From 106c314366e2846ef56b2567ff4b323804c425ab Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 10:44:59 -0700 Subject: [PATCH 01/10] Metric refactor - 2x perf and allocation free --- opentelemetry-sdk/src/metrics/internal/sum.rs | 144 ++++++++++-------- 1 file changed, 82 insertions(+), 62 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 61ceadc21c..0dc47b3b20 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,4 +1,6 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::collections::HashSet; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; use std::vec; use std::{ collections::HashMap, @@ -8,19 +10,21 @@ use std::{ use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use crate::metrics::AttributeSet; +use once_cell::sync::Lazy; use opentelemetry::KeyValue; use opentelemetry::{global, metrics::MetricsError}; -use super::{ - aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET}, - AtomicTracker, Number, -}; +use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number}; + +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = + Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); /// The storage for sums. struct ValueMap> { - values: RwLock>, + values: RwLock, Arc>>, has_no_value_attribute_value: AtomicBool, no_attribute_value: T::AtomicTracker, + count: AtomicUsize, } impl> Default for ValueMap { @@ -35,42 +39,65 @@ impl> ValueMap { values: RwLock::new(HashMap::new()), has_no_value_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), + count: AtomicUsize::new(0), } } } impl> ValueMap { - fn measure(&self, measurement: T, attrs: AttributeSet) { + fn measure(&self, measurement: T, attrs: &[KeyValue]) { if attrs.is_empty() { self.no_attribute_value.add(measurement); self.has_no_value_attribute_value .store(true, Ordering::Release); } else if let Ok(values) = self.values.read() { - if let Some(value_to_update) = values.get(&attrs) { + // Try incoming order first + if let Some(value_to_update) = values.get(attrs) { value_to_update.add(measurement); return; } else { - drop(values); - if let Ok(mut values) = self.values.write() { - // Recheck after acquiring write lock, in case another - // thread has added the value. - if let Some(value_to_update) = values.get(&attrs) { - value_to_update.add(measurement); - return; - } else if is_under_cardinality_limit(values.len()) { - let new_value = T::new_atomic_tracker(); - new_value.add(measurement); - values.insert(attrs, new_value); - } else if let Some(overflow_value) = - values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET) - { - overflow_value.add(measurement); - return; - } else { - let new_value = T::new_atomic_tracker(); - new_value.add(measurement); - values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); + // Then try sorted order. + let sorted_attrs = AttributeSet::from(attrs).into_vec(); + if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { + value_to_update.add(measurement); + return; + } else { + // Give up the lock, before acquiring write lock. + drop(values); + if let Ok(mut values) = self.values.write() { + // Recheck both incoming and sorted after acquiring + // write lock, in case another thread has added the + // value. + if let Some(value_to_update) = values.get(attrs) { + value_to_update.add(measurement); + return; + } else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { + value_to_update.add(measurement); + return; + } else if is_under_cardinality_limit(self.count.load(Ordering::Acquire)) { + let new_value = T::new_atomic_tracker(); + new_value.add(measurement); + let new_value = Arc::new(new_value); + + // Insert original order + values.insert(attrs.to_vec(), new_value.clone()); + + // Insert sorted order + values.insert(sorted_attrs, new_value); + + self.count.fetch_add(1, Ordering::Release); + + } else if let Some(overflow_value) = + values.get_mut(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) + { + overflow_value.add(measurement); + return; + } else { + let new_value = T::new_atomic_tracker(); + new_value.add(measurement); + values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value)); + global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); + } } } } @@ -100,7 +127,6 @@ impl> Sum { } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - let attrs: AttributeSet = attrs.into(); self.value_map.measure(measurement, attrs) } @@ -124,20 +150,17 @@ impl> Sum { s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; s_data.data_points.clear(); - - let mut values = match self.value_map.values.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; + + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::Relaxed) + 2; if n > s_data.data_points.capacity() { s_data .data_points .reserve_exact(n - s_data.data_points.capacity()); } - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); if self .value_map .has_no_value_attribute_value @@ -152,23 +175,29 @@ impl> Sum { }); } + let mut values = match self.value_map.values.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + + let mut seen = HashSet::new(); for (attrs, value) in values.drain() { - s_data.data_points.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), - start_time: Some(prev_start), - time: Some(t), - value: value.get_value(), - exemplars: vec![], - }); + if seen.insert(Arc::as_ptr(&value)) { + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: value.get_value(), + exemplars: vec![], + }); + } } // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { *start = t; } + self.value_map.count.store(0, Ordering::Release); ( s_data.data_points.len(), @@ -231,10 +260,7 @@ impl> Sum { // overload the system. for (attrs, value) in values.iter() { s_data.data_points.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), + attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), value: value.get_value(), @@ -254,7 +280,7 @@ pub(crate) struct PrecomputedSum> { value_map: ValueMap, monotonic: bool, start: Mutex, - reported: Mutex>, + reported: Mutex, T>>, } impl> PrecomputedSum { @@ -268,7 +294,6 @@ impl> PrecomputedSum { } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - let attrs: AttributeSet = attrs.into(); self.value_map.measure(measurement, attrs) } @@ -332,10 +357,7 @@ impl> PrecomputedSum { new_reported.insert(attrs.clone(), value.get_value()); } s_data.data_points.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), + attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), value: delta, @@ -347,6 +369,7 @@ impl> PrecomputedSum { if let Ok(mut start) = self.start.lock() { *start = t; } + self.value_map.count.store(0, Ordering::Release); *reported = new_reported; drop(reported); // drop before values guard is dropped @@ -417,10 +440,7 @@ impl> PrecomputedSum { new_reported.insert(attrs.clone(), value.get_value()); } s_data.data_points.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), + attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), value: delta, From c06696bfa33df0c501b355dd5ac8d26588a4279e Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 11:00:23 -0700 Subject: [PATCH 02/10] fmt --- opentelemetry-sdk/src/metrics/internal/sum.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 0dc47b3b20..37a5f33ea9 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -56,7 +56,7 @@ impl> ValueMap { value_to_update.add(measurement); return; } else { - // Then try sorted order. + // Then try sorted order. let sorted_attrs = AttributeSet::from(attrs).into_vec(); if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { value_to_update.add(measurement); @@ -81,15 +81,14 @@ impl> ValueMap { // Insert original order values.insert(attrs.to_vec(), new_value.clone()); - + // Insert sorted order values.insert(sorted_attrs, new_value); self.count.fetch_add(1, Ordering::Release); - } else if let Some(overflow_value) = values.get_mut(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) - { + { overflow_value.add(measurement); return; } else { @@ -150,7 +149,7 @@ impl> Sum { s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; s_data.data_points.clear(); - + // Max number of data points need to account for the special casing // of the no attribute value + overflow attribute. let n = self.value_map.count.load(Ordering::Relaxed) + 2; @@ -160,7 +159,7 @@ impl> Sum { .reserve_exact(n - s_data.data_points.capacity()); } - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); if self .value_map .has_no_value_attribute_value From f0fe9bd1df4fcf22e05c18e322d4426b9bed491a Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 12:11:24 -0700 Subject: [PATCH 03/10] unit test and better atomicity model --- opentelemetry-sdk/src/metrics/internal/sum.rs | 4 +- opentelemetry-sdk/src/metrics/mod.rs | 56 +++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 37a5f33ea9..57098440ea 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -74,7 +74,7 @@ impl> ValueMap { } else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { value_to_update.add(measurement); return; - } else if is_under_cardinality_limit(self.count.load(Ordering::Acquire)) { + } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { let new_value = T::new_atomic_tracker(); new_value.add(measurement); let new_value = Arc::new(new_value); @@ -85,7 +85,7 @@ impl> ValueMap { // Insert sorted order values.insert(sorted_attrs, new_value); - self.count.fetch_add(1, Ordering::Release); + self.count.fetch_add(1, Ordering::SeqCst); } else if let Some(overflow_value) = values.get_mut(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 2a65b83e9c..5141fd1cfb 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -925,6 +925,62 @@ mod tests { assert_eq!(data_point1.value, 6); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_attribute_ordering() { + // Run this test with stdout enabled to see output. + // cargo test counter_aggregation_attribute_ordering --features=testing -- --nocapture + + // Arrange + let mut test_context = TestContext::new(Temporality::Delta); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Act + // Add the same set of attributes in different order. (they are expected + // to be treated as same attributes) + // start with unsorted order + + let attribute_values = [ + "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", + "value10", + ]; + let mut rng = rngs::SmallRng::from_entropy(); + + for _ in 0..1000000 { + let mut rands:[usize; 4] = [0; 4]; + rands[0] = rng.gen_range(0..4); + rands[1] = rng.gen_range(0..4); + rands[2] = rng.gen_range(0..10); + rands[3] = rng.gen_range(0..10); + + + let index_first_attribute = rands[0]; + let index_second_attribute = rands[1]; + let index_third_attribute = rands[2]; + let index_fourth_attribute = rands[3]; + counter.add( + 1, + &[ + KeyValue::new("attribute1", attribute_values[index_first_attribute]), + KeyValue::new("attribute2", attribute_values[index_second_attribute]), + KeyValue::new("attribute3", attribute_values[index_third_attribute]), + KeyValue::new("attribute4", attribute_values[index_fourth_attribute]), + ], + ); + } + + test_context.flush_metrics(); + + let sum = test_context.get_aggregation::>("my_counter", None); + + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1600); + + // find and validate key1=value2 datapoint + let data_point1 = find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true"); + + assert!(data_point1.is_none()); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn no_attr_cumulative_counter() { let mut test_context = TestContext::new(Temporality::Cumulative); From 05dc0b2f5f986a834b44a074f606bef3b03829a5 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Tue, 6 Aug 2024 19:35:04 +0000 Subject: [PATCH 04/10] Fix benchmarks --- opentelemetry-sdk/benches/metric_counter.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/opentelemetry-sdk/benches/metric_counter.rs b/opentelemetry-sdk/benches/metric_counter.rs index a3dda396ca..20b5158ffe 100644 --- a/opentelemetry-sdk/benches/metric_counter.rs +++ b/opentelemetry-sdk/benches/metric_counter.rs @@ -32,13 +32,13 @@ thread_local! { // Run this benchmark with: // cargo bench --bench metric_counter -fn create_counter() -> Counter { +fn create_counter(name: &'static str) -> Counter { let meter_provider: SdkMeterProvider = SdkMeterProvider::builder() .with_reader(ManualReader::builder().build()) .build(); let meter = meter_provider.meter("benchmarks"); - meter.u64_counter("counter_bench").init() + meter.u64_counter(name).init() } fn criterion_benchmark(c: &mut Criterion) { @@ -51,8 +51,8 @@ fn counter_add(c: &mut Criterion) { "value10", ]; - let counter = create_counter(); c.bench_function("Counter_Add_Sorted", |b| { + let counter = create_counter("Counter_Add_Sorted"); b.iter(|| { // 4*4*10*10 = 1600 time series. let rands = CURRENT_RNG.with(|rng| { @@ -81,6 +81,7 @@ fn counter_add(c: &mut Criterion) { }); c.bench_function("Counter_Add_Unsorted", |b| { + let counter = create_counter("Counter_Add_Unsorted"); b.iter(|| { // 4*4*10*10 = 1600 time series. let rands = CURRENT_RNG.with(|rng| { @@ -108,11 +109,13 @@ fn counter_add(c: &mut Criterion) { }); }); - // Cause overflow. - for v in 0..2001 { - counter.add(100, &[KeyValue::new("A", v.to_string())]); - } c.bench_function("Counter_Overflow", |b| { + let counter = create_counter("Counter_Overflow"); + // Cause overflow. + for v in 0..2001 { + counter.add(100, &[KeyValue::new("A", v.to_string())]); + } + b.iter(|| { // 4*4*10*10 = 1600 time series. let rands = CURRENT_RNG.with(|rng| { From 3bc078389785dfb798ad5d7cb5fba53afc192591 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 22:01:48 +0000 Subject: [PATCH 05/10] update results --- opentelemetry-sdk/benches/metric_counter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/benches/metric_counter.rs b/opentelemetry-sdk/benches/metric_counter.rs index 8ae8a34f6f..34afa4defc 100644 --- a/opentelemetry-sdk/benches/metric_counter.rs +++ b/opentelemetry-sdk/benches/metric_counter.rs @@ -6,9 +6,9 @@ RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | Counter_Add_Sorted | 560 ns | - | Counter_Add_Unsorted | 565 ns | - | Counter_Overflow | 568 ns | + | Counter_Add_Sorted | 193 ns | + | Counter_Add_Unsorted | 209 ns | + | Counter_Overflow | 898 ns | | ThreadLocal_Random_Generator_5 | 37 ns | */ From f4d44ca70ea4335697c9805d2001f5a2404b6dc2 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 22:14:37 +0000 Subject: [PATCH 06/10] fix formats --- opentelemetry-sdk/src/metrics/mod.rs | 16 ++++++++-------- stress/src/metrics_counter.rs | 3 +++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 5141fd1cfb..5c8dc1dd91 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -940,19 +940,18 @@ mod tests { // start with unsorted order let attribute_values = [ - "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", - "value10", + "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", + "value9", "value10", ]; let mut rng = rngs::SmallRng::from_entropy(); for _ in 0..1000000 { - let mut rands:[usize; 4] = [0; 4]; + let mut rands: [usize; 4] = [0; 4]; rands[0] = rng.gen_range(0..4); rands[1] = rng.gen_range(0..4); rands[2] = rng.gen_range(0..10); rands[3] = rng.gen_range(0..10); - - + let index_first_attribute = rands[0]; let index_second_attribute = rands[1]; let index_third_attribute = rands[2]; @@ -967,7 +966,7 @@ mod tests { ], ); } - + test_context.flush_metrics(); let sum = test_context.get_aggregation::>("my_counter", None); @@ -976,8 +975,9 @@ mod tests { assert_eq!(sum.data_points.len(), 1600); // find and validate key1=value2 datapoint - let data_point1 = find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true"); - + let data_point1 = + find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true"); + assert!(data_point1.is_none()); } diff --git a/stress/src/metrics_counter.rs b/stress/src/metrics_counter.rs index 6cde5ed9cb..452907f2bf 100644 --- a/stress/src/metrics_counter.rs +++ b/stress/src/metrics_counter.rs @@ -4,6 +4,9 @@ Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, RAM: 64.0 GB ~9.5 M /sec + + Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + ~20 M /sec */ use lazy_static::lazy_static; From a28c4a05a7663f459c21f413ea9fa6b4a4e90a56 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:53:25 +0000 Subject: [PATCH 07/10] PR comment --- opentelemetry-sdk/benches/metric.rs | 82 ++++--------------- opentelemetry-sdk/src/metrics/internal/sum.rs | 54 ++++++------ opentelemetry-sdk/src/metrics/mod.rs | 14 ++-- 3 files changed, 48 insertions(+), 102 deletions(-) diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index d018634e04..a32ee3ef43 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -72,74 +72,20 @@ impl TemporalitySelector for DeltaTemporalitySelector { } } -// * Summary * - -// rustc 1.68.0 (2c8cc3432 2023-03-06) -// cargo 1.68.0 (115f34552 2023-02-26), OS=Windows 11 Enterprise -// Intel(R) Core(TM) i7-8850H CPU @ 2.60GHz 2.59 GHz -// 12 logical and 6 physical cores - -// Counter/AddNoAttrs time: [65.406 ns 65.535 ns 65.675 ns] -// Counter/AddNoAttrsDelta time: [65.553 ns 65.761 ns 65.981 ns] -// Counter/AddOneAttr time: [341.55 ns 344.40 ns 347.58 ns] -// Counter/AddOneAttrDelta time: [340.11 ns 342.42 ns 344.89 ns] -// Counter/AddThreeAttr time: [619.01 ns 624.16 ns 630.16 ns] -// Counter/AddThreeAttrDelta -// time: [606.71 ns 611.45 ns 616.66 ns] -// Counter/AddFiveAttr time: [3.7551 µs 3.7813 µs 3.8094 µs] -// Counter/AddFiveAttrDelta -// time: [3.7550 µs 3.7870 µs 3.8266 µs] -// Counter/AddTenAttr time: [4.7684 µs 4.7909 µs 4.8146 µs] -// Counter/AddTenAttrDelta time: [4.7682 µs 4.8152 µs 4.8722 µs] -// Counter/AddInvalidAttr time: [469.31 ns 472.97 ns 476.92 ns] -// Counter/AddSingleUseAttrs -// time: [749.15 ns 805.09 ns 868.03 ns] -// Counter/AddSingleUseInvalid -// time: [693.75 ns 702.65 ns 713.20 ns] -// Counter/AddSingleUseFiltered -// time: [677.00 ns 681.63 ns 686.88 ns] -// Counter/CollectOneAttr time: [659.29 ns 681.20 ns 708.04 ns] -// Counter/CollectTenAttrs time: [3.5048 µs 3.5384 µs 3.5777 µs] -// Histogram/Record0Attrs10bounds -// time: [75.790 ns 77.235 ns 78.825 ns] -// Histogram/Record3Attrs10bounds -// time: [580.88 ns 603.84 ns 628.71 ns] -// Histogram/Record5Attrs10bounds -// time: [3.8539 µs 3.8988 µs 3.9519 µs] -// Histogram/Record7Attrs10bounds -// time: [699.46 ns 720.17 ns 742.24 ns] -// Histogram/Record10Attrs10bounds -// time: [844.95 ns 861.92 ns 880.23 ns] -// Histogram/Record0Attrs49bounds -// time: [75.198 ns 77.081 ns 79.449 ns] -// Histogram/Record3Attrs49bounds -// time: [533.82 ns 540.44 ns 547.30 ns] -// Histogram/Record5Attrs49bounds -// time: [583.01 ns 588.27 ns 593.98 ns] -// Histogram/Record7Attrs49bounds -// time: [645.67 ns 652.03 ns 658.35 ns] -// Histogram/Record10Attrs49bounds -// time: [747.24 ns 755.42 ns 764.37 ns] -// Histogram/Record0Attrs50bounds -// time: [72.023 ns 72.218 ns 72.426 ns] -// Histogram/Record3Attrs50bounds -// time: [530.21 ns 534.23 ns 538.63 ns] -// Histogram/Record5Attrs50bounds -// time: [3.2934 µs 3.3069 µs 3.3228 µs] -// Histogram/Record7Attrs50bounds -// time: [633.88 ns 638.87 ns 644.52 ns] -// Histogram/Record10Attrs50bounds -// time: [759.69 ns 768.42 ns 778.12 ns] -// Histogram/Record0Attrs1000bounds -// time: [75.495 ns 75.942 ns 76.529 ns] -// Histogram/Record3Attrs1000bounds -// time: [542.06 ns 548.37 ns 555.31 ns] -// Histogram/Record5Attrs1000bounds -// time: [3.2935 µs 3.3058 µs 3.3215 µs] -// Histogram/Record7Attrs1000bounds -// time: [643.75 ns 649.05 ns 655.14 ns] -// Histogram/Record10Attrs1000bounds -// time: [726.87 ns 736.52 ns 747.09 ns] +/* + The benchmark results: + criterion = "0.5.1" + OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2) + Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + RAM: 64.0 GB + | Test | Average time| + |--------------------------------|-------------| + | Counter_Add_Sorted | 560 ns | + | Counter_Add_Unsorted | 565 ns | + | Counter_Overflow | 568 ns | + | ThreadLocal_Random_Generator_5 | 37 ns | +*/ + fn bench_counter(view: Option>, temporality: &str) -> (SharedReader, Counter) { let rdr = if temporality == "cumulative" { SharedReader(Arc::new(ManualReader::builder().build())) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 57098440ea..1e7f013d4f 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -54,13 +54,11 @@ impl> ValueMap { // Try incoming order first if let Some(value_to_update) = values.get(attrs) { value_to_update.add(measurement); - return; } else { // Then try sorted order. let sorted_attrs = AttributeSet::from(attrs).into_vec(); if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { value_to_update.add(measurement); - return; } else { // Give up the lock, before acquiring write lock. drop(values); @@ -70,10 +68,8 @@ impl> ValueMap { // value. if let Some(value_to_update) = values.get(attrs) { value_to_update.add(measurement); - return; } else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { value_to_update.add(measurement); - return; } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { let new_value = T::new_atomic_tracker(); new_value.add(measurement); @@ -90,7 +86,6 @@ impl> ValueMap { values.get_mut(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { overflow_value.add(measurement); - return; } else { let new_value = T::new_atomic_tracker(); new_value.add(measurement); @@ -152,7 +147,7 @@ impl> Sum { // Max number of data points need to account for the special casing // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::Relaxed) + 2; + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > s_data.data_points.capacity() { s_data .data_points @@ -223,14 +218,11 @@ impl> Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - s_data.data_points.clear(); - - let values = match self.value_map.values.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; + s_data.data_points.clear(); - let n = values.len() + 1; + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > s_data.data_points.capacity() { s_data .data_points @@ -253,6 +245,11 @@ impl> Sum { }); } + let values = match self.value_map.values.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + // TODO: This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -316,14 +313,11 @@ impl> PrecomputedSum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.data_points.clear(); s_data.temporality = Temporality::Delta; - s_data.is_monotonic = self.monotonic; - - let mut values = match self.value_map.values.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; + s_data.is_monotonic = self.monotonic; - let n = values.len() + 1; + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > s_data.data_points.capacity() { s_data .data_points @@ -349,6 +343,11 @@ impl> PrecomputedSum { }); } + let mut values = match self.value_map.values.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + let default = T::default(); for (attrs, value) in values.drain() { let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default); @@ -399,14 +398,11 @@ impl> PrecomputedSum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.data_points.clear(); s_data.temporality = Temporality::Cumulative; - s_data.is_monotonic = self.monotonic; + s_data.is_monotonic = self.monotonic; - let values = match self.value_map.values.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > s_data.data_points.capacity() { s_data .data_points @@ -432,6 +428,10 @@ impl> PrecomputedSum { }); } + let values = match self.value_map.values.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; let default = T::default(); for (attrs, value) in values.iter() { let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 5c8dc1dd91..e3db437406 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -945,13 +945,13 @@ mod tests { ]; let mut rng = rngs::SmallRng::from_entropy(); - for _ in 0..1000000 { + for _ in 0..100000 { let mut rands: [usize; 4] = [0; 4]; + // 4X4X10X10 = 1600 time-series. rands[0] = rng.gen_range(0..4); rands[1] = rng.gen_range(0..4); rands[2] = rng.gen_range(0..10); - rands[3] = rng.gen_range(0..10); - + rands[3] = rng.gen_range(0..10); let index_first_attribute = rands[0]; let index_second_attribute = rands[1]; let index_third_attribute = rands[2]; @@ -971,14 +971,14 @@ mod tests { let sum = test_context.get_aggregation::>("my_counter", None); - // Expecting 1 time-series. + // Expecting 1600 time-series. assert_eq!(sum.data_points.len(), 1600); - // find and validate key1=value2 datapoint - let data_point1 = + // validate that overflow data point is not present. + let overflow_point = find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true"); - assert!(data_point1.is_none()); + assert!(overflow_point.is_none()); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] From 9ee304c4d223e992602c31745a916afbc104487c Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:53:54 +0000 Subject: [PATCH 08/10] fmt --- opentelemetry-sdk/src/metrics/internal/sum.rs | 6 +++--- opentelemetry-sdk/src/metrics/mod.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 1e7f013d4f..ac39a2215e 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -218,7 +218,7 @@ impl> Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - s_data.data_points.clear(); + s_data.data_points.clear(); // Max number of data points need to account for the special casing // of the no attribute value + overflow attribute. @@ -313,7 +313,7 @@ impl> PrecomputedSum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.data_points.clear(); s_data.temporality = Temporality::Delta; - s_data.is_monotonic = self.monotonic; + s_data.is_monotonic = self.monotonic; // Max number of data points need to account for the special casing // of the no attribute value + overflow attribute. @@ -398,7 +398,7 @@ impl> PrecomputedSum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.data_points.clear(); s_data.temporality = Temporality::Cumulative; - s_data.is_monotonic = self.monotonic; + s_data.is_monotonic = self.monotonic; // Max number of data points need to account for the special casing // of the no attribute value + overflow attribute. diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index e3db437406..46904095e8 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -951,7 +951,7 @@ mod tests { rands[0] = rng.gen_range(0..4); rands[1] = rng.gen_range(0..4); rands[2] = rng.gen_range(0..10); - rands[3] = rng.gen_range(0..10); + rands[3] = rng.gen_range(0..10); let index_first_attribute = rands[0]; let index_second_attribute = rands[1]; let index_third_attribute = rands[2]; From dd320fd92112444ebde1e877c43d792a93246204 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:58:00 +0000 Subject: [PATCH 09/10] revert --- opentelemetry-sdk/benches/metric.rs | 82 ++++++++++++++++++++++++----- 1 file changed, 68 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index a32ee3ef43..d018634e04 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -72,20 +72,74 @@ impl TemporalitySelector for DeltaTemporalitySelector { } } -/* - The benchmark results: - criterion = "0.5.1" - OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2) - Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, - RAM: 64.0 GB - | Test | Average time| - |--------------------------------|-------------| - | Counter_Add_Sorted | 560 ns | - | Counter_Add_Unsorted | 565 ns | - | Counter_Overflow | 568 ns | - | ThreadLocal_Random_Generator_5 | 37 ns | -*/ - +// * Summary * + +// rustc 1.68.0 (2c8cc3432 2023-03-06) +// cargo 1.68.0 (115f34552 2023-02-26), OS=Windows 11 Enterprise +// Intel(R) Core(TM) i7-8850H CPU @ 2.60GHz 2.59 GHz +// 12 logical and 6 physical cores + +// Counter/AddNoAttrs time: [65.406 ns 65.535 ns 65.675 ns] +// Counter/AddNoAttrsDelta time: [65.553 ns 65.761 ns 65.981 ns] +// Counter/AddOneAttr time: [341.55 ns 344.40 ns 347.58 ns] +// Counter/AddOneAttrDelta time: [340.11 ns 342.42 ns 344.89 ns] +// Counter/AddThreeAttr time: [619.01 ns 624.16 ns 630.16 ns] +// Counter/AddThreeAttrDelta +// time: [606.71 ns 611.45 ns 616.66 ns] +// Counter/AddFiveAttr time: [3.7551 µs 3.7813 µs 3.8094 µs] +// Counter/AddFiveAttrDelta +// time: [3.7550 µs 3.7870 µs 3.8266 µs] +// Counter/AddTenAttr time: [4.7684 µs 4.7909 µs 4.8146 µs] +// Counter/AddTenAttrDelta time: [4.7682 µs 4.8152 µs 4.8722 µs] +// Counter/AddInvalidAttr time: [469.31 ns 472.97 ns 476.92 ns] +// Counter/AddSingleUseAttrs +// time: [749.15 ns 805.09 ns 868.03 ns] +// Counter/AddSingleUseInvalid +// time: [693.75 ns 702.65 ns 713.20 ns] +// Counter/AddSingleUseFiltered +// time: [677.00 ns 681.63 ns 686.88 ns] +// Counter/CollectOneAttr time: [659.29 ns 681.20 ns 708.04 ns] +// Counter/CollectTenAttrs time: [3.5048 µs 3.5384 µs 3.5777 µs] +// Histogram/Record0Attrs10bounds +// time: [75.790 ns 77.235 ns 78.825 ns] +// Histogram/Record3Attrs10bounds +// time: [580.88 ns 603.84 ns 628.71 ns] +// Histogram/Record5Attrs10bounds +// time: [3.8539 µs 3.8988 µs 3.9519 µs] +// Histogram/Record7Attrs10bounds +// time: [699.46 ns 720.17 ns 742.24 ns] +// Histogram/Record10Attrs10bounds +// time: [844.95 ns 861.92 ns 880.23 ns] +// Histogram/Record0Attrs49bounds +// time: [75.198 ns 77.081 ns 79.449 ns] +// Histogram/Record3Attrs49bounds +// time: [533.82 ns 540.44 ns 547.30 ns] +// Histogram/Record5Attrs49bounds +// time: [583.01 ns 588.27 ns 593.98 ns] +// Histogram/Record7Attrs49bounds +// time: [645.67 ns 652.03 ns 658.35 ns] +// Histogram/Record10Attrs49bounds +// time: [747.24 ns 755.42 ns 764.37 ns] +// Histogram/Record0Attrs50bounds +// time: [72.023 ns 72.218 ns 72.426 ns] +// Histogram/Record3Attrs50bounds +// time: [530.21 ns 534.23 ns 538.63 ns] +// Histogram/Record5Attrs50bounds +// time: [3.2934 µs 3.3069 µs 3.3228 µs] +// Histogram/Record7Attrs50bounds +// time: [633.88 ns 638.87 ns 644.52 ns] +// Histogram/Record10Attrs50bounds +// time: [759.69 ns 768.42 ns 778.12 ns] +// Histogram/Record0Attrs1000bounds +// time: [75.495 ns 75.942 ns 76.529 ns] +// Histogram/Record3Attrs1000bounds +// time: [542.06 ns 548.37 ns 555.31 ns] +// Histogram/Record5Attrs1000bounds +// time: [3.2935 µs 3.3058 µs 3.3215 µs] +// Histogram/Record7Attrs1000bounds +// time: [643.75 ns 649.05 ns 655.14 ns] +// Histogram/Record10Attrs1000bounds +// time: [726.87 ns 736.52 ns 747.09 ns] fn bench_counter(view: Option>, temporality: &str) -> (SharedReader, Counter) { let rdr = if temporality == "cumulative" { SharedReader(Arc::new(ManualReader::builder().build())) From 246ee175297ec8bfd32f1e6e943d9b21663de84b Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 7 Aug 2024 00:18:04 +0000 Subject: [PATCH 10/10] comment fix --- opentelemetry-sdk/src/metrics/internal/sum.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index ac39a2215e..912fbacd58 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -83,7 +83,7 @@ impl> ValueMap { self.count.fetch_add(1, Ordering::SeqCst); } else if let Some(overflow_value) = - values.get_mut(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) + values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { overflow_value.add(measurement); } else { @@ -191,7 +191,7 @@ impl> Sum { if let Ok(mut start) = self.start.lock() { *start = t; } - self.value_map.count.store(0, Ordering::Release); + self.value_map.count.store(0, Ordering::SeqCst); ( s_data.data_points.len(), @@ -367,7 +367,7 @@ impl> PrecomputedSum { if let Ok(mut start) = self.start.lock() { *start = t; } - self.value_map.count.store(0, Ordering::Release); + self.value_map.count.store(0, Ordering::SeqCst); *reported = new_reported; drop(reported); // drop before values guard is dropped