Skip to content

Commit

Permalink
Add more observable counter tests (#1818)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored May 24, 2024
1 parent f8ee551 commit 57f87d6
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 70 deletions.
3 changes: 3 additions & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
"Cijo",
"clippy",
"codecov",
"datapoint",
"deque",
"Dirkjan",
"EPYC",
"hasher",
"isahc",
"Isobel",
Expand All @@ -45,6 +47,7 @@
"msrv",
"mykey",
"myvalue",
"nocapture",
"Ochtman",
"opentelemetry",
"OTLP",
Expand Down
152 changes: 82 additions & 70 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,10 @@ mod tests {
use crate::metrics::reader::TemporalitySelector;
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
use opentelemetry::metrics::{Counter, UpDownCounter};
use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};

// Run all tests in this mod
// cargo test metrics::tests --features=metrics,testing
Expand Down Expand Up @@ -213,86 +214,94 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation() {
async fn observable_counter_aggregation_cumulative_non_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation --features=metrics,testing -- --nocapture
// cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4);
}

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_delta_non_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4);
}

// Act
let meter = meter_provider.meter("test");
let _counter = meter
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_cumulative_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"]
async fn observable_counter_aggregation_delta_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4);
}

fn observable_counter_aggregation_helper(
temporality: Temporality,
start: u64,
increment: u64,
length: u64,
) {
// Arrange
let mut test_context = TestContext::new(temporality);
// The Observable counter reports values[0], values[1],....values[n] on each flush.
let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
println!("Testing with observable values: {:?}", values);
let values = Arc::new(values);
let values_clone = values.clone();
let i = Arc::new(Mutex::new(0));
let _observable_counter = test_context
.meter()
.u64_observable_counter("my_observable_counter")
.with_unit("my_unit")
.with_callback(|observer| {
observer.observe(100, &[KeyValue::new("key1", "value1")]);
observer.observe(200, &[KeyValue::new("key1", "value2")]);
.with_callback(move |observer| {
let mut index = i.lock().unwrap();
if *index < values.len() {
observer.observe(values[*index], &[KeyValue::new("key1", "value1")]);
*index += 1;
}
})
.init();

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter");
assert_eq!(metric.unit, "my_unit");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 2);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce cumulative by default."
);

// find and validate key1=value1 datapoint
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|kv| kv.key.as_str() == "key1" && kv.value.as_str() == "value1")
{
data_point1 = Some(datapoint);
for (iter, v) in values_clone.iter().enumerate() {
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_observable_counter", None);
assert_eq!(sum.data_points.len(), 1);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}
}
assert_eq!(
data_point1
.expect("datapoint with key1=value1 expected")
.value,
100
);

// find and validate key1=value2 datapoint
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|kv| kv.key.as_str() == "key1" && kv.value.as_str() == "value2")
{
data_point1 = Some(datapoint);
// find and validate key1=value1 datapoint
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
if let Temporality::Cumulative = temporality {
// Cumulative counter should have the value as is.
assert_eq!(data_point.value, *v);
} else {
// Delta counter should have the increment value.
// Except for the first value which should be the start value.
if iter == 0 {
assert_eq!(data_point.value, start);
} else {
assert_eq!(data_point.value, increment);
}
}

test_context.reset_metrics();
}
assert_eq!(
data_point1
.expect("datapoint with key1=value2 expected")
.value,
200
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -566,7 +575,6 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
// #[ignore = "Spatial aggregation is not yet implemented."]
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing

Expand Down Expand Up @@ -1169,6 +1177,10 @@ mod tests {
updown_counter_builder.init()
}

fn meter(&self) -> Meter {
self.meter_provider.meter("test")
}

fn flush_metrics(&self) {
self.meter_provider.force_flush().unwrap();
}
Expand Down

0 comments on commit 57f87d6

Please sign in to comment.