Skip to content

Commit

Permalink
Handle per-plugin flush jitter
Browse files Browse the repository at this point in the history
  • Loading branch information
dbutler-starry committed Oct 31, 2019
1 parent 9efc376 commit a46d969
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
6 changes: 6 additions & 0 deletions agent/agent.go
Expand Up @@ -503,6 +503,12 @@ func (a *Agent) runOutputs(
interval = output.Config.FlushInterval
}

jitter := jitter
// Overwrite agent flush_jitter if this plugin has its own.
if pluginJitter, ok := output.Config.FlushJitter.(time.Duration); ok {
jitter = pluginJitter
}

wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()
Expand Down
11 changes: 8 additions & 3 deletions docs/CONFIGURATION.md
Expand Up @@ -127,9 +127,10 @@ The agent table configures Telegraf and the defaults used across all plugins.
flush_interval + flush_jitter

- **flush_jitter**:
Jitter the flush [interval][] by a random amount. This is primarily to avoid
large write spikes for users running a large number of telegraf instances.
ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
Default flushing jitter for all outputs. This jitters the flush [interval][]
by a random amount. This is primarily to avoid large write spikes for users
running a large number of telegraf instances. ie, a jitter of 5s and interval
10s means flushes will happen every 10-15s

- **precision**:
Collected metrics are rounded to the precision specified as an [interval][].
Expand Down Expand Up @@ -259,6 +260,8 @@ Parameters that can be used with any output plugin:

- **flush_interval**: The maximum time between flushes. Use this setting to
override the agent `flush_interval` on a per plugin basis.
- **flush_jitter**: The amount of time to jitter the flush interval. Use this
setting to override the agent `flush_jitter` on a per plugin basis.
- **metric_batch_size**: The maximum number of metrics to send at once. Use
this setting to override the agent `metric_batch_size` on a per plugin basis.
- **metric_buffer_limit**: The maximum number of unsent metrics to buffer.
Expand All @@ -274,6 +277,7 @@ Override flush parameters for a single output:
```toml
[agent]
flush_interval = "10s"
flush_jitter = "5s"
metric_batch_size = 1000

[[outputs.influxdb]]
Expand All @@ -283,6 +287,7 @@ Override flush parameters for a single output:
[[outputs.file]]
files = [ "stdout" ]
flush_interval = "1s"
flush_jitter = "1s"
metric_batch_size = 10
```

Expand Down
13 changes: 13 additions & 0 deletions internal/config/config.go
Expand Up @@ -2026,6 +2026,18 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
}
}

if node, ok := tbl.Fields["flush_jitter"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
oc.FlushJitter = dur
}
}
}

if node, ok := tbl.Fields["metric_buffer_limit"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if integer, ok := kv.Value.(*ast.Integer); ok {
Expand Down Expand Up @@ -2059,6 +2071,7 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
}

delete(tbl.Fields, "flush_interval")
delete(tbl.Fields, "flush_jitter")
delete(tbl.Fields, "metric_buffer_limit")
delete(tbl.Fields, "metric_batch_size")
delete(tbl.Fields, "alias")
Expand Down
1 change: 1 addition & 0 deletions internal/models/running_output.go
Expand Up @@ -24,6 +24,7 @@ type OutputConfig struct {
Filter Filter

FlushInterval time.Duration
FlushJitter interface{}
MetricBufferLimit int
MetricBatchSize int
}
Expand Down

0 comments on commit a46d969

Please sign in to comment.