Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API support for bound instruments #1444

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
13 changes: 13 additions & 0 deletions opentelemetry-sdk/benches/metric_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ fn counter_add(c: &mut Criterion) {
);
});
});

let bounded_counter = counter.bind(&[
KeyValue::new("attribute2", attribute_values[0]),
KeyValue::new("attribute3", attribute_values[1]),
KeyValue::new("attribute1", attribute_values[2]),
KeyValue::new("attribute4", attribute_values[3]),
]);

c.bench_function("Counter_Bounded", |b| {
b.iter(|| {
bounded_counter.add(1);
});
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
80 changes: 59 additions & 21 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync::Arc};

use opentelemetry::metrics::BoundSyncUpDownCounter;
use opentelemetry::{
metrics::{
AsyncInstrument, MetricsError, Result, SyncCounter, SyncGauge, SyncHistogram,
SyncUpDownCounter, Unit,
AsyncInstrument, BoundSyncCounter, MetricsError, Result, SyncCounter, SyncGauge,
SyncHistogram, SyncUpDownCounter, Unit,
},
Key, KeyValue,
};

use crate::{
attributes::AttributeSet,
instrumentation::Scope,
metrics::{aggregation::Aggregation, internal::Measure},
};
use crate::metrics::internal::{BoundedMeasure, MeasureSet};
use crate::{attributes::AttributeSet, instrumentation::Scope, metrics::aggregation::Aggregation};

pub(crate) const EMPTY_MEASURE_MSG: &str = "no aggregators for observable instrument";

Expand Down Expand Up @@ -255,37 +253,77 @@
}

pub(crate) struct ResolvedMeasures<T> {
pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
pub(crate) measure_sets: Vec<MeasureSet<T>>,
}

impl<T: Copy + 'static> SyncCounter<T> for ResolvedMeasures<T> {
fn add(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
for set in &self.measure_sets {
set.measure.call(val, AttributeSet::from(attrs))
}
}

fn bind(&self, attributes: &[KeyValue]) -> Arc<dyn BoundSyncCounter<T> + Send + Sync> {
Arc::new(BoundResolveMeasures::new(attributes, &self.measure_sets))
}
}

impl<T: Copy + 'static> SyncUpDownCounter<T> for ResolvedMeasures<T> {
fn add(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
for set in &self.measure_sets {
set.measure.call(val, AttributeSet::from(attrs))
}
}

fn bind(&self, attributes: &[KeyValue]) -> Arc<dyn BoundSyncUpDownCounter<T> + Send + Sync> {
Arc::new(BoundResolveMeasures::new(attributes, &self.measure_sets))
}
}

impl<T: Copy + 'static> SyncGauge<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
for set in &self.measure_sets {
set.measure.call(val, AttributeSet::from(attrs))

Check warning on line 286 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L285-L286

Added lines #L285 - L286 were not covered by tests
}
}
}

impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for set in &self.measure_sets {
set.measure.call(val, AttributeSet::from(attrs))
}
}
}

pub(crate) struct BoundResolveMeasures<T> {
pub(crate) measures: Vec<Arc<dyn BoundedMeasure<T>>>,
}

impl<T: Copy + 'static> BoundResolveMeasures<T> {
fn new(attrs: &[KeyValue], measure_sets: &[MeasureSet<T>]) -> Self {
let attrs = AttributeSet::from(attrs);
let measures = measure_sets
.iter()
.map(|s| s.bound_measure_generator.generate(attrs.clone()))
.collect::<Vec<_>>();

BoundResolveMeasures { measures }
}
}

impl<T: Copy + 'static> BoundSyncCounter<T> for BoundResolveMeasures<T> {
fn add(&self, value: T) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
measure.call(value);
}
}
}

impl<T: Copy + 'static> BoundSyncUpDownCounter<T> for BoundResolveMeasures<T> {
fn add(&self, value: T) {
for measure in &self.measures {
measure.call(value);
}
}
}
Expand Down Expand Up @@ -328,7 +366,7 @@
#[derive(Clone)]
pub(crate) struct Observable<T> {
pub(crate) id: ObservableId<T>,
measures: Vec<Arc<dyn Measure<T>>>,
measure_sets: Vec<MeasureSet<T>>,
}

impl<T> Observable<T> {
Expand All @@ -338,7 +376,7 @@
name: Cow<'static, str>,
description: Cow<'static, str>,
unit: Unit,
measures: Vec<Arc<dyn Measure<T>>>,
measures: Vec<MeasureSet<T>>,

Check warning on line 379 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L379

Added line #L379 was not covered by tests
) -> Self {
Self {
id: ObservableId {
Expand All @@ -351,7 +389,7 @@
},
_marker: marker::PhantomData,
},
measures,
measure_sets: measures,

Check warning on line 392 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L392

Added line #L392 was not covered by tests
}
}

Expand All @@ -362,7 +400,7 @@
/// any aggregators. Also, an error is returned if scope defines a Meter other
/// than the observable it was created by.
pub(crate) fn registerable(&self, scope: &Scope) -> Result<()> {
if self.measures.is_empty() {
if self.measure_sets.is_empty() {

Check warning on line 403 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L403

Added line #L403 was not covered by tests
return Err(MetricsError::Other(EMPTY_MEASURE_MSG.into()));
}
if &self.id.inner.scope != scope {
Expand All @@ -378,8 +416,8 @@

impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
fn observe(&self, measurement: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(measurement, AttributeSet::from(attrs))
for set in &self.measure_sets {
set.measure.call(measurement, AttributeSet::from(attrs))

Check warning on line 420 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L419-L420

Added lines #L419 - L420 were not covered by tests
}
}

Expand Down