Skip to content

Commit

Permalink
temp commit
Browse files Browse the repository at this point in the history
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
  • Loading branch information
tobz committed Oct 15, 2021
1 parent e29038a commit e55ee49
Show file tree
Hide file tree
Showing 14 changed files with 273 additions and 235 deletions.
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl ByteSizeOf for MetricName {
}
}

#[derive(Clone, Debug, Deserialize, Getters, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, Getters, MutGetters, PartialEq, Serialize)]
pub struct MetricData {
#[getset(get = "pub")]
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -93,7 +93,7 @@ pub struct MetricData {
#[getset(get = "pub")]
pub kind: MetricKind,

#[getset(get = "pub")]
#[getset(get = "pub", get_mut = "pub")]
#[serde(flatten)]
pub value: MetricValue,
}
Expand Down
65 changes: 64 additions & 1 deletion lib/vector-core/src/metrics/ddsketch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::{cmp, mem};

use core_common::byte_size_of::ByteSizeOf;
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};

use crate::event::metric::Bucket;

const AGENT_DEFAULT_BIN_LIMIT: u16 = 4096;
const AGENT_DEFAULT_EPS: f64 = 1.0 / 128.0;
const AGENT_DEFAULT_MIN_VALUE: f64 = 1.0e-9;
Expand Down Expand Up @@ -294,6 +297,7 @@ impl AgentDDSketch {
self.is_empty().then(|| self.avg)
}

