From a4c51ecfe6b1bf88e64cb8860779ac302b1e99b0 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:36:41 -0700 Subject: [PATCH 1/8] Fix few Observable Counter and UpDownCounter bugs --- opentelemetry-sdk/CHANGELOG.md | 5 +- opentelemetry-sdk/src/metrics/internal/mod.rs | 14 ++++ opentelemetry-sdk/src/metrics/internal/sum.rs | 84 ++++++++++++------- opentelemetry-sdk/src/metrics/mod.rs | 56 +++++++++++-- 4 files changed, 119 insertions(+), 40 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 942086e333..45d46ce670 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -16,7 +16,10 @@ The custom exporters and processors can't directly access the `LogData::LogRecord::attributes`, as these are private to opentelemetry-sdk. Instead, they would now use LogRecord::attributes_iter() method to access them. - +- Fixed various Metric aggregation bug related to + ObservableCounter,UpDownCounter including + [1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517). + [#1990](https://github.com/open-telemetry/opentelemetry-rust/pull/1990) ## v0.24.1 diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 92bc3d947f..cf0edeb47c 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -15,6 +15,7 @@ pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms pub(crate) trait AtomicTracker: Sync + Send + 'static { + fn store(&self, value: T); fn add(&self, value: T); fn get_value(&self) -> T; fn get_and_reset_value(&self) -> T; @@ -90,6 +91,10 @@ impl Number for f64 { } impl AtomicTracker for AtomicU64 { + fn store(&self, value: u64) { + self.store(value, Ordering::Relaxed); + } + fn add(&self, value: u64) { self.fetch_add(value, Ordering::Relaxed); } @@ -112,6 +117,10 @@ impl AtomicallyUpdate for u64 { } impl AtomicTracker for AtomicI64 { + fn store(&self, value: i64) { + self.store(value, Ordering::Relaxed); + } + fn add(&self, value: i64) { self.fetch_add(value, Ordering::Relaxed); } @@ -146,6 +155,11 @@ impl F64AtomicTracker { } impl AtomicTracker for F64AtomicTracker { + fn store(&self, value: f64) { + let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); + *guard = value; + } + fn add(&self, value: f64) { let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); *guard += value; diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 912fbacd58..8a51f6deac 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -25,21 +25,23 @@ struct ValueMap> { has_no_value_attribute_value: AtomicBool, no_attribute_value: T::AtomicTracker, count: AtomicUsize, + assign_only: bool, // if true, only assign incoming value, instead of adding to existing value } impl> Default for ValueMap { fn default() -> Self { - ValueMap::new() + ValueMap::new(false) } } impl> ValueMap { - fn new() -> Self { + fn new(assign_only: bool) -> Self { ValueMap { values: RwLock::new(HashMap::new()), has_no_value_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), count: AtomicUsize::new(0), + assign_only: assign_only, } } } @@ -47,18 +49,30 @@ impl> ValueMap { impl> ValueMap { fn measure(&self, measurement: T, attrs: &[KeyValue]) { if attrs.is_empty() { - self.no_attribute_value.add(measurement); + if self.assign_only { + self.no_attribute_value.store(measurement); + } else { + self.no_attribute_value.add(measurement); + } self.has_no_value_attribute_value .store(true, Ordering::Release); } else if let Ok(values) = self.values.read() { // Try incoming order first if let Some(value_to_update) = values.get(attrs) { - value_to_update.add(measurement); + if self.assign_only { + value_to_update.store(measurement); + } else { + value_to_update.add(measurement); + } } else { // Then try sorted order. let sorted_attrs = AttributeSet::from(attrs).into_vec(); if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { - value_to_update.add(measurement); + if self.assign_only { + value_to_update.store(measurement); + } else { + value_to_update.add(measurement); + } } else { // Give up the lock, before acquiring write lock. drop(values); @@ -67,12 +81,24 @@ impl> ValueMap { // write lock, in case another thread has added the // value. if let Some(value_to_update) = values.get(attrs) { - value_to_update.add(measurement); + if self.assign_only { + value_to_update.store(measurement); + } else { + value_to_update.add(measurement); + } } else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { - value_to_update.add(measurement); + if self.assign_only { + value_to_update.store(measurement); + } else { + value_to_update.add(measurement); + } } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { let new_value = T::new_atomic_tracker(); - new_value.add(measurement); + if self.assign_only { + new_value.store(measurement); + } else { + new_value.add(measurement); + } let new_value = Arc::new(new_value); // Insert original order @@ -85,10 +111,18 @@ impl> ValueMap { } else if let Some(overflow_value) = values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - overflow_value.add(measurement); + if self.assign_only { + overflow_value.store(measurement); + } else { + overflow_value.add(measurement); + } } else { let new_value = T::new_atomic_tracker(); - new_value.add(measurement); + if self.assign_only { + new_value.store(measurement); + } else { + new_value.add(measurement); + } values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value)); global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); } @@ -114,7 +148,7 @@ impl> Sum { /// were made in. pub(crate) fn new(monotonic: bool) -> Self { Sum { - value_map: ValueMap::new(), + value_map: ValueMap::new(false), monotonic, start: Mutex::new(SystemTime::now()), } @@ -282,7 +316,7 @@ pub(crate) struct PrecomputedSum> { impl> PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(), + value_map: ValueMap::new(true), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -334,11 +368,16 @@ impl> PrecomputedSum { .has_no_value_attribute_value .swap(false, Ordering::AcqRel) { + let default = T::default(); + let delta = self.value_map.no_attribute_value.get_value() + - *reported.get(&vec![]).unwrap_or(&default); + new_reported.insert(vec![], self.value_map.no_attribute_value.get_value()); + s_data.data_points.push(DataPoint { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_and_reset_value(), + value: delta, exemplars: vec![], }); } @@ -351,9 +390,7 @@ impl> PrecomputedSum { let default = T::default(); for (attrs, value) in values.drain() { let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default); - if delta != default { - new_reported.insert(attrs.clone(), value.get_value()); - } + new_reported.insert(attrs.clone(), value.get_value()); s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), @@ -408,11 +445,6 @@ impl> PrecomputedSum { .data_points .reserve_exact(n - s_data.data_points.capacity()); } - let mut new_reported = HashMap::with_capacity(n); - let mut reported = match self.reported.lock() { - Ok(r) => r, - Err(_) => return (0, None), - }; if self .value_map @@ -432,24 +464,16 @@ impl> PrecomputedSum { Ok(v) => v, Err(_) => return (0, None), }; - let default = T::default(); for (attrs, value) in values.iter() { - let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default); - if delta != default { - new_reported.insert(attrs.clone(), value.get_value()); - } s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: delta, + value: value.get_value(), exemplars: vec![], }); } - *reported = new_reported; - drop(reported); // drop before values guard is dropped - ( s_data.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>), diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 46904095e8..86c439150a 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -271,29 +271,56 @@ mod tests { async fn observable_counter_aggregation_cumulative_non_zero_increment() { // Run this test with stdout enabled to see output. // cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture - observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4); + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_cumulative_non_zero_increment_no_attrs --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn observable_counter_aggregation_delta_non_zero_increment() { // Run this test with stdout enabled to see output. // cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture - observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4); + observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_delta_non_zero_increment_no_attrs --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn observable_counter_aggregation_cumulative_zero_increment() { // Run this test with stdout enabled to see output. // cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture - observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4); + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_cumulative_zero_increment_no_attrs --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - #[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"] async fn observable_counter_aggregation_delta_zero_increment() { // Run this test with stdout enabled to see output. // cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture - observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4); + observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_delta_zero_increment_no_attrs() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_delta_zero_increment_no_attrs --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true); } fn observable_counter_aggregation_helper( @@ -301,9 +328,15 @@ mod tests { start: u64, increment: u64, length: u64, + is_empty_attributes: bool, ) { // Arrange let mut test_context = TestContext::new(temporality); + let attributes = if is_empty_attributes { + vec![] + } else { + vec![KeyValue::new("key1", "value1")] + }; // The Observable counter reports values[0], values[1],....values[n] on each flush. let values: Vec = (0..length).map(|i| start + i * increment).collect(); println!("Testing with observable values: {:?}", values); @@ -317,7 +350,7 @@ mod tests { .with_callback(move |observer| { let mut index = i.lock().unwrap(); if *index < values.len() { - observer.observe(values[*index], &[KeyValue::new("key1", "value1")]); + observer.observe(values[*index], &attributes); *index += 1; } }) @@ -338,9 +371,14 @@ mod tests { assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); } - // find and validate key1=value1 datapoint - let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); + // find and validate datapoint + let data_point = if is_empty_attributes { + &sum.data_points[0] + } else { + find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected") + }; + if let Temporality::Cumulative = temporality { // Cumulative counter should have the value as is. assert_eq!(data_point.value, *v); From 223eea7681a54a15be3affee9e9a8a53fa0f5c13 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:43:15 -0700 Subject: [PATCH 2/8] clipp --- opentelemetry-sdk/src/metrics/internal/sum.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 8a51f6deac..30b708ae22 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -41,7 +41,7 @@ impl> ValueMap { has_no_value_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), count: AtomicUsize::new(0), - assign_only: assign_only, + assign_only, } } } From cc3d9280371e3692960582b8543f883471f962d0 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:53:19 -0700 Subject: [PATCH 3/8] ignore spatial test --- opentelemetry-sdk/src/metrics/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 86c439150a..6f3f7c951c 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -667,8 +667,9 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[ignore = "Spatial aggregation is not yet implemented."] async fn spatial_aggregation_when_view_drops_attributes_observable_counter() { - // cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing + // metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter // Arrange let exporter = InMemoryMetricsExporter::default(); From b1104fa81aa86f3ef27f669e330e1dbf1306b951 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:54:09 -0700 Subject: [PATCH 4/8] comment right --- opentelemetry-sdk/src/metrics/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 6f3f7c951c..3225e660ac 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -669,7 +669,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[ignore = "Spatial aggregation is not yet implemented."] async fn spatial_aggregation_when_view_drops_attributes_observable_counter() { - // metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter + // cargo test metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing // Arrange let exporter = InMemoryMetricsExporter::default(); From c77065673c8681df87d4c26f78abfb0d36a40a18 Mon Sep 17 00:00:00 2001 From: Fabien Savy Date: Thu, 8 Aug 2024 15:56:55 +0200 Subject: [PATCH 5/8] refactor: use generics to dispatch updates --- opentelemetry-sdk/CHANGELOG.md | 4 +- opentelemetry-sdk/src/metrics/internal/sum.rs | 224 +++++++++--------- 2 files changed, 113 insertions(+), 115 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 45d46ce670..eb4d9818c0 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -18,8 +18,8 @@ method to access them. - Fixed various Metric aggregation bug related to ObservableCounter,UpDownCounter including - [1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517). - [#1990](https://github.com/open-telemetry/opentelemetry-rust/pull/1990) + [#1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517). + [#1992](https://github.com/open-telemetry/opentelemetry-rust/pull/1992) ## v0.24.1 diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 30b708ae22..a55274639c 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::vec; @@ -19,123 +20,122 @@ use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number}; pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); +/// Abstracts the update operation for a measurement. +trait Operation { + fn update_tracker(tracker: &dyn AtomicTracker, value: T); +} + +struct Increment; + +impl Operation for Increment { + fn update_tracker(tracker: &dyn AtomicTracker, value: T) { + tracker.add(value); + } +} + +struct Assign; + +impl Operation for Assign { + fn update_tracker(tracker: &dyn AtomicTracker, value: T) { + tracker.store(value); + } +} + /// The storage for sums. -struct ValueMap> { - values: RwLock, Arc>>, - has_no_value_attribute_value: AtomicBool, - no_attribute_value: T::AtomicTracker, +/// +/// This structure is parametrized by an `Operation` that indicates how +/// updates to the underlying value trackers should be performed. +struct ValueMap, O> { + /// Trackers store the values associated with different attribute sets. + trackers: RwLock, Arc>>, + /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, - assign_only: bool, // if true, only assign incoming value, instead of adding to existing value + /// Indicates whether a value with no attributes has been stored. + has_no_attribute_value: AtomicBool, + /// Tracker for values with no attributes attached. + no_attribute_value: T::AtomicTracker, + phantom: PhantomData, } -impl> Default for ValueMap { +impl, O> Default for ValueMap { fn default() -> Self { - ValueMap::new(false) + ValueMap::new() } } -impl> ValueMap { - fn new(assign_only: bool) -> Self { +impl, O> ValueMap { + fn new() -> Self { ValueMap { - values: RwLock::new(HashMap::new()), - has_no_value_attribute_value: AtomicBool::new(false), + trackers: RwLock::new(HashMap::new()), + has_no_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), count: AtomicUsize::new(0), - assign_only, + phantom: PhantomData, } } } -impl> ValueMap { - fn measure(&self, measurement: T, attrs: &[KeyValue]) { - if attrs.is_empty() { - if self.assign_only { - self.no_attribute_value.store(measurement); - } else { - self.no_attribute_value.add(measurement); - } - self.has_no_value_attribute_value - .store(true, Ordering::Release); - } else if let Ok(values) = self.values.read() { - // Try incoming order first - if let Some(value_to_update) = values.get(attrs) { - if self.assign_only { - value_to_update.store(measurement); - } else { - value_to_update.add(measurement); - } - } else { - // Then try sorted order. - let sorted_attrs = AttributeSet::from(attrs).into_vec(); - if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { - if self.assign_only { - value_to_update.store(measurement); - } else { - value_to_update.add(measurement); - } - } else { - // Give up the lock, before acquiring write lock. - drop(values); - if let Ok(mut values) = self.values.write() { - // Recheck both incoming and sorted after acquiring - // write lock, in case another thread has added the - // value. - if let Some(value_to_update) = values.get(attrs) { - if self.assign_only { - value_to_update.store(measurement); - } else { - value_to_update.add(measurement); - } - } else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { - if self.assign_only { - value_to_update.store(measurement); - } else { - value_to_update.add(measurement); - } - } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_value = T::new_atomic_tracker(); - if self.assign_only { - new_value.store(measurement); - } else { - new_value.add(measurement); - } - let new_value = Arc::new(new_value); - - // Insert original order - values.insert(attrs.to_vec(), new_value.clone()); - - // Insert sorted order - values.insert(sorted_attrs, new_value); - - self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = - values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) - { - if self.assign_only { - overflow_value.store(measurement); - } else { - overflow_value.add(measurement); - } - } else { - let new_value = T::new_atomic_tracker(); - if self.assign_only { - new_value.store(measurement); - } else { - new_value.add(measurement); - } - values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value)); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); - } - } - } - } +impl, O: Operation> ValueMap { + fn measure(&self, measurement: T, attributes: &[KeyValue]) { + if attributes.is_empty() { + O::update_tracker(&self.no_attribute_value, measurement); + self.has_no_attribute_value.store(true, Ordering::Release); + return; + } + + let Ok(trackers) = self.trackers.read() else { + return; + }; + + // Try to retrieve and update the tracker with the attributes in the provided order first + if let Some(tracker) = trackers.get(attributes) { + O::update_tracker(&**tracker, measurement); + return; + } + + // Try to retrieve and update the tracker with the attributes sorted. + let sorted_attrs = AttributeSet::from(attributes).into_vec(); + if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + O::update_tracker(&**tracker, measurement); + return; + } + + // Give up the read lock before acquiring the write lock. + drop(trackers); + + let Ok(mut trackers) = self.trackers.write() else { + return; + }; + + // Recheck both the provided and sorted orders after acquiring the write lock + // in case another thread has pushed an update in the meantime. + if let Some(tracker) = trackers.get(attributes) { + O::update_tracker(&**tracker, measurement); + } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + O::update_tracker(&**tracker, measurement); + } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { + let new_tracker = Arc::new(T::new_atomic_tracker()); + O::update_tracker(&*new_tracker, measurement); + + // Insert tracker with the attributes in the provided and sorted orders + trackers.insert(attributes.to_vec(), new_tracker.clone()); + trackers.insert(sorted_attrs, new_tracker); + + self.count.fetch_add(1, Ordering::SeqCst); + } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { + O::update_tracker(&**overflow_value, measurement); + } else { + let new_tracker = T::new_atomic_tracker(); + O::update_tracker(&new_tracker, measurement); + trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); + global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); } } } /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum> { - value_map: ValueMap, + value_map: ValueMap, monotonic: bool, start: Mutex, } @@ -148,7 +148,7 @@ impl> Sum { /// were made in. pub(crate) fn new(monotonic: bool) -> Self { Sum { - value_map: ValueMap::new(false), + value_map: ValueMap::new(), monotonic, start: Mutex::new(SystemTime::now()), } @@ -191,7 +191,7 @@ impl> Sum { let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); if self .value_map - .has_no_value_attribute_value + .has_no_attribute_value .swap(false, Ordering::AcqRel) { s_data.data_points.push(DataPoint { @@ -203,7 +203,7 @@ impl> Sum { }); } - let mut values = match self.value_map.values.write() { + let mut values = match self.value_map.trackers.write() { Ok(v) => v, Err(_) => return (0, None), }; @@ -267,7 +267,7 @@ impl> Sum { if self .value_map - .has_no_value_attribute_value + .has_no_attribute_value .load(Ordering::Acquire) { s_data.data_points.push(DataPoint { @@ -279,7 +279,7 @@ impl> Sum { }); } - let values = match self.value_map.values.write() { + let values = match self.value_map.trackers.write() { Ok(v) => v, Err(_) => return (0, None), }; @@ -307,7 +307,7 @@ impl> Sum { /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum> { - value_map: ValueMap, + value_map: ValueMap, monotonic: bool, start: Mutex, reported: Mutex, T>>, @@ -316,7 +316,7 @@ pub(crate) struct PrecomputedSum> { impl> PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(true), + value_map: ValueMap::new(), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -365,13 +365,12 @@ impl> PrecomputedSum { if self .value_map - .has_no_value_attribute_value + .has_no_attribute_value .swap(false, Ordering::AcqRel) { - let default = T::default(); - let delta = self.value_map.no_attribute_value.get_value() - - *reported.get(&vec![]).unwrap_or(&default); - new_reported.insert(vec![], self.value_map.no_attribute_value.get_value()); + let value = self.value_map.no_attribute_value.get_value(); + let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); + new_reported.insert(vec![], value); s_data.data_points.push(DataPoint { attributes: vec![], @@ -382,14 +381,13 @@ impl> PrecomputedSum { }); } - let mut values = match self.value_map.values.write() { + let mut values = match self.value_map.trackers.write() { Ok(v) => v, Err(_) => return (0, None), }; - let default = T::default(); for (attrs, value) in values.drain() { - let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default); + let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&T::default()); new_reported.insert(attrs.clone(), value.get_value()); s_data.data_points.push(DataPoint { attributes: attrs.clone(), @@ -448,7 +446,7 @@ impl> PrecomputedSum { if self .value_map - .has_no_value_attribute_value + .has_no_attribute_value .load(Ordering::Acquire) { s_data.data_points.push(DataPoint { @@ -460,7 +458,7 @@ impl> PrecomputedSum { }); } - let values = match self.value_map.values.write() { + let values = match self.value_map.trackers.write() { Ok(v) => v, Err(_) => return (0, None), }; From df2f26e7562751c186868752670e63c8bb9ca82f Mon Sep 17 00:00:00 2001 From: Fabien Savy Date: Fri, 9 Aug 2024 08:15:10 +0200 Subject: [PATCH 6/8] refactor: don't use dynamic dispatch for Operation --- opentelemetry-sdk/src/metrics/internal/sum.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index a55274639c..52d835442c 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -22,13 +22,13 @@ pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = /// Abstracts the update operation for a measurement. trait Operation { - fn update_tracker(tracker: &dyn AtomicTracker, value: T); + fn update_tracker>(tracker: &AT, value: T); } struct Increment; impl Operation for Increment { - fn update_tracker(tracker: &dyn AtomicTracker, value: T) { + fn update_tracker>(tracker: &AT, value: T) { tracker.add(value); } } @@ -36,7 +36,7 @@ impl Operation for Increment { struct Assign; impl Operation for Assign { - fn update_tracker(tracker: &dyn AtomicTracker, value: T) { + fn update_tracker>(tracker: &AT, value: T) { tracker.store(value); } } From 8b11e1b704c1a890a62c7bfd98b522dd3a70d54d Mon Sep 17 00:00:00 2001 From: Fabien Savy Date: Fri, 9 Aug 2024 08:15:32 +0200 Subject: [PATCH 7/8] fix: avoid double atomic readings and rename values to trackers --- opentelemetry-sdk/src/metrics/internal/sum.rs | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 52d835442c..1ed76fdae9 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -53,7 +53,7 @@ struct ValueMap, O> { /// Indicates whether a value with no attributes has been stored. has_no_attribute_value: AtomicBool, /// Tracker for values with no attributes attached. - no_attribute_value: T::AtomicTracker, + no_attribute_tracker: T::AtomicTracker, phantom: PhantomData, } @@ -68,7 +68,7 @@ impl, O> ValueMap { ValueMap { trackers: RwLock::new(HashMap::new()), has_no_attribute_value: AtomicBool::new(false), - no_attribute_value: T::new_atomic_tracker(), + no_attribute_tracker: T::new_atomic_tracker(), count: AtomicUsize::new(0), phantom: PhantomData, } @@ -78,7 +78,7 @@ impl, O> ValueMap { impl, O: Operation> ValueMap { fn measure(&self, measurement: T, attributes: &[KeyValue]) { if attributes.is_empty() { - O::update_tracker(&self.no_attribute_value, measurement); + O::update_tracker(&self.no_attribute_tracker, measurement); self.has_no_attribute_value.store(true, Ordering::Release); return; } @@ -198,24 +198,24 @@ impl> Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_and_reset_value(), + value: self.value_map.no_attribute_tracker.get_and_reset_value(), exemplars: vec![], }); } - let mut values = match self.value_map.trackers.write() { + let mut trackers = match self.value_map.trackers.write() { Ok(v) => v, Err(_) => return (0, None), }; let mut seen = HashSet::new(); - for (attrs, value) in values.drain() { - if seen.insert(Arc::as_ptr(&value)) { + for (attrs, tracker) in trackers.drain() { + if seen.insert(Arc::as_ptr(&tracker)) { s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: value.get_value(), + value: tracker.get_value(), exemplars: vec![], }); } @@ -274,12 +274,12 @@ impl> Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_value(), + value: self.value_map.no_attribute_tracker.get_value(), exemplars: vec![], }); } - let values = match self.value_map.trackers.write() { + let trackers = match self.value_map.trackers.write() { Ok(v) => v, Err(_) => return (0, None), }; @@ -288,12 +288,12 @@ impl> Sum { // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - for (attrs, value) in values.iter() { + for (attrs, tracker) in trackers.iter() { s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: value.get_value(), + value: tracker.get_value(), exemplars: vec![], }); } @@ -368,7 +368,7 @@ impl> PrecomputedSum { .has_no_attribute_value .swap(false, Ordering::AcqRel) { - let value = self.value_map.no_attribute_value.get_value(); + let value = self.value_map.no_attribute_tracker.get_value(); let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); new_reported.insert(vec![], value); @@ -381,14 +381,15 @@ impl> PrecomputedSum { }); } - let mut values = match self.value_map.trackers.write() { + let mut trackers = match self.value_map.trackers.write() { Ok(v) => v, Err(_) => return (0, None), }; - for (attrs, value) in values.drain() { - let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&T::default()); - new_reported.insert(attrs.clone(), value.get_value()); + for (attrs, tracker) in trackers.drain() { + let value = tracker.get_value(); + let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); + new_reported.insert(attrs.clone(), value); s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), @@ -453,21 +454,21 @@ impl> PrecomputedSum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_value(), + value: self.value_map.no_attribute_tracker.get_value(), exemplars: vec![], }); } - let values = match self.value_map.trackers.write() { + let trackers = match self.value_map.trackers.write() { Ok(v) => v, Err(_) => return (0, None), }; - for (attrs, value) in values.iter() { + for (attrs, tracker) in trackers.iter() { s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: value.get_value(), + value: tracker.get_value(), exemplars: vec![], }); } From 153f6d030c16159922560e74e3352e8f10bd6fb8 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Fri, 9 Aug 2024 10:05:18 -0700 Subject: [PATCH 8/8] Update opentelemetry-sdk/CHANGELOG.md Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> --- opentelemetry-sdk/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index eb4d9818c0..eb184e7a99 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -19,7 +19,7 @@ - Fixed various Metric aggregation bug related to ObservableCounter,UpDownCounter including [#1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517). - [#1992](https://github.com/open-telemetry/opentelemetry-rust/pull/1992) + [#2004](https://github.com/open-telemetry/opentelemetry-rust/pull/2004) ## v0.24.1