Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a method (and benchmark) to Context to speed up active span determination by 7x #1140

Merged
merged 5 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions opentelemetry-api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::sync::Arc;

thread_local! {
static CURRENT_CONTEXT: RefCell<Context> = RefCell::new(Context::default());
static DEFAULT_CONTEXT: Context = Context::default();
}

/// An execution-scoped collection of values.
Expand Down Expand Up @@ -107,7 +106,19 @@ impl Context {
/// do_work()
/// ```
pub fn current() -> Self {
get_current(|cx| cx.clone())
Context::map_current(|cx| cx.clone())
}

/// Applys a function to the current context returning its value.
///
/// This can be used to build higher performing algebraic expressions for
/// optionally creating a new context without the overhead of cloning the
/// current one and dropping it.
///
/// Note: This function will panic if you attempt to attach another context
/// while the current one is still borrowed.
pub fn map_current<T>(f: impl FnOnce(&Context) -> T) -> T {
CURRENT_CONTEXT.with(|cx| f(&cx.borrow()))
}

/// Returns a clone of the current thread's context with the given value.
Expand Down Expand Up @@ -318,16 +329,6 @@ impl Drop for ContextGuard {
}
}

/// Executes a closure with a reference to this thread's current context.
///
/// Note: This function will panic if you attempt to attach another context
/// while the context is still borrowed.
fn get_current<F: FnMut(&Context) -> T, T>(mut f: F) -> T {
CURRENT_CONTEXT
.try_with(|cx| f(&cx.borrow()))
.unwrap_or_else(|_| DEFAULT_CONTEXT.with(|cx| f(cx)))
}

/// With TypeIds as keys, there's no need to hash them. They are already hashes
/// themselves, coming from the compiler. The IdHasher holds the u64 of
/// the TypeId, and then returns it, instead of doing any bit fiddling.
Expand Down Expand Up @@ -373,11 +374,23 @@ mod tests {
let current = Context::current();
assert_eq!(current.get(), Some(&ValueA("a")));
assert_eq!(current.get(), Some(&ValueB(42)));

assert!(Context::map_current(|cx| {
assert_eq!(cx.get(), Some(&ValueA("a")));
assert_eq!(cx.get(), Some(&ValueB(42)));
true
}));
}

// Resets to only value `a` when inner guard is dropped
let current = Context::current();
assert_eq!(current.get(), Some(&ValueA("a")));
assert_eq!(current.get::<ValueB>(), None);

assert!(Context::map_current(|cx| {
assert_eq!(cx.get(), Some(&ValueA("a")));
assert_eq!(cx.get::<ValueB>(), None);
true
}));
}
}
4 changes: 2 additions & 2 deletions opentelemetry-api/src/propagation/text_map_propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
/// [`Context`]: crate::Context
/// [`Injector`]: crate::propagation::Injector
fn inject(&self, injector: &mut dyn Injector) {
self.inject_context(&Context::current(), injector)
Context::map_current(|cx| self.inject_context(cx, injector))

Check warning on line 21 in opentelemetry-api/src/propagation/text_map_propagator.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-api/src/propagation/text_map_propagator.rs#L21

Added line #L21 was not covered by tests
}

/// Properly encodes the values of the [`Context`] and injects them into the
Expand All @@ -35,7 +35,7 @@
/// [`Context`]: crate::Context
/// [`Injector`]: crate::propagation::Extractor
fn extract(&self, extractor: &dyn Extractor) -> Context {
self.extract_with_context(&Context::current(), extractor)
Context::map_current(|cx| self.extract_with_context(cx, extractor))
}