/// Clears the sketch, removing all bins and resetting all statistics.
pub fn clear(&mut self) {
self.count = 0;
self.min = f64::MAX;
Expand Down Expand Up @@ -459,7 +463,7 @@ impl AgentDDSketch {
self.insert_key_counts(vec![(key, n)]);
}

pub fn insert_interpolate(&mut self, lower: f64, upper: f64, count: u32) {
fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u32) {
// Find the keys for the bins where the lower bound and upper bound would end up, and
// collect all of the keys in between, inclusive.
let lower_key = self.config.key(lower);
Expand Down Expand Up @@ -516,6 +520,29 @@ impl AgentDDSketch {
self.insert_key_counts(key_counts);
}

pub fn insert_interpolate_buckets(&mut self, mut buckets: Vec<Bucket>) {
// Buckets need to be sorted from lowest to highest so that we can properly calculate the
// rolling lower/upper bounds.
buckets.sort_by(|a, b| {
let oa = OrderedFloat(a.upper_limit);
let ob = OrderedFloat(b.upper_limit);

oa.cmp(&ob)
});

let mut lower = 0.0;

for bucket in buckets {
let mut upper = bucket.upper_limit;
if upper.is_sign_positive() && upper.is_infinite() {
upper = lower;
}

self.insert_interpolate_bucket(lower, upper, bucket.count);
lower = bucket.upper_limit;
}
}

pub fn quantile(&self, q: f64) -> Option<f64> {
if self.count == 0 {
return None;
Expand Down Expand Up @@ -860,10 +887,24 @@ fn round_to_even(v: f64) -> f64 {

#[cfg(test)]
mod tests {
use crate::{event::metric::Bucket, metrics::handle::Histogram};

use super::{round_to_even, AgentDDSketch, Config, AGENT_DEFAULT_EPS};

const FLOATING_POINT_ACCEPTABLE_ERROR: f64 = 1.0e-10;

static HISTO_VALUES: &[u64] = &[
104221, 10206, 32436, 121686, 92848, 83685, 23739, 15122, 50491, 88507, 48318, 28004,
29576, 8735, 77693, 33965, 88047, 7592, 64138, 59966, 117956, 112525, 41743, 82790, 27084,
26967, 75008, 10752, 96636, 97150, 60768, 33411, 24746, 91872, 59057, 48329, 16756, 100459,
117640, 59244, 107584, 124303, 32368, 109940, 106353, 90452, 84471, 39086, 91119, 89680,
41339, 23329, 25629, 98156, 97002, 9538, 73671, 112586, 101616, 70719, 117291, 90043,
10713, 49195, 60656, 60887, 47332, 113675, 8371, 42619, 33489, 108629, 70501, 84355, 24576,
34468, 76756, 110706, 42854, 83841, 120751, 66494, 65210, 70244, 118529, 28021, 51603,
96315, 92364, 59120, 118968, 5484, 91790, 45171, 102756, 29673, 85303, 108322, 122793,
88373,
];

#[cfg(ddsketch_extended)]
fn generate_pareto_distribution() -> Vec<OrderedFloat<f64>> {
use ordered_float::OrderedFloat;
Expand Down Expand Up @@ -1093,6 +1134,28 @@ mod tests {
test_relative_accuracy(config, AGENT_DEFAULT_EPS, min_value, max_value)
}

#[test]
fn test_histogram_interpolation() {
let mut histo_sketch = AgentDDSketch::with_agent_defaults();
assert!(histo_sketch.is_empty());

let histo = Histogram::new();
for num in HISTO_VALUES {
histo.record((*num as f64) / 10_000.0);
}

let buckets = histo
.buckets()
.map(|(ub, n)| Bucket {
upper_limit: ub,
count: n,
})
.collect::<Vec<_>>();
histo_sketch.insert_interpolate_buckets(buckets);

assert!(!histo_sketch.is_empty());
}

fn test_relative_accuracy(config: Config, rel_acc: f64, min_value: f32, max_value: f32) {
let max_observed_rel_acc = check_max_relative_accuracy(config, min_value, max_value);
assert!(
Expand Down
177 changes: 0 additions & 177 deletions lib/vector-core/src/metrics/histogram.rs

This file was deleted.

2 changes: 0 additions & 2 deletions lib/vector-core/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod ddsketch;
mod handle;
mod histogram;
mod label_filter;
mod recorder;

Expand All @@ -9,7 +8,6 @@ use std::sync::Arc;
use crate::event::Metric;
pub use crate::metrics::ddsketch::{AgentDDSketch, BinMap};
pub use crate::metrics::handle::{Counter, Handle};
pub use crate::metrics::histogram::AgentDDSketchHistogram;
use crate::metrics::label_filter::VectorLabelFilter;
use crate::metrics::recorder::VectorRecorder;
use metrics::Key;
Expand Down
41 changes: 39 additions & 2 deletions src/sinks/datadog/metrics/normalizer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,48 @@
use vector_core::event::Metric;
use std::mem;

use vector_core::{event::{Metric, MetricKind, MetricValue, metric::MetricSketch}, metrics::AgentDDSketch};

use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

pub struct DatadogMetricsNormalizer;

impl MetricNormalize for DatadogMetricsNormalizer {
fn apply_state(state: &mut MetricSet, metric: Metric) -> Option<Metric> {
todo!()
// We primarily care about making sure that counters are incremental, and that gauges are
// always absolute. For other metric kinds, we want them to be incremental.
match &metric.value() {
MetricValue::Counter { .. } => state.make_incremental(metric),
MetricValue::Gauge { .. } => state.make_absolute(metric),
MetricValue::AggregatedHistogram { .. } => {
// Sketches should be sent to Datadog in an incremental fashion, so we need to
// incrementalize the aggregated histogram first and then generate a sketch from it.
state.make_incremental(metric)
.map(|metric| {
let (series, data, metadata) = metric.into_parts();

let sketch = match data.value_mut() {
MetricValue::AggregatedHistogram { buckets, .. } => {
let delta_buckets = mem::replace(buckets, Vec::new());
let sketch = AgentDDSketch::with_agent_defaults();
sketch.insert_interpolate_buckets(delta_buckets);
sketch
},
// We should never get back a different metric value simply from converting
// between absolute and incremental.
_ => unreachable!(),
};

let _ = mem::replace(data.value_mut(), MetricValue::Sketch {
sketch: MetricSketch::AgentDDSketch(sketch),
});

Metric::from_parts(series, data, metadata)
})
},
_ => match metric.kind() {
MetricKind::Absolute => state.make_incremental(metric),
MetricKind::Incremental => Some(metric),
}
}
}
}
Loading

0 comments on commit e55ee49

Please sign in to comment.