Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more observable counter tests #1818

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
"Cijo",
"clippy",
"codecov",
"datapoint",
"deque",
"Dirkjan",
"EPYC",
"hasher",
"isahc",
"Isobel",
Expand All @@ -45,6 +47,7 @@
"msrv",
"mykey",
"myvalue",
"nocapture",
"Ochtman",
"opentelemetry",
"OTLP",
Expand Down
152 changes: 82 additions & 70 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,10 @@
use crate::metrics::reader::TemporalitySelector;
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
use opentelemetry::metrics::{Counter, UpDownCounter};
use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};

// Run all tests in this mod
// cargo test metrics::tests --features=metrics,testing
Expand Down Expand Up @@ -213,86 +214,94 @@
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation() {
async fn observable_counter_aggregation_cumulative_non_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation --features=metrics,testing -- --nocapture
// cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
Copy link
Member

@lalitb lalitb May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - I know this is the existing comment. Do we know what output tester may want to see with nocapture as mentioned in this comment. Not blocker, just curious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

println() text is not captured in test output without this. It is used when debugging things quickly. Sometimes it is a lot easier that way, as opposed to running with RUST_FLAG to show stack when tests panic!

observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4);
}

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_delta_non_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4);
}

// Act
let meter = meter_provider.meter("test");
let _counter = meter
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_cumulative_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"]
async fn observable_counter_aggregation_delta_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4);
}

Check warning on line 243 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L239-L243

Added lines #L239 - L243 were not covered by tests

fn observable_counter_aggregation_helper(
temporality: Temporality,
start: u64,
increment: u64,
length: u64,
) {
// Arrange
let mut test_context = TestContext::new(temporality);
// The Observable counter reports values[0], values[1],....values[n] on each flush.
let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
println!("Testing with observable values: {:?}", values);
let values = Arc::new(values);
let values_clone = values.clone();
let i = Arc::new(Mutex::new(0));
let _observable_counter = test_context
.meter()
.u64_observable_counter("my_observable_counter")
.with_unit("my_unit")
.with_callback(|observer| {
observer.observe(100, &[KeyValue::new("key1", "value1")]);
observer.observe(200, &[KeyValue::new("key1", "value2")]);
.with_callback(move |observer| {
let mut index = i.lock().unwrap();
if *index < values.len() {
observer.observe(values[*index], &[KeyValue::new("key1", "value1")]);
*index += 1;
}
})
.init();

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter");
assert_eq!(metric.unit, "my_unit");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 2);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce cumulative by default."
);

// find and validate key1=value1 datapoint
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|kv| kv.key.as_str() == "key1" && kv.value.as_str() == "value1")
{
data_point1 = Some(datapoint);
for (iter, v) in values_clone.iter().enumerate() {
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_observable_counter", None);
assert_eq!(sum.data_points.len(), 1);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"

Check warning on line 281 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L281

Added line #L281 was not covered by tests
);
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}
}
assert_eq!(
data_point1
.expect("datapoint with key1=value1 expected")
.value,
100
);

// find and validate key1=value2 datapoint
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|kv| kv.key.as_str() == "key1" && kv.value.as_str() == "value2")
{
data_point1 = Some(datapoint);
// find and validate key1=value1 datapoint
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
if let Temporality::Cumulative = temporality {
// Cumulative counter should have the value as is.
assert_eq!(data_point.value, *v);
} else {
// Delta counter should have the increment value.
// Except for the first value which should be the start value.
if iter == 0 {
assert_eq!(data_point.value, start);
} else {
assert_eq!(data_point.value, increment);
}
}

test_context.reset_metrics();
}
assert_eq!(
data_point1
.expect("datapoint with key1=value2 expected")
.value,
200
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -566,7 +575,6 @@
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
// #[ignore = "Spatial aggregation is not yet implemented."]
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing

Expand Down Expand Up @@ -1169,6 +1177,10 @@
updown_counter_builder.init()
}

fn meter(&self) -> Meter {
self.meter_provider.meter("test")
}

fn flush_metrics(&self) {
self.meter_provider.force_flush().unwrap();
}
Expand Down
Loading