diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 19dac050a4..6abe299a6f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -26,6 +26,7 @@ - `shutdown` methods in `LoggerProvider` and `LogProcessor` now takes a immutable reference - After `shutdown`, `LoggerProvider` will return noop `Logger` - After `shutdown`, `LogProcessor` will not process any new logs +- [#1644](https://github.com/open-telemetry/opentelemetry-rust/pull/1644) Fix cumulative aggregation for observable counters. ## v0.22.1 diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 83a6b07858..6997907253 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,3 +1,4 @@ +use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; use std::{ collections::{hash_map::Entry, HashMap}, @@ -14,30 +15,56 @@ use super::{ AtomicTracker, Number, }; +/// Abstracts the update operation for a measurement. +trait UpdateOperation { + fn update(a: &mut T, b: T); +} + +struct AddAssign; + +impl UpdateOperation for AddAssign +where + T: std::ops::AddAssign, +{ + fn update(a: &mut T, b: T) { + *a += b + } +} + +struct Assign; + +impl UpdateOperation for Assign { + fn update(a: &mut T, b: T) { + *a = b + } +} + /// The storage for sums. -struct ValueMap> { +struct ValueMap, O> { values: Mutex>, has_no_value_attribute_value: AtomicBool, no_attribute_value: T::AtomicTracker, + _operation: PhantomData, } -impl> Default for ValueMap { +impl, O: UpdateOperation> Default for ValueMap { fn default() -> Self { ValueMap::new() } } -impl> ValueMap { +impl, O: UpdateOperation> ValueMap { fn new() -> Self { ValueMap { values: Mutex::new(HashMap::new()), has_no_value_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), + _operation: PhantomData, } } } -impl> ValueMap { +impl, O: UpdateOperation> ValueMap { fn measure(&self, measurement: T, attrs: AttributeSet) { if attrs.is_empty() { self.no_attribute_value.add(measurement); @@ -47,8 +74,8 @@ impl> ValueMap { let size = values.len(); match values.entry(attrs) { Entry::Occupied(mut occupied_entry) => { - let sum = occupied_entry.get_mut(); - *sum += measurement; + let value = occupied_entry.get_mut(); + O::update(value, measurement); } Entry::Vacant(vacant_entry) => { if is_under_cardinality_limit(size) { @@ -56,7 +83,7 @@ impl> ValueMap { } else { values .entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone()) - .and_modify(|val| *val += measurement) + .and_modify(|val| O::update(val, measurement)) .or_insert(measurement); global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into())); } @@ -68,7 +95,7 @@ impl> ValueMap { /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum> { - value_map: ValueMap, + value_map: ValueMap, monotonic: bool, start: Mutex, } @@ -232,7 +259,7 @@ impl> Sum { /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum> { - value_map: ValueMap, + value_map: ValueMap, monotonic: bool, start: Mutex, reported: Mutex>, @@ -367,11 +394,6 @@ impl> PrecomputedSum { .data_points .reserve_exact(n - s_data.data_points.capacity()); } - let mut new_reported = HashMap::with_capacity(n); - let mut reported = match self.reported.lock() { - Ok(r) => r, - Err(_) => return (0, None), - }; if self .value_map @@ -387,24 +409,16 @@ impl> PrecomputedSum { }); } - let default = T::default(); for (attrs, value) in values.iter() { - let delta = *value - *reported.get(attrs).unwrap_or(&default); - if delta != default { - new_reported.insert(attrs.clone(), *value); - } s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: delta, + value: *value, exemplars: vec![], }); } - *reported = new_reported; - drop(reported); // drop before values guard is dropped - ( s_data.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>), diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 80ff89469b..6ad1c15fc0 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -72,6 +72,7 @@ mod tests { KeyValue, }; use std::borrow::Cow; + use std::sync::{Arc, Mutex}; // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! @@ -980,4 +981,190 @@ mod tests { .expect("Failed to cast aggregation to expected type") } } + + /// Observable counter in delta aggregation. + /// + /// ObservableCounter provides the current (i.e cumulative) value of the counter at the time of observation, + /// and the SDK is expected to remember the previous value, so that it can do cumulative to + /// delta conversion. + #[tokio::test(flavor = "multi_thread")] + async fn observable_counter_delta() { + // cargo test observable_counter_delta --features=metrics,testing -- --nocapture + + // Arrange + let test_context = TestContext::new(Some(Temporality::Delta)); + let meter_provider = test_context.meter_provider; + let exporter = test_context.exporter; + let meter = meter_provider.meter("test"); + let observable_counter = meter.u64_observable_counter("my_observable_counter").init(); + + // Act + // The Observable counter reports 100, 200, 300 and so on. + let i = Arc::new(Mutex::new(0)); + meter + .register_callback(&[observable_counter.as_any()], move |observer| { + let mut num = i.lock().unwrap(); + *num += 1; + + println!("Observable Counter is reporting: {}", *num * 100); + + observer.observe_u64( + &observable_counter, + *num * 100, + [ + KeyValue::new("statusCode", "200"), + KeyValue::new("verb", "get"), + ] + .as_ref(), + ); + }) + .expect("Expected to register callback"); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter"); + + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + assert_eq!( + sum.temporality, + data::Temporality::Delta, + "Should produce Delta as configured." + ); + + assert_eq!(sum.data_points.len(), 1); + + // find and validate the single datapoint + let data_point = &sum.data_points[0]; + assert_eq!(data_point.value, 100); + + // Flush again, to trigger next collection. + exporter.reset(); + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter"); + + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + assert_eq!(sum.data_points.len(), 1); + + // find and validate the single datapoint + let data_point = &sum.data_points[0]; + + // The second observation should be 100 as well, as temporality is delta + assert_eq!(data_point.value, 100); + } + + /// Tests Observable counter in cumulative aggregation. + /// + /// ObservableCounter provides the current (i.e Cumulative) value of the counter at the time of observation, + /// and the SDK is expected to aggregate the value as-is. + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_cumulative() { + // cargo test observable_counter_cumulative --features=metrics,testing -- --nocapture + + // Arrange + let test_context = TestContext::new(Some(Temporality::Cumulative)); + let meter_provider = test_context.meter_provider; + let exporter = test_context.exporter; + let meter = meter_provider.meter("test"); + let observable_counter = meter.u64_observable_counter("my_observable_counter").init(); + + // Act + // The Observable counter reports 100, 200, 300 and so on. + let i = Arc::new(Mutex::new(0)); + meter + .register_callback(&[observable_counter.as_any()], move |observer| { + let mut num = i.lock().unwrap(); + *num += 1; + + println!("Observable Counter is reporting: {}", *num * 100); + + observer.observe_u64( + &observable_counter, + *num * 100, + [ + KeyValue::new("statusCode", "200"), + KeyValue::new("verb", "get"), + ] + .as_ref(), + ); + }) + .expect("Expected to register callback"); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter"); + + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + assert_eq!( + sum.temporality, + data::Temporality::Cumulative, + "Should produce Cumulative by default." + ); + + assert_eq!(sum.data_points.len(), 1); + + // find and validate the single datapoint + let data_point = &sum.data_points[0]; + // 100 is the first observation. + assert_eq!(data_point.value, 100); + + // Flush again, to trigger next collection. + exporter.reset(); + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter"); + + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + assert_eq!(sum.data_points.len(), 1); + + // find and validate the single datapoint + let data_point = &sum.data_points[0]; + // The second observation should be 200 + assert_eq!(data_point.value, 200); + } } diff --git a/opentelemetry/src/metrics/instruments/counter.rs b/opentelemetry/src/metrics/instruments/counter.rs index c10ac70405..33439182b1 100644 --- a/opentelemetry/src/metrics/instruments/counter.rs +++ b/opentelemetry/src/metrics/instruments/counter.rs @@ -82,7 +82,7 @@ impl fmt::Debug for ObservableCounter { } impl ObservableCounter { - /// Records an increment to the counter. + /// Records the absolute value of the counter. /// /// It is only valid to call this within a callback. If called outside of the /// registered callback it should have no effect on the instrument, and an diff --git a/opentelemetry/src/metrics/instruments/up_down_counter.rs b/opentelemetry/src/metrics/instruments/up_down_counter.rs index 5c92025661..6530191d8e 100644 --- a/opentelemetry/src/metrics/instruments/up_down_counter.rs +++ b/opentelemetry/src/metrics/instruments/up_down_counter.rs @@ -88,7 +88,7 @@ impl ObservableUpDownCounter { ObservableUpDownCounter(inner) } - /// Records the increment or decrement to the counter. + /// Records the absolute value of the counter. /// /// It is only valid to call this within a callback. If called outside of the /// registered callback it should have no effect on the instrument, and an