Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(datadog_metrics sink): rewrite to the new model + add sketch support #9178

Merged
merged 32 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
13b9425
enhancement(datadog_metrics sink): add support for aggregated histogr…
tobz Sep 16, 2021
5173ea7
fix metric_to_log test and note BC in 0.17.0 upgrade guide
tobz Sep 16, 2021
2264c4b
temporary/broken commit
tobz Oct 4, 2021
8b39de7
forgot two files
tobz Oct 4, 2021
9e0bccd
more cleanup
tobz Oct 5, 2021
20e2f57
temp commit
tobz Oct 7, 2021
6849efa
wip
tobz Oct 8, 2021
9e11150
wip commit before i lose all of this
tobz Oct 12, 2021
b2220c9
another wip commit
tobz Oct 13, 2021
02eb571
anotha one
tobz Oct 13, 2021
3dff9eb
temp commit
tobz Oct 15, 2021
e5e738f
temporary commit because it worksssssssss
tobz Oct 19, 2021
d1cc201
it workkkkkkks
tobz Oct 19, 2021
d99d548
so much work trying to make fallible encoding work more correctly
tobz Oct 20, 2021
4ab250d
temporary commit because i will definitely cry if i lost all this pro…
tobz Oct 20, 2021
551f25b
anotha one
tobz Oct 25, 2021
370950b
remove some cruft, fix some spelling
tobz Oct 25, 2021
d102071
dont actually need that
tobz Oct 25, 2021
05288a6
remove more extraneous stuff
tobz Oct 25, 2021
2cbce96
fix glitches
tobz Oct 27, 2021
dabcdbe
intermediate commit to save clippy fix progress
tobz Oct 27, 2021
8461392
address PR self-feedback
tobz Oct 27, 2021
76a41d7
Merge branch 'master' into tobz/datadog-metrics-sink-electric-boogaloo
tobz Oct 27, 2021
e556c9c
more PR feedback and such
tobz Oct 27, 2021
69b2a33
fix more clippy issues and such
tobz Oct 28, 2021
c262e45
Merge branch 'master' into tobz/datadog-metrics-sink-electric-boogaloo
tobz Oct 28, 2021
d37c810
fix weirdness with unused type
tobz Oct 28, 2021
6a0daf4
more and more fixes
tobz Oct 29, 2021
96f77a2
Merge branch 'master' of github.com:vectordotdev/vector into tobz/dat…
tobz Oct 29, 2021
4da96fc
use the correct/specific feature flag for the datadog agent source
tobz Oct 29, 2021
15232a2
fmt
tobz Oct 29, 2021
7c13036
fix another test
tobz Oct 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
579 changes: 313 additions & 266 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 16 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ libz-sys = "1.1.3"
matches = "0.1.9"
pretty_assertions = "1.0.0"
reqwest = { version = "0.11.6", features = ["json"] }
proptest = "1.0"
tempfile = "3.2.0"
tokio = { version = "1.12.0", features = ["test-util"] }
tokio-test = "0.4.2"
Expand Down Expand Up @@ -400,6 +401,9 @@ api-client = [
"vector-api-client",
]

# Anything that requires Protocol Buffers.
protobuf-build = ["tonic-build", "prost-build"]

