Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aggregators: allow historical data in aggregators #6119

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/AGGREGATORS.md
Expand Up @@ -50,6 +50,8 @@ var sampleConfig = `
## If true drop_original will drop the original metrics and
## only send aggregates.
drop_original = false
## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false
`

func (m *Min) Init() error {
Expand Down
11 changes: 11 additions & 0 deletions etc/telegraf.conf
Expand Up @@ -1493,6 +1493,8 @@
# ## If true, the original metric will be dropped by the
# ## aggregator and will not get sent to the output plugins.
# drop_original = false
# ## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false
#
# ## Configures which basic stats to push as fields
# # stats = ["count", "min", "max", "mean", "stdev", "s2", "sum"]
Expand All @@ -1505,6 +1507,8 @@
# ## If true, the original metric will be dropped by the
# ## aggregator and will not get sent to the output plugins.
# drop_original = false
# ## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false
#
# ## The time that a series is not updated until considering it final.
# series_timeout = "5m"
Expand All @@ -1519,6 +1523,9 @@
# ## aggregator and will not get sent to the output plugins.
# drop_original = false
#
# ## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false
#
# ## If true, the histogram will be reset on flush instead
# ## of accumulating the results.
# reset = false
Expand Down Expand Up @@ -1548,6 +1555,8 @@
# ## If true, the original metric will be dropped by the
# ## aggregator and will not get sent to the output plugins.
# drop_original = false
# ## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false


# # Count the occurrence of values in fields.
Expand All @@ -1558,6 +1567,8 @@
# ## If true, the original metric will be dropped by the
# ## aggregator and will not get sent to the output plugins.
# drop_original = false
# ## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false
# ## The fields for which the values will be counted
# fields = []

Expand Down
13 changes: 13 additions & 0 deletions internal/config/config.go
Expand Up @@ -1065,6 +1065,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}

if node, ok := tbl.Fields["allow_historical"]; ok {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am I correctly inferring that this is internal to telegraf? would this work for all plugins?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would work for all aggregator type plugins. Which means it extends all existing aggregator plugins with added functionality, but because the default value is false, it does not have adverse affects to existing deployments.

if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
var err error
conf.AllowHistorical, err = strconv.ParseBool(b.Value)
if err != nil {
log.Printf("Error parsing boolean value for %s: %s\n", name, err)
}
}
}
}

if node, ok := tbl.Fields["name_prefix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
Expand Down Expand Up @@ -1101,6 +1113,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "allow_historical")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
Expand Down
21 changes: 12 additions & 9 deletions internal/models/running_aggregator.go
Expand Up @@ -55,10 +55,11 @@ func NewRunningAggregator(

// AggregatorConfig is the common config for all aggregators.
type AggregatorConfig struct {
Name string
DropOriginal bool
Period time.Duration
Delay time.Duration
Name string
DropOriginal bool
AllowHistorical bool
Period time.Duration
Delay time.Duration

NameOverride string
MeasurementPrefix string
Expand Down Expand Up @@ -135,11 +136,13 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool {
r.Lock()
defer r.Unlock()

if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd)
r.MetricsDropped.Incr(1)
return r.Config.DropOriginal
if !r.Config.AllowHistorical {
if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd)
r.MetricsDropped.Incr(1)
return r.Config.DropOriginal
}
}

r.Aggregator.Add(m)
Expand Down
51 changes: 51 additions & 0 deletions internal/models/running_aggregator_test.go
Expand Up @@ -89,6 +89,57 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"])
}

func TestAllowHistorical(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 500,
AllowHistorical: true,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))

m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now.Add(-time.Hour),
telegraf.Untyped,
)
require.False(t, ra.Add(m))

// metric after current period
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now.Add(time.Hour),
telegraf.Untyped,
)
require.False(t, ra.Add(m))

// "now" metric
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*50),
telegraf.Untyped)
require.False(t, ra.Add(m))

ra.Push(&acc)
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, int64(303), acc.Metrics[0].Fields["sum"])
}

func TestAddAndPushOnePeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Expand Down
2 changes: 2 additions & 0 deletions plugins/aggregators/basicstats/basicstats.go
Expand Up @@ -52,6 +52,8 @@ var sampleConfig = `
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false

## Configures which basic stats to push as fields
# stats = ["count", "min", "max", "mean", "stdev", "s2", "sum"]
Expand Down
2 changes: 2 additions & 0 deletions plugins/aggregators/final/final.go
Expand Up @@ -14,6 +14,8 @@ var sampleConfig = `
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false

## The time that a series is not updated until considering it final.
series_timeout = "5m"
Expand Down
3 changes: 3 additions & 0 deletions plugins/aggregators/histogram/histogram.go
Expand Up @@ -73,6 +73,9 @@ var sampleConfig = `
## aggregator and will not get sent to the output plugins.
drop_original = false

## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false

## If true, the histogram will be reset on flush instead
## of accumulating the results.
reset = false
Expand Down
2 changes: 2 additions & 0 deletions plugins/aggregators/minmax/minmax.go
Expand Up @@ -33,6 +33,8 @@ var sampleConfig = `
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false
`

func (m *MinMax) SampleConfig() string {
Expand Down
2 changes: 2 additions & 0 deletions plugins/aggregators/valuecounter/valuecounter.go
Expand Up @@ -36,6 +36,8 @@ var sampleConfig = `
drop_original = false
## The fields for which the values will be counted
fields = []
## If true, metrics that fall outside the period won't be ignored.
# allow_historical = false
`

// SampleConfig generates a sample config for the ValueCounter plugin
Expand Down