Skip to content

Commit

Permalink
feat(adaptive_concurrency): support configuring the initial ARC limit (
Browse files Browse the repository at this point in the history
…#18175)

* feat(adaptive_concurrency): support configuring the initial ARC limit

* docs: update component docs for new arc configuration

* replace option with default value and add range validation

* update docs
  • Loading branch information
blake-mealey committed Aug 8, 2023
1 parent 923eb45 commit 42119dc
Show file tree
Hide file tree
Showing 42 changed files with 457 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/sinks/util/adaptive_concurrency/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<L> Controller<L> {
// current limit and the maximum, effectively bypassing all the
// mechanisms. Otherwise, the current limit is set to 1 and the
// maximum to MAX_CONCURRENCY.
let current_limit = concurrency.unwrap_or(1);
let current_limit = concurrency.unwrap_or(settings.initial_concurrency);
Self {
semaphore: Arc::new(ShrinkableSemaphore::new(current_limit)),
concurrency,
Expand Down
16 changes: 16 additions & 0 deletions src/sinks/util/adaptive_concurrency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ mod service;
#[cfg(test)]
pub mod tests;

// Make sure to update the max range of the `AdaptiveConcurrencySettings::initial_concurrency` when changing
// this constant.
pub(super) const MAX_CONCURRENCY: usize = 200;

pub(crate) use layer::AdaptiveConcurrencyLimitLayer;
Expand All @@ -29,6 +31,15 @@ pub(self) fn instant_now() -> std::time::Instant {
#[derive(Clone, Copy, Debug)]
#[serde(deny_unknown_fields)]
pub struct AdaptiveConcurrencySettings {
/// The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
///
/// It is recommended to set this value to your service's average limit if you're seeing that it takes a
/// long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
/// `adaptive_concurrency_limit` metric.
#[configurable(validation(range(min = 1, max = 200)))]
#[serde(default = "default_initial_concurrency")]
pub(super) initial_concurrency: usize,

/// The fraction of the current value to set the new concurrency limit when decreasing the limit.
///
/// Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly
Expand Down Expand Up @@ -63,6 +74,10 @@ pub struct AdaptiveConcurrencySettings {
pub(super) rtt_deviation_scale: f64,
}

const fn default_initial_concurrency() -> usize {
1
}

const fn default_decrease_ratio() -> f64 {
0.9
}
Expand All @@ -84,6 +99,7 @@ impl AdaptiveConcurrencySettings {
impl Default for AdaptiveConcurrencySettings {
fn default() -> Self {
Self {
initial_concurrency: default_initial_concurrency(),
decrease_ratio: default_decrease_ratio(),
ewma_alpha: default_ewma_alpha(),
rtt_deviation_scale: default_rtt_deviation_scale(),
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/appsignal.cue
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ base: components: sinks: appsignal: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,17 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,17 @@ base: components: sinks: aws_cloudwatch_metrics: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,17 @@ base: components: sinks: aws_kinesis_firehose: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,17 @@ base: components: sinks: aws_kinesis_streams: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,17 @@ base: components: sinks: aws_s3: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/aws_sqs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,17 @@ base: components: sinks: aws_sqs: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/axiom.cue
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ base: components: sinks: axiom: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/azure_blob.cue
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,17 @@ base: components: sinks: azure_blob: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/azure_monitor_logs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ base: components: sinks: azure_monitor_logs: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/clickhouse.cue
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ base: components: sinks: clickhouse: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/databend.cue
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,17 @@ base: components: sinks: databend: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/datadog_events.cue
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ base: components: sinks: datadog_events: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/datadog_logs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,17 @@ base: components: sinks: datadog_logs: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/datadog_metrics.cue
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ base: components: sinks: datadog_metrics: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/datadog_traces.cue
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ base: components: sinks: datadog_traces: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/elasticsearch.cue
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,17 @@ base: components: sinks: elasticsearch: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,17 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/gcp_cloud_storage.cue
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,17 @@ base: components: sinks: gcp_cloud_storage: configuration: {
required: false
type: float: default: 0.4
}
initial_concurrency: {
description: """
The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service's average limit if you're seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
`adaptive_concurrency_limit` metric.
"""
required: false
type: uint: default: 1
}
rtt_deviation_scale: {
description: """
Scale of RTT deviations which are not considered anomalous.
Expand Down

0 comments on commit 42119dc

Please sign in to comment.