diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index cabcc84476..e828937948 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -12,11 +12,13 @@ rust-version = "1.65" [dependencies] opentelemetry = { version = "0.22", path = "../opentelemetry/" } opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true } +ahash = { version = "0.8", optional = true } async-std = { workspace = true, features = ["unstable"], optional = true } async-trait = { workspace = true, optional = true } futures-channel = "0.3" futures-executor = { workspace = true } futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] } +hashbrown = { version = "0.14", optional = true } once_cell = { workspace = true } ordered-float = { workspace = true } percent-encoding = { version = "2.0", optional = true } @@ -53,6 +55,9 @@ testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", rt-tokio = ["tokio", "tokio-stream"] rt-tokio-current-thread = ["tokio", "tokio-stream"] rt-async-std = ["async-std"] +# Enable use_hashbrown to improve hashing performance in Metrics aggregation. +# This feature should be used carefully, especially when the key/value pairs in measurement attributes are derived from external or untrusted sources. +use_hashbrown = ["hashbrown", "ahash"] [[bench]] name = "context" diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 83a6b07858..d9cd48489f 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,24 +1,37 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{ - collections::{hash_map::Entry, HashMap}, - sync::Mutex, + sync::{Arc, Mutex}, time::SystemTime, }; use crate::attributes::AttributeSet; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use opentelemetry::{global, metrics::MetricsError}; +use std::hash::{Hash, Hasher}; + +#[cfg(feature = "use_hashbrown")] +use ahash::AHasher; +#[cfg(feature = "use_hashbrown")] +use hashbrown::{hash_map::Entry, HashMap}; + +#[cfg(not(feature = "use_hashbrown"))] +use std::collections::{hash_map::DefaultHasher, hash_map::Entry, HashMap}; use super::{ aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET}, AtomicTracker, Number, }; +const BUCKET_COUNT: usize = 256; +const OVERFLOW_BUCKET_INDEX: usize = BUCKET_COUNT - 1; // Use the last bucket as overflow bucket +type BucketValue = Mutex>>; +type Buckets = Arc<[BucketValue; BUCKET_COUNT]>; /// The storage for sums. struct ValueMap> { - values: Mutex>, + buckets: Buckets, has_no_value_attribute_value: AtomicBool, no_attribute_value: T::AtomicTracker, + total_unique_entries: AtomicUsize, } impl> Default for ValueMap { @@ -29,10 +42,49 @@ impl> Default for ValueMap { impl> ValueMap { fn new() -> Self { + let buckets = std::iter::repeat_with(|| Mutex::new(None)) + .take(BUCKET_COUNT) + .collect::>() + .try_into() + .unwrap(); // this will never fail as Vec length is fixed + ValueMap { - values: Mutex::new(HashMap::new()), + buckets: Arc::new(buckets), has_no_value_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), + total_unique_entries: AtomicUsize::new(0), + } + } + + // Hash function to determine the bucket + fn hash_to_bucket(key: &AttributeSet) -> usize { + #[cfg(not(feature = "use_hashbrown"))] + let mut hasher = DefaultHasher::new(); + #[cfg(feature = "use_hashbrown")] + let mut hasher = AHasher::default(); + + key.hash(&mut hasher); + // Use the 8 least significant bits directly, avoiding the modulus operation. + hasher.finish() as u8 as usize + } + + fn try_increment(&self) -> bool { + loop { + let current = self.total_unique_entries.load(Ordering::Acquire); + if is_under_cardinality_limit(current) { + // Attempt to increment atomically if old value is still current, else retry + match self.total_unique_entries.compare_exchange( + current, + current + 1, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return true, // Increment successful + Err(_) => continue, // Failed to increment due to concurrent modification, retry + } + } else { + return false; // Limit reached, do not increment + } } } } @@ -40,28 +92,65 @@ impl> ValueMap { impl> ValueMap { fn measure(&self, measurement: T, attrs: AttributeSet) { if attrs.is_empty() { + // Directly store measurements with no attributes. self.no_attribute_value.add(measurement); self.has_no_value_attribute_value .store(true, Ordering::Release); - } else if let Ok(mut values) = self.values.lock() { - let size = values.len(); - match values.entry(attrs) { - Entry::Occupied(mut occupied_entry) => { - let sum = occupied_entry.get_mut(); - *sum += measurement; + return; + } + + // Hash attributes to find the corresponding bucket. + let bucket_index = Self::hash_to_bucket(&attrs); + let mut bucket_guard = self.buckets[bucket_index].lock().unwrap(); + if let Some(bucket) = &mut *bucket_guard { + // Fast path: if attributes already exist, just update the value. + if let Some(value) = bucket.get_mut(&attrs) { + *value += measurement; + return; + } + } + + // Attributes not present, release lock to attempt adding them. + drop(bucket_guard); + + // Attempt to first increment the total unique entries if under limit. + let under_limit = self.try_increment(); + + // Determine the bucket index for the attributes + let (bucket_index, attrs) = if under_limit { + (bucket_index, attrs) // the index remains same + } else { + (OVERFLOW_BUCKET_INDEX, STREAM_OVERFLOW_ATTRIBUTE_SET.clone()) + }; + if under_limit { + // Reacquire lock to add new attributes or update existing ones. + let mut final_bucket_guard = self.buckets[bucket_index].lock().unwrap(); + let bucket = final_bucket_guard.get_or_insert_with(HashMap::default); + + // Double check if the attribute is present in bucket. + // This handles the case where another thread might have inserted the key + // while this thread was incrementing the global counter + match bucket.entry(attrs) { + Entry::Vacant(e) => { + // The key still does not exist, insert the new measurement + e.insert(measurement); } - Entry::Vacant(vacant_entry) => { - if is_under_cardinality_limit(size) { - vacant_entry.insert(measurement); - } else { - values - .entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone()) - .and_modify(|val| *val += measurement) - .or_insert(measurement); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into())); - } + Entry::Occupied(mut e) => { + // The key exists, update the measurement + *e.get_mut() += measurement; + // Correct the unique entries count as the thread's increment was redundant + self.total_unique_entries.fetch_sub(1, Ordering::SeqCst); } } + } else { + // Handle overflow cases quietly to avoid log flooding. + let mut oveflow_bucket_guard = self.buckets[bucket_index].lock().unwrap(); + let overflow_bucket = oveflow_bucket_guard.get_or_insert_with(HashMap::default); + + overflow_bucket + .entry(attrs) + .and_modify(|e| *e += measurement) + .or_insert(measurement); } } } @@ -112,16 +201,10 @@ impl> Sum { s_data.is_monotonic = self.monotonic; s_data.data_points.clear(); - let mut values = match self.value_map.values.lock() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); + let total_len: usize = self.value_map.total_unique_entries.load(Ordering::Relaxed) + 1; + if total_len > s_data.data_points.capacity() { + let additional_space_needed = total_len - s_data.data_points.capacity(); + s_data.data_points.reserve_exact(additional_space_needed); } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); @@ -139,14 +222,37 @@ impl> Sum { }); } - for (attrs, value) in values.drain() { - s_data.data_points.push(DataPoint { - attributes: attrs, - start_time: Some(prev_start), - time: Some(t), - value, - exemplars: vec![], - }); + for bucket_mutex in self.value_map.buckets.iter() { + match bucket_mutex.lock() { + Ok(mut locked_bucket) => { + if let Some(ref mut bucket) = *locked_bucket { + for (attrs, value) in bucket.drain() { + // Correctly handle lock acquisition on self.start + let start_time = self.start.lock().map_or_else( + |_| SystemTime::now(), // In case of an error, use SystemTime::now() + |guard| *guard, // In case of success, dereference the guard to get the SystemTime + ); + + s_data.data_points.push(DataPoint { + attributes: attrs, + start_time: Some(start_time), + time: Some(t), + value, + exemplars: vec![], + }); + self.value_map + .total_unique_entries + .fetch_sub(1, Ordering::Relaxed); + } + } + } + Err(e) => { + global::handle_error(MetricsError::Other(format!( + "Failed to acquire lock on bucket due to: {:?}", + e + ))); + } + } } // The delta collection cycle resets. @@ -181,16 +287,10 @@ impl> Sum { s_data.is_monotonic = self.monotonic; s_data.data_points.clear(); - let values = match self.value_map.values.lock() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); + let total_len: usize = self.value_map.total_unique_entries.load(Ordering::Relaxed) + 1; + if total_len > s_data.data_points.capacity() { + let additional_space_needed = total_len - s_data.data_points.capacity(); + s_data.data_points.reserve_exact(additional_space_needed); } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); @@ -213,14 +313,31 @@ impl> Sum { // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - for (attrs, value) in values.iter() { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: *value, - exemplars: vec![], - }); + for bucket_mutex in self.value_map.buckets.iter() { + // Handle potential lock failure gracefully + if let Ok(locked_bucket) = bucket_mutex.lock() { + if let Some(locked_bucket) = &*locked_bucket { + for (attrs, value) in locked_bucket.iter() { + // Handle potential lock failure on self.start and use current time as fallback + let start_time = self.start.lock().map_or_else( + |_| SystemTime::now(), // Use SystemTime::now() as fallback on error + |guard| *guard, // Dereference the guard to get the SystemTime on success + ); + + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(start_time), + time: Some(t), + value: *value, + exemplars: vec![], + }); + } + } + } else { + global::handle_error(MetricsError::Other( + "Failed to acquire lock on a bucket".into(), + )); + } } ( @@ -274,18 +391,13 @@ impl> PrecomputedSum { s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - let mut values = match self.value_map.values.lock() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); + let total_len: usize = self.value_map.total_unique_entries.load(Ordering::Relaxed) + 1; + if total_len > s_data.data_points.capacity() { + let additional_space_needed = total_len - s_data.data_points.capacity(); + s_data.data_points.reserve_exact(additional_space_needed); } - let mut new_reported = HashMap::with_capacity(n); + + let mut new_reported = HashMap::with_capacity(total_len); let mut reported = match self.reported.lock() { Ok(r) => r, Err(_) => return (0, None), @@ -305,19 +417,39 @@ impl> PrecomputedSum { }); } - let default = T::default(); - for (attrs, value) in values.drain() { - let delta = value - *reported.get(&attrs).unwrap_or(&default); - if delta != default { - new_reported.insert(attrs.clone(), value); + for bucket_mutex in self.value_map.buckets.iter() { + match bucket_mutex.lock() { + Ok(mut locked_bucket) => { + if let Some(locked_bucket) = &mut *locked_bucket { + let default = T::default(); + for (attrs, value) in locked_bucket.drain() { + let delta = value - *reported.get(&attrs).unwrap_or(&default); + if delta != default { + new_reported.insert(attrs.clone(), value); + } + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); + self.value_map + .total_unique_entries + .fetch_sub(1, Ordering::Relaxed); + } + } + } + Err(e) => { + // Log or handle the lock acquisition error if necessary + global::handle_error(MetricsError::Other(format!( + "Failed to acquire lock on bucket due to: {:?}", + e + ))); + // Continue to the next bucket if the lock cannot be acquired + continue; + } } - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); } // The delta collection cycle resets. @@ -356,18 +488,13 @@ impl> PrecomputedSum { s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - let values = match self.value_map.values.lock() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); + let total_len: usize = self.value_map.total_unique_entries.load(Ordering::Relaxed) + 1; + if total_len > s_data.data_points.capacity() { + let additional_space_needed = total_len - s_data.data_points.capacity(); + s_data.data_points.reserve_exact(additional_space_needed); } - let mut new_reported = HashMap::with_capacity(n); + + let mut new_reported = HashMap::with_capacity(total_len); let mut reported = match self.reported.lock() { Ok(r) => r, Err(_) => return (0, None), @@ -388,18 +515,37 @@ impl> PrecomputedSum { } let default = T::default(); - for (attrs, value) in values.iter() { - let delta = *value - *reported.get(attrs).unwrap_or(&default); - if delta != default { - new_reported.insert(attrs.clone(), *value); + for bucket_mutex in self.value_map.buckets.iter() { + // Safely attempt to acquire the lock, handling any potential error. + let locked_bucket = match bucket_mutex.lock() { + Ok(bucket) => bucket, + Err(e) => { + // Log the error or handle it as needed. + global::handle_error(MetricsError::Other(format!( + "Failed to acquire lock on bucket due to: {:?}", + e + ))); + continue; // Skip to the next bucket if the lock cannot be acquired. + } + }; + + // Proceed only if the bucket is not empty. + if let Some(locked_bucket) = &*locked_bucket { + for (attrs, value) in locked_bucket.iter() { + let delta = *value - *reported.get(attrs).unwrap_or(&default); + if delta != default { + new_reported.insert(attrs.clone(), *value); + } + + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: *value, // For cumulative, directly use the value without calculating the delta. + exemplars: vec![], + }); + } } - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); } *reported = new_reported; diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 80ff89469b..10a1898a9b 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -163,6 +163,189 @@ mod tests { ); } + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_overflow() { + // Run this test with stdout enabled to see output. + // cargo test counter --features=metrics,testing -- --nocapture + + // Arrange + let exporter = InMemoryMetricsExporter::default(); + // PeriodicReader with large interval to avoid auto-flush + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio) + .with_interval(std::time::Duration::from_secs(100000)) + .build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .init(); + + // sleep for random ~5 milis to avoid recording during first collect cycle + // (TBD: need to fix PeriodicReader to NOT collect data immediately after start) + std::thread::sleep(std::time::Duration::from_millis(5)); + let unique_measurements = 1999; + let overflow_measurements = 4; + // Generate measurements to enforce overflow + for i in 0..unique_measurements + overflow_measurements { + let attribute_value = format!("value{}", i); // Creates a unique attribute value for each measurement + counter.add(1, &[KeyValue::new("key1", attribute_value)]); + } + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + // Every collect cycle produces a new ResourceMetrics (even if no data is collected). + // TBD = This needs to be fixed, and then below assert should validate for one entry + assert!(resource_metrics.len() == 2); + let metric = &resource_metrics[1].scope_metrics[0].metrics[0]; // second ResourceMetrics + assert_eq!(metric.name, "my_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 2000 unique time-series. + assert_eq!(sum.data_points.len(), unique_measurements + 1); // all overflow measurements are merged into one + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!( + sum.temporality, + data::Temporality::Cumulative, + "Should produce cumulative by default." + ); + // ensure that overflow attribute is persent + for data_point in &sum.data_points { + let mut overflow_attribute_present = false; + for attribute in data_point.attributes.iter() { + if attribute.0 == &opentelemetry::Key::from("otel.metric.overflow") { + overflow_attribute_present = true; + break; + } + } + if overflow_attribute_present { + assert_eq!(data_point.value, overflow_measurements as u64); + } else { + assert_eq!(data_point.value, 1); + } + } + } + + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_concurrent_overflow() { + // Run this test with stdout enabled to see output. + // cargo test counter --features=metrics,testing -- --nocapture + + // Arrange + let exporter = InMemoryMetricsExporter::default(); + // PeriodicReader with large interval to avoid auto-flush + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio) + .with_interval(std::time::Duration::from_secs(100000)) + .build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .init(); + + // sleep for random ~5 milis to avoid recording during first collect cycle + // (TBD: need to fix PeriodicReader to NOT collect data immediately after start) + std::thread::sleep(std::time::Duration::from_millis(5)); + + let unique_measurements = 1999; + let overflow_measurements = 1000; + let total_measurements = unique_measurements + overflow_measurements; + + let counter = std::sync::Arc::new(std::sync::Mutex::new(counter)); // Shared counter among threads + + let num_threads = 4; + let measurements_per_thread = total_measurements / num_threads; + let remainder = total_measurements % num_threads; // Remainder to be added to the last thread + + let mut handles = vec![]; + + for thread_id in 0..num_threads { + let counter_clone = std::sync::Arc::clone(&counter); + let start_index = thread_id * measurements_per_thread; + let end_index = if thread_id == num_threads - 1 { + start_index + measurements_per_thread + remainder // Add remainder to the last thread + } else { + start_index + measurements_per_thread + }; + + let handle = std::thread::spawn(move || { + for i in start_index..end_index { + let attribute_value = format!("value{}", i); + let kv = vec![KeyValue::new("key1", attribute_value)]; + + let counter = counter_clone.lock().unwrap(); + counter.add(1, &kv); + } + }); + + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + // Every collect cycle produces a new ResourceMetrics (even if no data is collected). + // TBD = This needs to be fixed, and then below assert should validate for one entry + assert!(resource_metrics.len() == 2); + let metric = &resource_metrics[1].scope_metrics[0].metrics[0]; // second ResourceMetrics + assert_eq!(metric.name, "my_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 2000 unique time-series. + assert_eq!(sum.data_points.len(), unique_measurements + 1); // all overflow measurements are merged into one + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!( + sum.temporality, + data::Temporality::Cumulative, + "Should produce cumulative by default." + ); + + // ensure that overflow attribute is persent + for data_point in &sum.data_points { + let mut overflow_attribute_present = false; + for attribute in data_point.attributes.iter() { + if attribute.0 == &opentelemetry::Key::from("otel.metric.overflow") { + overflow_attribute_present = true; + break; + } + } + if overflow_attribute_present { + assert_eq!(data_point.value, overflow_measurements as u64); + } else { + assert_eq!(data_point.value, 1); + } + } + } + // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! #[tokio::test(flavor = "multi_thread", worker_threads = 1)] diff --git a/stress/Cargo.toml b/stress/Cargo.toml index c693eef984..2c7ded5ed5 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -29,7 +29,7 @@ ctrlc = "3.2.5" lazy_static = "1.4.0" num_cpus = "1.15.0" opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "logs_level_enabled"] } -opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "logs_level_enabled"] } +opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "use_hashbrown", "logs_level_enabled"] } opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"} rand = { version = "0.8.4", features = ["small_rng"] } tracing = { workspace = true, features = ["std"]}