diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index e8d27466e1..49d4a084e7 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -19,6 +19,7 @@ use opentelemetry_sdk::{ }; use opentelemetry_semantic_conventions::trace; use opentelemetry_stdout::{LogExporter, SpanExporter}; +use std::time::Duration; use std::{convert::Infallible, net::SocketAddr, sync::OnceLock}; use tokio::net::TcpListener; use tracing::info; @@ -131,7 +132,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 71547d6d72..55d4853042 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -25,6 +25,7 @@ also modified to suppress telemetry before invoking exporters. - Fixed the overflow attribute to correctly use the boolean value `true` instead of the string `"true"`. [#2878](https://github.com/open-telemetry/opentelemetry-rust/issues/2878) +- The `shutdown_with_timeout` method is added to SpanProcessor, SpanExporter trait and TracerProvider. - The `shutdown_with_timeout` method is added to LogExporter trait. - The `shutdown_with_timeout` method is added to LogProvider and LogProcessor trait. - *Breaking* `MetricError`, `MetricResult` no longer public (except when diff --git a/opentelemetry-sdk/src/trace/export.rs b/opentelemetry-sdk/src/trace/export.rs index 15db1bdaf4..950dfe08d2 100644 --- a/opentelemetry-sdk/src/trace/export.rs +++ b/opentelemetry-sdk/src/trace/export.rs @@ -5,7 +5,7 @@ use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status}; use opentelemetry::{InstrumentationScope, KeyValue}; use std::borrow::Cow; use std::fmt::Debug; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; /// `SpanExporter` defines the interface that protocol-specific exporters must /// implement so that they can be plugged into OpenTelemetry SDK and support @@ -43,9 +43,13 @@ pub trait SpanExporter: Send + Sync + Debug { /// flush the data and the destination is unavailable). SDK authors /// can decide if they want to make the shutdown timeout /// configurable. - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult { Ok(()) } + /// Shuts down the exporter with default timeout. + fn shutdown(&mut self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_nanos(5)) + } /// This is a hint to ensure that the export of any Spans the exporter /// has received prior to the call to this function SHOULD be completed diff --git a/opentelemetry-sdk/src/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/trace/in_memory_exporter.rs index 099f666a41..34ce2dd817 100644 --- a/opentelemetry-sdk/src/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/trace/in_memory_exporter.rs @@ -3,6 +3,7 @@ use crate::resource::Resource; use crate::trace::{SpanData, SpanExporter}; use crate::InMemoryExporterError; use std::sync::{Arc, Mutex}; +use std::time::Duration; /// An in-memory span exporter that stores span data in memory. /// @@ -140,7 +141,7 @@ impl SpanExporter for InMemorySpanExporter { result } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult { self.reset(); Ok(()) } diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 1b9a027326..42b299a606 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -58,8 +58,8 @@ mod runtime_tests; #[cfg(all(test, feature = "testing"))] mod tests { - use super::*; + use crate::error::OTelSdkResult; use crate::{ trace::span_limit::{DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN}, trace::{InMemorySpanExporter, InMemorySpanExporterBuilder}, @@ -76,6 +76,7 @@ mod tests { }, Context, KeyValue, }; + use std::time::Duration; #[test] fn span_modification_via_context() { @@ -146,7 +147,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> crate::error::OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 2773b8778b..23473925d6 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -1,3 +1,4 @@ +use super::IdGenerator; use crate::error::{OTelSdkError, OTelSdkResult}; /// # Trace Provider SDK /// @@ -74,8 +75,7 @@ use opentelemetry::{otel_info, InstrumentationScope}; use std::borrow::Cow; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, OnceLock}; - -use super::IdGenerator; +use std::time::Duration; static PROVIDER_RESOURCE: OnceLock = OnceLock::new(); @@ -112,10 +112,10 @@ pub(crate) struct TracerProviderInner { impl TracerProviderInner { /// Crate-private shutdown method to be called both from explicit shutdown /// and from Drop when the last reference is released. - pub(crate) fn shutdown(&self) -> Vec { + pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> Vec { let mut results = vec![]; for processor in &self.processors { - let result = processor.shutdown(); + let result = processor.shutdown_with_timeout(timeout); if let Err(err) = &result { // Log at debug level because: // - The error is also returned to the user for handling (if applicable) @@ -128,6 +128,10 @@ impl TracerProviderInner { } results } + /// shutdown with default timeout + pub(crate) fn shutdown(&self) -> Vec { + self.shutdown_with_timeout(Duration::from_secs(5)) + } } impl Drop for TracerProviderInner { @@ -239,7 +243,7 @@ impl SdkTracerProvider { /// Shuts down the current `TracerProvider`. /// /// Note that shut down doesn't means the TracerProvider has dropped - pub fn shutdown(&self) -> OTelSdkResult { + pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { if self .inner .is_shutdown @@ -247,7 +251,7 @@ impl SdkTracerProvider { .is_ok() { // propagate the shutdown signal to processors - let results = self.inner.shutdown(); + let results = self.inner.shutdown_with_timeout(timeout); if results.iter().all(|res| res.is_ok()) { Ok(()) @@ -264,6 +268,11 @@ impl SdkTracerProvider { Err(OTelSdkError::AlreadyShutdown) } } + + /// shutdown with default timeout + pub fn shutdown(&self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_secs(5)) + } } impl opentelemetry::trace::TracerProvider for SdkTracerProvider { @@ -471,6 +480,7 @@ mod tests { use std::env; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; + use std::time::Duration; // fields below is wrapped with Arc so we can assert it #[derive(Default, Debug)] @@ -528,7 +538,7 @@ mod tests { } } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) { Ok(()) } else { @@ -787,7 +797,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { self.shutdown_count.fetch_add(1, Ordering::SeqCst); Ok(()) } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 595099ef7f..c72e3f0ed0 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -90,7 +90,11 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// opportunity for processors to do any cleanup required. /// /// Implementation should make sure shutdown can be called multiple times. - fn shutdown(&self) -> OTelSdkResult; + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult; + /// shutdown the processor with a default timeout. + fn shutdown(&self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_secs(5)) + } /// Set the resource for the span processor. fn set_resource(&mut self, _resource: &Resource) {} } @@ -154,9 +158,9 @@ impl SpanProcessor for SimpleSpanProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown() + exporter.shutdown_with_timeout(timeout) } else { Err(OTelSdkError::InternalFailure( "SimpleSpanProcessor mutex poison at shutdown".into(), @@ -285,7 +289,6 @@ pub struct BatchSpanProcessor { message_sender: SyncSender, // Control channel to store control messages. handle: Mutex>>, forceflush_timeout: Duration, - shutdown_timeout: Duration, is_shutdown: AtomicBool, dropped_span_count: Arc, export_span_message_sent: Arc, @@ -424,7 +427,6 @@ impl BatchSpanProcessor { message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable - shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable is_shutdown: AtomicBool::new(false), dropped_span_count: Arc::new(AtomicUsize::new(0)), max_queue_size, @@ -580,7 +582,7 @@ impl SpanProcessor for BatchSpanProcessor { } /// Shuts down the processor. - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { if self.is_shutdown.swap(true, Ordering::Relaxed) { return Err(OTelSdkError::AlreadyShutdown); } @@ -601,8 +603,8 @@ impl SpanProcessor for BatchSpanProcessor { .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; let result = receiver - .recv_timeout(self.shutdown_timeout) - .map_err(|_| OTelSdkError::Timeout(self.shutdown_timeout))?; + .recv_timeout(timeout) + .map_err(|_| OTelSdkError::Timeout(timeout))?; if let Some(handle) = self.handle.lock().unwrap().take() { if let Err(err) = handle.join() { return Err(OTelSdkError::InternalFailure(format!( diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index cb4d2fc14b..b59c77845e 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -18,6 +18,7 @@ use opentelemetry::{otel_debug, otel_error, otel_warn}; use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::time::Duration; /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. @@ -133,7 +134,7 @@ impl SpanProcessor for BatchSpanProcessor { })? } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_spans > 0 { diff --git a/stress/src/traces.rs b/stress/src/traces.rs index 4cd713e4b2..e0f15099e5 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -8,7 +8,6 @@ Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, ~10.6 M /sec */ - use lazy_static::lazy_static; use opentelemetry::{ trace::{Span, SpanBuilder, Tracer, TracerProvider}, @@ -18,6 +17,7 @@ use opentelemetry_sdk::{ error::OTelSdkResult, trace::{self as sdktrace, SpanData, SpanProcessor}, }; +use std::time::Duration; mod throughput; @@ -45,7 +45,7 @@ impl SpanProcessor for NoOpSpanProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } }