Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
  • Loading branch information
tobz committed Oct 8, 2021
1 parent 2bd083e commit e5a18c6
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 168 deletions.
35 changes: 23 additions & 12 deletions lib/vector-core/src/metrics/ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,19 @@ impl Bin {
}

/// An implementation of [`DDSketch`][ddsketch] that mirrors the implementation from the Datadog agent.
///
///
/// This implementation is subtly different from the open-source implementations of DDSketch, as
/// Datadog made some slight tweaks to configuration values and in-memory layout to optimize it for
/// insertion performance within the agent.
///
/// We've mimiced the agent version of DDSketch here in order to support a future where we can take
/// sketches shipped by the agent, handle them internally, merge them, and so on, without any loss
/// of accuracy, eventually forwarding them to Datadog ourselves.
///
///
/// As such, this implementation is constrained in the same ways: the configuration parameters
/// cannot be changed, the collapsing strategy is fixed, and we support a limited number of methods
/// for inserting into the sketch.
///
///
/// Importantly, we have a special function, again taken from the agent version, to allow us to
/// interpolate histograms, specifically our own aggregated histograms, into a sketch so that we can
/// emit useful default quantiles, rather than having to ship the buckets -- upper bound and count
Expand Down Expand Up @@ -365,12 +365,11 @@ impl AgentDDSketch {
let mut keys = Vec::with_capacity(vs.len());
for v in vs {
self.adjust_basic_stats(*v, 1);
keys.push(self.config.key(*v));
keys.push(self.config.key(*v));
}
self.insert_keys(keys);
}


