Skip to content

Commit

Permalink
feat: try split batch if payload size larger than max_package_size (#619
Browse files Browse the repository at this point in the history
)

* feat: auto split batch

* recover unexpected delete

* reduce overhead if auto split is off

* improve documentation.

* docs: auto_split

* docs: fix format.

* regenerate protobuf types.

Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
  • Loading branch information
DCjanus and TommyCpp committed Aug 23, 2021
1 parent df01fb3 commit a0899d9
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 55 deletions.
116 changes: 89 additions & 27 deletions opentelemetry-jaeger/src/exporter/agent.rs
Expand Up @@ -40,13 +40,15 @@ pub(crate) struct AgentSyncClientUdp {
conn: UdpSocket,
buffer_client: BufferClient,
max_packet_size: usize,
auto_split: bool,
}

impl AgentSyncClientUdp {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
host_port: T,
max_packet_size: Option<usize>,
auto_split: bool,
) -> thrift::Result<Self> {
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
Expand All @@ -62,28 +64,28 @@ impl AgentSyncClientUdp {
conn,
buffer_client: BufferClient { buffer, client },
max_packet_size,
auto_split,
})
}

/// Emit standard Jaeger batch
pub(crate) fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> {
// Write payload to buffer
self.buffer_client.client.emit_batch(batch)?;
let payload = self.buffer_client.buffer.take_bytes();

if payload.len() > self.max_packet_size {
return Err(thrift::ProtocolError::new(
thrift::ProtocolErrorKind::SizeLimit,
format!(
"jaeger exporter payload size of {} bytes over max UDP packet size of {} bytes. Try setting a smaller batch size.",
payload.len(),
self.max_packet_size,
),
)
.into());
if !self.auto_split {
let payload = serialize_batch(&mut self.buffer_client, batch, self.max_packet_size)?;
self.conn.send(&payload)?;
return Ok(());
}

self.conn.send(&payload)?;
let mut buffers = vec![];
serialize_batch_vectored(
&mut self.buffer_client,
batch,
self.max_packet_size,
&mut buffers,
)?;
for payload in buffers {
self.conn.send(&payload)?;
}

Ok(())
}
Expand All @@ -97,6 +99,7 @@ pub(crate) struct AgentAsyncClientUdp<R: JaegerTraceRuntime> {
conn: <R as JaegerTraceRuntime>::Socket,
buffer_client: BufferClient,
max_packet_size: usize,
auto_split: bool,
}

impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
Expand All @@ -105,6 +108,7 @@ impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
host_port: T,
max_packet_size: Option<usize>,
runtime: R,
auto_split: bool,
) -> thrift::Result<Self> {
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
Expand All @@ -120,30 +124,88 @@ impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
conn,
buffer_client: BufferClient { buffer, client },
max_packet_size,
auto_split,
})
}

/// Emit standard Jaeger batch
pub(crate) async fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> {
// Write payload to buffer
self.buffer_client.client.emit_batch(batch)?;
let payload = self.buffer_client.buffer.take_bytes();
if !self.auto_split {
let payload = serialize_batch(&mut self.buffer_client, batch, self.max_packet_size)?;
self.runtime.write_to_socket(&self.conn, payload).await?;
return Ok(());
}

let mut buffers = vec![];
serialize_batch_vectored(
&mut self.buffer_client,
batch,
self.max_packet_size,
&mut buffers,
)?;
for payload in buffers {
self.runtime.write_to_socket(&self.conn, payload).await?;
}

Ok(())
}
}

