Skip to content

Commit

Permalink
fix(datadog_metrics sink): Revert to using v1 endpoint by default (#1…
Browse files Browse the repository at this point in the history
…9138)

* fix payload limits

* adjust batch and concurrency defaults

* use the v1 default batch/concurrency if env var specified

* reset cue

* adjust to v1

* remove unused impl

* feedback bruce
  • Loading branch information
neuronull authored and jszwedko committed Nov 15, 2023
1 parent 7b292ce commit 3158f46
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
1 change: 0 additions & 1 deletion docs/DEPRECATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,4 @@ For example:

## To be removed

- datadog_v1_metrics v0.35.0 Support for `v1` series endpoint in the `datadog_metrics` sink should be removed.
- http_internal_metrics v0.35.0 `requests_completed_total`, `request_duration_seconds`, and `requests_received_total` internal metrics should be removed.
62 changes: 39 additions & 23 deletions src/sinks/datadog/metrics/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,27 @@ use crate::{
tls::{MaybeTlsSettings, TlsEnableableConfig},
};

const DEFAULT_REQUEST_RETRY_ATTEMPTS: usize = 5;

#[derive(Clone, Copy, Debug, Default)]
pub struct DatadogMetricsDefaultBatchSettings;

// This default is centered around "series" data, which should be the lion's share of what we
// process. Given that a single series, when encoded, is in the 150-300 byte range, we can fit a
// lot of these into a single request, something like 150-200K series. Simply to be a little more
// conservative, though, we use 100K here. This will also get a little more tricky when it comes to
// distributions and sketches, but we're going to have to implement incremental encoding to handle
// "we've exceeded our maximum payload size, split this batch" scenarios anyways.
pub const MAXIMUM_PAYLOAD_COMPRESSED_SIZE: usize = 3_200_000;
pub const MAXIMUM_PAYLOAD_SIZE: usize = 62_914_560;

// TODO: revisit our concurrency and batching defaults
const DEFAULT_REQUEST_RETRY_ATTEMPTS: usize = 5;

#[derive(Clone, Copy, Debug, Default)]
pub struct DatadogMetricsDefaultBatchSettings;

impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(100_000);
const MAX_BYTES: Option<usize> = None;
const TIMEOUT_SECS: f64 = 2.0;
}

pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";

// TODO: the series V1 endpoint support is considered deprecated and should be removed in a future release.
// At that time when the V1 support is removed, the SeriesApiVersion stops being useful and can be removed.

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SeriesApiVersion {
V1,
Expand All @@ -63,12 +56,12 @@ impl SeriesApiVersion {
Self::V2 => SERIES_V2_PATH,
}
}
fn get_api_version_backwards_compatible() -> Self {
fn get_api_version() -> Self {
static API_VERSION: OnceLock<SeriesApiVersion> = OnceLock::new();
*API_VERSION.get_or_init(|| {
match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V1_API") {
Ok(_) => Self::V1,
Err(_) => Self::V2,
match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API") {
Ok(_) => Self::V2,
Err(_) => Self::V1,
}
})
}
Expand All @@ -83,6 +76,12 @@ pub enum DatadogMetricsEndpoint {
Sketches,
}

/// Payload limits for metrics are endpoint-dependent.
pub(super) struct DatadogMetricsPayloadLimits {
pub(super) uncompressed: usize,
pub(super) compressed: usize,
}

impl DatadogMetricsEndpoint {
/// Gets the content type associated with the specific encoder for a given metric endpoint.
pub const fn content_type(self) -> &'static str {
Expand All @@ -99,7 +98,29 @@ impl DatadogMetricsEndpoint {

// Creates an instance of the `Series` variant with the default API version.
pub fn series() -> Self {
Self::Series(SeriesApiVersion::get_api_version_backwards_compatible())
Self::Series(SeriesApiVersion::get_api_version())
}

pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
// from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics

let (uncompressed, compressed) = match self {
// Sketches use the same payload size limits as v1 series
DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
| DatadogMetricsEndpoint::Sketches => (
62_914_560, // 60 MiB
3_200_000, // 3.2 MB
),
DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => (
5_242_880, // 5 MiB
512_000, // 512 KB
),
};

DatadogMetricsPayloadLimits {
uncompressed,
compressed,
}
}
}

Expand Down Expand Up @@ -200,12 +221,7 @@ impl DatadogMetricsConfig {
) -> crate::Result<DatadogMetricsEndpointConfiguration> {
let base_uri = self.get_base_agent_endpoint();

// TODO: the V1 endpoint support is considered deprecated and should be removed in a future release.
// At that time, the get_api_version_backwards_compatible() should be replaced with statically using the v2.
let series_endpoint = build_uri(
&base_uri,
SeriesApiVersion::get_api_version_backwards_compatible().get_path(),
)?;
let series_endpoint = build_uri(&base_uri, SeriesApiVersion::get_api_version().get_path())?;
let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;

Ok(DatadogMetricsEndpointConfiguration::new(
Expand Down
11 changes: 4 additions & 7 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use vector_lib::{
EstimatedJsonEncodedSizeOf,
};

use super::config::{
DatadogMetricsEndpoint, SeriesApiVersion, MAXIMUM_PAYLOAD_COMPRESSED_SIZE, MAXIMUM_PAYLOAD_SIZE,
};
use super::config::{DatadogMetricsEndpoint, SeriesApiVersion};
use crate::{
common::datadog::{
DatadogMetricType, DatadogPoint, DatadogSeriesMetric, DatadogSeriesMetricMetadata,
Expand Down Expand Up @@ -171,13 +169,12 @@ impl DatadogMetricsEncoder {
endpoint: DatadogMetricsEndpoint,
default_namespace: Option<String>,
) -> Result<Self, CreateError> {
// According to the datadog-agent code, sketches use the same payload size limits as series
// data. We're just gonna go with that for now.
let payload_limits = endpoint.payload_limits();
Self::with_payload_limits(
endpoint,
default_namespace,
MAXIMUM_PAYLOAD_SIZE,
MAXIMUM_PAYLOAD_COMPRESSED_SIZE,
payload_limits.uncompressed,
payload_limits.compressed,
)
}

Expand Down

0 comments on commit 3158f46

Please sign in to comment.