Skip to content

Commit

Permalink
aggregate stored metrics
Browse files Browse the repository at this point in the history
since metrics will be generated for each cluster and each backend, we
will store a lot of different points. Since this data is only meant for
quick debugging, we do not need to store long term data, and we do not
need very precise data.
So, to preerve space, we will regularly agregate data:
- all the new measures get aggregated in a point with the timestamp set
at the last second
- every minute, we aggregate the 60 previous measures in a point set at
one minute ago
- every hour, we aggregate the 60 previous "minute points" in a point
set at one hour ago, and we delete all of the points older than 24h

This introduces a lot of bookkeeping, so there will be runtime commands
to reset the database to an empty file, and activate or deactivate the
collection
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent f079127 commit 9e6bd1d
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 35 deletions.
233 changes: 205 additions & 28 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use time::OffsetDateTime;
use std::convert::TryInto;
use std::collections::BTreeMap;
use hdrhistogram::Histogram;
use nom::combinator::complete;
use sozu_command::proxy::{FilteredData,MetricsData,Percentiles,AppMetricsData};

use super::{MetricData,Subscriber};
Expand Down Expand Up @@ -252,41 +251,22 @@ impl LocalDrain {
};

self.metrics.insert(key_prefix.clone(), (meta, kind));
//let start = format!("{}\0", key);
let end = format!("{}\x7F", key_prefix);
//self.db.insert(start.as_bytes(), &0u64.to_le_bytes()).unwrap();
self.db.insert(end.as_bytes(), &0u64.to_le_bytes()).unwrap();
}

let now = time::OffsetDateTime::now();
// reduce to mucrosecond precision
let ts = now.timestamp_nanos() / 1000;

let metric_type = match metric {
MetricData::Gauge(_) => 'g',
MetricData::GaugeAdd(_) => 'a',
MetricData::Count(_) => 'c',
MetricData::Time(_) => 't',
};

let db_key = if let Some(bid) = backend_id {
format!("{}\t{}\t{}\t{}", key, id, bid, ts)
} else {
format!("{}\t{}\t{}", key, id, ts)
};

match metric {
MetricData::Gauge(i) => {
self.db.insert(db_key.as_bytes(), &i.to_le_bytes()).unwrap();
self.store_gauge(&key_prefix, i);
},
MetricData::GaugeAdd(i) => {
self.db.insert(db_key.as_bytes(), &i.to_le_bytes()).unwrap();
self.add_gauge(&key_prefix, i);
},
MetricData::Count(i) => {
self.db.insert(db_key.as_bytes(), &i.to_le_bytes()).unwrap();
self.store_count(&key_prefix, i);
},
MetricData::Time(i) => {
self.db.insert(db_key.as_bytes(), &i.to_le_bytes()).unwrap();
//self.db.insert(db_key.as_bytes(), &i.to_le_bytes()).unwrap();
},
}

Expand All @@ -298,10 +278,209 @@ impl LocalDrain {
}
}
info!("metrics: {:?}", self.metrics);
info!("db size (at {}): {:?}", self.data_dir.path().to_str().unwrap(), self.db.size_on_disk());
//info!("metrics: {:?}", self.metrics);
info!("db size: {:?}", self.db.size_on_disk());
*/
}

fn store_gauge(&mut self, key: &str, i: usize) {
let now = OffsetDateTime::now_utc();
let timestamp = now.unix_timestamp();
let complete_key = format!("{}\t{}", key, timestamp);

info!("store gauge at {} -> {}", complete_key, i);
self.db.insert(complete_key.as_bytes(), &i.to_le_bytes()).unwrap();

// we change the minute, aggregate the 60 measurements from the last minute
if now.second() == 0 {
self.aggregate_gauge(key, now);
}
}

