Skip to content

experiment: prototype measurement processor #2797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions examples/metrics-advanced/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use opentelemetry::global;
use opentelemetry::{global, Context};
use opentelemetry::Key;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::{Aggregation, Instrument, SdkMeterProvider, Stream, Temporality};
use opentelemetry_sdk::metrics::{Aggregation, Instrument, MeasurementProcessor, SdkMeterProvider, Stream, Temporality};
use opentelemetry_sdk::Resource;
use std::error::Error;

@@ -57,6 +57,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
.with_view(my_view_rename_and_unit)
.with_view(my_view_drop_attributes)
.with_view(my_view_change_aggregation)
.with_measurement_processor(UserTypeMeasurementProcessor)
.build();
global::set_meter_provider(provider.clone());
provider
@@ -128,6 +129,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
],
);

// Enrich the next measurement with user type
let guard = Context::current_with_value(UserType::Admin).attach();
histogram2.record(
1.2,
&[
@@ -137,6 +140,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
KeyValue::new("mykey4", "myvalue4"),
],
);
drop(guard);

histogram2.record(
1.23,
@@ -154,3 +158,31 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
meter_provider.shutdown()?;
Ok(())
}



enum UserType {
User,
Admin,
}

impl UserType {
fn as_str(&self) -> &'static str {
match self {
UserType::User => "user",
UserType::Admin => "admin",
}
}
}

struct UserTypeMeasurementProcessor;

impl MeasurementProcessor for UserTypeMeasurementProcessor {
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>> {
Context::current().get::<UserType>().map(|user_type| {
let mut attrs = attributes.to_vec();
attrs.push(KeyValue::new("user_type", user_type.as_str()));
attrs
})
}
}
103 changes: 85 additions & 18 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
@@ -5,11 +5,11 @@ use std::{
sync::{Arc, Mutex},
time::SystemTime,
};

use std::borrow::Cow;
use crate::metrics::{data::Aggregation, Temporality};
use opentelemetry::time::now;
use opentelemetry::KeyValue;

use crate::metrics::measurement_processor::MeasurementProcessor;
use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
@@ -106,21 +106,51 @@ type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
#[derive(Clone)]
pub(crate) struct AttributeSetFilter {
filter: Option<Filter>,
processor: Option<Arc<dyn MeasurementProcessor>>
}

impl AttributeSetFilter {
pub(crate) fn new(filter: Option<Filter>) -> Self {
Self { filter }
pub(crate) fn new(filter: Option<Filter>, processors: Vec<Arc<dyn MeasurementProcessor>>) -> Self {
Self { filter, processor: AggregateProcessor::try_create(processors) }
}

pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
if let Some(filter) = &self.filter {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
run(&filtered_attrs);
} else {
run(attrs);
};
match (&self.filter, &self.processor) {
(None, None) => {
run(attrs);
},
(Some(filter), None) => {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();

run(&filtered_attrs);
},
(None, Some(processor)) => {
let attributes = Cow::Borrowed(attrs);

match processor.process(&attributes) {
Some(attributes) => {
run(&attributes);
}
None => {
run(attrs);
}
}
},
(Some(filter), Some(processor)) => {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();

match processor.process(&filtered_attrs) {
Some(attributes) => {
run(&attributes);
}
None => {
run(attrs);
}
}
}
}
}
}

@@ -137,10 +167,10 @@ pub(crate) struct AggregateBuilder<T> {
}

impl<T: Number> AggregateBuilder<T> {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>, processors: Vec<Arc<dyn MeasurementProcessor>>) -> Self {
AggregateBuilder {
temporality,
filter: AttributeSetFilter::new(filter),
filter: AttributeSetFilter::new(filter, processors),
_marker: marker::PhantomData,
}
}
@@ -201,6 +231,43 @@ impl<T: Number> AggregateBuilder<T> {
}
}


#[derive(Clone)]
struct AggregateProcessor(Arc<Vec<Arc<dyn MeasurementProcessor>>>);

impl AggregateProcessor {
fn try_create(
processors: Vec<Arc<dyn MeasurementProcessor>>,
) -> Option<Arc<dyn MeasurementProcessor>> {

match processors.len() {
0 => return None,
1 => Some(processors[0].clone()),
_ => Some(Arc::new(AggregateProcessor(Arc::new(processors)))),
}
}
}