pub fn insert_n(&mut self, v: f64, n: u32) {
// TODO: this should return a result that makes sure we have enough room to actually add N
// more samples without hitting `self.config.max_count()`
Expand Down Expand Up @@ -773,8 +772,12 @@ mod tests {
assert_eq!(all_values.bin_count(), values.len());

// Values at both ends of the quantile range should be equal.
let low_end = all_values.quantile(0.01).expect("should have estimated value");
let high_end = all_values.quantile(0.99).expect("should have estimated value");
let low_end = all_values
.quantile(0.01)
.expect("should have estimated value");
let high_end = all_values
.quantile(0.99)
.expect("should have estimated value");
assert_eq!(high_end, -low_end);

let target_bin_count = all_values.bin_count();
Expand All @@ -785,11 +788,19 @@ mod tests {

for p in 0..50 {
let q = p as f64 / 100.0;
let positive = sketch.quantile(q + 0.5).expect("should have estimated value");
let negative = -sketch.quantile(0.5 - q).expect("should have estimated value");

assert!((positive - negative).abs() <= 1.0e-6,
"positive vs negative difference too great ({} vs {})", positive, negative);
let positive = sketch
.quantile(q + 0.5)
.expect("should have estimated value");
let negative = -sketch
.quantile(0.5 - q)
.expect("should have estimated value");

assert!(
(positive - negative).abs() <= 1.0e-6,
"positive vs negative difference too great ({} vs {})",
positive,
negative
);
}

assert_eq!(target_bin_count, sketch.bin_count());
Expand Down
4 changes: 2 additions & 2 deletions src/config/datadog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{ComponentKey, Config, SinkOuter, SourceOuter};
use crate::{
sinks::datadog::metrics::DatadogConfig, sources::internal_metrics::InternalMetricsConfig,
sinks::datadog::metrics::DatadogMetricsConfig, sources::internal_metrics::InternalMetricsConfig,
};
use serde::{Deserialize, Serialize};
use std::env;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub fn try_attach(config: &mut Config) -> bool {
);

// Create a Datadog metrics sink to consume and emit internal + host metrics.
let datadog_metrics = DatadogConfig::from_api_key(api_key);
let datadog_metrics = DatadogMetricsConfig::from_api_key(api_key);

config.sinks.insert(
datadog_metrics_id,
Expand Down
177 changes: 85 additions & 92 deletions src/sinks/datadog/metrics/config.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
use crate::{config::{DataType, SinkConfig, SinkContext}, event::Event, http::HttpClient, sinks::{Healthcheck, UriParseError, VectorSink, datadog::{Region, healthcheck}, util::{Compression, Concurrency, TowerRequestConfig, batch::{BatchConfig, BatchSettings}, retries::RetryLogic}}};
use chrono:: Utc;
use futures::{stream, FutureExt, SinkExt};
use crate::{
config::{DataType, SinkConfig, SinkContext},
http::HttpClient,
sinks::{
datadog::{healthcheck, Region},
util::{
batch::{BatchConfig, BatchSettings},
retries::RetryLogic,
Compression, Concurrency, ServiceBuilderExt, TowerRequestConfig,
},
Healthcheck, UriParseError, VectorSink,
},
};
use futures::FutureExt;
use http::{uri::InvalidUri, Uri};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use tower::ServiceBuilder;
use std::{
future::ready,
sync::atomic::AtomicI64,
use vector_core::config::proxy::ProxyConfig;

use super::{
service::{DatadogMetricsRetryLogic, DatadogMetricsService},
sink::DatadogMetricsSink,
};

// TODO: revisit our concurrency and batching defaults
const DEFAULT_REQUEST_LIMITS: TowerRequestConfig =
TowerRequestConfig::const_new(Concurrency::None, Concurrency::None).retry_attempts(5);
TowerRequestConfig::const_new(Concurrency::None, Concurrency::None).retry_attempts(5);

const DEFAULT_BATCH_SETTINGS: BatchSettings<()> =
BatchSettings::const_default().events(20).timeout(1);
BatchSettings::const_default().events(20).timeout(1);

const MAXIMUM_SERIES_PAYLOAD_COMPRESSED_SIZE: usize = 3_200_000;
const MAXIMUM_SERIES_PAYLOAD_SIZE: usize = 62_914_560;
Expand All @@ -27,29 +40,14 @@ enum BuildError {
}

/// Various metric type-specific API types.
///
///
/// Each of these corresponds to a specific request path when making a request to the agent API.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum DatadogMetricsEndpoint {
pub enum DatadogMetricsEndpoint {
Series,
Distribution,
}

pub struct DatadogMetricsRetryLogic;

impl RetryLogic for DatadogMetricsRetryLogic {
type Error = HttpError;
type Response = DatadogMetricsResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
todo!()
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
todo!()
}
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct DatadogMetricsConfig {
Expand All @@ -66,8 +64,8 @@ pub struct DatadogMetricsConfig {
pub batch: BatchConfig,
#[serde(default)]
pub request: TowerRequestConfig,
#[serde(default = "Compression::gzip_default")]
pub compression: Compression,
#[serde(default = "Compression::gzip_default")]
pub compression: Compression,
}

impl_generate_config_from_default!(DatadogMetricsConfig);
Expand All @@ -78,12 +76,12 @@ impl SinkConfig for DatadogMetricsConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let client = HttpClient::new(None, cx.proxy())?;

let client = self.build_client(&cx.proxy)?;
let healthcheck = self.build_healthcheck(client.clone());
let sink = self.build_sink(client, cx)?;
let client = self.build_client(&cx.proxy)?;
let healthcheck = self.build_healthcheck(client.clone());
let sink = self.build_sink(client, cx)?;

Ok((sink, healthcheck))
}
Ok((sink, healthcheck))
}

fn input_type(&self) -> DataType {
DataType::Metric
Expand All @@ -95,103 +93,98 @@ impl SinkConfig for DatadogMetricsConfig {
}

impl DatadogMetricsConfig {
/// Copies this `DatadogMetricsConfig` with the API key set to the given value.
pub fn with_api_key<T: Into<String>>(api_key: T) -> Self {
/// Creates a default [`DatadogMetricsConfig`] with the given API key.
pub fn from_api_key<T: Into<String>>(api_key: T) -> Self {
Self {
api_key: api_key.into(),
..Self::default()
}
}

/// Gets the base URI of the Datadog agent API.
///
/// Per the Datadog agent convention, we should include a unique identifier as part of the
/// domain to indicate that these metrics are being submitted by Vector, including the version,
/// likely useful for detecting if a specific version of the agent (Vector, in this case) is
/// doing something wrong, for understanding issues from the API side.
///
/// The `endpoint` configuration field will be used here if it is present.
/// Gets the base URI of the Datadog agent API.
///
/// Per the Datadog agent convention, we should include a unique identifier as part of the
/// domain to indicate that these metrics are being submitted by Vector, including the version,
/// likely useful for detecting if a specific version of the agent (Vector, in this case) is
/// doing something wrong, for understanding issues from the API side.
///
/// The `endpoint` configuration field will be used here if it is present.
fn get_base_agent_endpoint(&self) -> String {
self.endpoint.clone().unwrap_or_else(|| {
let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
format!("https://{}-vector.agent.{}", version, self.get_site())
})
}

/// Generates the full URIs to use for the various type-specific metrics endpoints.
fn generate_metric_endpoints(&self) -> crate::Result<Vec<(DatadogMetricsEndpoint, Uri)>> {
let base_uri = self.get_base_metric_endpoint();
let series_endpoint = build_uri(&base_uri, "/api/v1/series")?;
let distribution_endpoint = build_uri(&base_uri, "/api/v1/distribution_points")?;

Ok(vec![
(DatadogMetricsEndpoint::Series, series_endpoint),
(DatadogMetricsEndpoint::Distribution, distribution_endpoint),
])
}

/// Gets the base URI of the Datadog API.
///
/// The `endpoint` configuration field will be used here if it is present.
/// Generates the full URIs to use for the various type-specific metrics endpoints.
fn generate_metric_endpoints(&self) -> crate::Result<Vec<(DatadogMetricsEndpoint, Uri)>> {
let base_uri = self.get_base_agent_endpoint();
let series_endpoint = build_uri(&base_uri, "/api/v1/series")?;
let distribution_endpoint = build_uri(&base_uri, "/api/v1/distribution_points")?;

Ok(vec![
(DatadogMetricsEndpoint::Series, series_endpoint),
(DatadogMetricsEndpoint::Distribution, distribution_endpoint),
])
}

/// Gets the base URI of the Datadog API.
///
/// The `endpoint` configuration field will be used here if it is present.
fn get_api_endpoint(&self) -> String {
self.endpoint
.clone()
.unwrap_or_else(|| format!("https://api.{}", self.get_site()))
}

/// Gets the base domain to use for any calls to Datadog.
///
/// If `site` is not specified, we fallback to `region`, and if that is not specified, we
/// fallback to the Datadog US domain.
/// Gets the base domain to use for any calls to Datadog.
///
/// If `site` is not specified, we fallback to `region`, and if that is not specified, we
/// fallback to the Datadog US domain.
fn get_site(&self) -> &str {
self.site.as_deref().unwrap_or_else(|| match self.region {
Some(Region::Eu) => "datadoghq.eu",
None | Some(Region::Us) => "datadoghq.com",
})
}

fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
HttpClient::new(None, proxy)
}

fn build_healthcheck(&self, client: HttpClient) -> Healthcheck {
healthcheck(self.get_api_endpoint(), self.api_key.clone(), client).boxed()
}

fn build_sink(&self, client: HttpClient, cx: SinkContext) -> crate::Result<VectorSink> {
let batch = DEFAULT_BATCH_SETTINGS
.parse_config(self.batch)?;

let request_limits = self.request.unwrap_with(&DEFAULT_REQUEST_LIMITS);
let metric_endpoints = self.generate_metric_endpoints()?;
let service = ServiceBuilder::new()
.settings(request_limits, DatadogMetricsRetryLogic)
.service(DatadogMetricsService::new(client));

let sink = DatadogMetricsSink::new(
cx,
service,
metric_endpoints,
compression: self.compression,
);

Ok(VectorSink::Sink(Box::new(sink)))
}
fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
let client = HttpClient::new(None, proxy)?;
Ok(client)
}

fn build_healthcheck(&self, client: HttpClient) -> Healthcheck {
healthcheck(self.get_api_endpoint(), self.api_key.clone(), client).boxed()
}

fn build_sink(&self, client: HttpClient, cx: SinkContext) -> crate::Result<VectorSink> {
let batch = DEFAULT_BATCH_SETTINGS.parse_config(self.batch)?;

let request_limits = self.request.unwrap_with(&DEFAULT_REQUEST_LIMITS);
let metric_endpoints = self.generate_metric_endpoints()?;
let service = ServiceBuilder::new()
.settings(request_limits, DatadogMetricsRetryLogic)
.service(DatadogMetricsService::new(client));

let sink = DatadogMetricsSink::new(cx, service, metric_endpoints, self.compression);

Ok(VectorSink::Stream(Box::new(sink)))
}
}

fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
format!("{}{}", host, endpoint)
let result = format!("{}{}", host, endpoint)
.parse::<Uri>()
.context(UriParseError)
.context(UriParseError)?;
Ok(result)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{event::metric::Sample, sinks::util::test::load_sink};

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<DatadogConfig>();
crate::test_util::test_generate_config::<DatadogMetricsConfig>();
}
}
4 changes: 1 addition & 3 deletions src/sinks/datadog/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ mod sink;

use crate::config::SinkDescription;

use self::config::DatadogMetricsConfig;
pub use self::config::DatadogMetricsConfig;

inventory::submit! {
SinkDescription::new::<DatadogMetricsConfig>("datadog_metrics")
}

impl_generate_config_from_default!(DatadogMetricsConfig);
1 change: 1 addition & 0 deletions src/sinks/datadog/metrics/request_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Loading

0 comments on commit e5a18c6

Please sign in to comment.