Skip to content

Commit

Permalink
Proposal: create metrics from traces and agent based sampling (#17)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
  • Loading branch information
Hartigan and cijothomas committed Mar 20, 2024
1 parent e23abe4 commit a84e207
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 22 deletions.
6 changes: 6 additions & 0 deletions opentelemetry-datadog/CHANGELOG.md
Expand Up @@ -12,6 +12,12 @@

- Bump opentelemetry version to 0.22, opentelemetry_sdk version to 0.22

### Changed

- allow send all traces to `datadog-agent` with `agent-sampling` feature.
- allow `datadog-agent` generate metrics from spans for [APM](https://docs.datadoghq.com/tracing/metrics/).


## v0.9.0

### Changed
Expand Down
6 changes: 6 additions & 0 deletions opentelemetry-datadog/Cargo.toml
Expand Up @@ -19,6 +19,7 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[features]
agent-sampling = []
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"]
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]

Expand Down Expand Up @@ -49,3 +50,8 @@ opentelemetry_sdk = { workspace = true, features = ["trace", "testing"] }
[[example]]
name = "datadog"
path = "examples/datadog.rs"

[[example]]
name = "agent_sampling"
path = "examples/agent_sampling.rs"
required-features = ["agent-sampling"]
1 change: 1 addition & 0 deletions opentelemetry-datadog/README.md
Expand Up @@ -24,6 +24,7 @@ to [`Datadog`].

`opentelemetry-datadog` supports following features:

- `agent-sampling`: move decision making about sampling to `datadog-agent` (see `agent_sampling.rs` example).
- `reqwest-blocking-client`: use `reqwest` blocking http client to send spans.
- `reqwest-client`: use `reqwest` http client to send spans.
- `surf-client`: use `surf` http client to send spans.
Expand Down
78 changes: 78 additions & 0 deletions opentelemetry-datadog/examples/agent_sampling.rs
@@ -0,0 +1,78 @@
use opentelemetry::{
global::{self, shutdown_tracer_provider},
trace::{SamplingResult, Span, TraceContextExt, Tracer},
Key,
};
use opentelemetry_datadog::{new_pipeline, ApiVersion, DatadogTraceStateBuilder};
use opentelemetry_sdk::trace::{self, RandomIdGenerator, ShouldSample};
use std::thread;
use std::time::Duration;

fn bar() {
let tracer = global::tracer("component-bar");
let mut span = tracer.start("bar");
span.set_attribute(Key::new("span.type").string("sql"));
span.set_attribute(Key::new("sql.query").string("SELECT * FROM table"));
thread::sleep(Duration::from_millis(6));
span.end()
}

#[derive(Debug, Clone)]
struct AgentBasedSampler;

impl ShouldSample for AgentBasedSampler {
fn should_sample(
&self,
parent_context: Option<&opentelemetry::Context>,
_trace_id: opentelemetry::trace::TraceId,
_name: &str,
_span_kind: &opentelemetry::trace::SpanKind,
_attributes: &[opentelemetry::KeyValue],
_links: &[opentelemetry::trace::Link],
) -> opentelemetry::trace::SamplingResult {
let trace_state = parent_context
.map(
|parent_context| parent_context.span().span_context().trace_state().clone(), // inherit sample decision from parent span
)
.unwrap_or_else(|| {
DatadogTraceStateBuilder::default()
.with_priority_sampling(true) // always sample root span(span without remote or local parent)
.with_measuring(true) // datadog-agent will create metric for this span for APM
.build()
});

SamplingResult {
decision: opentelemetry::trace::SamplingDecision::RecordAndSample, // send all spans to datadog-agent
attributes: vec![],
trace_state,
}
}
}

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let tracer = new_pipeline()
.with_service_name("agent-sampling-demo")
.with_api_version(ApiVersion::Version05)
.with_trace_config(
trace::config()
.with_sampler(AgentBasedSampler)
.with_id_generator(RandomIdGenerator::default()),
)
.install_simple()?;

tracer.in_span("foo", |cx| {
let span = cx.span();
span.set_attribute(Key::new("span.type").string("web"));
span.set_attribute(Key::new("http.url").string("http://localhost:8080/foo"));
span.set_attribute(Key::new("http.method").string("GET"));
span.set_attribute(Key::new("http.status_code").i64(200));

thread::sleep(Duration::from_millis(6));
bar();
thread::sleep(Duration::from_millis(6));
});

shutdown_tracer_provider();

Ok(())
}
3 changes: 3 additions & 0 deletions opentelemetry-datadog/src/exporter/model/mod.rs
Expand Up @@ -20,6 +20,9 @@ mod v05;
// https://github.com/DataDog/dd-trace-js/blob/c89a35f7d27beb4a60165409376e170eacb194c5/packages/dd-trace/src/constants.js#L4
static SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1";