impl MeasurementProcessor for AggregateProcessor {
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>> {
// Do not allocate if not necessary.
let mut new_attributes: Option<Vec<KeyValue>> = None;

for processor in self.0.iter() {
let existing_or_new = match &new_attributes {
Some(new) => new,
None => attributes
};

if let Some(new) = processor.process(existing_or_new) {
new_attributes = Some(new);
}
}

new_attributes
}
}

#[cfg(test)]
mod tests {
use crate::metrics::data::{
@@ -214,7 +281,7 @@ mod tests {
#[test]
fn last_value_aggregation() {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
AggregateBuilder::<u64>::new(Temporality::Cumulative, None, vec![]).last_value(None);
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
@@ -240,7 +307,7 @@ mod tests {
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
AggregateBuilder::<u64>::new(temporality, None, vec![]).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
@@ -282,7 +349,7 @@ mod tests {
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).sum(true);
AggregateBuilder::<u64>::new(temporality, None, vec![]).sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
@@ -323,7 +390,7 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None, vec![])
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
@@ -366,7 +433,7 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None, vec![])
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
13 changes: 13 additions & 0 deletions opentelemetry-sdk/src/metrics/measurement_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use opentelemetry::KeyValue;

/// A trait for processing measurement attributes.
pub trait MeasurementProcessor: Send + Sync + 'static {

/// Processes the attributes of a measurement.
///
/// The processor might decide to modify the attributes. In that case, it returns
/// `Some` with the modified attributes. If no attribute modification is needed,
/// it returns `None`.
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>>;
}

13 changes: 9 additions & 4 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
@@ -15,10 +15,7 @@ use opentelemetry::{
use crate::error::OTelSdkResult;
use crate::Resource;

use super::{
exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
reader::MetricReader, view::View, PeriodicReader,
};
use super::{exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View, MeasurementProcessor, PeriodicReader};

/// Handles the creation and coordination of [Meter]s.
///
@@ -223,6 +220,7 @@ pub struct MeterProviderBuilder {
resource: Option<Resource>,
readers: Vec<Box<dyn MetricReader>>,
views: Vec<Arc<dyn View>>,
measurement_processors: Vec<Arc<dyn MeasurementProcessor>>,
}

impl MeterProviderBuilder {
@@ -291,6 +289,12 @@ impl MeterProviderBuilder {
self
}

/// Associates a [MeasurementProcessor] with a [MeterProvider].
pub fn with_measurement_processor<T: MeasurementProcessor>(mut self, processor: T) -> Self {
self.measurement_processors.push(Arc::new(processor));
self
}

/// Construct a new [MeterProvider] with this configuration.
pub fn build(self) -> SdkMeterProvider {
otel_debug!(
@@ -304,6 +308,7 @@ impl MeterProviderBuilder {
self.resource.unwrap_or(Resource::builder().build()),
self.readers,
self.views,
self.measurement_processors,
)),
meters: Default::default(),
shutdown_invoked: AtomicBool::new(false),
4 changes: 4 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -61,6 +61,10 @@ pub(crate) mod view;
#[cfg(any(feature = "testing", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
pub mod in_memory_exporter;

mod measurement_processor;
pub use measurement_processor::MeasurementProcessor;

#[cfg(any(feature = "testing", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
7 changes: 5 additions & 2 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ use crate::{

use self::internal::AggregateFns;

use super::{Aggregation, Temporality};
use super::{Aggregation, MeasurementProcessor, Temporality};

/// Connects all of the instruments created by a meter provider to a [MetricReader].
///
@@ -39,6 +39,7 @@ pub struct Pipeline {
reader: Box<dyn MetricReader>,
views: Vec<Arc<dyn View>>,
inner: Mutex<PipelineInner>,
processors: Vec<Arc<dyn MeasurementProcessor>>,
}

impl fmt::Debug for Pipeline {
@@ -385,7 +386,7 @@ where
.clone()
.map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);

let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter);
let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter, self.pipeline.processors.clone());
let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
Ok(Some(inst)) => inst,
other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error
@@ -621,6 +622,7 @@ impl Pipelines {
res: Resource,
readers: Vec<Box<dyn MetricReader>>,
views: Vec<Arc<dyn View>>,
processors: Vec<Arc<dyn MeasurementProcessor>>,
) -> Self {
let mut pipes = Vec::with_capacity(readers.len());
for r in readers {
@@ -629,6 +631,7 @@ impl Pipelines {
reader: r,
views: views.clone(),
inner: Default::default(),
processors: processors.clone(),
});
p.reader.register_pipeline(Arc::downgrade(&p));
pipes.push(p);
Loading
Oops, something went wrong.