# Enrichment Tables
enrichment-tables = ["enrichment-tables-file"]
enrichment-tables-file = [ "csv", "seahash", "hash_hasher" ]
Expand All @@ -412,7 +416,7 @@ sources = ["sources-logs", "sources-metrics"]
sources-logs = [
"sources-aws_kinesis_firehose",
"sources-aws_s3",
"sources-datadog",
"sources-datadog_agent",
"sources-docker_logs",
"sources-exec",
"sources-file",
Expand Down Expand Up @@ -450,8 +454,8 @@ sources-apache_metrics = []
sources-aws_ecs_metrics = []
sources-aws_kinesis_firehose = ["base64", "infer", "sources-utils-tls", "warp", "codecs"]
sources-aws_s3 = ["rusoto", "rusoto_s3", "rusoto_sqs", "semver", "uuid", "codecs", "zstd"]
sources-datadog = ["snap", "sources-utils-tls", "warp", "sources-utils-http-error", "codecs"]
sources-dnstap = ["base64", "data-encoding", "trust-dns-proto", "dnsmsg-parser", "tonic-build", "prost-build"]
sources-datadog_agent = ["snap", "sources-utils-tls", "warp", "sources-utils-http-error", "protobuf-build", "codecs"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this? Because we'll need the proto to be compiled when we accept sketches from the agent, so just doing it now with the knowledge we're going to need it anyways.

sources-dnstap = ["base64", "data-encoding", "trust-dns-proto", "dnsmsg-parser", "protobuf-build"]
sources-docker_logs = ["docker"]
sources-eventstoredb_metrics = []
sources-exec = ["codecs"]
Expand Down Expand Up @@ -488,7 +492,7 @@ sources-utils-tcp-socket = []
sources-utils-tls = []
sources-utils-udp = []
sources-utils-unix = []
sources-vector = ["listenfd", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "tonic", "tonic-build", "prost-build", "codecs"]
sources-vector = ["listenfd", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "tonic", "protobuf-build", "codecs"]

# Transforms
transforms = ["transforms-logs", "transforms-metrics"]
Expand Down Expand Up @@ -584,8 +588,9 @@ sinks-logs = [
"sinks-blackhole",
"sinks-clickhouse",
"sinks-console",
"sinks-datadog",
"sinks-datadog_archives",
"sinks-datadog_events",
"sinks-datadog_logs",
"sinks-elasticsearch",
"sinks-file",
"sinks-gcp",
Expand All @@ -610,7 +615,7 @@ sinks-metrics = [
"sinks-aws_cloudwatch_metrics",
"sinks-blackhole",
"sinks-console",
"sinks-datadog",
"sinks-datadog_metrics",
"sinks-humio",
"sinks-influxdb",
"sinks-kafka",
Expand All @@ -632,8 +637,10 @@ sinks-azure_monitor_logs = []
sinks-blackhole = []
sinks-clickhouse = []
sinks-console = []
sinks-datadog = []
sinks-datadog_archives = ["sinks-aws_s3"]
sinks-datadog_events = []
sinks-datadog_logs = []
sinks-datadog_metrics = ["protobuf-build"]
sinks-elasticsearch = ["rusoto", "transforms-metric_to_log"]
sinks-file = []
sinks-gcp = ["base64", "goauth", "gouth", "smpl_jwt", "uuid"]
Expand All @@ -655,13 +662,13 @@ sinks-socket = ["sinks-utils-udp"]
sinks-splunk_hec = []
sinks-statsd = ["sinks-utils-udp", "tokio-util/net"]
sinks-utils-udp = []
sinks-vector = ["sinks-utils-udp", "tonic", "tonic-build", "prost-build"]
sinks-vector = ["sinks-utils-udp", "tonic", "protobuf-build"]

# Datadog integration
datadog-pipelines = [
"sources-host_metrics",
"sources-internal_metrics",
"sinks-datadog",
"sinks-datadog_metrics",
"sha2",
"hex"
]
Expand Down
8 changes: 3 additions & 5 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,11 @@ fn main() {
// Always rerun if the build script itself changes.
println!("cargo:rerun-if-changed=build.rs");

#[cfg(any(
feature = "sources-vector",
feature = "sources-dnstap",
feature = "sinks-vector"
))]
#[cfg(feature = "protobuf-build")]
{
println!("cargo:rerun-if-changed=proto/vector.proto");
println!("cargo:rerun-if-changed=proto/dnstap.proto");
println!("cargo:rerun-if-changed=proto/ddsketch.proto");

let mut prost_build = prost_build::Config::new();
prost_build.btree_map(&["."]);
Expand All @@ -116,6 +113,7 @@ fn main() {
"lib/vector-core/proto/event.proto",
"proto/vector.proto",
"proto/dnstap.proto",
"proto/ddsketch.proto",
],
&["proto/", "lib/vector-core/proto/"],
)
Expand Down
8 changes: 6 additions & 2 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ metrics-util = { version = "0.10.1", default-features = false, features = ["std"
mlua = { version = "0.6.6", default-features = false, features = ["lua54", "send", "vendored"], optional = true }
no-proxy = { version = "0.3.1", default-features = false, features = ["serialize"] }
once_cell = { version = "1.8", default-features = false }
ordered-float = { version = "2.8.0", default-features = false }
pest = { version = "2.1.3", default-features = false }
pest_derive = { version = "2.1.0", default-features = false }
pin-project = { version = "1.0.8", default-features = false }
Expand Down Expand Up @@ -66,8 +67,11 @@ quickcheck = "1.0.3"
proptest = "1.0"
pretty_assertions = "1.0.0"
tokio-test = "0.4.2"
rand = "0.8"
rand_distr = "0.4"
ndarray = "0.15.0"
ndarray-stats = "0.5.0"
noisy_float = "0.2.0"
rand = "0.8.4"
rand_distr = "0.4.2"
tobz marked this conversation as resolved.
Show resolved Hide resolved

[features]
api = ["async-graphql"]
Expand Down
23 changes: 22 additions & 1 deletion lib/vector-core/proto/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ message Metric {
Distribution2 distribution2 = 12;
AggregatedHistogram2 aggregated_histogram2 = 13;
AggregatedSummary2 aggregated_summary2 = 14;
Sketch sketch = 15;
}
string namespace = 11;
}
Expand Down Expand Up @@ -128,6 +129,26 @@ message AggregatedSummary2 {
}

message SummaryQuantile {
double upper_limit = 1;
double quantile = 1;
double value = 2;
}

message Sketch {
tobz marked this conversation as resolved.
Show resolved Hide resolved
message AgentDDSketch {
// Summary statistics for the samples in this sketch.
uint32 count = 1;
double min = 2;
double max = 3;
double sum = 4;
double avg = 5;
// The bins (buckets) of this sketch, where `k` and `n` are unzipped pairs.
// `k` is the list of bin indexes that are populated, and `n` is the count of samples
// within the given bin.
repeated sint32 k = 6;
repeated uint32 n = 7;
}

oneof sketch {
AgentDDSketch agent_dd_sketch = 1;
}
}
16 changes: 16 additions & 0 deletions lib/vector-core/src/event/finalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ impl EventFinalizers {
}
}

impl Finalizable for EventFinalizers {
fn take_finalizers(&mut self) -> EventFinalizers {
mem::take(self)
}
}

/// An event finalizer is the shared data required to handle tracking
/// the status of an event, and updating the status of a batch with that
/// when the event is dropped.
Expand Down Expand Up @@ -337,6 +343,16 @@ pub trait Finalizable {
fn take_finalizers(&mut self) -> EventFinalizers;
}

impl<T: Finalizable> Finalizable for Vec<T> {
fn take_finalizers(&mut self) -> EventFinalizers {
self.iter_mut()
.fold(EventFinalizers::default(), |mut acc, x| {
acc.merge(x.take_finalizers());
acc
})
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
14 changes: 7 additions & 7 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
finalization::{BatchNotifier, EventFinalizer},
legacy_lookup::Segment,
metadata::EventMetadata,
util, Finalizable, Lookup, PathComponent, Value,
util, EventFinalizers, Finalizable, Lookup, PathComponent, Value,
};
use crate::event::MaybeAsLogMut;
use crate::{config::log_schema, ByteSizeOf};
Expand Down Expand Up @@ -47,6 +47,12 @@ impl ByteSizeOf for LogEvent {
}
}

impl Finalizable for LogEvent {
fn take_finalizers(&mut self) -> EventFinalizers {
self.metadata.take_finalizers()
}
}

impl LogEvent {
#[must_use]
pub fn new_with_metadata(metadata: EventMetadata) -> Self {
Expand Down Expand Up @@ -446,12 +452,6 @@ impl From<&tracing::Event<'_>> for LogEvent {
}
}

impl Finalizable for LogEvent {
fn take_finalizers(&mut self) -> super::EventFinalizers {
self.metadata_mut().take_finalizers()
}
}

#[derive(Debug, Default)]
struct MakeLogEvent(LogEvent);

Expand Down
63 changes: 61 additions & 2 deletions lib/vector-core/src/event/lua/metric.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use super::util::{table_to_timestamp, timestamp_to_table};
use crate::event::{metric, Metric, MetricKind, MetricValue, StatisticKind};
use crate::{
event::{
metric::{self, MetricSketch},
Metric, MetricKind, MetricValue, StatisticKind,
},
metrics::AgentDDSketch,
};
use mlua::prelude::*;
use std::collections::BTreeMap;

Expand Down Expand Up @@ -119,20 +125,41 @@ impl<'a> ToLua<'a> for Metric {
} => {
let aggregated_summary = lua.create_table()?;
let values: Vec<_> = quantiles.iter().map(|q| q.value).collect();
let quantiles: Vec<_> = quantiles.into_iter().map(|q| q.upper_limit).collect();
let quantiles: Vec<_> = quantiles.into_iter().map(|q| q.quantile).collect();
aggregated_summary.raw_set("quantiles", quantiles)?;
aggregated_summary.raw_set("values", values)?;
aggregated_summary.raw_set("count", count)?;
aggregated_summary.raw_set("sum", sum)?;
tbl.raw_set("aggregated_summary", aggregated_summary)?;
}
MetricValue::Sketch { sketch } => {
let sketch_tbl = match sketch {
MetricSketch::AgentDDSketch(ddsketch) => {
let sketch_tbl = lua.create_table()?;
sketch_tbl.raw_set("type", "ddsketch")?;
sketch_tbl.raw_set("count", ddsketch.count())?;
sketch_tbl.raw_set("min", ddsketch.min())?;
sketch_tbl.raw_set("max", ddsketch.max())?;
sketch_tbl.raw_set("sum", ddsketch.sum())?;
sketch_tbl.raw_set("avg", ddsketch.avg())?;

let bin_map = ddsketch.bin_map();
sketch_tbl.raw_set("k", bin_map.keys)?;
sketch_tbl.raw_set("n", bin_map.counts)?;
sketch_tbl
}
};

tbl.raw_set("sketch", sketch_tbl)?;
}
}

Ok(LuaValue::Table(tbl))
}
}

