Skip to content

Commit

Permalink
Fix observable counter cumulative aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
stormshield-fabs committed May 6, 2024
1 parent ccd5f08 commit 12e0a03
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 25 deletions.
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- `shutdown` methods in `LoggerProvider` and `LogProcessor` now takes a immutable reference
- After `shutdown`, `LoggerProvider` will return noop `Logger`
- After `shutdown`, `LogProcessor` will not process any new logs
- [#1644](https://github.com/open-telemetry/opentelemetry-rust/pull/1644) Fix cumulative aggregation for observable counters.

## v0.22.1

Expand Down
60 changes: 37 additions & 23 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
collections::{hash_map::Entry, HashMap},
Expand All @@ -14,30 +15,56 @@ use super::{
AtomicTracker, Number,
};

/// Abstracts the update operation for a measurement.
trait UpdateOperation<T> {
fn update(a: &mut T, b: T);
}

struct AddAssign;

impl<T> UpdateOperation<T> for AddAssign
where
T: std::ops::AddAssign<T>,
{
fn update(a: &mut T, b: T) {
*a += b
}
}

struct Assign;

impl<T> UpdateOperation<T> for Assign {
fn update(a: &mut T, b: T) {
*a = b
}
}

/// The storage for sums.
struct ValueMap<T: Number<T>> {
struct ValueMap<T: Number<T>, O> {
values: Mutex<HashMap<AttributeSet, T>>,
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
_operation: PhantomData<O>,
}

impl<T: Number<T>> Default for ValueMap<T> {
impl<T: Number<T>, O: UpdateOperation<T>> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}
}

impl<T: Number<T>> ValueMap<T> {
impl<T: Number<T>, O: UpdateOperation<T>> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
values: Mutex::new(HashMap::new()),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
_operation: PhantomData,
}
}
}

impl<T: Number<T>> ValueMap<T> {
impl<T: Number<T>, O: UpdateOperation<T>> ValueMap<T, O> {
fn measure(&self, measurement: T, attrs: AttributeSet) {
if attrs.is_empty() {
self.no_attribute_value.add(measurement);
Expand All @@ -47,16 +74,16 @@ impl<T: Number<T>> ValueMap<T> {
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
let sum = occupied_entry.get_mut();
*sum += measurement;
let value = occupied_entry.get_mut();
O::update(value, measurement);
}
Entry::Vacant(vacant_entry) => {
if is_under_cardinality_limit(size) {
vacant_entry.insert(measurement);
} else {
values
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
.and_modify(|val| *val += measurement)
.and_modify(|val| O::update(val, measurement))
.or_insert(measurement);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
}
Expand All @@ -68,7 +95,7 @@ impl<T: Number<T>> ValueMap<T> {

/// Summarizes a set of measurements made as their arithmetic sum.
pub(crate) struct Sum<T: Number<T>> {
value_map: ValueMap<T>,
value_map: ValueMap<T, AddAssign>,
monotonic: bool,
start: Mutex<SystemTime>,
}
Expand Down Expand Up @@ -232,7 +259,7 @@ impl<T: Number<T>> Sum<T> {

/// Summarizes a set of pre-computed sums as their arithmetic sum.
pub(crate) struct PrecomputedSum<T: Number<T>> {
value_map: ValueMap<T>,
value_map: ValueMap<T, Assign>,
monotonic: bool,
start: Mutex<SystemTime>,
reported: Mutex<HashMap<AttributeSet, T>>,
Expand Down Expand Up @@ -367,11 +394,6 @@ impl<T: Number<T>> PrecomputedSum<T> {
.data_points
.reserve_exact(n - s_data.data_points.capacity());
}
let mut new_reported = HashMap::with_capacity(n);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
};

if self
.value_map
Expand All @@ -387,24 +409,16 @@ impl<T: Number<T>> PrecomputedSum<T> {
});
}

let default = T::default();
for (attrs, value) in values.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
value: *value,
exemplars: vec![],
});
}

*reported = new_reported;
drop(reported); // drop before values guard is dropped

(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
Expand Down
187 changes: 187 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod tests {
KeyValue,
};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
Expand Down Expand Up @@ -980,4 +981,190 @@ mod tests {
.expect("Failed to cast aggregation to expected type")
}
}

/// Observable counter in delta aggregation.
///
/// ObservableCounter provides the current (i.e cumulative) value of the counter at the time of observation,
/// and the SDK is expected to remember the previous value, so that it can do cumulative to
/// delta conversion.
#[tokio::test(flavor = "multi_thread")]
async fn observable_counter_delta() {
// cargo test observable_counter_delta --features=metrics,testing -- --nocapture

// Arrange
let test_context = TestContext::new(Some(Temporality::Delta));
let meter_provider = test_context.meter_provider;
let exporter = test_context.exporter;
let meter = meter_provider.meter("test");
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();

// Act
// The Observable counter reports 100, 200, 300 and so on.
let i = Arc::new(Mutex::new(0));
meter
.register_callback(&[observable_counter.as_any()], move |observer| {
let mut num = i.lock().unwrap();
*num += 1;

println!("Observable Counter is reporting: {}", *num * 100);

observer.observe_u64(
&observable_counter,
*num * 100,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "get"),
]
.as_ref(),
);
})
.expect("Expected to register callback");

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter");

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

assert_eq!(
sum.temporality,
data::Temporality::Delta,
"Should produce Delta as configured."
);

assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];
assert_eq!(data_point.value, 100);

// Flush again, to trigger next collection.
exporter.reset();
meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter");

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];

// The second observation should be 100 as well, as temporality is delta
assert_eq!(data_point.value, 100);
}

/// Tests Observable counter in cumulative aggregation.
///
/// ObservableCounter provides the current (i.e Cumulative) value of the counter at the time of observation,
/// and the SDK is expected to aggregate the value as-is.
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_cumulative() {
// cargo test observable_counter_cumulative --features=metrics,testing -- --nocapture

// Arrange
let test_context = TestContext::new(Some(Temporality::Cumulative));
let meter_provider = test_context.meter_provider;
let exporter = test_context.exporter;
let meter = meter_provider.meter("test");
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();

// Act
// The Observable counter reports 100, 200, 300 and so on.
let i = Arc::new(Mutex::new(0));
meter
.register_callback(&[observable_counter.as_any()], move |observer| {
let mut num = i.lock().unwrap();
*num += 1;

println!("Observable Counter is reporting: {}", *num * 100);

observer.observe_u64(
&observable_counter,
*num * 100,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "get"),
]
.as_ref(),
);
})
.expect("Expected to register callback");

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter");

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce Cumulative by default."
);

assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];
// 100 is the first observation.
assert_eq!(data_point.value, 100);

// Flush again, to trigger next collection.
exporter.reset();
meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");

assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter");

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];
// The second observation should be 200
assert_eq!(data_point.value, 200);
}
}
2 changes: 1 addition & 1 deletion opentelemetry/src/metrics/instruments/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<T> fmt::Debug for ObservableCounter<T> {
}

impl<T> ObservableCounter<T> {
/// Records an increment to the counter.
/// Records the absolute value of the counter.
///
/// It is only valid to call this within a callback. If called outside of the
/// registered callback it should have no effect on the instrument, and an
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/metrics/instruments/up_down_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<T> ObservableUpDownCounter<T> {
ObservableUpDownCounter(inner)
}

/// Records the increment or decrement to the counter.
/// Records the absolute value of the counter.
///
/// It is only valid to call this within a callback. If called outside of the
/// registered callback it should have no effect on the instrument, and an
Expand Down

0 comments on commit 12e0a03

Please sign in to comment.