Skip to content

Commit

Permalink
Fix aggregator window and shutdown of multiple aggregators (#5644)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson committed Mar 29, 2019
1 parent 3045ffb commit 4e3244c
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 94 deletions.
9 changes: 4 additions & 5 deletions accumulator.go
Expand Up @@ -41,11 +41,10 @@ type Accumulator interface {
// AddMetric adds an metric to the accumulator.
AddMetric(Metric)

// SetPrecision takes two time.Duration objects. If the first is non-zero,
// it sets that as the precision. Otherwise, it takes the second argument
// as the order of time that the metrics should be rounded to, with the
// maximum being 1s.
SetPrecision(precision, interval time.Duration)
// SetPrecision sets the timestamp rounding precision. All metrics addeds
// added to the accumulator will have their timestamp rounded to the
// nearest multiple of precision.
SetPrecision(precision time.Duration)

// Report an error.
AddError(err error)
Expand Down
17 changes: 2 additions & 15 deletions agent/accumulator.go
Expand Up @@ -114,21 +114,8 @@ func (ac *accumulator) AddError(err error) {
log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err)
}

func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
if precision > 0 {
ac.precision = precision
return
}
switch {
case interval >= time.Second:
ac.precision = time.Second
case interval >= time.Millisecond:
ac.precision = time.Millisecond
case interval >= time.Microsecond:
ac.precision = time.Microsecond
default:
ac.precision = time.Nanosecond
}
func (ac *accumulator) SetPrecision(precision time.Duration) {
ac.precision = precision
}

func (ac *accumulator) getTime(t []time.Time) time.Time {
Expand Down
7 changes: 3 additions & 4 deletions agent/accumulator_test.go
Expand Up @@ -74,7 +74,6 @@ func TestSetPrecision(t *testing.T) {
name string
unset bool
precision time.Duration
interval time.Duration
timestamp time.Time
expected time.Time
}{
Expand All @@ -86,13 +85,13 @@ func TestSetPrecision(t *testing.T) {
},
{
name: "second interval",
interval: time.Second,
precision: time.Second,
timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC),
expected: time.Date(2006, time.February, 10, 12, 0, 0, 0, time.UTC),
},
{
name: "microsecond interval",
interval: time.Microsecond,
precision: time.Microsecond,
timestamp: time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC),
expected: time.Date(2006, time.February, 10, 12, 0, 0, 82913000, time.UTC),
},
Expand All @@ -109,7 +108,7 @@ func TestSetPrecision(t *testing.T) {

a := NewAccumulator(&TestMetricMaker{}, metrics)
if !tt.unset {
a.SetPrecision(tt.precision, tt.interval)
a.SetPrecision(tt.precision)
}

a.AddFields("acctest",
Expand Down
131 changes: 86 additions & 45 deletions agent/agent.go
Expand Up @@ -180,17 +180,15 @@ func (a *Agent) Test(ctx context.Context) error {
}

acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
acc.SetPrecision(a.Precision())
input.SetDefaultTags(a.Config.Tags)

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch input.Name() {
case "inputs.cpu", "inputs.mongodb", "inputs.procstat":
nulAcc := NewAccumulator(input, nulC)
nulAcc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil {
return err
}
Expand Down Expand Up @@ -222,7 +220,6 @@ func (a *Agent) runInputs(
var wg sync.WaitGroup
for _, input := range a.Config.Inputs {
interval := a.Config.Agent.Interval.Duration
precision := a.Config.Agent.Precision.Duration
jitter := a.Config.Agent.CollectionJitter.Duration

// Overwrite agent interval if this plugin has its own.
Expand All @@ -231,7 +228,7 @@ func (a *Agent) runInputs(
}

acc := NewAccumulator(input, dst)
acc.SetPrecision(precision, interval)
acc.SetPrecision(a.Precision())

wg.Add(1)
go func(input *models.RunningInput) {
Expand Down Expand Up @@ -339,17 +336,41 @@ func (a *Agent) applyProcessors(m telegraf.Metric) []telegraf.Metric {
return metrics
}

// runAggregators triggers the periodic push for Aggregators.
func updateWindow(start time.Time, roundInterval bool, period time.Duration) (time.Time, time.Time) {
var until time.Time
if roundInterval {
until = internal.AlignTime(start, period)
if until == start {
until = internal.AlignTime(start.Add(time.Nanosecond), period)
}
} else {
until = start.Add(period)
}

since := until.Add(-period)

return since, until
}

// runAggregators adds metrics to the aggregators and triggers their periodic
// push call.
//
// When the context is done a final push will occur and then this function
// will return.
// Runs until src is closed and all metrics have been processed. Will call
// push one final time before returning.
func (a *Agent) runAggregators(
startTime time.Time,
src <-chan telegraf.Metric,
dst chan<- telegraf.Metric,
) error {
ctx, cancel := context.WithCancel(context.Background())

// Before calling Add, initialize the aggregation window. This ensures
// that any metric created after start time will be aggregated.
for _, agg := range a.Config.Aggregators {
since, until := updateWindow(startTime, a.Config.Agent.RoundInterval, agg.Period())
agg.UpdateWindow(since, until)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -371,73 +392,72 @@ func (a *Agent) runAggregators(
cancel()
}()

precision := a.Config.Agent.Precision.Duration
interval := a.Config.Agent.Interval.Duration
aggregations := make(chan telegraf.Metric, 100)
for _, agg := range a.Config.Aggregators {
wg.Add(1)
go func(agg *models.RunningAggregator) {
defer func() {
wg.Done()
close(aggregations)
}()

if a.Config.Agent.RoundInterval {
// Aggregators are aligned to the agent interval regardless of
// their period.
err := internal.SleepContext(ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}
wg.Add(1)
go func() {
defer wg.Done()

agg.SetPeriodStart(startTime)
var aggWg sync.WaitGroup
for _, agg := range a.Config.Aggregators {
aggWg.Add(1)
go func(agg *models.RunningAggregator) {
defer aggWg.Done()

acc := NewAccumulator(agg, aggregations)
acc.SetPrecision(a.Precision())
fmt.Println(1)
a.push(ctx, agg, acc)
fmt.Println(2)
}(agg)
}

acc := NewAccumulator(agg, aggregations)
acc.SetPrecision(precision, interval)
a.push(ctx, agg, acc)
}(agg)
}
aggWg.Wait()
fmt.Println(3)
close(aggregations)
}()

for metric := range aggregations {
metrics := a.applyProcessors(metric)
for _, metric := range metrics {
dst <- metric
}
}
fmt.Println(4)

wg.Wait()
fmt.Println(5)
return nil
}

// push runs the push for a single aggregator every period. More simple than
// the output/input version as timeout should be less likely.... not really
// because the output channel can block for now.
// push runs the push for a single aggregator every period.
func (a *Agent) push(
ctx context.Context,
aggregator *models.RunningAggregator,
acc telegraf.Accumulator,
) {
ticker := time.NewTicker(aggregator.Period())
defer ticker.Stop()

for {
// Ensures that Push will be called for each period, even if it has
// already elapsed before this function is called. This is guaranteed
// because so long as only Push updates the EndPeriod. This method
// also avoids drift by not using a ticker.
until := time.Until(aggregator.EndPeriod())

select {
case <-ticker.C:
case <-time.After(until):
aggregator.Push(acc)
break
case <-ctx.Done():
aggregator.Push(acc)
return
}

aggregator.Push(acc)
}
}

// runOutputs triggers the periodic write for Outputs.
//
// When the context is done, outputs continue to run until their buffer is
// closed, afterwich they run flush once more.

// Runs until src is closed and all metrics have been processed. Will call
// Write one final time before returning.
func (a *Agent) runOutputs(
startTime time.Time,
src <-chan telegraf.Metric,
Expand Down Expand Up @@ -608,7 +628,7 @@ func (a *Agent) startServiceInputs(
// Gather() accumulator does apply rounding according to the
// precision agent setting.
acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond, 0)
acc.SetPrecision(time.Nanosecond)

err := si.Start(acc)
if err != nil {
Expand Down Expand Up @@ -638,6 +658,27 @@ func (a *Agent) stopServiceInputs() {
}
}

// Returns the rounding precision for metrics.
func (a *Agent) Precision() time.Duration {
precision := a.Config.Agent.Precision.Duration
interval := a.Config.Agent.Interval.Duration

if precision > 0 {
return precision
}

switch {
case interval >= time.Second:
return time.Second
case interval >= time.Millisecond:
return time.Millisecond
case interval >= time.Microsecond:
return time.Microsecond
default:
return time.Nanosecond
}
}

// panicRecover displays an error if an input panics.
func panicRecover(input *models.RunningInput) {
if err := recover(); err != nil {
Expand Down
65 changes: 61 additions & 4 deletions agent/agent_test.go
Expand Up @@ -2,15 +2,13 @@ package agent

import (
"testing"
"time"

"github.com/influxdata/telegraf/internal/config"

// needing to load the plugins
_ "github.com/influxdata/telegraf/plugins/inputs/all"
// needing to load the outputs
_ "github.com/influxdata/telegraf/plugins/outputs/all"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAgent_OmitHostname(t *testing.T) {
Expand Down Expand Up @@ -109,3 +107,62 @@ func TestAgent_LoadOutput(t *testing.T) {
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
}

func TestWindow(t *testing.T) {
parse := func(s string) time.Time {
tm, err := time.Parse(time.RFC3339, s)
if err != nil {
panic(err)
}
return tm
}

tests := []struct {
name string
start time.Time
roundInterval bool
period time.Duration
since time.Time
until time.Time
}{
{
name: "round with exact alignment",
start: parse("2018-03-27T00:00:00Z"),
roundInterval: true,
period: 30 * time.Second,
since: parse("2018-03-27T00:00:00Z"),
until: parse("2018-03-27T00:00:30Z"),
},
{
name: "round with alignment needed",
start: parse("2018-03-27T00:00:05Z"),
roundInterval: true,
period: 30 * time.Second,
since: parse("2018-03-27T00:00:00Z"),
until: parse("2018-03-27T00:00:30Z"),
},
{
name: "no round with exact alignment",
start: parse("2018-03-27T00:00:00Z"),
roundInterval: false,
period: 30 * time.Second,
since: parse("2018-03-27T00:00:00Z"),
until: parse("2018-03-27T00:00:30Z"),
},
{
name: "no found with alignment needed",
start: parse("2018-03-27T00:00:05Z"),
roundInterval: false,
period: 30 * time.Second,
since: parse("2018-03-27T00:00:05Z"),
until: parse("2018-03-27T00:00:35Z"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
since, until := updateWindow(tt.start, tt.roundInterval, tt.period)
require.Equal(t, tt.since, since, "since")
require.Equal(t, tt.until, until, "until")
})
}
}
2 changes: 2 additions & 0 deletions internal/internal.go
Expand Up @@ -288,11 +288,13 @@ func SleepContext(ctx context.Context, duration time.Duration) error {
}

// AlignDuration returns the duration until next aligned interval.
// If the current time is aligned a 0 duration is returned.
func AlignDuration(tm time.Time, interval time.Duration) time.Duration {
return AlignTime(tm, interval).Sub(tm)
}

// AlignTime returns the time of the next aligned interval.
// If the current time is aligned the current time is returned.
func AlignTime(tm time.Time, interval time.Duration) time.Time {
truncated := tm.Truncate(interval)
if truncated == tm {
Expand Down

0 comments on commit 4e3244c

Please sign in to comment.