Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{Arc, Weak};

use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use opentelemetry::{
global,
metrics::{Counter, Histogram, MeterProvider as _, Result},
Key, KeyValue,
};
Expand Down Expand Up @@ -344,7 +345,7 @@ fn counters(c: &mut Criterion) {
const MAX_BOUND: usize = 100000;

fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram<u64>) {
let mut bounds = vec![0; bound_count];
let mut bounds: Vec<usize> = vec![0; bound_count];
#[allow(clippy::needless_range_loop)]
for i in 0..bounds.len() {
bounds[i] = i * MAX_BOUND / bound_count
Expand Down Expand Up @@ -394,22 +395,35 @@ fn histograms(c: &mut Criterion) {
);
}
}
group.bench_function("CollectOne", |b| benchmark_collect_histogram(b, 1));
group.bench_function("CollectFive", |b| benchmark_collect_histogram(b, 5));
group.bench_function("CollectTen", |b| benchmark_collect_histogram(b, 10));
group.bench_function("CollectTwentyFive", |b| benchmark_collect_histogram(b, 25));
for metrics_size in [1, 5, 20] {
for attr_sets in [1, 5, 20, 200] {
group.bench_function(
format!("Collect{metrics_size}Metric{attr_sets}AttrSets"),
|b| benchmark_collect_histogram(b, metrics_size, attr_sets),
);
}
}
}

fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
fn benchmark_collect_histogram(b: &mut Bencher, metrics_size: usize, attr_sets: usize) {
let r = SharedReader(Arc::new(ManualReader::default()));
let mtr = SdkMeterProvider::builder()
.with_reader(r.clone())
.build()
.meter("sdk/metric/bench/histogram");

for i in 0..n {
let h = mtr.u64_histogram(format!("fake_data_{i}")).init();
h.record(1, &[]);
let provider = SdkMeterProvider::builder().with_reader(r.clone()).build();
let mtr = provider.meter("sdk/metric/bench/histogram");
global::set_meter_provider(provider);

let mut rng = rand::thread_rng();
for m in 0..metrics_size {
let h = mtr.u64_histogram(format!("fake_data_{m}")).init();
for _att in 0..attr_sets {
let mut attributes: Vec<KeyValue> = Vec::new();
for _i in 0..rng.gen_range(0..3) {
attributes.push(KeyValue::new(
format!("K{}", rng.gen_range::<i32, _>(0..10)),
format!("V{}", rng.gen_range::<i32, _>(0..10)),
))
}
h.record(1, &attributes)
}
}

let mut rm = ResourceMetrics {
Expand All @@ -418,8 +432,8 @@ fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
};

b.iter(|| {
let _ = r.collect(&mut rm);
assert_eq!(rm.scope_metrics[0].metrics.len(), n);
r.collect(&mut rm).unwrap();
assert_eq!(rm.scope_metrics[0].metrics.len(), metrics_size);
})
}

Expand Down
183 changes: 183 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use std::{
collections::HashMap,
fmt::Debug,
ops::Deref,
sync::{Arc, Mutex, RwLock},
};

use opentelemetry::{global, metrics::MetricsError, KeyValue};

use crate::metrics::AttributeSet;

use super::{
aggregate::is_under_cardinality_limit, Number, STREAM_OVERFLOW_ATTRIBUTES,
STREAM_OVERFLOW_ATTRIBUTES_ERR,
};

/// Aggregator interface
pub(crate) trait Aggregator<T>: Debug + Clone
where
T: Number,
{
/// A static configuration that is needed by configurators.
/// E.g. bucket_size at creation time and buckets list at aggregator update.
type Config;

/// Called everytime a new attribute-set is stored.
fn create(init: &Self::Config) -> Self;

/// Called for each measurement.
fn update(&mut self, config: &Self::Config, measurement: T);
}

/// hashing and sorting is expensive, so we have two lists
/// sorted list is mainly needed for fast collection phase
struct WithAttribsAggregators<A> {
// put all attribute combinations in this list
all: HashMap<Vec<KeyValue>, Arc<Mutex<A>>>,
sorted: HashMap<Vec<KeyValue>, Arc<Mutex<A>>>,
}

/// This class is responsible for two things:
/// * send measurement information for specific aggregator (per attribute-set)
/// * collect all attribute-sets + aggregators (either readonly OR reset)
///
/// Even though it's simple to understand it's responsibility,
/// implementation is a lot more complex to make it very performant.
pub(crate) struct AttributeSetAggregation<T, A>
where
T: Number,
A: Aggregator<T>,
{
/// Aggregator for values with no attributes attached.
no_attribs: Mutex<Option<A>>,
list: RwLock<WithAttribsAggregators<A>>,
/// Configuration required to create and update the [`Aggregator`]
config: A::Config,
}

impl<T, A> AttributeSetAggregation<T, A>
where
T: Number,
A: Aggregator<T>,
{
/// Initiate aggregators by specifing [`Aggregator`] configuration.
pub(crate) fn new(init_data: A::Config) -> Self {
Self {
no_attribs: Mutex::new(None),
list: RwLock::new(WithAttribsAggregators {
all: Default::default(),
sorted: Default::default(),
}),
config: init_data,
}
}

/// Update specific aggregator depending on provided attributes.
pub(crate) fn measure(&self, attrs: &[KeyValue], measurement: T) {
if attrs.is_empty() {
if let Ok(mut aggr) = self.no_attribs.lock() {
aggr.get_or_insert_with(|| A::create(&self.config))
.update(&self.config, measurement);
}
return;
}
let Ok(list) = self.list.read() else {
return;
};
if let Some(aggr) = list.all.get(attrs) {
if let Ok(mut aggr) = aggr.lock() {
aggr.update(&self.config, measurement);
}
return;
}
drop(list);
let Ok(mut list) = self.list.write() else {
return;
};

// Recheck again in case another thread already inserted
if let Some(aggr) = list.all.get(attrs) {
if let Ok(mut aggr) = aggr.lock() {
aggr.update(&self.config, measurement);
}
} else if is_under_cardinality_limit(list.all.len()) {
let mut aggr = A::create(&self.config);
aggr.update(&self.config, measurement);
let aggr = Arc::new(Mutex::new(aggr));
list.all.insert(attrs.into(), aggr.clone());
let sorted_attribs = AttributeSet::from(attrs).into_vec();
list.sorted.insert(sorted_attribs, aggr);
} else if let Some(aggr) = list.sorted.get(STREAM_OVERFLOW_ATTRIBUTES.deref()) {
if let Ok(mut aggr) = aggr.lock() {
aggr.update(&self.config, measurement);
}
} else {
let mut aggr = A::create(&self.config);
aggr.update(&self.config, measurement);
list.sorted.insert(
STREAM_OVERFLOW_ATTRIBUTES.clone(),
Arc::new(Mutex::new(aggr)),
);
global::handle_error(MetricsError::Other(STREAM_OVERFLOW_ATTRIBUTES_ERR.into()));
}
}

/// Iterate through all attribute sets and populate `DataPoints`in readonly mode.
pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
let Ok(list) = self.list.read() else {
return;
};
prepare_data(dest, list.sorted.len());
if let Ok(aggr) = self.no_attribs.lock() {
if let Some(aggr) = aggr.deref() {
dest.push(map_fn(Default::default(), aggr.clone()));
}
};
dest.extend(
list.sorted
.iter()
.filter_map(|(k, v)| v.lock().ok().map(|v| map_fn(k.clone(), v.clone()))),
)
}

/// Iterate through all attribute sets and populate `DataPoints`, while also consuming (reseting) aggregators
pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
let Ok(mut list) = self.list.write() else {
return;
};
prepare_data(dest, list.sorted.len());
if let Ok(mut aggr) = self.no_attribs.lock() {
if let Some(aggr) = aggr.take() {
dest.push(map_fn(Default::default(), aggr));
}
};
list.all.clear();
dest.extend(list.sorted.drain().filter_map(|(k, v)| {
Arc::try_unwrap(v)
.expect("this is last instance, so we cannot fail to get it")
.into_inner()
.ok()
.map(|v| map_fn(k, v))
}));
}

pub(crate) fn config(&self) -> &A::Config {
&self.config
}
}

/// Clear and allocate exactly required amount of space for all attribute-sets
fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
data.clear();
let total_len = list_len + 1; // to account for no_attributes case
if total_len > data.capacity() {
data.reserve_exact(total_len - data.capacity());
}
}
Loading