diff --git a/opentelemetry-jaeger/src/exporter/agent.rs b/opentelemetry-jaeger/src/exporter/agent.rs index 87cebfc318..d0842f31fb 100644 --- a/opentelemetry-jaeger/src/exporter/agent.rs +++ b/opentelemetry-jaeger/src/exporter/agent.rs @@ -40,6 +40,7 @@ pub(crate) struct AgentSyncClientUdp { conn: UdpSocket, buffer_client: BufferClient, max_packet_size: usize, + auto_split: bool, } impl AgentSyncClientUdp { @@ -47,6 +48,7 @@ impl AgentSyncClientUdp { pub(crate) fn new( host_port: T, max_packet_size: Option, + auto_split: bool, ) -> thrift::Result { let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH); let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?; @@ -62,28 +64,28 @@ impl AgentSyncClientUdp { conn, buffer_client: BufferClient { buffer, client }, max_packet_size, + auto_split, }) } /// Emit standard Jaeger batch pub(crate) fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> { - // Write payload to buffer - self.buffer_client.client.emit_batch(batch)?; - let payload = self.buffer_client.buffer.take_bytes(); - - if payload.len() > self.max_packet_size { - return Err(thrift::ProtocolError::new( - thrift::ProtocolErrorKind::SizeLimit, - format!( - "jaeger exporter payload size of {} bytes over max UDP packet size of {} bytes. Try setting a smaller batch size.", - payload.len(), - self.max_packet_size, - ), - ) - .into()); + if !self.auto_split { + let payload = serialize_batch(&mut self.buffer_client, batch, self.max_packet_size)?; + self.conn.send(&payload)?; + return Ok(()); } - self.conn.send(&payload)?; + let mut buffers = vec![]; + serialize_batch_vectored( + &mut self.buffer_client, + batch, + self.max_packet_size, + &mut buffers, + )?; + for payload in buffers { + self.conn.send(&payload)?; + } Ok(()) } @@ -97,6 +99,7 @@ pub(crate) struct AgentAsyncClientUdp { conn: ::Socket, buffer_client: BufferClient, max_packet_size: usize, + auto_split: bool, } impl AgentAsyncClientUdp { @@ -105,6 +108,7 @@ impl AgentAsyncClientUdp { host_port: T, max_packet_size: Option, runtime: R, + auto_split: bool, ) -> thrift::Result { let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH); let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?; @@ -120,30 +124,88 @@ impl AgentAsyncClientUdp { conn, buffer_client: BufferClient { buffer, client }, max_packet_size, + auto_split, }) } /// Emit standard Jaeger batch pub(crate) async fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> { - // Write payload to buffer - self.buffer_client.client.emit_batch(batch)?; - let payload = self.buffer_client.buffer.take_bytes(); + if !self.auto_split { + let payload = serialize_batch(&mut self.buffer_client, batch, self.max_packet_size)?; + self.runtime.write_to_socket(&self.conn, payload).await?; + return Ok(()); + } + + let mut buffers = vec![]; + serialize_batch_vectored( + &mut self.buffer_client, + batch, + self.max_packet_size, + &mut buffers, + )?; + for payload in buffers { + self.runtime.write_to_socket(&self.conn, payload).await?; + } + + Ok(()) + } +} - if payload.len() > self.max_packet_size { - return Err(thrift::ProtocolError::new( +fn serialize_batch( + client: &mut BufferClient, + batch: jaeger::Batch, + max_packet_size: usize, +) -> thrift::Result> { + client.client.emit_batch(batch)?; + let payload = client.buffer.take_bytes(); + + if payload.len() > max_packet_size { + return Err(thrift::ProtocolError::new( thrift::ProtocolErrorKind::SizeLimit, format!( - "jaeger exporter payload size of {} bytes over max UDP packet size of {} bytes. Try setting a smaller batch size.", + "jaeger exporter payload size of {} bytes over max UDP packet size of {} bytes. Try setting a smaller batch size or turn auto split on.", payload.len(), - self.max_packet_size, + max_packet_size, ), ) - .into()); - } + .into()); + } - // Write async to socket, reading from buffer - self.runtime.write_to_socket(&self.conn, payload).await?; + Ok(payload) +} - Ok(()) +fn serialize_batch_vectored( + client: &mut BufferClient, + mut batch: jaeger::Batch, + max_packet_size: usize, + output: &mut Vec>, +) -> thrift::Result<()> { + client.client.emit_batch(batch.clone())?; + let payload = client.buffer.take_bytes(); + + if payload.len() <= max_packet_size { + output.push(payload); + return Ok(()); + } + + if batch.spans.len() <= 1 { + return Err(thrift::ProtocolError::new( + thrift::ProtocolErrorKind::SizeLimit, + format!( + "single span's jaeger exporter payload size of {} bytes over max UDP packet size of {} bytes", + payload.len(), + max_packet_size, + ), + ) + .into()); } + + let mid = batch.spans.len() / 2; + let new_spans = batch.spans.drain(mid..).collect::>(); + let new_batch = jaeger::Batch::new(batch.process.clone(), new_spans); + + serialize_batch_vectored(client, batch, max_packet_size, output)?; + serialize_batch_vectored(client, new_batch, max_packet_size, output)?; + + Ok(()) } diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 62b3e52c5e..687402b33e 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -131,6 +131,7 @@ pub struct PipelineBuilder { service_name: Option, tags: Option>, max_packet_size: Option, + auto_split: bool, config: Option, } @@ -151,6 +152,7 @@ impl Default for PipelineBuilder { service_name: None, tags: None, max_packet_size: None, + auto_split: false, config: None, }; @@ -245,6 +247,20 @@ impl PipelineBuilder { self } + /// Config whether to auto split batches. + /// + /// When auto split is set to true, the exporter will try to split the + /// batch into smaller ones so that there will be minimal data loss. It + /// will impact the performance. + /// + /// Note that if one span is too large to export, other spans within the + /// same batch may or may not be exported. In this case, exporter will + /// return errors as we cannot split spans. + pub fn with_auto_split_batch(mut self, auto_split: bool) -> Self { + self.auto_split = auto_split; + self + } + /// Assign the SDK config for the exporter pipeline. pub fn with_trace_config(self, config: sdk::trace::Config) -> Self { PipelineBuilder { @@ -401,9 +417,12 @@ impl PipelineBuilder { } fn init_sync_uploader(self) -> Result, TraceError> { - let agent = - agent::AgentSyncClientUdp::new(self.agent_endpoint.as_slice(), self.max_packet_size) - .map_err::(Into::into)?; + let agent = agent::AgentSyncClientUdp::new( + self.agent_endpoint.as_slice(), + self.max_packet_size, + self.auto_split, + ) + .map_err::(Into::into)?; Ok(Box::new(SyncUploader::Agent(agent))) } @@ -416,6 +435,7 @@ impl PipelineBuilder { self.agent_endpoint.as_slice(), self.max_packet_size, runtime, + self.auto_split, ) .map_err::(Into::into)?; Ok(Box::new(AsyncUploader::Agent(agent))) @@ -512,8 +532,9 @@ impl PipelineBuilder { Ok(Box::new(uploader)) } else { let endpoint = self.agent_endpoint.as_slice(); - let agent = AgentAsyncClientUdp::new(endpoint, self.max_packet_size, runtime) - .map_err::(Into::into)?; + let agent = + AgentAsyncClientUdp::new(endpoint, self.max_packet_size, runtime, self.auto_split) + .map_err::(Into::into)?; Ok(Box::new(AsyncUploader::Agent(agent))) } } @@ -537,7 +558,7 @@ impl PipelineBuilder { Ok(Box::new(AsyncUploader::Collector(collector))) } else { let endpoint = self.agent_endpoint.as_slice(); - let agent = AgentAsyncClientUdp::new(endpoint, self.max_packet_size) + let agent = AgentAsyncClientUdp::new(endpoint, self.max_packet_size, self.auto_split) .map_err::(Into::into)?; Ok(Box::new(AsyncUploader::Agent(agent))) } diff --git a/opentelemetry-otlp/src/proto/grpcio/common.rs b/opentelemetry-otlp/src/proto/grpcio/common.rs index c52abc36a9..04d3add108 100644 --- a/opentelemetry-otlp/src/proto/grpcio/common.rs +++ b/opentelemetry-otlp/src/proto/grpcio/common.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/metrics.rs b/opentelemetry-otlp/src/proto/grpcio/metrics.rs index 59542607fa..6434e21147 100644 --- a/opentelemetry-otlp/src/proto/grpcio/metrics.rs +++ b/opentelemetry-otlp/src/proto/grpcio/metrics.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/metrics_service.rs b/opentelemetry-otlp/src/proto/grpcio/metrics_service.rs index 83316de80c..751fc2a269 100644 --- a/opentelemetry-otlp/src/proto/grpcio/metrics_service.rs +++ b/opentelemetry-otlp/src/proto/grpcio/metrics_service.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/resource.rs b/opentelemetry-otlp/src/proto/grpcio/resource.rs index 6b00f05689..a751f478e1 100644 --- a/opentelemetry-otlp/src/proto/grpcio/resource.rs +++ b/opentelemetry-otlp/src/proto/grpcio/resource.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/trace.rs b/opentelemetry-otlp/src/proto/grpcio/trace.rs index 0cdc8b11b5..2debf8cefb 100644 --- a/opentelemetry-otlp/src/proto/grpcio/trace.rs +++ b/opentelemetry-otlp/src/proto/grpcio/trace.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/trace_config.rs b/opentelemetry-otlp/src/proto/grpcio/trace_config.rs index 29bab70ac3..2d4030aab6 100644 --- a/opentelemetry-otlp/src/proto/grpcio/trace_config.rs +++ b/opentelemetry-otlp/src/proto/grpcio/trace_config.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-otlp/src/proto/grpcio/trace_service.rs b/opentelemetry-otlp/src/proto/grpcio/trace_service.rs index 510a7e7e9d..7b957b9488 100644 --- a/opentelemetry-otlp/src/proto/grpcio/trace_service.rs +++ b/opentelemetry-otlp/src/proto/grpcio/trace_service.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-zpages/src/proto/common.rs b/opentelemetry-zpages/src/proto/common.rs index 41c52a7534..7c80a2e0f6 100644 --- a/opentelemetry-zpages/src/proto/common.rs +++ b/opentelemetry-zpages/src/proto/common.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-zpages/src/proto/resource.rs b/opentelemetry-zpages/src/proto/resource.rs index 6b00f05689..a751f478e1 100644 --- a/opentelemetry-zpages/src/proto/resource.rs +++ b/opentelemetry-zpages/src/proto/resource.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-zpages/src/proto/trace.rs b/opentelemetry-zpages/src/proto/trace.rs index 0cdc8b11b5..2debf8cefb 100644 --- a/opentelemetry-zpages/src/proto/trace.rs +++ b/opentelemetry-zpages/src/proto/trace.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))] diff --git a/opentelemetry-zpages/src/proto/tracez.rs b/opentelemetry-zpages/src/proto/tracez.rs index 6c901a2f0d..79200af631 100644 --- a/opentelemetry-zpages/src/proto/tracez.rs +++ b/opentelemetry-zpages/src/proto/tracez.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.25.0. Do not edit +// This file is generated by rust-protobuf 2.25.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -21,7 +21,7 @@ /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1; #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]