diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index 2b20f403df..466bc03654 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -6,7 +6,7 @@ use crate::exporter::config::{ use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader}; use crate::{Error, Exporter, JaegerTraceRuntime}; use opentelemetry::sdk; -use opentelemetry::sdk::trace::{Config, TracerProvider}; +use opentelemetry::sdk::trace::{BatchConfig, Config, TracerProvider}; use opentelemetry::trace::TraceError; use std::borrow::BorrowMut; use std::sync::Arc; @@ -72,6 +72,7 @@ const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831"; pub struct AgentPipeline { transformation_config: TransformationConfig, trace_config: Option, + batch_config: Option, agent_endpoint: Result, crate::Error>, max_packet_size: usize, auto_split_batch: bool, @@ -82,6 +83,7 @@ impl Default for AgentPipeline { let mut pipeline = AgentPipeline { transformation_config: Default::default(), trace_config: Default::default(), + batch_config: Some(Default::default()), agent_endpoint: Ok(vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()]), max_packet_size: UDP_PACKET_MAX_LENGTH, auto_split_batch: false, @@ -108,6 +110,10 @@ impl HasRequiredConfig for AgentPipeline { fn set_trace_config(&mut self, config: Config) { self.trace_config = Some(config) } + + fn set_batch_config(&mut self, config: BatchConfig) { + self.batch_config = Some(config) + } } /// Start a new pipeline to configure a exporter that target a jaeger agent. @@ -226,6 +232,27 @@ impl AgentPipeline { self } + /// Assign the batch span processor for the exporter pipeline. + /// + /// If a simple span processor is used by [`install_simple`][AgentPipeline::install_simple] + /// or [`build_simple`][AgentPipeline::install_simple], then this config will not be ignored. + /// + /// # Examples + /// Set max queue size. + /// ```rust + /// use opentelemetry::sdk::trace::BatchConfig; + /// + /// let pipeline = opentelemetry_jaeger::new_agent_pipeline() + /// .with_batch_processor_config( + /// BatchConfig::default().with_max_queue_size(200) + /// ); + /// + /// ``` + pub fn with_batch_processor_config(mut self, config: BatchConfig) -> Self { + self.set_batch_config(config); + self + } + /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline. /// /// The exporter will send each span to the agent upon the span ends. @@ -273,10 +300,14 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); + let batch_config = self.batch_config.take(); let uploader = self.build_async_agent_uploader(runtime.clone())?; let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let batch_processor = sdk::trace::BatchSpanProcessor::builder(exporter, runtime) + .with_batch_config(batch_config.unwrap_or_default()) + .build(); - builder = builder.with_batch_exporter(exporter, runtime); + builder = builder.with_span_processor(batch_processor); builder = builder.with_config(config); Ok(builder.build()) diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 9455763a64..b41e0269f5 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -5,6 +5,7 @@ use crate::exporter::config::{ use crate::exporter::uploader::{AsyncUploader, Uploader}; use crate::{Exporter, JaegerTraceRuntime}; use http::Uri; +use opentelemetry::sdk::trace::BatchConfig; use opentelemetry::{sdk, sdk::trace::Config as TraceConfig, trace::TraceError}; use std::borrow::BorrowMut; use std::convert::TryFrom; @@ -91,6 +92,7 @@ const ENV_PASSWORD: &str = "OTEL_EXPORTER_JAEGER_PASSWORD"; pub struct CollectorPipeline { transformation_config: TransformationConfig, trace_config: Option, + batch_config: Option, #[cfg(feature = "collector_client")] collector_timeout: Duration, @@ -113,6 +115,7 @@ impl Default for CollectorPipeline { client_config: ClientConfig::default(), transformation_config: Default::default(), trace_config: Default::default(), + batch_config: Some(Default::default()), }; #[cfg(feature = "collector_client")] @@ -155,6 +158,10 @@ impl HasRequiredConfig for CollectorPipeline { fn set_trace_config(&mut self, config: TraceConfig) { self.trace_config = Some(config) } + + fn set_batch_config(&mut self, config: BatchConfig) { + self.batch_config = Some(config) + } } #[derive(Debug)] @@ -398,6 +405,24 @@ impl CollectorPipeline { self } + /// Assign the batch span processor for the exporter pipeline. + /// + /// # Examples + /// Set max queue size. + /// ```rust + /// use opentelemetry::sdk::trace::BatchConfig; + /// + /// let pipeline = opentelemetry_jaeger::new_collector_pipeline() + /// .with_batch_processor_config( + /// BatchConfig::default().with_max_queue_size(200) + /// ); + /// + /// ``` + pub fn with_batch_processor_config(mut self, config: BatchConfig) -> Self { + self.set_batch_config(config); + self + } + /// Build a `TracerProvider` using a async exporter and configurations from the pipeline. /// /// The exporter will collect spans in a batch and send them to the agent. @@ -423,10 +448,14 @@ impl CollectorPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); + let batch_config = self.batch_config.take(); let uploader = self.build_uploader::()?; let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let batch_processor = sdk::trace::BatchSpanProcessor::builder(exporter, runtime) + .with_batch_config(batch_config.unwrap_or_default()) + .build(); - builder = builder.with_batch_exporter(exporter, runtime); + builder = builder.with_span_processor(batch_processor); builder = builder.with_config(config); Ok(builder.build()) diff --git a/opentelemetry-jaeger/src/exporter/config/mod.rs b/opentelemetry-jaeger/src/exporter/config/mod.rs index 7b314c2ec8..59071067ad 100644 --- a/opentelemetry-jaeger/src/exporter/config/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/mod.rs @@ -36,13 +36,15 @@ impl Default for TransformationConfig { } } -// pipeline must have transformation config and trace config. +// pipeline must have transformation config, trace config and batch config. trait HasRequiredConfig { fn set_transformation_config(&mut self, f: T) where T: FnOnce(&mut TransformationConfig); fn set_trace_config(&mut self, config: sdk::trace::Config); + + fn set_batch_config(&mut self, config: sdk::trace::BatchConfig); } // To reduce the overhead of copying service name in every spans. We convert resource into jaeger tags diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 8436181d04..d4e01a156b 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -554,6 +554,54 @@ impl Default for BatchConfig { } } +impl BatchConfig { + /// Set max_queue_size for [`BatchConfig`]. + /// It's the maximum queue size to buffer spans for delayed processing. + /// If the queue gets full it will drops the spans. + /// The default value of is 2048. + pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self { + self.max_queue_size = max_queue_size; + self + } + + /// Set max_export_batch_size for [`BatchConfig`]. + /// It's the maximum number of spans to process in a single batch. If there are + /// more than one batch worth of spans then it processes multiple batches + /// of spans one batch after the other without any delay. The default value + /// is 512. + pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self { + self.max_export_batch_size = max_export_batch_size; + self + } + + /// Set max_concurrent_exports for [`BatchConfig`]. + /// It's the maximum number of concurrent exports. + /// Limits the number of spawned tasks for exports and thus memory consumed by an exporter. + /// The default value is 1. + /// IF the max_concurrent_exports value is default value, it will cause exports to be performed + /// synchronously on the BatchSpanProcessor task. + pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self { + self.max_concurrent_exports = max_concurrent_exports; + self + } + + /// Set scheduled_delay_duration for [`BatchConfig`]. + /// It's the delay interval in milliseconds between two consecutive processing of batches. + /// The default value is 5000 milliseconds. + pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self { + self.scheduled_delay = scheduled_delay; + self + } + + /// Set max_export_timeout for [`BatchConfig`]. + /// It's the maximum duration to export a batch of data. + /// The The default value is 30000 milliseconds. + pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self { + self.max_export_timeout = max_export_timeout; + self + } +} + /// A builder for creating [`BatchSpanProcessor`] instances. /// #[derive(Debug)] @@ -616,6 +664,11 @@ where BatchSpanProcessorBuilder { config, ..self } } + /// Set the BatchConfig for [BatchSpanProcessorBuilder] + pub fn with_batch_config(self, config: BatchConfig) -> Self { + BatchSpanProcessorBuilder { config, ..self } + } + /// Build a batch processor pub fn build(self) -> BatchSpanProcessor { BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime) @@ -657,6 +710,21 @@ mod tests { assert!(rx_shutdown.try_recv().is_ok()); } + #[test] + fn test_batch_config_with_fields() { + let batch = BatchConfig::default() + .with_max_export_batch_size(10) + .with_scheduled_delay(Duration::from_millis(10)) + .with_max_export_timeout(Duration::from_millis(10)) + .with_max_concurrent_exports(10) + .with_max_queue_size(10); + assert_eq!(batch.max_export_batch_size, 10); + assert_eq!(batch.scheduled_delay, Duration::from_millis(10)); + assert_eq!(batch.max_export_timeout, Duration::from_millis(10)); + assert_eq!(batch.max_concurrent_exports, 10); + assert_eq!(batch.max_queue_size, 10); + } + #[test] fn test_build_batch_span_processor_builder() { std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");