fn add_gauge(&mut self, key: &str, i: i64) {
let now = OffsetDateTime::now_utc();
let timestamp = now.unix_timestamp();
let complete_key = format!("{}\t{}", key, timestamp);
let one_minute_ago = format!("{}\t{}", key, timestamp - 60);

info!("add gauge at {} -> {}", complete_key, i);
match self.db.range(one_minute_ago.as_bytes()..=complete_key.as_bytes()).rev().next() {
None | Some(Err(_)) => {
self.db.insert(complete_key.as_bytes(), &i.to_le_bytes()).unwrap();
},
Some(Ok((_, v))) => {
let i2 = i64::from_le_bytes((*v).try_into().unwrap());
self.db.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes()).unwrap();
}
};

// we change the minute, aggregate the 60 measurements from the last minute
if now.second() == 0 {
self.aggregate_gauge(key, now);
}
}

fn store_count(&mut self, key: &str, i: i64) {
let now = OffsetDateTime::now_utc();
let timestamp = now.unix_timestamp();
let complete_key = format!("{}\t{}", key, timestamp);

info!("store count at {} -> {}", complete_key, i);
match self.db.get(complete_key.as_bytes()).unwrap() {
None => {
self.db.insert(complete_key.as_bytes(), &i.to_le_bytes()).unwrap();
},
Some(v) => {
let i2 = i64::from_le_bytes((*v).try_into().unwrap());
self.db.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes()).unwrap();
}
};

// we change the minute, aggregate the 60 measurements from the last minute
if now.second() == 0 {
self.aggregate_count(key, now);
}
}