impl<'a> FromLua<'a> for Metric {
#[allow(clippy::too_many_lines)]
fn from_lua(value: LuaValue<'a>, _: &'a Lua) -> LuaResult<Self> {
let table = match &value {
LuaValue::Table(table) => table,
Expand Down Expand Up @@ -196,6 +223,38 @@ impl<'a> FromLua<'a> for Metric {
count: aggregated_summary.raw_get("count")?,
sum: aggregated_summary.raw_get("sum")?,
}
} else if let Some(sketch) = table.raw_get::<_, Option<LuaTable>>("sketch")? {
let sketch_type: String = sketch.raw_get("type")?;
match sketch_type.as_str() {
"ddsketch" => {
let count: u32 = sketch.raw_get("count")?;
let min: f64 = sketch.raw_get("min")?;
let max: f64 = sketch.raw_get("max")?;
let sum: f64 = sketch.raw_get("sum")?;
let avg: f64 = sketch.raw_get("avg")?;
let k: Vec<i16> = sketch.raw_get("k")?;
let n: Vec<u16> = sketch.raw_get("n")?;

AgentDDSketch::from_raw(count, min, max, sum, avg, &k, &n)
.map(|sketch| MetricValue::Sketch {
sketch: MetricSketch::AgentDDSketch(sketch),
})
.ok_or(LuaError::FromLuaConversionError {
from: value.type_name(),
to: "Metric",
message: Some(
"Invalid structure for converting to AgentDDSketch".to_string(),
),
})?
}
x => {
return Err(LuaError::FromLuaConversionError {
from: value.type_name(),
to: "Metric",
message: Some(format!("Invalid sketch type '{}' given", x)),
})
}
}
} else {
return Err(LuaError::FromLuaConversionError {
from: value.type_name(),
Expand Down
5 changes: 5 additions & 0 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ impl EventMetadata {
pub fn take_finalizers(&mut self) -> EventFinalizers {
std::mem::take(&mut self.finalizers)
}

/// Merges the given finalizers into the existing set of finalizers.
pub fn merge_finalizers(&mut self, finalizers: EventFinalizers) {
self.finalizers.merge(finalizers);
}
}

impl EventDataEq for EventMetadata {
Expand Down
Loading