Skip to content

Commit

Permalink
use static vector at first level hash
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Feb 23, 2024
1 parent 75c853e commit fd858f6
Showing 1 changed file with 71 additions and 62 deletions.
133 changes: 71 additions & 62 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{

/// The storage for sums.
struct ValueMap<T: Number<T>> {
buckets: Arc<HashMap<u8, Mutex<HashMap<AttributeSet, T>>>>,
buckets: Arc<[Mutex<Option<HashMap<AttributeSet, T>>>; 256]>,

Check failure on line 21 in opentelemetry-sdk/src/metrics/internal/sum.rs

View workflow job for this annotation

GitHub Actions / lint

very complex type used. Consider factoring parts into `type` definitions
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
total_count: AtomicUsize,
Expand All @@ -32,12 +32,14 @@ impl<T: Number<T>> Default for ValueMap<T> {

impl<T: Number<T>> ValueMap<T> {
fn new() -> Self {
let mut outer_map = HashMap::new();
for i in 0..=255 {
outer_map.insert(i, Mutex::new(HashMap::new()));
}
let buckets = std::iter::repeat_with(|| Mutex::new(None))
.take(256)
.collect::<Vec<_>>()
.try_into()
.unwrap_or_else(|_| panic!("Incorrect length"));

ValueMap {
buckets: Arc::new(outer_map),
buckets: Arc::new(buckets),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
total_count: AtomicUsize::new(0),
Expand All @@ -48,7 +50,8 @@ impl<T: Number<T>> ValueMap<T> {
fn hash_to_bucket(key: &AttributeSet) -> u8 {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() % 256) as u8
// Use the 8 least significant bits directly, avoiding the modulus operation.
hasher.finish() as u8
}
}

Expand All @@ -59,9 +62,15 @@ impl<T: Number<T>> ValueMap<T> {
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else {
let bucket_key = Self::hash_to_bucket(&attrs);
if let Some(bucket) = self.buckets.get(&bucket_key) {
let mut values = bucket.lock().unwrap();
let bucket_index = Self::hash_to_bucket(&attrs) as usize; // Ensure index is usize for array indexing
let bucket_mutex = &self.buckets[bucket_index];
let mut bucket_guard = bucket_mutex.lock().unwrap();

if bucket_guard.is_none() {
*bucket_guard = Some(HashMap::new()); // Initialize the bucket if it's None
}

if let Some(ref mut values) = *bucket_guard {
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
Expand Down Expand Up @@ -158,18 +167,19 @@ impl<T: Number<T>> Sum<T> {
});
}

for (_, bucket) in self.value_map.buckets.iter() {
let mut locked_bucket = bucket.lock().unwrap();
for (attrs, value) in locked_bucket.drain() {
s_data.data_points.push(DataPoint {
attributes: attrs,
start_time: Some(*self.start.lock().unwrap()),
time: Some(t),
value,
exemplars: vec![],
});
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() {
for (attrs, value) in locked_bucket.drain() {
s_data.data_points.push(DataPoint {
attributes: attrs,
start_time: Some(*self.start.lock().unwrap()),
time: Some(t),
value,
exemplars: vec![],
});
}
// The bucket is automatically cleared by the .drain() method
}
// The bucket is automatically cleared by the .drain() method
}

// The delta collection cycle resets.
Expand Down Expand Up @@ -231,16 +241,17 @@ impl<T: Number<T>> Sum<T> {
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
for (_, bucket) in self.value_map.buckets.iter() {
let locked_bucket = bucket.lock().unwrap();
for (attrs, value) in locked_bucket.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(*self.start.lock().unwrap()), // Consider last reset time
time: Some(t),
value: *value,
exemplars: vec![],
});
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() {
for (attrs, value) in locked_bucket.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(*self.start.lock().unwrap()), // Consider last reset time
time: Some(t),
value: *value,
exemplars: vec![],
});
}
}
}

Expand Down Expand Up @@ -322,24 +333,22 @@ impl<T: Number<T>> PrecomputedSum<T> {
});
}

// Iterating through each bucket to aggregate and drain data points
for (_, bucket) in self.value_map.buckets.iter() {
let mut locked_bucket = bucket.lock().unwrap();

// Drain operation to move out values from the bucket
let default = T::default();
for (attrs, value) in locked_bucket.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() {
let default = T::default();
for (attrs, value) in locked_bucket.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}
}

Expand Down Expand Up @@ -406,21 +415,21 @@ impl<T: Number<T>> PrecomputedSum<T> {
}

let default = T::default();
for (_, bucket) in self.value_map.buckets.iter() {
let locked_bucket = bucket.lock().unwrap();

for (attrs, value) in locked_bucket.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() {
for (attrs, value) in locked_bucket.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: *value, // For cumulative, we use the value directly without calculating delta
exemplars: vec![],
});
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: *value, // For cumulative, we use the value directly without calculating delta
exemplars: vec![],
});
}
}

Expand Down

0 comments on commit fd858f6

Please sign in to comment.