Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add shutdown in TracerProvider #1855

Merged
merged 10 commits into from
Jun 10, 2024
Merged
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {

fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
// If the provider is shutdown, new logger will refer a no-op logger provider.
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
if self.is_shutdown.load(Ordering::Relaxed) {
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
}
Logger::new(library, self.clone())
Expand Down
181 changes: 164 additions & 17 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,37 @@
//! not duplicate this data to avoid that different [`Tracer`] instances
//! of the [`TracerProvider`] have different versions of these data.
use crate::runtime::RuntimeChannel;
use crate::trace::{BatchSpanProcessor, SimpleSpanProcessor, Tracer};
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
};
use crate::{export::trace::SpanExporter, trace::SpanProcessor};
use crate::{InstrumentationLibrary, Resource};
use once_cell::sync::OnceCell;
use once_cell::sync::{Lazy, OnceCell};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, trace::TraceResult};
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

/// Default tracer name if empty string is provided.
const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/tracer";
static PROVIDER_RESOURCE: OnceCell<Resource> = OnceCell::new();

// a no nop tracer provider used as placeholder when the provider is shutdown
static NOOP_TRACER_PROVIDER: Lazy<TracerProvider> = Lazy::new(|| TracerProvider {
inner: Arc::new(TracerProviderInner {
processors: Vec::new(),
config: Config {
// cannot use default here as the default resource is not empty
sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
id_generator: Box::<RandomIdGenerator>::default(),
span_limits: SpanLimits::default(),
resource: Cow::Owned(Resource::empty()),
},
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

/// TracerProvider inner type
#[derive(Debug)]
pub(crate) struct TracerProviderInner {
Expand All @@ -39,9 +58,14 @@
}

/// Creator and registry of named [`Tracer`] instances.
///
/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components.
/// Cloning and dropping them will not stop the span processing. To stop span processing, users
/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`.
#[derive(Clone, Debug)]
pub struct TracerProvider {
inner: Arc<TracerProviderInner>,
is_shutdown: Arc<AtomicBool>,
Copy link
Contributor

@utpilla utpilla Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit to creating another Arc pointer just for is_shutdown? Could we reuse the existing Arc pointer inner to also hold is_shutdown inside it?

}

impl Default for TracerProvider {
Expand All @@ -52,8 +76,11 @@

impl TracerProvider {
/// Build a new tracer provider
pub(crate) fn new(inner: Arc<TracerProviderInner>) -> Self {
TracerProvider { inner }
pub(crate) fn new(inner: TracerProviderInner) -> Self {
TracerProvider {
inner: Arc::new(inner),
is_shutdown: Arc::new(AtomicBool::new(false)),
}
}

/// Create a new [`TracerProvider`] builder.
Expand All @@ -71,6 +98,12 @@
&self.inner.config
}

/// true if the provider has been shutdown
/// Don't start span or export spans when provider is shutdown
pub(crate) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::Relaxed)
}

/// Force flush all remaining spans in span processors and return results.
///
/// # Examples
Expand Down Expand Up @@ -114,11 +147,41 @@
.map(|processor| processor.force_flush())
.collect()
}

/// Shuts down the current `TracerProvider`.
///
/// Note that shut down doesn't means the TracerProvider has dropped
pub fn shutdown(&self) -> TraceResult<()> {
if self
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
// propagate the shutdown signal to processors
// it's up to the processor to properly block new spans after shutdown
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
errs.push(err);
}
}

if errs.is_empty() {
Ok(())
} else {
Err(TraceError::Other(format!("{errs:?}").into()))
}
} else {
Err(TraceError::Other(
"tracer provider already shut down".into(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use Mutex or RwLock instead of atomic variable to track shutdown. With the current setup, if there are two threads that are racing to shutdown the tracerprovider, the thread which fails the compare and swap and immediately come to the else condition. It wouldn't return an error message saying "tracer provider already shut down" without even waiting or verifying that the other thread has indeed completed the shutdown.

Copy link
Member

@lalitb lalitb Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rephrasing the error to say "tracer provider is already shutting down or has been shut down" be useful here, instead of using the Mutex/RwLock ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on what kind of experience we are targeting. I feel that shutting down a tracer provider does not have to be a perf optimized operation so it's okay to use locks. Shutdown would anyway not be a frequent scenario. I also like the clear status that locking offers about whether the provider is shutdown or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutex/rwlock/atomic - is checked in hot path, so it needs to be performant!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-sdk/src/logs/log_processor.rs#L93-L97 Logs.
Looks like we are inconsistent here. so that need to be taken care as well!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutex/rwlock/atomic - is checked in hot path, so it needs to be performant!

Got it! In that case, we should make use of Arc<Cell<bool>> to track is_shutdown instead of atomics as that should offer better performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Arc<Cell<bool>> - this can cause race condition and data corruption right, as it's not thread safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad! I thought we could use Mutex in conjunction with Arc<Cell<T>> to save the atomic read operation on hot-path. Rust wouldn't allow that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cell<T> is not Send and Sync so compiler won't allow this

))

Check warning on line 177 in opentelemetry-sdk/src/trace/provider.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/provider.rs#L175-L177

Added lines #L175 - L177 were not covered by tests
}
}
}

impl opentelemetry::trace::TracerProvider for TracerProvider {
/// This implementation of `TracerProvider` produces `Tracer` instances.
type Tracer = crate::trace::Tracer;
type Tracer = Tracer;

/// Create a new versioned `Tracer` instance.
fn versioned_tracer(
Expand Down Expand Up @@ -152,7 +215,10 @@
}

fn library_tracer(&self, library: Arc<InstrumentationLibrary>) -> Self::Tracer {
Tracer::new(library, Arc::downgrade(&self.inner))
if self.is_shutdown.load(Ordering::Relaxed) {
return Tracer::new(library, NOOP_TRACER_PROVIDER.clone());
}
Tracer::new(library, self.clone())
}
}

Expand Down Expand Up @@ -226,9 +292,7 @@
p.set_resource(config.resource.as_ref());
}

TracerProvider {
inner: Arc::new(TracerProviderInner { processors, config }),
}
TracerProvider::new(TracerProviderInner { processors, config })
}
}

Expand All @@ -241,24 +305,59 @@
use crate::trace::provider::TracerProviderInner;
use crate::trace::{Config, Span, SpanProcessor};
use crate::Resource;
use opentelemetry::trace::{TraceError, TraceResult};
use opentelemetry::trace::{TraceError, TraceResult, Tracer, TracerProvider};
use opentelemetry::{Context, Key, KeyValue, Value};
use std::borrow::Cow;
use std::env;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;

// fields below is wrapped with Arc so we can assert it
#[derive(Default, Debug)]
struct AssertInfo {
started_span: AtomicU32,
is_shutdown: AtomicBool,
}

#[derive(Default, Debug, Clone)]
struct SharedAssertInfo(Arc<AssertInfo>);

impl SharedAssertInfo {
fn started_span_count(&self, count: u32) -> bool {
self.0.started_span.load(Ordering::SeqCst) == count
}
}

#[derive(Debug)]
struct TestSpanProcessor {
success: bool,
assert_info: SharedAssertInfo,
}

impl TestSpanProcessor {
fn new(success: bool) -> TestSpanProcessor {
TestSpanProcessor {
success,
assert_info: SharedAssertInfo::default(),
}
}

// get handle to assert info
fn assert_info(&self) -> SharedAssertInfo {
self.assert_info.clone()
}
}

impl SpanProcessor for TestSpanProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
unimplemented!()
self.assert_info
.0
.started_span
.fetch_add(1, Ordering::SeqCst);
}

fn on_end(&self, _span: SpanData) {
unimplemented!()
// ignore
}

fn force_flush(&self) -> TraceResult<()> {
Expand All @@ -270,19 +369,29 @@
}

fn shutdown(&self) -> TraceResult<()> {
self.force_flush()
if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
Ok(())
} else {
let _ = self.assert_info.0.is_shutdown.compare_exchange(
false,
true,
Ordering::SeqCst,
Ordering::SeqCst,
);
self.force_flush()
}
}
}

#[test]
fn test_force_flush() {
let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner {
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
processors: vec![
Box::from(TestSpanProcessor { success: true }),
Box::from(TestSpanProcessor { success: false }),
Box::from(TestSpanProcessor::new(true)),
Box::from(TestSpanProcessor::new(false)),
],
config: Default::default(),
}));
});

let results = tracer_provider.force_flush();
assert_eq!(results.len(), 2);
Expand Down Expand Up @@ -417,4 +526,42 @@

assert_eq!(no_service_name.config().resource.len(), 0)
}

#[test]
fn test_shutdown_noops() {
let processor = TestSpanProcessor::new(false);
let assert_handle = processor.assert_info();
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
processors: vec![Box::from(processor)],
config: Default::default(),
});

let test_tracer_1 = tracer_provider.tracer("test1");
let _ = test_tracer_1.start("test");

assert!(assert_handle.started_span_count(1));

let _ = test_tracer_1.start("test");

assert!(assert_handle.started_span_count(2));

let shutdown = |tracer_provider: super::TracerProvider| {
let _ = tracer_provider.shutdown(); // shutdown once
};

// assert tracer provider can be shutdown using on a cloned version
shutdown(tracer_provider.clone());

// after shutdown we should get noop tracer
let noop_tracer = tracer_provider.tracer("noop");
// noop tracer cannot start anything
let _ = noop_tracer.start("test");
assert!(assert_handle.started_span_count(2));
// noop tracer's tracer provider should be shutdown
assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst));

// existing tracer becomes noops after shutdown
let _ = test_tracer_1.start("test");
assert!(assert_handle.started_span_count(2));
}
}
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ impl Span {
None => return,
};

let provider = self.tracer.provider();
// skip if provider has been shut down
let provider = match self.tracer.provider() {
Some(provider) => provider,
None => return,
};
if provider.is_shutdown() {
return;
}

// ensure end time is set via explicit end or implicitly on drop
if let Some(timestamp) = timestamp {
Expand Down Expand Up @@ -719,7 +719,7 @@ mod tests {
let exported_data = span.exported_data();
assert!(exported_data.is_some());

drop(provider);
provider.shutdown().expect("shutdown panicked");
let dropped_span = tracer.start("span_with_dropped_provider");
// return none if the provider has already been dropped
assert!(dropped_span.exported_data().is_none());
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn force_flush(&self) -> TraceResult<()>;
/// Shuts down the processor. Called when SDK is shut down. This is an
/// opportunity for processors to do any cleanup required.
///
/// Implementation should make sure shutdown can be called multiple times.
fn shutdown(&self) -> TraceResult<()>;
/// Set the resource for the log processor.
fn set_resource(&mut self, _resource: &Resource) {}
Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/trace/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! Docs: <https://github.com/open-telemetry/opentelemetry-specification/blob/v1.3.0/specification/trace/api.md#tracer>
use crate::{
trace::{
provider::{TracerProvider, TracerProviderInner},
provider::TracerProvider,
span::{Span, SpanData},
SpanLimits, SpanLinks,
},
Expand All @@ -20,15 +20,15 @@ use opentelemetry::{
Context, KeyValue,
};
use std::fmt;
use std::sync::{Arc, Weak};
use std::sync::Arc;

use super::SpanEvents;

/// `Tracer` implementation to create and manage spans
#[derive(Clone)]
pub struct Tracer {
instrumentation_lib: Arc<InstrumentationLibrary>,
provider: Weak<TracerProviderInner>,
provider: TracerProvider,
}

impl fmt::Debug for Tracer {
Expand All @@ -46,7 +46,7 @@ impl Tracer {
/// Create a new tracer (used internally by `TracerProvider`s).
pub(crate) fn new(
instrumentation_lib: Arc<InstrumentationLibrary>,
provider: Weak<TracerProviderInner>,
provider: TracerProvider,
) -> Self {
Tracer {
instrumentation_lib,
Expand All @@ -55,8 +55,8 @@ impl Tracer {
}

/// TracerProvider associated with this tracer.
pub(crate) fn provider(&self) -> Option<TracerProvider> {
self.provider.upgrade().map(TracerProvider::new)
pub(crate) fn provider(&self) -> &TracerProvider {
&self.provider
}

/// Instrumentation library information of this tracer.
Expand Down Expand Up @@ -175,7 +175,8 @@ impl opentelemetry::trace::Tracer for Tracer {
/// spans in the trace.
fn build_with_context(&self, mut builder: SpanBuilder, parent_cx: &Context) -> Self::Span {
let provider = self.provider();
if provider.is_none() {
// no point start a span if the tracer provider has already being shutdown
if provider.is_shutdown() {
return Span::new(
SpanContext::empty_context(),
None,
Expand All @@ -184,7 +185,6 @@ impl opentelemetry::trace::Tracer for Tracer {
);
}

let provider = provider.unwrap();
let config = provider.config();
let span_id = builder
.span_id
Expand Down
Loading