Skip to content

Commit

Permalink
Reduce cost of no attribute counters (#1519)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
  • Loading branch information
KallDrexx and cijothomas committed Feb 15, 2024
1 parent b35bf38 commit 650904f
Show file tree
Hide file tree
Showing 4 changed files with 531 additions and 11 deletions.
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
- `LoggerProviderInner` is no longer `pub (crate)`
- `Logger.provider()` now returns `&LoggerProvider` instead of an `Option<LoggerProvider>`

- [1519](https://github.com/open-telemetry/opentelemetry-rust/pull/1519) Performance improvements
when calling `Counter::add()` and `UpDownCounter::add()` with an empty set of attributes
(e.g. `counter.Add(5, &[])`)

### Fixed

- [#1481](https://github.com/open-telemetry/opentelemetry-rust/pull/1481) Fix error message caused by race condition when using PeriodicReader
Expand Down
173 changes: 173 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,26 @@ mod sum;

use core::fmt;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Mutex;

pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
fn add(&self, value: T);
fn get_value(&self) -> T;
fn get_and_reset_value(&self) -> T;
}

/// Marks a type that can have an atomic tracker generated for it
pub(crate) trait AtomicallyUpdate<T> {
type AtomicTracker: AtomicTracker<T>;
fn new_atomic_tracker() -> Self::AtomicTracker;
}

pub(crate) trait Number<T>:
Add<Output = T>
+ AddAssign
Expand All @@ -23,6 +39,7 @@ pub(crate) trait Number<T>:
+ Send
+ Sync
+ 'static
+ AtomicallyUpdate<T>
{
fn min() -> Self;
fn max() -> Self;
Expand Down Expand Up @@ -71,3 +88,159 @@ impl Number<f64> for f64 {
self
}
}

impl AtomicTracker<u64> for AtomicU64 {
fn add(&self, value: u64) {
self.fetch_add(value, Ordering::Relaxed);
}

fn get_value(&self) -> u64 {
self.load(Ordering::Relaxed)
}

fn get_and_reset_value(&self) -> u64 {
self.swap(0, Ordering::Relaxed)
}
}

impl AtomicallyUpdate<u64> for u64 {
type AtomicTracker = AtomicU64;

fn new_atomic_tracker() -> Self::AtomicTracker {
AtomicU64::new(0)
}
}

impl AtomicTracker<i64> for AtomicI64 {
fn add(&self, value: i64) {
self.fetch_add(value, Ordering::Relaxed);
}

fn get_value(&self) -> i64 {
self.load(Ordering::Relaxed)
}

fn get_and_reset_value(&self) -> i64 {
self.swap(0, Ordering::Relaxed)
}
}

impl AtomicallyUpdate<i64> for i64 {
type AtomicTracker = AtomicI64;

fn new_atomic_tracker() -> Self::AtomicTracker {
AtomicI64::new(0)
}
}

pub(crate) struct F64AtomicTracker {
inner: Mutex<f64>, // Floating points don't have true atomics, so we need to use mutex for them
}

impl F64AtomicTracker {
fn new() -> Self {
F64AtomicTracker {
inner: Mutex::new(0.0),
}
}
}

impl AtomicTracker<f64> for F64AtomicTracker {
fn add(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard += value;
}

fn get_value(&self) -> f64 {
let guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard
}

fn get_and_reset_value(&self) -> f64 {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
let value = *guard;
*guard = 0.0;

value
}
}

impl AtomicallyUpdate<f64> for f64 {
type AtomicTracker = F64AtomicTracker;

fn new_atomic_tracker() -> Self::AtomicTracker {
F64AtomicTracker::new()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn can_add_and_get_u64_atomic_value() {
let atomic = u64::new_atomic_tracker();
atomic.add(15);
atomic.add(10);

let value = atomic.get_value();
assert_eq!(value, 25);
}

#[test]
fn can_reset_u64_atomic_value() {
let atomic = u64::new_atomic_tracker();
atomic.add(15);

let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();

assert_eq!(value, 15, "Incorrect first value");
assert_eq!(value2, 0, "Incorrect second value");
}

#[test]
fn can_add_and_get_i64_atomic_value() {
let atomic = i64::new_atomic_tracker();
atomic.add(15);
atomic.add(-10);

let value = atomic.get_value();
assert_eq!(value, 5);
}

#[test]
fn can_reset_i64_atomic_value() {
let atomic = i64::new_atomic_tracker();
atomic.add(15);

let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();

assert_eq!(value, 15, "Incorrect first value");
assert_eq!(value2, 0, "Incorrect second value");
}

#[test]
fn can_add_and_get_f64_atomic_value() {
let atomic = f64::new_atomic_tracker();
atomic.add(15.3);
atomic.add(10.4);

let value = atomic.get_value();

assert!(f64::abs(25.7 - value) < 0.0001);
}

#[test]
fn can_reset_f64_atomic_value() {
let atomic = f64::new_atomic_tracker();
atomic.add(15.5);

let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();

assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value");
assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value");
}
}
Loading

0 comments on commit 650904f

Please sign in to comment.