Skip to content

Commit

Permalink
no need to aggregate data when clearing
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent c271481 commit 7af7a98
Showing 1 changed file with 6 additions and 41 deletions.
47 changes: 6 additions & 41 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ impl LocalDrain {
Ok(())
}

fn aggregate_gauge(&mut self, key: &str, now: OffsetDateTime, is_backend: bool) -> Result<(), sled::Error> {
fn clear_gauge(&mut self, key: &str, now: OffsetDateTime, is_backend: bool) -> Result<(), sled::Error> {
let timestamp = now.unix_timestamp();
let one_hour_ago = format!("{}\t{}", key, timestamp - 3600);
let one_minute_ago = format!("{}\t{}", key, timestamp - 60);
Expand All @@ -721,48 +721,33 @@ impl LocalDrain {
};

// aggregate 60 measures in a point at the last minute
let mut value = None;
for res in tree.range(one_minute_ago.as_bytes()..now_key.as_bytes()) {
let (k, v) = res?;
value = Some(usize::from_le_bytes((*v).try_into().unwrap()));
info!("removing {} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
tree.remove(k)?;
}

if let Some(v) = value {
info!("reinserting {} -> {:?}", one_minute_ago, v);
tree.insert(one_minute_ago.as_bytes(), &v.to_le_bytes())?;
}

// aggregate 60 measures in a point at the last hour
if now.minute() == 0 {
let mut value = None;
for res in tree.range(one_hour_ago.as_bytes()..one_minute_ago.as_bytes()) {
let (k, v) = res?;
value = Some(usize::from_le_bytes((*v).try_into().unwrap()));
info!("removing {} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
tree.remove(k)?;
}

if let Some(v) = value {
info!("reinserting {} -> {:?}", one_hour_ago, v);
tree.insert(one_minute_ago.as_bytes(), &v.to_le_bytes())?;
}

// remove all measures older than 24h
let one_day_ago = format!("{}\t{}", key, timestamp - 3600 * 24);
for res in tree.range(key.as_bytes()..one_day_ago.as_bytes()) {
let (k, v) = res?;
value = Some(usize::from_le_bytes((*v).try_into().unwrap()));
info!("removing {} -> {:?} (more than 24h)", unsafe { std::str::from_utf8_unchecked(&k) }, value);
info!("removing {} -> {:?} (more than 24h)", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
tree.remove(k)?;
}
}

Ok(())
}

fn aggregate_count(&mut self, key: &str, now: OffsetDateTime, is_backend: bool) -> Result<(), sled::Error> {
fn clear_count(&mut self, key: &str, now: OffsetDateTime, is_backend: bool) -> Result<(), sled::Error> {
let timestamp = now.unix_timestamp();
let one_hour_ago = format!("{}\t{}", key, timestamp - 3600);
let one_minute_ago = format!("{}\t{}", key, timestamp - 60);
Expand All @@ -775,44 +760,25 @@ impl LocalDrain {
};

// aggregate 60 measures in a point at the last hour
let mut value = 0i64;
let mut found = false;
for res in tree.range(one_minute_ago.as_bytes()..now_key.as_bytes()) {
found = true;
let (k, v) = res?;
value += i64::from_le_bytes((*v).try_into().unwrap());
info!("removing {} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
tree.remove(k)?;
}

if found {
info!("reinserting {} -> {:?}", one_minute_ago, value);
tree.insert(one_minute_ago.as_bytes(), &value.to_le_bytes())?;
}

// remove all measures older than 24h
if now.minute() == 0 {
let mut value = 0i64;
let mut found = false;
for res in tree.range(one_hour_ago.as_bytes()..one_minute_ago.as_bytes()) {
found = true;
let (k, v) = res?;
value += i64::from_le_bytes((*v).try_into().unwrap());
info!("removing {} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
tree.remove(k)?;
}

if found {
info!("reinserting {} -> {:?}", one_hour_ago, value);
tree.insert(one_hour_ago.as_bytes(), &value.to_le_bytes())?;
}

// remove all measures older than 24h
let one_day_ago = format!("{}\t{}", key, timestamp - 3600 * 24);
for res in tree.range(key.as_bytes()..one_day_ago.as_bytes()) {
let (k, v) = res?;
value = i64::from_le_bytes((*v).try_into().unwrap());
info!("removing {} -> {:?} (more than 24h)", unsafe { std::str::from_utf8_unchecked(&k) }, value);
info!("removing {} -> {:?} (more than 24h)", unsafe { std::str::from_utf8_unchecked(&k) }, i64::from_le_bytes((*v).try_into().unwrap()));
tree.remove(k)?;
}
}
Expand All @@ -832,7 +798,6 @@ impl LocalDrain {
let timestamp = previous_minute.unix_timestamp();
info!("WILL REWRITE TIME METRIC AT {}", timestamp);
let _res = self.store_time_metric_at(key, cluster_id, backend_id, timestamp, t)?;
//self.aggregate_count(key, now, is_backend)?;
} else {
//FIXME: here we should delete all the measurements for the previous 60 seconds
}
Expand Down Expand Up @@ -1045,10 +1010,10 @@ impl LocalDrain {
let is_backend = *meta == MetricMeta::ClusterBackend;
match kind {
MetricKind::Gauge => {
self.aggregate_gauge(key, now, is_backend)?;
self.clear_gauge(key, now, is_backend)?;
},
MetricKind::Count => {
self.aggregate_count(key, now, is_backend)?;
self.clear_count(key, now, is_backend)?;
},
MetricKind::Time => {
}
Expand Down

0 comments on commit 7af7a98

Please sign in to comment.