Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dtoa = "1.0"
itoa = "1.0"
owning_ref = "0.4"
prometheus-client-derive-text-encode = { version = "0.3.0", path = "derive-text-encode" }
quantiles = "0.7.1"

[dev-dependencies]
async-std = { version = "1", features = ["attributes"] }
Expand Down
2 changes: 2 additions & 0 deletions src/encoding/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl Encode for MetricType {
MetricType::Histogram => "histogram",
MetricType::Info => "info",
MetricType::Unknown => "unknown",
MetricType::Summary => "summary",
};

writer.write_all(t.as_bytes())?;
Expand Down Expand Up @@ -603,6 +604,7 @@ where
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 2 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod exemplar;
pub mod family;
pub mod gauge;
pub mod histogram;
pub mod summary;
pub mod info;

/// A metric that is aware of its Open Metrics metric type.
Expand All @@ -19,9 +20,9 @@ pub enum MetricType {
Histogram,
Info,
Unknown,
Summary,
// Not (yet) supported metric types.
//
// GaugeHistogram,
// StateSet,
// Summary
}
2 changes: 1 addition & 1 deletion src/metrics/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {
// Map infinite, subnormal and NaN to 0.0.
.map(|f| if f.is_normal() { f } else { 0.0 })
.collect();
let sum = fs.iter().sum();
let sum: f64 = fs.iter().sum();
let counter = Counter::<f64, AtomicU64>::default();
for f in fs {
counter.inc_by(f);
Expand Down
117 changes: 117 additions & 0 deletions src/metrics/summary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! Module implementing an Open Metrics histogram.
//!
//! See [`Summary`] for details.

use super::{MetricType, TypedMetric};
//use owning_ref::OwningRef;
//use std::iter::{self, once};
use std::sync::{Arc, Mutex};

use quantiles::ckms::CKMS;

/// Open Metrics [`Summary`] to measure distributions of discrete events.
pub struct Summary {
target_quantile: Vec<f64>,
target_error: f64,
max_age_buckets: u64,
max_age_seconds: u64,
inner: Arc<Mutex<InnerSummary>>,
}

impl Clone for Summary {
fn clone(&self) -> Self {
Summary {
target_quantile: self.target_quantile.clone(),
target_error: self.target_error,
max_age_buckets: self.max_age_buckets,
max_age_seconds: self.max_age_seconds,
inner: self.inner.clone(),
}
}
}

pub(crate) struct InnerSummary {
sum: f64,
count: u64,
quantile_streams: Vec<CKMS<f64>>,
// head_stream is like a cursor which carries the index
// of the stream in the quantile_streams that we want to query
head_stream: u64,
}

impl Summary {
pub fn new(max_age_buckets: u64, max_age_seconds: u64, target_quantile: Vec<f64>, target_error: f64) -> Self {
let mut streams: Vec<CKMS<f64>> = Vec::new();
for _ in 0..max_age_buckets {
streams.push(CKMS::new(target_error));
}

Summary{
max_age_buckets,
max_age_seconds,
target_quantile,
target_error,
inner: Arc::new(Mutex::new(InnerSummary {
sum: Default::default(),
count: Default::default(),
quantile_streams: streams,
head_stream: 0,
}))
}
}

pub fn observe(&mut self, v: f64) {
let mut inner = self.inner.lock().unwrap();
inner.sum += v;
inner.count += 1;

// insert quantiles into all streams/buckets.
for stream in inner.quantile_streams.iter_mut() {
stream.insert(v);
}
}

pub fn get(&self) -> (f64, u64, Vec<(f64, f64)>) {
let inner = self.inner.lock().unwrap();
let sum = inner.sum;
let count = inner.count;
let head = inner.head_stream;
let mut quantile_values: Vec<(f64, f64)> = Vec::new();

// TODO: add stream rotation
for q in self.target_quantile.iter() {
match inner.quantile_streams[head as usize].query(*q) {
Some((_, v)) => quantile_values.push((*q, v)),
None => continue, // TODO fix this
};
}
(sum, count, quantile_values)
}
}

// TODO: should this type impl Default like Counter?

impl TypedMetric for Summary {
const TYPE: MetricType = MetricType::Summary;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn basic() {
let mut summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01);
summary.observe(5.0);
summary.observe(15.0);
summary.observe(25.0);

let (s, c, q) = summary.get();
assert_eq!(45.0, s);
assert_eq!(3, c);

for elem in q.iter() {
println!("Vec<{}, {}>", elem.0, elem.1);
}
}
}