From b481cf260d032d4321d487aae40c3a5611a368a2 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 6 May 2024 16:08:53 -0700 Subject: [PATCH 1/3] Simpler usage of Observable instruments --- examples/metrics-basic/src/main.rs | 22 ++++--- opentelemetry-sdk/src/metrics/mod.rs | 85 ++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 12 deletions(-) diff --git a/examples/metrics-basic/src/main.rs b/examples/metrics-basic/src/main.rs index 3a2aa1432d..5db332c8b1 100644 --- a/examples/metrics-basic/src/main.rs +++ b/examples/metrics-basic/src/main.rs @@ -43,23 +43,21 @@ async fn main() -> Result<(), Box> { ); // Create a ObservableCounter instrument and register a callback that reports the measurement. - let observable_counter = meter + let _observable_counter = meter .u64_observable_counter("my_observable_counter") .with_description("My observable counter example description") .with_unit(Unit::new("myunit")) + .with_callback(|observer| { + observer.observe( + 100, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ) + }) .init(); - meter.register_callback(&[observable_counter.as_any()], move |observer| { - observer.observe_u64( - &observable_counter, - 100, - &[ - KeyValue::new("mykey1", "myvalue1"), - KeyValue::new("mykey2", "myvalue2"), - ], - ) - })?; - // Create a UpCounter Instrument. let updown_counter = meter.i64_up_down_counter("my_updown_counter").init(); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 80ff89469b..52706f32e6 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -163,6 +163,91 @@ mod tests { ); } + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation --features=metrics,testing -- --nocapture + + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let _counter = meter + .u64_observable_counter("my_observable_counter") + .with_unit(Unit::new("my_unit")) + .with_callback(|observer| { + observer.observe(100, &[KeyValue::new("key1", "value1")]); + observer.observe(200, &[KeyValue::new("key1", "value2")]); + }) + .init(); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + // Expecting 2 time-series. + assert_eq!(sum.data_points.len(), 2); + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!( + sum.temporality, + data::Temporality::Cumulative, + "Should produce cumulative by default." + ); + + // find and validate key1=value1 datapoint + let mut data_point1 = None; + for datapoint in &sum.data_points { + if datapoint + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1") + { + data_point1 = Some(datapoint); + } + } + assert_eq!( + data_point1 + .expect("datapoint with key1=value1 expected") + .value, + 100 + ); + + // find and validate key1=value2 datapoint + let mut data_point1 = None; + for datapoint in &sum.data_points { + if datapoint + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2") + { + data_point1 = Some(datapoint); + } + } + assert_eq!( + data_point1 + .expect("datapoint with key1=value2 expected") + .value, + 200 + ); + } + // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! #[tokio::test(flavor = "multi_thread", worker_threads = 1)] From b5d7f1c488f586cdf764ee1a3a453d58b13328e5 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 6 May 2024 17:53:19 -0700 Subject: [PATCH 2/3] fix all simple examples --- examples/metrics-basic/src/main.rs | 49 +++++++++++++----------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/examples/metrics-basic/src/main.rs b/examples/metrics-basic/src/main.rs index 5db332c8b1..5faae46888 100644 --- a/examples/metrics-basic/src/main.rs +++ b/examples/metrics-basic/src/main.rs @@ -71,23 +71,21 @@ async fn main() -> Result<(), Box> { ); // Create a Observable UpDownCounter instrument and register a callback that reports the measurement. - let observable_up_down_counter = meter + let _observable_up_down_counter = meter .i64_observable_up_down_counter("my_observable_updown_counter") .with_description("My observable updown counter example description") .with_unit(Unit::new("myunit")) + .with_callback(|observer| { + observer.observe( + 100, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ) + }) .init(); - meter.register_callback(&[observable_up_down_counter.as_any()], move |observer| { - observer.observe_i64( - &observable_up_down_counter, - 100, - &[ - KeyValue::new("mykey1", "myvalue1"), - KeyValue::new("mykey2", "myvalue2"), - ], - ) - })?; - // Create a Histogram Instrument. let histogram = meter .f64_histogram("my_histogram") @@ -106,8 +104,7 @@ async fn main() -> Result<(), Box> { // Note that there is no ObservableHistogram instrument. // Create a Gauge Instrument. - { - let gauge = meter + let gauge = meter .f64_gauge("my_gauge") .with_description("A gauge set to 1.0") .with_unit(Unit::new("myunit")) @@ -120,27 +117,23 @@ async fn main() -> Result<(), Box> { KeyValue::new("mykey2", "myvalue2"), ], ); - } // Create a ObservableGauge instrument and register a callback that reports the measurement. - let observable_gauge = meter + let _observable_gauge = meter .f64_observable_gauge("my_observable_gauge") .with_description("An observable gauge set to 1.0") .with_unit(Unit::new("myunit")) + .with_callback(|observer| { + observer.observe( + 1.0, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ) + }) .init(); - // Register a callback that reports the measurement. - meter.register_callback(&[observable_gauge.as_any()], move |observer| { - observer.observe_f64( - &observable_gauge, - 1.0, - &[ - KeyValue::new("mykey1", "myvalue1"), - KeyValue::new("mykey2", "myvalue2"), - ], - ) - })?; - // Metrics are exported by default every 30 seconds when using stdout exporter, // however shutting down the MeterProvider here instantly flushes // the metrics, instead of waiting for the 30 sec interval. From 1c466ccedd93e1e5d752d55376c814a9b826d9c8 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 6 May 2024 18:13:56 -0700 Subject: [PATCH 3/3] fix all examples and test to use simpler callback --- examples/metrics-basic/src/main.rs | 24 ++++++------ .../examples/basic-otlp/src/main.rs | 7 +--- opentelemetry-sdk/src/metrics/mod.rs | 37 +++++++------------ .../src/metrics/periodic_reader.rs | 8 ++-- 4 files changed, 32 insertions(+), 44 deletions(-) diff --git a/examples/metrics-basic/src/main.rs b/examples/metrics-basic/src/main.rs index 5faae46888..d88f979a0b 100644 --- a/examples/metrics-basic/src/main.rs +++ b/examples/metrics-basic/src/main.rs @@ -105,18 +105,18 @@ async fn main() -> Result<(), Box> { // Create a Gauge Instrument. let gauge = meter - .f64_gauge("my_gauge") - .with_description("A gauge set to 1.0") - .with_unit(Unit::new("myunit")) - .init(); - - gauge.record( - 1.0, - &[ - KeyValue::new("mykey1", "myvalue1"), - KeyValue::new("mykey2", "myvalue2"), - ], - ); + .f64_gauge("my_gauge") + .with_description("A gauge set to 1.0") + .with_unit(Unit::new("myunit")) + .init(); + + gauge.record( + 1.0, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ); // Create a ObservableGauge instrument and register a callback that reports the measurement. let _observable_gauge = meter diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index 2f228dcc72..97d2fd498f 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -115,15 +115,12 @@ async fn main() -> Result<(), Box> { let tracer = global::tracer("ex.com/basic"); let meter = global::meter("ex.com/basic"); - let gauge = meter + let _gauge = meter .f64_observable_gauge("ex.com.one") .with_description("A gauge set to 1.0") + .with_callback(|observer| observer.observe(1.0, COMMON_ATTRIBUTES.as_ref())) .init(); - meter.register_callback(&[gauge.as_any()], move |observer| { - observer.observe_f64(&gauge, 1.0, COMMON_ATTRIBUTES.as_ref()) - })?; - let histogram = meter.f64_histogram("ex.com.two").init(); histogram.record(5.5, COMMON_ATTRIBUTES.as_ref()); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 52706f32e6..ddd1f36c4b 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -530,7 +530,7 @@ mod tests { // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - #[ignore = "Spatial aggregation is not yet implemented."] + // #[ignore = "Spatial aggregation is not yet implemented."] async fn spatial_aggregation_when_view_drops_attributes_observable_counter() { // cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing @@ -550,43 +550,34 @@ mod tests { // Act let meter = meter_provider.meter("test"); - let observable_counter = meter.u64_observable_counter("my_observable_counter").init(); - - // Normally, these callbacks would generate 3 time-series, but since the view - // drops all attributes, we expect only 1 time-series. - meter - .register_callback(&[observable_counter.as_any()], move |observer| { - observer.observe_u64( - &observable_counter, + let _observable_counter = meter + .u64_observable_counter("my_observable_counter") + .with_callback(|observer| { + observer.observe( 100, - [ + &[ KeyValue::new("statusCode", "200"), KeyValue::new("verb", "get"), - ] - .as_ref(), + ], ); - observer.observe_u64( - &observable_counter, + observer.observe( 100, - [ + &[ KeyValue::new("statusCode", "200"), KeyValue::new("verb", "post"), - ] - .as_ref(), + ], ); - observer.observe_u64( - &observable_counter, + observer.observe( 100, - [ + &[ KeyValue::new("statusCode", "500"), KeyValue::new("verb", "get"), - ] - .as_ref(), + ], ); }) - .expect("Expected to register callback"); + .init(); meter_provider.force_flush().unwrap(); diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 954874294a..b0ea070513 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -428,12 +428,12 @@ mod tests { // Act let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); let meter = meter_provider.meter("test"); - let counter = meter.u64_observable_counter("testcounter").init(); - meter - .register_callback(&[counter.as_any()], move |_| { + let _counter = meter + .u64_observable_counter("testcounter") + .with_callback(move |_| { sender.send(()).expect("channel should still be open"); }) - .expect("callback registration should succeed"); + .init(); _ = meter_provider.force_flush();