Skip to content

Commit

Permalink
fix count and gauge aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 5427107 commit c271481
Showing 1 changed file with 117 additions and 23 deletions.
140 changes: 117 additions & 23 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ impl LocalDrain {
}
}

fn tree(&mut self, is_backend: bool) -> &mut sled::Tree {
if is_backend {
&mut self.backend_tree
} else {
&mut self.cluster_tree
}
}

pub fn dump_metrics_data(&mut self) -> MetricsData {
MetricsData {
proxy: self.dump_process_data(),
Expand Down Expand Up @@ -546,9 +554,28 @@ impl LocalDrain {
self.cluster_tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
}

// we change the minute, aggregate the 60 measurements from the last minute
if now.second() == 0 {
self.aggregate_gauge(key, now, is_backend)?;
// aggregate at the last hour
let second = now.second();
if second != 0 {
let previous_minute = now - time::Duration::seconds(second as i64);
let timestamp = previous_minute.unix_timestamp();

let complete_key = format!("{}\t{}", key, timestamp);

if !self.tree(is_backend).contains_key(complete_key.as_bytes())? {
self.tree(is_backend).insert(complete_key.as_bytes(), &i.to_le_bytes())?;
}

let minute = previous_minute.minute();
if minute != 0 {
let previous_hour = now - time::Duration::minutes(minute as i64);
let timestamp = previous_hour.unix_timestamp();

let complete_key = format!("{}\t{}", key, timestamp);
if !self.tree(is_backend).contains_key(complete_key.as_bytes())? {
self.tree(is_backend).insert(complete_key.as_bytes(), &i.to_le_bytes())?;
}
}
}

Ok(())
Expand All @@ -558,28 +585,65 @@ impl LocalDrain {
let now = OffsetDateTime::now_utc();
let timestamp = now.unix_timestamp();
let complete_key = format!("{}\t{}", key, timestamp);
let one_minute_ago = format!("{}\t{}", key, timestamp - 60);

let tree = if is_backend {
&mut self.backend_tree
} else {
&mut self.cluster_tree
};
info!("add gauge at {} -> {}", complete_key, i);
match tree.range(one_minute_ago.as_bytes()..=complete_key.as_bytes()).rev().next() {
None => {
tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
let value = match self.tree(is_backend).get(complete_key.as_bytes())? {
Some(v) => i64::from_le_bytes((*v).try_into().unwrap()),
// start from the last known value, or zero
None => match self.get_last_before(key, &complete_key, is_backend)? {
None => 0i64,
Some(v) => {
i64::from_le_bytes((*v).try_into().unwrap())
}
},
Some(Err(e)) => return Err(e),
Some(Ok((_, v))) => {
let i2 = i64::from_le_bytes((*v).try_into().unwrap());
tree.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes())?;
}
};

// we change the minute, aggregate the 60 measurements from the last minute
if now.second() == 0 {
self.aggregate_gauge(key, now, is_backend)?;
let new_value = value + i;
self.tree(is_backend).insert(complete_key.as_bytes(), &new_value.to_le_bytes())?;

// aggregate at the last hour
let second = now.second();
if second != 0 {
let previous_minute = now - time::Duration::seconds(second as i64);
let timestamp = previous_minute.unix_timestamp();

let complete_key = format!("{}\t{}", key, timestamp);

let value = match self.tree(is_backend).get(complete_key.as_bytes())? {
Some(v) => i64::from_le_bytes((*v).try_into().unwrap()),
// start from the last known value, or zero
None => match self.get_last_before(key, &complete_key, is_backend)? {
None => 0i64,
Some(v) => {
i64::from_le_bytes((*v).try_into().unwrap())
}
},
};

let new_value = value + i;
self.tree(is_backend).insert(complete_key.as_bytes(), &new_value.to_le_bytes())?;

let minute = previous_minute.minute();
if minute != 0 {
let previous_hour = now - time::Duration::minutes(minute as i64);
let timestamp = previous_hour.unix_timestamp();

let complete_key = format!("{}\t{}", key, timestamp);

let value = match self.tree(is_backend).get(complete_key.as_bytes())? {
Some(v) => i64::from_le_bytes((*v).try_into().unwrap()),
// start from the last known value, or zero
None => match self.get_last_before(key, &complete_key, is_backend)? {
None => 0i64,
Some(v) => {
i64::from_le_bytes((*v).try_into().unwrap())
}
},
};

let new_value = value + i;
self.tree(is_backend).insert(complete_key.as_bytes(), &new_value.to_le_bytes())?;
}
}

Ok(())
Expand All @@ -606,9 +670,39 @@ impl LocalDrain {
}
};

// we change the minute, aggregate the 60 measurements from the last minute
if now.second() == 0 {
self.aggregate_count(key, now, is_backend)?;
// aggregate at the last hour
let second = now.second();
if second != 0 {
let previous_minute = now - time::Duration::seconds(second as i64);
let timestamp = previous_minute.unix_timestamp();

let complete_key = format!("{}\t{}", key, timestamp);
match tree.get(complete_key.as_bytes())? {
None => {
tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
},
Some(v) => {
let i2 = i64::from_le_bytes((*v).try_into().unwrap());
tree.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes())?;
}
};

let minute = previous_minute.minute();
if minute != 0 {
let previous_hour = now - time::Duration::minutes(minute as i64);
let timestamp = previous_hour.unix_timestamp();

let complete_key = format!("{}\t{}", key, timestamp);
match tree.get(complete_key.as_bytes())? {
None => {
tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
},
Some(v) => {
let i2 = i64::from_le_bytes((*v).try_into().unwrap());
tree.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes())?;
}
};
}
}

Ok(())
Expand Down

0 comments on commit c271481

Please sign in to comment.