From 96be7f2ffde7dd8a0f1d24c5278abf51a0348c73 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Thu, 12 Sep 2024 13:21:06 +0300 Subject: [PATCH] Unify Histogram and ExpHistogram aggregation --- opentelemetry-sdk/benches/metric.rs | 46 ++- .../internal/attribute_set_aggregation.rs | 183 +++++++++ .../metrics/internal/exponential_histogram.rs | 358 ++++++++---------- .../src/metrics/internal/histogram.rs | 345 +++++------------ opentelemetry-sdk/src/metrics/internal/mod.rs | 17 +- opentelemetry-sdk/src/metrics/internal/sum.rs | 13 + opentelemetry-sdk/src/metrics/mod.rs | 7 +- 7 files changed, 493 insertions(+), 476 deletions(-) create mode 100644 opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 88143bccff..23d05008e9 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, Weak}; use criterion::{criterion_group, criterion_main, Bencher, Criterion}; use opentelemetry::{ + global, metrics::{Counter, Histogram, MeterProvider as _, Result}, Key, KeyValue, }; @@ -344,7 +345,7 @@ fn counters(c: &mut Criterion) { const MAX_BOUND: usize = 100000; fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram) { - let mut bounds = vec![0; bound_count]; + let mut bounds: Vec = vec![0; bound_count]; #[allow(clippy::needless_range_loop)] for i in 0..bounds.len() { bounds[i] = i * MAX_BOUND / bound_count @@ -394,22 +395,35 @@ fn histograms(c: &mut Criterion) { ); } } - group.bench_function("CollectOne", |b| benchmark_collect_histogram(b, 1)); - group.bench_function("CollectFive", |b| benchmark_collect_histogram(b, 5)); - group.bench_function("CollectTen", |b| benchmark_collect_histogram(b, 10)); - group.bench_function("CollectTwentyFive", |b| benchmark_collect_histogram(b, 25)); + for metrics_size in [1, 5, 20] { + for attr_sets in [1, 5, 20, 200] { + group.bench_function( + format!("Collect{metrics_size}Metric{attr_sets}AttrSets"), + |b| benchmark_collect_histogram(b, metrics_size, attr_sets), + ); + } + } } -fn benchmark_collect_histogram(b: &mut Bencher, n: usize) { +fn benchmark_collect_histogram(b: &mut Bencher, metrics_size: usize, attr_sets: usize) { let r = SharedReader(Arc::new(ManualReader::default())); - let mtr = SdkMeterProvider::builder() - .with_reader(r.clone()) - .build() - .meter("sdk/metric/bench/histogram"); - - for i in 0..n { - let h = mtr.u64_histogram(format!("fake_data_{i}")).init(); - h.record(1, &[]); + let provider = SdkMeterProvider::builder().with_reader(r.clone()).build(); + let mtr = provider.meter("sdk/metric/bench/histogram"); + global::set_meter_provider(provider); + + let mut rng = rand::thread_rng(); + for m in 0..metrics_size { + let h = mtr.u64_histogram(format!("fake_data_{m}")).init(); + for _att in 0..attr_sets { + let mut attributes: Vec = Vec::new(); + for _i in 0..rng.gen_range(0..3) { + attributes.push(KeyValue::new( + format!("K{}", rng.gen_range::(0..10)), + format!("V{}", rng.gen_range::(0..10)), + )) + } + h.record(1, &attributes) + } } let mut rm = ResourceMetrics { @@ -418,8 +432,8 @@ fn benchmark_collect_histogram(b: &mut Bencher, n: usize) { }; b.iter(|| { - let _ = r.collect(&mut rm); - assert_eq!(rm.scope_metrics[0].metrics.len(), n); + r.collect(&mut rm).unwrap(); + assert_eq!(rm.scope_metrics[0].metrics.len(), metrics_size); }) } diff --git a/opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs b/opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs new file mode 100644 index 0000000000..0a11187343 --- /dev/null +++ b/opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs @@ -0,0 +1,183 @@ +use std::{ + collections::HashMap, + fmt::Debug, + ops::Deref, + sync::{Arc, Mutex, RwLock}, +}; + +use opentelemetry::{global, metrics::MetricsError, KeyValue}; + +use crate::metrics::AttributeSet; + +use super::{ + aggregate::is_under_cardinality_limit, Number, STREAM_OVERFLOW_ATTRIBUTES, + STREAM_OVERFLOW_ATTRIBUTES_ERR, +}; + +/// Aggregator interface +pub(crate) trait Aggregator: Debug + Clone +where + T: Number, +{ + /// A static configuration that is needed by configurators. + /// E.g. bucket_size at creation time and buckets list at aggregator update. + type Config; + + /// Called everytime a new attribute-set is stored. + fn create(init: &Self::Config) -> Self; + + /// Called for each measurement. + fn update(&mut self, config: &Self::Config, measurement: T); +} + +/// hashing and sorting is expensive, so we have two lists +/// sorted list is mainly needed for fast collection phase +struct WithAttribsAggregators { + // put all attribute combinations in this list + all: HashMap, Arc>>, + sorted: HashMap, Arc>>, +} + +/// This class is responsible for two things: +/// * send measurement information for specific aggregator (per attribute-set) +/// * collect all attribute-sets + aggregators (either readonly OR reset) +/// +/// Even though it's simple to understand it's responsibility, +/// implementation is a lot more complex to make it very performant. +pub(crate) struct AttributeSetAggregation +where + T: Number, + A: Aggregator, +{ + /// Aggregator for values with no attributes attached. + no_attribs: Mutex>, + list: RwLock>, + /// Configuration required to create and update the [`Aggregator`] + config: A::Config, +} + +impl AttributeSetAggregation +where + T: Number, + A: Aggregator, +{ + /// Initiate aggregators by specifing [`Aggregator`] configuration. + pub(crate) fn new(init_data: A::Config) -> Self { + Self { + no_attribs: Mutex::new(None), + list: RwLock::new(WithAttribsAggregators { + all: Default::default(), + sorted: Default::default(), + }), + config: init_data, + } + } + + /// Update specific aggregator depending on provided attributes. + pub(crate) fn measure(&self, attrs: &[KeyValue], measurement: T) { + if attrs.is_empty() { + if let Ok(mut aggr) = self.no_attribs.lock() { + aggr.get_or_insert_with(|| A::create(&self.config)) + .update(&self.config, measurement); + } + return; + } + let Ok(list) = self.list.read() else { + return; + }; + if let Some(aggr) = list.all.get(attrs) { + if let Ok(mut aggr) = aggr.lock() { + aggr.update(&self.config, measurement); + } + return; + } + drop(list); + let Ok(mut list) = self.list.write() else { + return; + }; + + // Recheck again in case another thread already inserted + if let Some(aggr) = list.all.get(attrs) { + if let Ok(mut aggr) = aggr.lock() { + aggr.update(&self.config, measurement); + } + } else if is_under_cardinality_limit(list.all.len()) { + let mut aggr = A::create(&self.config); + aggr.update(&self.config, measurement); + let aggr = Arc::new(Mutex::new(aggr)); + list.all.insert(attrs.into(), aggr.clone()); + let sorted_attribs = AttributeSet::from(attrs).into_vec(); + list.sorted.insert(sorted_attribs, aggr); + } else if let Some(aggr) = list.sorted.get(STREAM_OVERFLOW_ATTRIBUTES.deref()) { + if let Ok(mut aggr) = aggr.lock() { + aggr.update(&self.config, measurement); + } + } else { + let mut aggr = A::create(&self.config); + aggr.update(&self.config, measurement); + list.sorted.insert( + STREAM_OVERFLOW_ATTRIBUTES.clone(), + Arc::new(Mutex::new(aggr)), + ); + global::handle_error(MetricsError::Other(STREAM_OVERFLOW_ATTRIBUTES_ERR.into())); + } + } + + /// Iterate through all attribute sets and populate `DataPoints`in readonly mode. + pub(crate) fn collect_readonly(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, A) -> Res, + { + let Ok(list) = self.list.read() else { + return; + }; + prepare_data(dest, list.sorted.len()); + if let Ok(aggr) = self.no_attribs.lock() { + if let Some(aggr) = aggr.deref() { + dest.push(map_fn(Default::default(), aggr.clone())); + } + }; + dest.extend( + list.sorted + .iter() + .filter_map(|(k, v)| v.lock().ok().map(|v| map_fn(k.clone(), v.clone()))), + ) + } + + /// Iterate through all attribute sets and populate `DataPoints`, while also consuming (reseting) aggregators + pub(crate) fn collect_and_reset(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, A) -> Res, + { + let Ok(mut list) = self.list.write() else { + return; + }; + prepare_data(dest, list.sorted.len()); + if let Ok(mut aggr) = self.no_attribs.lock() { + if let Some(aggr) = aggr.take() { + dest.push(map_fn(Default::default(), aggr)); + } + }; + list.all.clear(); + dest.extend(list.sorted.drain().filter_map(|(k, v)| { + Arc::try_unwrap(v) + .expect("this is last instance, so we cannot fail to get it") + .into_inner() + .ok() + .map(|v| map_fn(k, v)) + })); + } + + pub(crate) fn config(&self) -> &A::Config { + &self.config + } +} + +/// Clear and allocate exactly required amount of space for all attribute-sets +fn prepare_data(data: &mut Vec, list_len: usize) { + data.clear(); + let total_len = list_len + 1; // to account for no_attributes case + if total_len > data.capacity() { + data.reserve_exact(total_len - data.capacity()); + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index c23b441663..39249d273d 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,63 +1,46 @@ -use std::{collections::HashMap, f64::consts::LOG2_E, sync::Mutex, time::SystemTime}; +use std::{f64::consts::LOG2_E, sync::Mutex, time::SystemTime}; use once_cell::sync::Lazy; use opentelemetry::{metrics::MetricsError, KeyValue}; -use crate::{ - metrics::data::{self, Aggregation, Temporality}, - metrics::AttributeSet, -}; +use crate::metrics::data::{self, Aggregation, Temporality}; -use super::Number; +use super::{ + attribute_set_aggregation::{Aggregator, AttributeSetAggregation}, + Number, +}; pub(crate) const EXPO_MAX_SCALE: i8 = 20; pub(crate) const EXPO_MIN_SCALE: i8 = -10; +struct HistConfig { + max_size: i32, + max_scale: i8, + record_min_max: bool, + record_sum: bool, +} + /// A single data point in an exponential histogram. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] struct ExpoHistogramDataPoint { count: usize, min: T, max: T, sum: T, - - max_size: i32, - record_min_max: bool, - record_sum: bool, - scale: i8, - pos_buckets: ExpoBuckets, neg_buckets: ExpoBuckets, zero_count: u64, } -impl ExpoHistogramDataPoint { - fn new(max_size: i32, max_scale: i8, record_min_max: bool, record_sum: bool) -> Self { - ExpoHistogramDataPoint { - count: 0, - min: T::max(), - max: T::min(), - sum: T::default(), - max_size, - record_min_max, - record_sum, - scale: max_scale, - pos_buckets: ExpoBuckets::default(), - neg_buckets: ExpoBuckets::default(), - zero_count: 0, - } - } -} - impl ExpoHistogramDataPoint { /// Adds a new measurement to the histogram. /// /// It will rescale the buckets if needed. - fn record(&mut self, v: T) { + fn record(&mut self, config: &HistConfig, v: T) { self.count += 1; - if self.record_min_max { + if config.record_min_max { if v < self.min { self.min = v; } @@ -65,7 +48,7 @@ impl ExpoHistogramDataPoint { self.max = v; } } - if self.record_sum { + if config.record_sum { self.sum += v; } @@ -90,7 +73,7 @@ impl ExpoHistogramDataPoint { }; scale_change( - self.max_size, + config.max_size, bin, bucket.start_bin, bucket.counts.len() as i32, @@ -222,7 +205,7 @@ fn frexp(x: f64) -> (f64, i32) { } /// A set of buckets in an exponential histogram. -#[derive(Default, Debug, PartialEq)] +#[derive(Default, Debug, PartialEq, Clone)] struct ExpoBuckets { start_bin: i32, counts: Vec, @@ -306,19 +289,45 @@ impl ExpoBuckets { } } +impl Aggregator for ExpoHistogramDataPoint +where + T: Number, +{ + type Config = HistConfig; + + fn create(config: &HistConfig) -> Self { + ExpoHistogramDataPoint { + count: 0, + min: T::max(), + max: T::min(), + sum: T::default(), + scale: config.max_scale, + pos_buckets: ExpoBuckets::default(), + neg_buckets: ExpoBuckets::default(), + zero_count: 0, + } + } + + fn update(&mut self, config: &HistConfig, value: T) { + let f_value = value.into_float(); + // Ignore NaN and infinity. + if f_value.is_infinite() || f_value.is_nan() { + return; + } + self.record(config, value); + } +} + /// An aggregator that summarizes a set of measurements as an exponential /// histogram. /// /// Each histogram is scoped by attributes and the aggregation cycle the /// measurements were made in. -pub(crate) struct ExpoHistogram { - record_sum: bool, - record_min_max: bool, - max_size: i32, - max_scale: i8, - - values: Mutex>>, - +pub(crate) struct ExpoHistogram +where + T: Number, +{ + aggregators: AttributeSetAggregation>, start: Mutex, } @@ -331,34 +340,18 @@ impl ExpoHistogram { record_sum: bool, ) -> Self { ExpoHistogram { - record_sum, - record_min_max, - max_size: max_size as i32, - max_scale, - values: Mutex::new(HashMap::default()), + aggregators: AttributeSetAggregation::new(HistConfig { + max_size: max_size as i32, + max_scale, + record_min_max, + record_sum, + }), start: Mutex::new(SystemTime::now()), } } pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) { - let f_value = value.into_float(); - // Ignore NaN and infinity. - if f_value.is_infinite() || f_value.is_nan() { - return; - } - - let attrs: AttributeSet = attrs.into(); - if let Ok(mut values) = self.values.lock() { - let v = values.entry(attrs).or_insert_with(|| { - ExpoHistogramDataPoint::new( - self.max_size, - self.max_scale, - self.record_min_max, - self.record_sum, - ) - }); - v.record(value) - } + self.aggregators.measure(attrs, value); } pub(crate) fn delta( @@ -383,59 +376,19 @@ impl ExpoHistogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - h.data_points.clear(); - - let mut values = match self.values.lock() { - Ok(g) => g, - Err(_) => return (0, None), - }; - let n = values.len(); - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } - - for (a, b) in values.drain() { - h.data_points.push(data::ExponentialHistogramDataPoint { - attributes: a - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), - start_time: start, - time: t, - count: b.count, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - sum: if self.record_sum { b.sum } else { T::default() }, - scale: b.scale, - zero_count: b.zero_count, - positive_bucket: data::ExponentialBucket { - offset: b.pos_buckets.start_bin, - counts: b.pos_buckets.counts.clone(), - }, - negative_bucket: data::ExponentialBucket { - offset: b.neg_buckets.start_bin, - counts: b.neg_buckets.counts.clone(), - }, - zero_threshold: 0.0, - exemplars: vec![], + let config = self.aggregators.config(); + self.aggregators + .collect_and_reset(&mut h.data_points, |attributes, aggr| { + to_data_point(start, t, config, attributes, aggr) }); - } // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { *start = t; } - (n, new_agg.map(|a| Box::new(a) as Box<_>)) + (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } pub(crate) fn cumulative( @@ -461,63 +414,64 @@ impl ExpoHistogram { let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - let values = match self.values.lock() { - Ok(g) => g, - Err(_) => return (0, None), - }; - h.data_points.clear(); - - let n = values.len(); - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - for (a, b) in values.iter() { - h.data_points.push(data::ExponentialHistogramDataPoint { - attributes: a - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), - start_time: start, - time: t, - count: b.count, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - sum: if self.record_sum { b.sum } else { T::default() }, - scale: b.scale, - zero_count: b.zero_count, - positive_bucket: data::ExponentialBucket { - offset: b.pos_buckets.start_bin, - counts: b.pos_buckets.counts.clone(), - }, - negative_bucket: data::ExponentialBucket { - offset: b.neg_buckets.start_bin, - counts: b.neg_buckets.counts.clone(), - }, - zero_threshold: 0.0, - exemplars: vec![], + let config = self.aggregators.config(); + self.aggregators + .collect_readonly(&mut h.data_points, |attributes, aggr| { + to_data_point(start, t, config, attributes, aggr) }); - } - (n, new_agg.map(|a| Box::new(a) as Box<_>)) + (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) + } +} + +fn to_data_point( + start_time: SystemTime, + time: SystemTime, + config: &HistConfig, + attributes: Vec, + aggr: ExpoHistogramDataPoint, +) -> data::ExponentialHistogramDataPoint +where + T: Default, +{ + data::ExponentialHistogramDataPoint { + attributes, + start_time, + time, + count: aggr.count, + min: if config.record_min_max { + Some(aggr.min) + } else { + None + }, + max: if config.record_min_max { + Some(aggr.max) + } else { + None + }, + sum: if config.record_sum { + aggr.sum + } else { + T::default() + }, + scale: aggr.scale, + zero_count: aggr.zero_count, + positive_bucket: data::ExponentialBucket { + offset: aggr.pos_buckets.start_bin, + counts: aggr.pos_buckets.counts, + }, + negative_bucket: data::ExponentialBucket { + offset: aggr.neg_buckets.start_bin, + counts: aggr.neg_buckets.counts, + }, + zero_threshold: 0.0, + exemplars: vec![], } } #[cfg(test)] mod tests { - use std::ops::Neg; + use std::{collections::HashMap, ops::Neg}; use opentelemetry::KeyValue; @@ -619,12 +573,18 @@ mod tests { expected_scale: -1, }, ]; - for test in test_cases { - let mut dp = ExpoHistogramDataPoint::::new(test.max_size, 20, true, true); + let config = HistConfig { + max_size: test.max_size, + max_scale: 20, + record_min_max: true, + record_sum: true, + }; + + let mut dp = ExpoHistogramDataPoint::::create(&config); for v in test.values { - dp.record(v); - dp.record(-v); + dp.record(&config, v); + dp.record(&config, -v); } assert_eq!(test.expected_buckets, dp.pos_buckets, "positive buckets"); @@ -684,9 +644,11 @@ mod tests { for v in test.values { h.measure(v, alice); } - let values = h.values.lock().unwrap(); - let alice: AttributeSet = alice.into(); - let dp = values.get(&alice).unwrap(); + let mut values = Vec::new(); + h.aggregators + .collect_and_reset(&mut values, |attrb, aggr| (attrb, aggr)); + let values: HashMap<_, _> = values.into_iter().collect(); + let dp = values.get(alice).unwrap(); assert_eq!(test.expected.max, dp.max); assert_eq!(test.expected.min, dp.min); @@ -736,9 +698,11 @@ mod tests { for v in test.values { h.measure(v, alice); } - let values = h.values.lock().unwrap(); - let alice: AttributeSet = alice.into(); - let dp = values.get(&alice).unwrap(); + let mut values = Vec::new(); + h.aggregators + .collect_and_reset(&mut values, |attrb, aggr| (attrb, aggr)); + let values: HashMap<_, _> = values.into_iter().collect(); + let dp = values.get(alice).unwrap(); assert_eq!(test.expected.max, dp.max); assert_eq!(test.expected.min, dp.min); @@ -821,10 +785,16 @@ mod tests { }, ]; for test in test_cases { - let mut dp = ExpoHistogramDataPoint::new(test.max_size, 20, true, true); + let config = HistConfig { + max_size: test.max_size, + max_scale: 20, + record_min_max: true, + record_sum: true, + }; + let mut dp = ExpoHistogramDataPoint::create(&config); for v in test.values { - dp.record(v); - dp.record(-v); + dp.record(&config, v); + dp.record(&config, -v); } assert_eq!(test.expected_buckets, dp.pos_buckets); @@ -837,25 +807,30 @@ mod tests { fn data_point_record_limits() { // These bins are calculated from the following formula: // floor( log2( value) * 2^20 ) using an arbitrary precision calculator. - - let mut fdp = ExpoHistogramDataPoint::new(4, 20, true, true); - fdp.record(f64::MAX); + let config = HistConfig { + max_size: 4, + max_scale: 20, + record_min_max: true, + record_sum: true, + }; + let mut fdp = ExpoHistogramDataPoint::create(&config); + fdp.record(&config, f64::MAX); assert_eq!( fdp.pos_buckets.start_bin, 1073741823, "start bin does not match for large f64 values", ); - let mut fdp = ExpoHistogramDataPoint::new(4, 20, true, true); - fdp.record(f64::MIN_POSITIVE); + let mut fdp = ExpoHistogramDataPoint::create(&config); + fdp.record(&config, f64::MIN_POSITIVE); assert_eq!( fdp.pos_buckets.start_bin, -1071644673, "start bin does not match for small positive values", ); - let mut idp = ExpoHistogramDataPoint::new(4, 20, true, true); - idp.record(i64::MAX); + let mut idp = ExpoHistogramDataPoint::create(&config); + idp.record(&config, i64::MAX); assert_eq!( idp.pos_buckets.start_bin, 66060287, @@ -1184,14 +1159,19 @@ mod tests { #[test] fn sub_normal() { - let want = ExpoHistogramDataPoint { + let config = HistConfig { max_size: 4, + max_scale: 20, + record_min_max: true, + record_sum: true, + }; + let want = ExpoHistogramDataPoint { count: 3, min: f64::MIN_POSITIVE, max: f64::MIN_POSITIVE, sum: 3.0 * f64::MIN_POSITIVE, - scale: 20, + scale: config.max_scale, pos_buckets: ExpoBuckets { start_bin: -1071644673, counts: vec![3], @@ -1200,15 +1180,13 @@ mod tests { start_bin: 0, counts: vec![], }, - record_min_max: true, - record_sum: true, zero_count: 0, }; - let mut ehdp = ExpoHistogramDataPoint::new(4, 20, true, true); - ehdp.record(f64::MIN_POSITIVE); - ehdp.record(f64::MIN_POSITIVE); - ehdp.record(f64::MIN_POSITIVE); + let mut ehdp = ExpoHistogramDataPoint::create(&config); + ehdp.record(&config, f64::MIN_POSITIVE); + ehdp.record(&config, f64::MIN_POSITIVE); + ehdp.record(&config, f64::MIN_POSITIVE); assert_eq!(want, ehdp); } diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 089415ba7c..cad973daf1 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,51 +1,19 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::HistogramDataPoint; use crate::metrics::data::{self, Aggregation, Temporality}; use opentelemetry::KeyValue; +use super::attribute_set_aggregation::{Aggregator, AttributeSetAggregation}; use super::Number; -use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap}; -struct HistogramUpdate; - -impl Operation for HistogramUpdate { - fn update_tracker>(tracker: &AT, value: T, index: usize) { - tracker.update_histogram(index, value); - } -} - -struct HistogramTracker { - buckets: Mutex>, -} - -impl AtomicTracker for HistogramTracker { - fn update_histogram(&self, index: usize, value: T) { - let mut buckets = match self.buckets.lock() { - Ok(guard) => guard, - Err(_) => return, - }; - - buckets.bin(index, value); - buckets.sum(value); - } -} - -impl AtomicallyUpdate for HistogramTracker { - type AtomicTracker = HistogramTracker; - - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker { - let count = buckets_count.unwrap(); - HistogramTracker { - buckets: Mutex::new(Buckets::::new(count)), - } - } +struct BucketsConfig { + bounds: Vec, + record_min_max: bool, + record_sum: bool, } -#[derive(Default)] +#[derive(Default, Debug, Clone)] struct Buckets { counts: Vec, count: u64, @@ -54,82 +22,72 @@ struct Buckets { max: T, } -impl Buckets { - /// returns buckets with `n` bins. - fn new(n: usize) -> Buckets { +impl Aggregator for Buckets +where + T: Number, +{ + type Config = BucketsConfig; + + fn create(config: &BucketsConfig) -> Self { + let size = config.bounds.len() + 1; Buckets { - counts: vec![0; n], + counts: vec![0; size], min: T::max(), max: T::min(), ..Default::default() } } - fn sum(&mut self, value: T) { - self.total += value; - } - - fn bin(&mut self, idx: usize, value: T) { + fn update(&mut self, config: &BucketsConfig, measurement: T) { + let f_value = measurement.into_float(); + // Ignore NaN and infinity. + if f_value.is_infinite() || f_value.is_nan() { + return; + } + // This search will return an index in the range `[0, bounds.len()]`, where + // it will return `bounds.len()` if value is greater than the last element + // of `bounds`. This aligns with the buckets in that the length of buckets + // is `bounds.len()+1`, with the last bucket representing: + // `(bounds[bounds.len()-1], +∞)`. + let idx = config.bounds.partition_point(|&x| x < f_value); self.counts[idx] += 1; self.count += 1; - if value < self.min { - self.min = value; - } - if value > self.max { - self.max = value - } - } - - fn reset(&mut self) { - for item in &mut self.counts { - *item = 0; + if config.record_min_max { + if measurement < self.min { + self.min = measurement; + } + if measurement > self.max { + self.max = measurement + } } - self.count = Default::default(); - self.total = Default::default(); - self.min = T::max(); - self.max = T::min(); + // it's very cheap to update it, even if it is not configured to record_sum + self.total += measurement; } } /// Summarizes a set of measurements as a histogram with explicitly defined /// buckets. pub(crate) struct Histogram { - value_map: ValueMap, T, HistogramUpdate>, - bounds: Vec, - record_min_max: bool, - record_sum: bool, + aggregators: AttributeSetAggregation>, start: Mutex, } impl Histogram { - pub(crate) fn new(boundaries: Vec, record_min_max: bool, record_sum: bool) -> Self { - let buckets_count = boundaries.len() + 1; - let mut histogram = Histogram { - value_map: ValueMap::new_with_buckets_count(buckets_count), - bounds: boundaries, - record_min_max, - record_sum, + pub(crate) fn new(mut bounds: Vec, record_min_max: bool, record_sum: bool) -> Self { + bounds.retain(|v| !v.is_nan()); + bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out")); + Self { + aggregators: AttributeSetAggregation::new(BucketsConfig { + record_min_max, + record_sum, + bounds, + }), start: Mutex::new(SystemTime::now()), - }; - - histogram.bounds.retain(|v| !v.is_nan()); - histogram - .bounds - .sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out")); - - histogram + } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - let f = measurement.into_float(); - - // This search will return an index in the range `[0, bounds.len()]`, where - // it will return `bounds.len()` if value is greater than the last element - // of `bounds`. This aligns with the buckets in that the length of buckets - // is `bounds.len()+1`, with the last bucket representing: - // `(bounds[bounds.len()-1], +∞)`. - let index = self.bounds.partition_point(|&x| x < f); - self.value_map.measure(measurement, attrs, index); + self.aggregators.measure(attrs, measurement) } pub(crate) fn delta( @@ -153,92 +111,17 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - h.data_points.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); - - b.reset(); - } - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); - } - } - } + let config = self.aggregators.config(); + self.aggregators + .collect_and_reset(&mut h.data_points, |attributes, buckets| { + to_data_point(start, t, config, attributes, buckets) + }); // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { *start = t; } - self.value_map.count.store(0, Ordering::SeqCst); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } @@ -264,89 +147,49 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - h.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); - } - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); - } - } - } + let config = self.aggregators.config(); + self.aggregators + .collect_readonly(&mut h.data_points, |attributes, buckets| { + to_data_point(start, t, config, attributes, buckets) + }); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } } + +fn to_data_point( + start_time: SystemTime, + time: SystemTime, + config: &BucketsConfig, + attributes: Vec, + buckets: Buckets, +) -> HistogramDataPoint +where + T: Default, +{ + HistogramDataPoint { + attributes, + start_time, + time, + count: buckets.count, + bounds: config.bounds.clone(), + bucket_counts: buckets.counts, + sum: if config.record_sum { + buckets.total + } else { + T::default() + }, + min: if config.record_min_max { + Some(buckets.min) + } else { + None + }, + max: if config.record_min_max { + Some(buckets.max) + } else { + None + }, + exemplars: vec![], + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index abc691b2fc..5ab142bc1c 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -1,4 +1,5 @@ mod aggregate; +mod attribute_set_aggregation; mod exponential_histogram; mod histogram; mod last_value; @@ -21,6 +22,8 @@ use opentelemetry::{global, KeyValue}; use crate::metrics::AttributeSet; +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES_ERR: &str = "Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."; + pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); @@ -80,17 +83,6 @@ impl, T: Number, O> ValueMap { phantom: PhantomData, } } - - fn new_with_buckets_count(buckets_count: usize) -> Self { - ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(Some(buckets_count)), - count: AtomicUsize::new(0), - buckets_count: Some(buckets_count), - phantom: PhantomData, - } - } } impl, T: Number, O: Operation> ValueMap { @@ -146,7 +138,7 @@ impl, T: Number, O: Operation> ValueMap { let new_tracker = AU::new_atomic_tracker(self.buckets_count); O::update_tracker(&new_tracker, measurement, index); trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); + global::handle_error(MetricsError::Other(STREAM_OVERFLOW_ATTRIBUTES_ERR.into())); } } } @@ -162,7 +154,6 @@ pub(crate) trait AtomicTracker: Sync + Send + 'static { fn get_and_reset_value(&self) -> T { T::default() } - fn update_histogram(&self, _index: usize, _value: T) {} } /// Marks a type that can have an atomic tracker generated for it diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 66af75734d..a1f944233f 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -13,6 +13,7 @@ use super::{Increment, ValueMap}; /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { value_map: ValueMap, + // value_map2: ValueMap2>, monotonic: bool, start: Mutex, } @@ -26,6 +27,7 @@ impl Sum { pub(crate) fn new(monotonic: bool) -> Self { Sum { value_map: ValueMap::new(), + // value_map2: ValueMap2::new(T::default()), monotonic, start: Mutex::new(SystemTime::now()), } @@ -34,6 +36,7 @@ impl Sum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to Sum. self.value_map.measure(measurement, attrs, 0); + // self.value_map2.measure(measurement, attrs); } pub(crate) fn delta( @@ -67,6 +70,16 @@ impl Sum { } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + + // self.value_map2 + // .read_measurements(&mut s_data.data_points, |attributes, tracker| DataPoint { + // attributes, + // start_time: Some(prev_start), + // time: Some(t), + // value: tracker.value.get(), + // exemplars: vec![], + // }); + if self .value_map .has_no_attribute_value diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index cf6e3fb928..55bdc638b1 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -64,7 +64,7 @@ use std::collections::hash_map::DefaultHasher; use std::collections::HashSet; use std::hash::{Hash, Hasher}; -use opentelemetry::{Key, KeyValue, Value}; +use opentelemetry::KeyValue; /// A unique set of attributes that can be used as instrument identifiers. /// @@ -108,11 +108,6 @@ impl AttributeSet { AttributeSet(values, hash) } - /// Iterate over key value pairs in the set - pub(crate) fn iter(&self) -> impl Iterator { - self.0.iter().map(|kv| (&kv.key, &kv.value)) - } - /// Returns the underlying Vec of KeyValue pairs pub(crate) fn into_vec(self) -> Vec { self.0