Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simpler usage of Observable instruments #1715

Merged
95 changes: 43 additions & 52 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,21 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
);

// Create a ObservableCounter instrument and register a callback that reports the measurement.
let observable_counter = meter
let _observable_counter = meter
.u64_observable_counter("my_observable_counter")
.with_description("My observable counter example description")
.with_unit(Unit::new("myunit"))
.with_callback(|observer| {
observer.observe(
100,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still type-safe. i.e passing 100.0 will be blocked by compiler. But the API itself is more consistent with sync versions, where we don't use record_i64 or record_u64 etc. instead they are just record,add.

&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})
.init();

meter.register_callback(&[observable_counter.as_any()], move |observer| {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Community Meeting 5/7:

Lalit, Zhongyang:

  1. Mark this under unstable flag.
  2. Unregister callback is only available with this.
  3. This is part of API, so higher bar to change than SDK/View/Exporter
  4. Removing this and adding in future might be difficult to achieve without breaking change.

Zhongyang: Opposed to removing it, because it was part of documented API, not because it is required.
Suggestion: Mark as unstable, and make it just compile - so let user take the risk. (docs are showing the right way).
Okay with: If this is too complex to achieve, Zhongyang won't block from proceeding with removal.

Lalit: +1 to above.

observer.observe_u64(
&observable_counter,
100,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})?;

// Create a UpCounter Instrument.
let updown_counter = meter.i64_up_down_counter("my_updown_counter").init();

Expand All @@ -73,23 +71,21 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
);

// Create a Observable UpDownCounter instrument and register a callback that reports the measurement.
let observable_up_down_counter = meter
let _observable_up_down_counter = meter
.i64_observable_up_down_counter("my_observable_updown_counter")
.with_description("My observable updown counter example description")
.with_unit(Unit::new("myunit"))
.with_callback(|observer| {
observer.observe(
100,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})
.init();

meter.register_callback(&[observable_up_down_counter.as_any()], move |observer| {
observer.observe_i64(
&observable_up_down_counter,
100,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})?;

// Create a Histogram Instrument.
let histogram = meter
.f64_histogram("my_histogram")
Expand All @@ -108,41 +104,36 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Note that there is no ObservableHistogram instrument.

// Create a Gauge Instrument.
{
let gauge = meter
.f64_gauge("my_gauge")
.with_description("A gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

gauge.record(
1.0,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
);
}
let gauge = meter
.f64_gauge("my_gauge")
.with_description("A gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

gauge.record(
1.0,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
);

// Create a ObservableGauge instrument and register a callback that reports the measurement.
let observable_gauge = meter
let _observable_gauge = meter
.f64_observable_gauge("my_observable_gauge")
.with_description("An observable gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.with_callback(|observer| {
observer.observe(
1.0,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})
.init();

// Register a callback that reports the measurement.
meter.register_callback(&[observable_gauge.as_any()], move |observer| {
observer.observe_f64(
&observable_gauge,
1.0,
&[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
],
)
})?;

// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
Expand Down
7 changes: 2 additions & 5 deletions opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let tracer = global::tracer("ex.com/basic");
let meter = global::meter("ex.com/basic");

let gauge = meter
let _gauge = meter
.f64_observable_gauge("ex.com.one")
.with_description("A gauge set to 1.0")
.with_callback(|observer| observer.observe(1.0, COMMON_ATTRIBUTES.as_ref()))
.init();

meter.register_callback(&[gauge.as_any()], move |observer| {
observer.observe_f64(&gauge, 1.0, COMMON_ATTRIBUTES.as_ref())
})?;

let histogram = meter.f64_histogram("ex.com.two").init();
histogram.record(5.5, COMMON_ATTRIBUTES.as_ref());

Expand Down
122 changes: 99 additions & 23 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,91 @@ mod tests {
);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation --features=metrics,testing -- --nocapture

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("my_observable_counter")
.with_unit(Unit::new("my_unit"))
.with_callback(|observer| {
observer.observe(100, &[KeyValue::new("key1", "value1")]);
observer.observe(200, &[KeyValue::new("key1", "value2")]);
})
.init();

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter");
assert_eq!(metric.unit.as_str(), "my_unit");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 2);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce cumulative by default."
);

// find and validate key1=value1 datapoint
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1")
{
data_point1 = Some(datapoint);
}
}
assert_eq!(
data_point1
.expect("datapoint with key1=value1 expected")
.value,
100
);

// find and validate key1=value2 datapoint
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2")
{
data_point1 = Some(datapoint);
}
}
assert_eq!(
data_point1
.expect("datapoint with key1=value2 expected")
.value,
200
);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -445,7 +530,7 @@ mod tests {
// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Spatial aggregation is not yet implemented."]
// #[ignore = "Spatial aggregation is not yet implemented."]
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing

Expand All @@ -465,43 +550,34 @@ mod tests {

// Act
let meter = meter_provider.meter("test");
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();

// Normally, these callbacks would generate 3 time-series, but since the view
// drops all attributes, we expect only 1 time-series.
meter
.register_callback(&[observable_counter.as_any()], move |observer| {
observer.observe_u64(
&observable_counter,
let _observable_counter = meter
.u64_observable_counter("my_observable_counter")
.with_callback(|observer| {
observer.observe(
100,
[
&[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "get"),
]
.as_ref(),
],
);

observer.observe_u64(
&observable_counter,
observer.observe(
100,
[
&[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "post"),
]
.as_ref(),
],
);

observer.observe_u64(
&observable_counter,
observer.observe(
100,
[
&[
KeyValue::new("statusCode", "500"),
KeyValue::new("verb", "get"),
]
.as_ref(),
],
);
})
.expect("Expected to register callback");
.init();

meter_provider.force_flush().unwrap();

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,12 @@ mod tests {
// Act
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let counter = meter.u64_observable_counter("testcounter").init();
meter
.register_callback(&[counter.as_any()], move |_| {
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
sender.send(()).expect("channel should still be open");
})
.expect("callback registration should succeed");
.init();

_ = meter_provider.force_flush();

Expand Down