if payload.len() > self.max_packet_size {
return Err(thrift::ProtocolError::new(
fn serialize_batch(
client: &mut BufferClient,
batch: jaeger::Batch,
max_packet_size: usize,
) -> thrift::Result<Vec<u8>> {
client.client.emit_batch(batch)?;
let payload = client.buffer.take_bytes();

if payload.len() > max_packet_size {
return Err(thrift::ProtocolError::new(
thrift::ProtocolErrorKind::SizeLimit,
format!(
"jaeger exporter payload size of {} bytes over max UDP packet size of {} bytes. Try setting a smaller batch size.",
"jaeger exporter payload size of {} bytes over max UDP packet size of {} bytes. Try setting a smaller batch size or turn auto split on.",
payload.len(),
self.max_packet_size,
max_packet_size,
),
)
.into());
}
.into());
}

// Write async to socket, reading from buffer
self.runtime.write_to_socket(&self.conn, payload).await?;
Ok(payload)
}

Ok(())
fn serialize_batch_vectored(
client: &mut BufferClient,
mut batch: jaeger::Batch,
max_packet_size: usize,
output: &mut Vec<Vec<u8>>,
) -> thrift::Result<()> {
client.client.emit_batch(batch.clone())?;
let payload = client.buffer.take_bytes();

if payload.len() <= max_packet_size {
output.push(payload);
return Ok(());
}

if batch.spans.len() <= 1 {
return Err(thrift::ProtocolError::new(
thrift::ProtocolErrorKind::SizeLimit,
format!(
"single span's jaeger exporter payload size of {} bytes over max UDP packet size of {} bytes",
payload.len(),
max_packet_size,
),
)
.into());
}

let mid = batch.spans.len() / 2;
let new_spans = batch.spans.drain(mid..).collect::<Vec<_>>();
let new_batch = jaeger::Batch::new(batch.process.clone(), new_spans);

serialize_batch_vectored(client, batch, max_packet_size, output)?;
serialize_batch_vectored(client, new_batch, max_packet_size, output)?;

Ok(())
}
33 changes: 27 additions & 6 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -131,6 +131,7 @@ pub struct PipelineBuilder {
service_name: Option<String>,
tags: Option<Vec<KeyValue>>,
max_packet_size: Option<usize>,
auto_split: bool,
config: Option<sdk::trace::Config>,
}

Expand All @@ -151,6 +152,7 @@ impl Default for PipelineBuilder {
service_name: None,
tags: None,
max_packet_size: None,
auto_split: false,
config: None,
};

Expand Down Expand Up @@ -245,6 +247,20 @@ impl PipelineBuilder {
self
}

/// Config whether to auto split batches.
///
/// When auto split is set to true, the exporter will try to split the
/// batch into smaller ones so that there will be minimal data loss. It
/// will impact the performance.
///
/// Note that if one span is too large to export, other spans within the
/// same batch may or may not be exported. In this case, exporter will
/// return errors as we cannot split spans.
pub fn with_auto_split_batch(mut self, auto_split: bool) -> Self {
self.auto_split = auto_split;
self
}

/// Assign the SDK config for the exporter pipeline.
pub fn with_trace_config(self, config: sdk::trace::Config) -> Self {
PipelineBuilder {
Expand Down Expand Up @@ -401,9 +417,12 @@ impl PipelineBuilder {
}

fn init_sync_uploader(self) -> Result<Box<dyn Uploader>, TraceError> {
let agent =
agent::AgentSyncClientUdp::new(self.agent_endpoint.as_slice(), self.max_packet_size)
.map_err::<Error, _>(Into::into)?;
let agent = agent::AgentSyncClientUdp::new(
self.agent_endpoint.as_slice(),
self.max_packet_size,
self.auto_split,
)
.map_err::<Error, _>(Into::into)?;
Ok(Box::new(SyncUploader::Agent(agent)))
}

Expand All @@ -416,6 +435,7 @@ impl PipelineBuilder {
self.agent_endpoint.as_slice(),
self.max_packet_size,
runtime,
self.auto_split,
)
.map_err::<Error, _>(Into::into)?;
Ok(Box::new(AsyncUploader::Agent(agent)))
Expand Down Expand Up @@ -512,8 +532,9 @@ impl PipelineBuilder {
Ok(Box::new(uploader))
} else {
let endpoint = self.agent_endpoint.as_slice();
let agent = AgentAsyncClientUdp::new(endpoint, self.max_packet_size, runtime)
.map_err::<Error, _>(Into::into)?;
let agent =
AgentAsyncClientUdp::new(endpoint, self.max_packet_size, runtime, self.auto_split)
.map_err::<Error, _>(Into::into)?;
Ok(Box::new(AsyncUploader::Agent(agent)))
}
}
Expand All @@ -537,7 +558,7 @@ impl PipelineBuilder {
Ok(Box::new(AsyncUploader::Collector(collector)))
} else {
let endpoint = self.agent_endpoint.as_slice();
let agent = AgentAsyncClientUdp::new(endpoint, self.max_packet_size)
let agent = AgentAsyncClientUdp::new(endpoint, self.max_packet_size, self.auto_split)
.map_err::<Error, _>(Into::into)?;
Ok(Box::new(AsyncUploader::Agent(agent)))
}
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/grpcio/common.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/grpcio/metrics.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/grpcio/metrics_service.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/grpcio/resource.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/grpcio/trace.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/grpcio/trace_config.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/proto/grpcio/trace_service.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-zpages/src/proto/common.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-zpages/src/proto/resource.rs
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.25.0. Do not edit
// This file is generated by rust-protobuf 2.25.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -21,7 +21,7 @@

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_25_1;

#[derive(PartialEq,Clone,Default)]
#[cfg_attr(feature = "with-serde", derive(::serde::Serialize, ::serde::Deserialize))]
Expand Down

0 comments on commit a0899d9

Please sign in to comment.