fn aggregate_gauge(&mut self, key: &str, now: OffsetDateTime) {
let timestamp = now.unix_timestamp();
let one_hour_ago = format!("{}\t{}", key, timestamp - 3600);
let one_minute_ago = format!("{}\t{}", key, timestamp - 60);
let now_key = format!("{}\t{}", key, timestamp);

// aggregate 60 measures in a point at the last minute
let mut value = None;
for res in self.db.range(one_minute_ago.as_bytes()..now_key.as_bytes()) {
let (k, v) = res.unwrap();
value = Some(usize::from_le_bytes((*v).try_into().unwrap()));
info!("removing {} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
self.db.remove(k).unwrap();
}

if let Some(v) = value {
info!("reinserting {} -> {:?}", one_minute_ago, v);
self.db.insert(one_minute_ago.as_bytes(), &v.to_le_bytes()).unwrap();
}

// aggregate 60 measures in a point at the last hour
if now.minute() == 0 {
let mut value = None;
for res in self.db.range(one_hour_ago.as_bytes()..one_minute_ago.as_bytes()) {
let (k, v) = res.unwrap();
value = Some(usize::from_le_bytes((*v).try_into().unwrap()));
info!("removing {} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
self.db.remove(k).unwrap();
}

if let Some(v) = value {
info!("reinserting {} -> {:?}", one_hour_ago, v);
self.db.insert(one_minute_ago.as_bytes(), &v.to_le_bytes()).unwrap();
}

// remove all measures older than 24h
let one_day_ago = format!("{}\t{}", key, timestamp - 3600 * 24);
for res in self.db.range(key.as_bytes()..one_day_ago.as_bytes()) {
let (k, v) = res.unwrap();
value = Some(usize::from_le_bytes((*v).try_into().unwrap()));
info!("removing {} -> {:?} (more than 24h)", unsafe { std::str::from_utf8_unchecked(&k) }, value);
self.db.remove(k).unwrap();
}
}

}

fn aggregate_count(&mut self, key: &str, now: OffsetDateTime) {
let timestamp = now.unix_timestamp();
let one_hour_ago = format!("{}\t{}", key, timestamp - 3600);
let one_minute_ago = format!("{}\t{}", key, timestamp - 60);
let now_key = format!("{}\t{}", key, timestamp);

// aggregate 60 measures in a point at the last hour
let mut value = 0i64;
let mut found = false;
for res in self.db.range(one_minute_ago.as_bytes()..now_key.as_bytes()) {
found = true;
let (k, v) = res.unwrap();
value += i64::from_le_bytes((*v).try_into().unwrap());
info!("removing {} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
self.db.remove(k).unwrap();
}

if found {
info!("reinserting {} -> {:?}", one_minute_ago, value);
self.db.insert(one_minute_ago.as_bytes(), &value.to_le_bytes()).unwrap();
}

// remove all measures older than 24h
if now.minute() == 0 {
let mut value = 0i64;
let mut found = false;
for res in self.db.range(one_hour_ago.as_bytes()..one_minute_ago.as_bytes()) {
found = true;
let (k, v) = res.unwrap();
value += i64::from_le_bytes((*v).try_into().unwrap());
info!("removing {} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));
self.db.remove(k).unwrap();
}

if found {
info!("reinserting {} -> {:?}", one_hour_ago, value);
self.db.insert(one_hour_ago.as_bytes(), &value.to_le_bytes()).unwrap();
}

// remove all measures older than 24h
let one_day_ago = format!("{}\t{}", key, timestamp - 3600 * 24);
for res in self.db.range(key.as_bytes()..one_day_ago.as_bytes()) {
let (k, v) = res.unwrap();
value = i64::from_le_bytes((*v).try_into().unwrap());
info!("removing {} -> {:?} (more than 24h)", unsafe { std::str::from_utf8_unchecked(&k) }, value);
self.db.remove(k).unwrap();
}
}
}

pub fn clear(&mut self, now: OffsetDateTime) {
info!("will clear old data from the metrics database");
//self.db.clear();
//

let metrics = self.metrics.clone();
for (key, (_, kind)) in metrics.iter() {
info!("will aggregate metrics for key '{}'", key);

match kind {
MetricKind::Gauge => {
self.aggregate_gauge(key, now);
},
MetricKind::Count => {
self.aggregate_count(key, now);
},
MetricKind::Time => {
}
}

// check if we removed all the points for this metric
let end = format!("{}\x7F", key);
if let Some((k, _)) = self.db.get_gt(key.as_bytes()).unwrap() {
if &k == end.as_bytes() {
info!("removing key {} from metrics", key);
self.db.remove(k).unwrap();
self.metrics.remove(key);
}
}
}

if let (Some(first), Some(second)) = (self.db.first().unwrap(), self.db.last().unwrap()) {
info!("remaining keys:");
for res in self.db.range(first.0..second.0) {
let (k, v) = res.unwrap();
info!("{} -> {:?}", unsafe { std::str::from_utf8_unchecked(&k) }, u64::from_le_bytes((*v).try_into().unwrap()));

}
}
info!("db size: {:?}", self.db.size_on_disk());
}

}


Expand All @@ -320,6 +499,4 @@ impl Subscriber for LocalDrain {
});
}
}

}

4 changes: 2 additions & 2 deletions lib/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ impl Aggregator {
self.local.dump_process_data()
}

pub fn clear_local(&mut self) {
self.local.clear();
pub fn clear_local(&mut self, now: time::OffsetDateTime) {
self.local.clear(now);
}
}

Expand Down
22 changes: 17 additions & 5 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,15 @@ impl Server {
Some(i) => if *i <= now {
poll_timeout
} else {
Some(*i - now)
let dur = *i - now;
match poll_timeout {
None => Some(dur),
Some(t) => if t < dur {
Some(t)
} else {
Some(dur)
}
}
},
};

Expand Down Expand Up @@ -488,11 +496,15 @@ impl Server {
}
info!("removing {} zombies ({} remaining tokens after close)", count, remaining);
}
}

// regularly clear local metrics to prevent them from taking too much memory
METRICS.with(|metrics| {
(*metrics.borrow_mut()).clear_local();
});
//FIXME: what if we got no event in a while
let now = time::OffsetDateTime::now_utc();
if now.second() == 0 {
// regularly clear local metrics to prevent them from taking too much memory
METRICS.with(|metrics| {
(*metrics.borrow_mut()).clear_local(now);
});
}

gauge!("client.connections", self.nb_connections);
Expand Down

0 comments on commit 9e6bd1d

Please sign in to comment.