/// Retrieves encoded data using the provided [`Extractor`]. If no data for this
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-api/src/trace/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub trait Tracer {
where
T: Into<Cow<'static, str>>,
{
self.build_with_context(SpanBuilder::from_name(name), &Context::current())
Context::map_current(|cx| self.start_with_context(name, cx))
}

/// Starts a new [`Span`] with a given context.
Expand Down Expand Up @@ -169,7 +169,7 @@ pub trait Tracer {

/// Start a [`Span`] from a [`SpanBuilder`].
fn build(&self, builder: SpanBuilder) -> Self::Span {
self.build_with_context(builder, &Context::current())
Context::map_current(|cx| self.build_with_context(builder, cx))
}

/// Start a span from a [`SpanBuilder`] with a parent context.
Expand Down Expand Up @@ -382,7 +382,7 @@ impl SpanBuilder {

/// Builds a span with the given tracer from this configuration.
pub fn start<T: Tracer>(self, tracer: &T) -> T::Span {
tracer.build_with_context(self, &Context::current())
Context::map_current(|cx| tracer.build_with_context(self, cx))
}

/// Builds a span with the given tracer from this configuration and parent.
Expand Down
8 changes: 6 additions & 2 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ rustdoc-args = ["--cfg", "docsrs"]

[dev-dependencies]
indexmap = "1.8"
criterion = { version = "0.4.0", features = ["html_reports"] }
pprof = { version = "0.11.1", features = ["flamegraph", "criterion"] }
criterion = { version = "0.5", features = ["html_reports"] }
pprof = { version = "0.12", features = ["flamegraph", "criterion"] }

[features]
default = ["trace"]
Expand All @@ -51,6 +51,10 @@ rt-tokio = ["tokio", "tokio-stream"]
rt-tokio-current-thread = ["tokio", "tokio-stream"]
rt-async-std = ["async-std"]

[[bench]]
name = "context"
harness = false

[[bench]]
name = "key_value_map"
harness = false
Expand Down
94 changes: 94 additions & 0 deletions opentelemetry-sdk/benches/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::fmt::Display;

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use futures_util::future::BoxFuture;
use opentelemetry_api::{
trace::{TraceContextExt, Tracer, TracerProvider},
Context,
};
use opentelemetry_sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
trace as sdktrace,
};
use pprof::criterion::{Output, PProfProfiler};

fn criterion_benchmark(c: &mut Criterion) {
benchmark_group(c, BenchmarkParameter::NoActiveSpan);
benchmark_group(c, BenchmarkParameter::WithActiveSpan);
}

fn benchmark_group(c: &mut Criterion, p: BenchmarkParameter) {
let _guard = match p {
BenchmarkParameter::NoActiveSpan => None,
BenchmarkParameter::WithActiveSpan => {
let (provider, tracer) = tracer();
let guard = Context::current_with_span(tracer.start("span")).attach();
Some((guard, provider))
}
};

let mut group = c.benchmark_group("context");

group.bench_function(BenchmarkId::new("baseline current()", p), |b| {
b.iter(|| {
black_box(Context::current());
})
});

group.bench_function(BenchmarkId::new("current().has_active_span()", p), |b| {
b.iter(|| {
black_box(Context::current().has_active_span());
})
});

group.bench_function(
BenchmarkId::new("map_current(|cx| cx.has_active_span())", p),
|b| {
b.iter(|| {
black_box(Context::map_current(|cx| cx.has_active_span()));
})
},
);

group.finish();
}

#[derive(Copy, Clone)]
enum BenchmarkParameter {
NoActiveSpan,
WithActiveSpan,
}

impl Display for BenchmarkParameter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
BenchmarkParameter::NoActiveSpan => write!(f, "no-active-span"),
BenchmarkParameter::WithActiveSpan => write!(f, "with-active-span"),
}
}
}

fn tracer() -> (sdktrace::TracerProvider, sdktrace::Tracer) {
let provider = sdktrace::TracerProvider::builder()
.with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOn))
.with_simple_exporter(NoopExporter)
.build();
let tracer = provider.tracer(module_path!());
(provider, tracer)
}

#[derive(Debug)]
struct NoopExporter;

impl SpanExporter for NoopExporter {
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(futures_util::future::ready(Ok(())))
}
}

criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = criterion_benchmark
}
criterion_main!(benches);
19 changes: 11 additions & 8 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
};
use opentelemetry_api::{
global::{self},
logs::{LogRecord, LogResult},
logs::{LogRecord, LogResult, TraceContext},
trace::TraceContextExt,
Context, InstrumentationLibrary,
};
Expand Down Expand Up @@ -206,16 +206,19 @@
Some(provider) => provider,
None => return,
};

let trace_context = if self.include_trace_context {
Context::map_current(|cx| {
cx.has_active_span()
.then(|| TraceContext::from(cx.span().span_context()))
})

Check warning on line 213 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L209-L213

Added lines #L209 - L213 were not covered by tests
} else {
None

Check warning on line 215 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L215

Added line #L215 was not covered by tests
};
let config = provider.config();
for processor in provider.log_processors() {
let mut record = record.clone();
if self.include_trace_context {
let ctx = Context::current();
if ctx.has_active_span() {
let span = ctx.span();
record.trace_context = Some(span.span_context().into());
}
if let Some(ref trace_context) = trace_context {
record.trace_context = Some(trace_context.clone())

Check warning on line 221 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L220-L221

Added lines #L220 - L221 were not covered by tests
}
let data = LogData {
record,
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/propagation/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ mod tests {
let mut injector: HashMap<String, String> = HashMap::new();
injector.set(TRACESTATE_HEADER, state.to_string());

propagator.inject_context(&Context::current(), &mut injector);
Context::map_current(|cx| propagator.inject_context(cx, &mut injector));

assert_eq!(Extractor::get(&injector, TRACESTATE_HEADER), Some(state))
}
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-sdk/src/trace/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,16 @@ mod tests {
let span = tracer.span_builder("must_not_be_sampled").start(&tracer);
assert!(!span.span_context().is_sampled());

let _attached = Context::current()
.with_remote_span_context(SpanContext::new(
let context = Context::map_current(|cx| {
cx.with_remote_span_context(SpanContext::new(
TraceId::from_u128(1),
SpanId::from_u64(1),
TraceFlags::default(),
true,
Default::default(),
))
.attach();
});
let _attached = context.attach();
let span = tracer.span_builder("must_not_be_sampled").start(&tracer);

assert!(!span.span_context().is_sampled());
Expand Down
Loading