// https://github.com/DataDog/datadog-agent/blob/ec96f3c24173ec66ba235bda7710504400d9a000/pkg/trace/traceutil/span.go#L20
static DD_MEASURED_KEY: &str = "_dd.measured";

/// Custom mapping between opentelemetry spans and datadog spans.
///
/// User can provide custom function to change the mapping. It currently supports customizing the following
Expand Down
42 changes: 32 additions & 10 deletions opentelemetry-datadog/src/exporter/model/v05.rs
@@ -1,13 +1,15 @@
use crate::exporter::intern::StringInterner;
use crate::exporter::model::SAMPLING_PRIORITY_KEY;
use crate::exporter::model::{DD_MEASURED_KEY, SAMPLING_PRIORITY_KEY};
use crate::exporter::{Error, ModelConfig};
use crate::propagator::DatadogTraceState;
use opentelemetry::trace::Status;
use opentelemetry_sdk::export::trace::SpanData;
use std::time::SystemTime;

use super::unified_tags::{UnifiedTagField, UnifiedTags};

const SPAN_NUM_ELEMENTS: u32 = 12;
const METRICS_LEN: u32 = 2;
const GIT_META_TAGS_COUNT: u32 = if matches!(
(
option_env!("DD_GIT_REPOSITORY_URL"),
Expand Down Expand Up @@ -125,6 +127,28 @@ fn write_unified_tag(
Ok(())
}

#[cfg(not(feature = "agent-sampling"))]
fn get_sampling_priority(_span: &SpanData) -> f64 {
1.0
}

#[cfg(feature = "agent-sampling")]
fn get_sampling_priority(span: &SpanData) -> f64 {
if span.span_context.trace_state().priority_sampling_enabled() {
1.0
} else {
0.0
}
}

fn get_measuring(span: &SpanData) -> f64 {
if span.span_context.trace_state().measuring_enabled() {
1.0
} else {
0.0
}
}

fn encode_traces<S, N, R>(
interner: &mut StringInterner,
model_config: &ModelConfig,
Expand Down Expand Up @@ -228,16 +252,14 @@ where
rmp::encode::write_u32(&mut encoded, interner.intern(commit_sha))?;
}

rmp::encode::write_map_len(&mut encoded, 1)?;
rmp::encode::write_map_len(&mut encoded, METRICS_LEN)?;
rmp::encode::write_u32(&mut encoded, interner.intern(SAMPLING_PRIORITY_KEY))?;
rmp::encode::write_f64(
&mut encoded,
if span.span_context.is_sampled() {
1.0
} else {
0.0
},
)?;
let sampling_priority = get_sampling_priority(&span);
rmp::encode::write_f64(&mut encoded, sampling_priority)?;

rmp::encode::write_u32(&mut encoded, interner.intern(DD_MEASURED_KEY))?;
let measuring = get_measuring(&span);
rmp::encode::write_f64(&mut encoded, measuring)?;
rmp::encode::write_u32(&mut encoded, span_type)?;
}
}
Expand Down

0 comments on commit a84e207

Please sign in to comment.