Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 126 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,136 @@ mod last_value;
mod sum;

use core::fmt;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};

use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
use opentelemetry::metrics::MetricsError;
use opentelemetry::{global, KeyValue};

use crate::metrics::AttributeSet;

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// Abstracts the update operation for a measurement.
pub(crate) trait Operation {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T);
}

struct Increment;

impl Operation for Increment {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.add(value);
}
}

struct Assign;

impl Operation for Assign {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.store(value);
}
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
/// updates to the underlying value trackers should be performed.
pub(crate) struct ValueMap<T: Number<T>, O> {
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: T::AtomicTracker,
phantom: PhantomData<O>,
}

impl<T: Number<T>, O> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}
}

impl<T: Number<T>, O> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
phantom: PhantomData,
}
}
}

impl<T: Number<T>, O: Operation> ValueMap<T, O> {
fn measure(&self, measurement: T, attributes: &[KeyValue]) {
if attributes.is_empty() {
O::update_tracker(&self.no_attribute_tracker, measurement);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(T::new_atomic_tracker());
O::update_tracker(&*new_tracker, measurement);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
O::update_tracker(&**overflow_value, measurement);
} else {
let new_tracker = T::new_atomic_tracker();
O::update_tracker(&new_tracker, measurement);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
Expand Down
131 changes: 4 additions & 127 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,137 +1,14 @@
use std::collections::HashSet;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::vec;
use std::{
collections::HashMap,
sync::{Mutex, RwLock},
time::SystemTime,
};
use std::{collections::HashMap, sync::Mutex, time::SystemTime};

use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use crate::metrics::AttributeSet;
use once_cell::sync::Lazy;
use opentelemetry::KeyValue;
use opentelemetry::{global, metrics::MetricsError};

use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number};

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// Abstracts the update operation for a measurement.
trait Operation {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T);
}

struct Increment;

impl Operation for Increment {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.add(value);
}
}

struct Assign;

impl Operation for Assign {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.store(value);
}
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
/// updates to the underlying value trackers should be performed.
struct ValueMap<T: Number<T>, O> {
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: T::AtomicTracker,
phantom: PhantomData<O>,
}

impl<T: Number<T>, O> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}
}

impl<T: Number<T>, O> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
phantom: PhantomData,
}
}
}

impl<T: Number<T>, O: Operation> ValueMap<T, O> {
fn measure(&self, measurement: T, attributes: &[KeyValue]) {
if attributes.is_empty() {
O::update_tracker(&self.no_attribute_tracker, measurement);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(T::new_atomic_tracker());
O::update_tracker(&*new_tracker, measurement);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
O::update_tracker(&**overflow_value, measurement);
} else {
let new_tracker = T::new_atomic_tracker();
O::update_tracker(&new_tracker, measurement);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}
use super::{Assign, Increment, ValueMap};
use super::{AtomicTracker, Number};

/// Summarizes a set of measurements made as their arithmetic sum.
pub(crate) struct Sum<T: Number<T>> {
Expand Down