From 5479df2eb5e8401773d604a83590d789a158c735 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 15 Feb 2022 18:39:12 +0100 Subject: [PATCH] feat: collection offset implementation (#10545) --- agent/agent.go | 10 +- agent/tick.go | 81 ++++++++------- agent/tick_test.go | 224 ++++++++++++++++++++++++++++++++-------- config/config.go | 13 +++ docs/CONFIGURATION.md | 10 ++ models/running_input.go | 1 + 6 files changed, 259 insertions(+), 80 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index b5e44c03a6164..5575e8af374a7 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -298,11 +298,17 @@ func (a *Agent) runInputs( jitter = input.Config.CollectionJitter } + // Overwrite agent collection_offset if this plugin has its own. + offset := time.Duration(a.Config.Agent.CollectionOffset) + if input.Config.CollectionOffset != 0 { + offset = input.Config.CollectionOffset + } + var ticker Ticker if a.Config.Agent.RoundInterval { - ticker = NewAlignedTicker(startTime, interval, jitter) + ticker = NewAlignedTicker(startTime, interval, jitter, offset) } else { - ticker = NewUnalignedTicker(interval, jitter) + ticker = NewUnalignedTicker(interval, jitter, offset) } defer ticker.Stop() diff --git a/agent/tick.go b/agent/tick.go index 16233ba6d4adb..ee4cc41223f80 100644 --- a/agent/tick.go +++ b/agent/tick.go @@ -31,36 +31,38 @@ type Ticker interface { type AlignedTicker struct { interval time.Duration jitter time.Duration + offset time.Duration minInterval time.Duration ch chan time.Time cancel context.CancelFunc wg sync.WaitGroup } -func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker { - return newAlignedTicker(now, interval, jitter, clock.New()) -} - -func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker { - ctx, cancel := context.WithCancel(context.Background()) +func NewAlignedTicker(now time.Time, interval, jitter, offset time.Duration) *AlignedTicker { t := &AlignedTicker{ interval: interval, jitter: jitter, + offset: offset, minInterval: interval / 100, - ch: make(chan time.Time, 1), - cancel: cancel, } + t.start(now, clock.New()) + return t +} + +func (t *AlignedTicker) start(now time.Time, clk clock.Clock) { + t.ch = make(chan time.Time, 1) + + ctx, cancel := context.WithCancel(context.Background()) + t.cancel = cancel d := t.next(now) - timer := clock.Timer(d) + timer := clk.Timer(d) t.wg.Add(1) go func() { defer t.wg.Done() t.run(ctx, timer) }() - - return t } func (t *AlignedTicker) next(now time.Time) time.Duration { @@ -74,6 +76,7 @@ func (t *AlignedTicker) next(now time.Time) time.Duration { if d == 0 { d = t.interval } + d += t.offset d += internal.RandomDuration(t.jitter) return d } @@ -118,42 +121,48 @@ func (t *AlignedTicker) Stop() { type UnalignedTicker struct { interval time.Duration jitter time.Duration + offset time.Duration ch chan time.Time cancel context.CancelFunc wg sync.WaitGroup } -func NewUnalignedTicker(interval, jitter time.Duration) *UnalignedTicker { - return newUnalignedTicker(interval, jitter, clock.New()) -} - -func newUnalignedTicker(interval, jitter time.Duration, clock clock.Clock) *UnalignedTicker { - ctx, cancel := context.WithCancel(context.Background()) +func NewUnalignedTicker(interval, jitter, offset time.Duration) *UnalignedTicker { t := &UnalignedTicker{ interval: interval, jitter: jitter, - ch: make(chan time.Time, 1), - cancel: cancel, + offset: offset, } + t.start(clock.New()) + return t +} - ticker := clock.Ticker(t.interval) - t.ch <- clock.Now() +func (t *UnalignedTicker) start(clk clock.Clock) *UnalignedTicker { + t.ch = make(chan time.Time, 1) + ctx, cancel := context.WithCancel(context.Background()) + t.cancel = cancel + + ticker := clk.Ticker(t.interval) + if t.offset == 0 { + // Perform initial trigger to stay backward compatible + t.ch <- clk.Now() + } t.wg.Add(1) go func() { defer t.wg.Done() - t.run(ctx, ticker, clock) + t.run(ctx, ticker, clk) }() return t } -func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error { +func sleep(ctx context.Context, duration time.Duration, clk clock.Clock) error { if duration == 0 { return nil } - t := clock.Timer(duration) + t := clk.Timer(duration) select { case <-t.C: return nil @@ -163,7 +172,7 @@ func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error } } -func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clock clock.Clock) { +func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clk clock.Clock) { for { select { case <-ctx.Done(): @@ -171,13 +180,13 @@ func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clock c return case <-ticker.C: jitter := internal.RandomDuration(t.jitter) - err := sleep(ctx, jitter, clock) + err := sleep(ctx, t.offset+jitter, clk) if err != nil { ticker.Stop() return } select { - case t.ch <- clock.Now(): + case t.ch <- clk.Now(): default: } } @@ -217,20 +226,22 @@ type RollingTicker struct { } func NewRollingTicker(interval, jitter time.Duration) *RollingTicker { - return newRollingTicker(interval, jitter, clock.New()) -} - -func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *RollingTicker { - ctx, cancel := context.WithCancel(context.Background()) t := &RollingTicker{ interval: interval, jitter: jitter, - ch: make(chan time.Time, 1), - cancel: cancel, } + t.start(clock.New()) + return t +} + +func (t *RollingTicker) start(clk clock.Clock) *RollingTicker { + t.ch = make(chan time.Time, 1) + + ctx, cancel := context.WithCancel(context.Background()) + t.cancel = cancel d := t.next() - timer := clock.Timer(d) + timer := clk.Timer(d) t.wg.Add(1) go func() { diff --git a/agent/tick_test.go b/agent/tick_test.go index 69bf0c2affa39..397a56ed8bd62 100644 --- a/agent/tick_test.go +++ b/agent/tick_test.go @@ -13,12 +13,19 @@ import ( func TestAlignedTicker(t *testing.T) { interval := 10 * time.Second jitter := 0 * time.Second + offset := 0 * time.Second - clock := clock.NewMock() - since := clock.Now() + clk := clock.NewMock() + since := clk.Now() until := since.Add(60 * time.Second) - ticker := newAlignedTicker(since, interval, jitter, clock) + ticker := &AlignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + minInterval: interval / 100, + } + ticker.start(since, clk) defer ticker.Stop() expected := []time.Time{ @@ -32,13 +39,13 @@ func TestAlignedTicker(t *testing.T) { actual := []time.Time{} - clock.Add(10 * time.Second) - for !clock.Now().After(until) { + clk.Add(10 * time.Second) + for !clk.Now().After(until) { select { case tm := <-ticker.Elapsed(): actual = append(actual, tm.UTC()) } - clock.Add(10 * time.Second) + clk.Add(10 * time.Second) } require.Equal(t, expected, actual) @@ -47,16 +54,23 @@ func TestAlignedTicker(t *testing.T) { func TestAlignedTickerJitter(t *testing.T) { interval := 10 * time.Second jitter := 5 * time.Second + offset := 0 * time.Second - clock := clock.NewMock() - since := clock.Now() + clk := clock.NewMock() + since := clk.Now() until := since.Add(61 * time.Second) - ticker := newAlignedTicker(since, interval, jitter, clock) + ticker := &AlignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + minInterval: interval / 100, + } + ticker.start(since, clk) defer ticker.Stop() last := since - for !clock.Now().After(until) { + for !clk.Now().After(until) { select { case tm := <-ticker.Elapsed(): dur := tm.Sub(last) @@ -66,24 +80,69 @@ func TestAlignedTickerJitter(t *testing.T) { last = last.Add(interval) default: } - clock.Add(1 * time.Second) + clk.Add(1 * time.Second) + } +} + +func TestAlignedTickerOffset(t *testing.T) { + interval := 10 * time.Second + jitter := 0 * time.Second + offset := 3 * time.Second + + clk := clock.NewMock() + since := clk.Now() + until := since.Add(61 * time.Second) + + ticker := &AlignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + minInterval: interval / 100, + } + ticker.start(since, clk) + defer ticker.Stop() + + expected := []time.Time{ + time.Unix(13, 0).UTC(), + time.Unix(23, 0).UTC(), + time.Unix(33, 0).UTC(), + time.Unix(43, 0).UTC(), + time.Unix(53, 0).UTC(), } + + actual := []time.Time{} + + clk.Add(10*time.Second + offset) + for !clk.Now().After(until) { + tm := <-ticker.Elapsed() + actual = append(actual, tm.UTC()) + clk.Add(10 * time.Second) + } + + require.Equal(t, expected, actual) } func TestAlignedTickerMissedTick(t *testing.T) { interval := 10 * time.Second jitter := 0 * time.Second + offset := 0 * time.Second - clock := clock.NewMock() - since := clock.Now() + clk := clock.NewMock() + since := clk.Now() - ticker := newAlignedTicker(since, interval, jitter, clock) + ticker := &AlignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + minInterval: interval / 100, + } + ticker.start(since, clk) defer ticker.Stop() - clock.Add(25 * time.Second) + clk.Add(25 * time.Second) tm := <-ticker.Elapsed() require.Equal(t, time.Unix(10, 0).UTC(), tm.UTC()) - clock.Add(5 * time.Second) + clk.Add(5 * time.Second) tm = <-ticker.Elapsed() require.Equal(t, time.Unix(30, 0).UTC(), tm.UTC()) } @@ -91,13 +150,19 @@ func TestAlignedTickerMissedTick(t *testing.T) { func TestUnalignedTicker(t *testing.T) { interval := 10 * time.Second jitter := 0 * time.Second + offset := 0 * time.Second - clock := clock.NewMock() - clock.Add(1 * time.Second) - since := clock.Now() + clk := clock.NewMock() + clk.Add(1 * time.Second) + since := clk.Now() until := since.Add(60 * time.Second) - ticker := newUnalignedTicker(interval, jitter, clock) + ticker := &UnalignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + } + ticker.start(clk) defer ticker.Stop() expected := []time.Time{ @@ -111,13 +176,13 @@ func TestUnalignedTicker(t *testing.T) { } actual := []time.Time{} - for !clock.Now().After(until) { + for !clk.Now().After(until) { select { case tm := <-ticker.Elapsed(): actual = append(actual, tm.UTC()) default: } - clock.Add(10 * time.Second) + clk.Add(10 * time.Second) } require.Equal(t, expected, actual) @@ -126,13 +191,19 @@ func TestUnalignedTicker(t *testing.T) { func TestRollingTicker(t *testing.T) { interval := 10 * time.Second jitter := 0 * time.Second + offset := 0 * time.Second - clock := clock.NewMock() - clock.Add(1 * time.Second) - since := clock.Now() + clk := clock.NewMock() + clk.Add(1 * time.Second) + since := clk.Now() until := since.Add(60 * time.Second) - ticker := newUnalignedTicker(interval, jitter, clock) + ticker := &UnalignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + } + ticker.start(clk) defer ticker.Stop() expected := []time.Time{ @@ -146,13 +217,13 @@ func TestRollingTicker(t *testing.T) { } actual := []time.Time{} - for !clock.Now().After(until) { + for !clk.Now().After(until) { select { case tm := <-ticker.Elapsed(): actual = append(actual, tm.UTC()) default: } - clock.Add(10 * time.Second) + clk.Add(10 * time.Second) } require.Equal(t, expected, actual) @@ -167,13 +238,46 @@ func TestAlignedTickerDistribution(t *testing.T) { interval := 10 * time.Second jitter := 5 * time.Second + offset := 0 * time.Second + + clk := clock.NewMock() + since := clk.Now() + + ticker := &AlignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + minInterval: interval / 100, + } + ticker.start(since, clk) + defer ticker.Stop() + dist := simulatedDist(ticker, clk) + printDist(dist) + require.True(t, 350 < dist.Count) + require.True(t, 9 < dist.Mean() && dist.Mean() < 11) +} + +func TestAlignedTickerDistributionWithOffset(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + interval := 10 * time.Second + jitter := 5 * time.Second + offset := 3 * time.Second - clock := clock.NewMock() - since := clock.Now() + clk := clock.NewMock() + since := clk.Now() - ticker := newAlignedTicker(since, interval, jitter, clock) + ticker := &AlignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + minInterval: interval / 100, + } + ticker.start(since, clk) defer ticker.Stop() - dist := simulatedDist(ticker, clock) + dist := simulatedDist(ticker, clk) printDist(dist) require.True(t, 350 < dist.Count) require.True(t, 9 < dist.Mean() && dist.Mean() < 11) @@ -188,12 +292,42 @@ func TestUnalignedTickerDistribution(t *testing.T) { interval := 10 * time.Second jitter := 5 * time.Second + offset := 0 * time.Second + + clk := clock.NewMock() + + ticker := &UnalignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + } + ticker.start(clk) + defer ticker.Stop() + dist := simulatedDist(ticker, clk) + printDist(dist) + require.True(t, 350 < dist.Count) + require.True(t, 9 < dist.Mean() && dist.Mean() < 11) +} + +func TestUnalignedTickerDistributionWithOffset(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + interval := 10 * time.Second + jitter := 5 * time.Second + offset := 3 * time.Second - clock := clock.NewMock() + clk := clock.NewMock() - ticker := newUnalignedTicker(interval, jitter, clock) + ticker := &UnalignedTicker{ + interval: interval, + jitter: jitter, + offset: offset, + } + ticker.start(clk) defer ticker.Stop() - dist := simulatedDist(ticker, clock) + dist := simulatedDist(ticker, clk) printDist(dist) require.True(t, 350 < dist.Count) require.True(t, 9 < dist.Mean() && dist.Mean() < 11) @@ -209,11 +343,15 @@ func TestRollingTickerDistribution(t *testing.T) { interval := 10 * time.Second jitter := 5 * time.Second - clock := clock.NewMock() + clk := clock.NewMock() - ticker := newRollingTicker(interval, jitter, clock) + ticker := &RollingTicker{ + interval: interval, + jitter: jitter, + } + ticker.start(clk) defer ticker.Stop() - dist := simulatedDist(ticker, clock) + dist := simulatedDist(ticker, clk) printDist(dist) require.True(t, 275 < dist.Count) require.True(t, 12 < dist.Mean() && 13 > dist.Mean()) @@ -237,14 +375,14 @@ func printDist(dist Distribution) { fmt.Printf("Count: %d\n", dist.Count) } -func simulatedDist(ticker Ticker, clock *clock.Mock) Distribution { - since := clock.Now() +func simulatedDist(ticker Ticker, clk *clock.Mock) Distribution { + since := clk.Now() until := since.Add(1 * time.Hour) var dist Distribution - last := clock.Now() - for !clock.Now().After(until) { + last := clk.Now() + for !clk.Now().After(until) { select { case tm := <-ticker.Elapsed(): dist.Buckets[tm.Second()]++ @@ -252,7 +390,7 @@ func simulatedDist(ticker Ticker, clock *clock.Mock) Distribution { dist.Waittime += tm.Sub(last).Seconds() last = tm default: - clock.Add(1 * time.Second) + clk.Add(1 * time.Second) } } diff --git a/config/config.go b/config/config.go index 43dfc3520ea88..afa0866cbe27a 100644 --- a/config/config.go +++ b/config/config.go @@ -153,6 +153,11 @@ type AgentConfig struct { // same time, which can have a measurable effect on the system. CollectionJitter Duration + // CollectionOffset is used to shift the collection by the given amount. + // This can be be used to avoid many plugins querying constraint devices + // at the same time by manually scheduling them in time. + CollectionOffset Duration + // FlushInterval is the Interval at which to flush data FlushInterval Duration @@ -322,6 +327,7 @@ var globalTagsConfig = ` # user = "$USER" ` + var agentConfig = ` # Configuration for telegraf agent [agent] @@ -347,6 +353,11 @@ var agentConfig = ` ## same time, which can have a measurable effect on the system. collection_jitter = "0s" + ## Collection offset is used to shift the collection by the given amount. + ## This can be be used to avoid many plugins querying constraint devices + ## at the same time by manually scheduling them in time. + # collection_offset = "0s" + ## Default flushing interval for all outputs. Maximum flush_interval will be ## flush_interval + flush_jitter flush_interval = "10s" @@ -1488,6 +1499,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e c.getFieldDuration(tbl, "interval", &cp.Interval) c.getFieldDuration(tbl, "precision", &cp.Precision) c.getFieldDuration(tbl, "collection_jitter", &cp.CollectionJitter) + c.getFieldDuration(tbl, "collection_offset", &cp.CollectionOffset) c.getFieldString(tbl, "name_prefix", &cp.MeasurementPrefix) c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix) c.getFieldString(tbl, "name_override", &cp.NameOverride) @@ -1793,6 +1805,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { switch key { case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file", "collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter", + "collection_offset", "data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path", "dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path", "fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys", diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 25d10a90b1340..0eacdb5865a46 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -188,6 +188,11 @@ The agent table configures Telegraf and the defaults used across all plugins. This can be used to avoid many plugins querying things like sysfs at the same time, which can have a measurable effect on the system. +- **collection_offset**: + Collection offset is used to shift the collection by the given [interval][]. + This can be be used to avoid many plugins querying constraint devices + at the same time by manually scheduling them in time. + - **flush_interval**: Default flushing [interval][] for all outputs. Maximum flush_interval will be flush_interval + flush_jitter. @@ -281,6 +286,11 @@ Parameters that can be used with any input plugin: plugin. Collection jitter is used to jitter the collection by a random [interval][]. +- **collection_offset**: + Overrides the `collection_offset` setting of the [agent][Agent] for the + plugin. Collection offset is used to shift the collection by the given + [interval][]. + - **name_override**: Override the base name of the measurement. (Default is the name of the input). diff --git a/models/running_input.go b/models/running_input.go index 70a4c2ee3a70f..16f4bd10bc11e 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -60,6 +60,7 @@ type InputConfig struct { Alias string Interval time.Duration CollectionJitter time.Duration + CollectionOffset time.Duration Precision time.Duration NameOverride string