From 0f93b91b4897a5d93b595c1374e208f7ec196969 Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Tue, 30 Apr 2024 11:01:24 -0500 Subject: [PATCH] address some review comments --- config/config.go | 6 +++--- models/buffer.go | 30 +++++++++++++----------------- models/buffer_disk.go | 8 ++++---- models/buffer_mem.go | 16 ++++++++-------- models/running_output.go | 6 +++--- 5 files changed, 31 insertions(+), 35 deletions(-) diff --git a/config/config.go b/config/config.go index 037719577e9d4..00e00f2dea720 100644 --- a/config/config.go +++ b/config/config.go @@ -271,8 +271,8 @@ type AgentConfig struct { // this setting to true will skip the second run of processors. SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"` - BufferStrategy string `toml:"buffer_strategy"` - BufferFileDirectory string `toml:"buffer_file_directory"` + BufferStrategy string `toml:"buffer_strategy"` + BufferDirectory string `toml:"buffer_directory"` } // InputNames returns a list of strings of the configured inputs. @@ -1492,7 +1492,7 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, c.getFieldString(tbl, "name_prefix", &oc.NamePrefix) c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior) c.getFieldString(tbl, "buffer_strategy", &oc.BufferStrategy) - c.getFieldString(tbl, "buffer_file_directory", &oc.BufferFileDirectory) + c.getFieldString(tbl, "buffer_directory", &oc.BufferDirectory) if c.hasErrs() { return nil, c.firstErr() diff --git a/models/buffer.go b/models/buffer.go index d14a85672d912..dd87c00dbd475 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -10,12 +10,6 @@ var ( AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{}) ) -const ( - StrategyMemory = "memory" - StrategyDisk = "disk" - StrategyOverflow = "overflow" -) - type Buffer interface { // Len returns the number of metrics currently in the buffer. @@ -37,9 +31,9 @@ type Buffer interface { Reject([]telegraf.Metric) } -// BufferMetrics holds common metrics used for buffer implementations. +// BufferStats holds common metrics used for buffer implementations. // Implementations of Buffer should embed this struct in them. -type BufferMetrics struct { +type BufferStats struct { MetricsAdded selfstat.Stat MetricsWritten selfstat.Stat MetricsDropped selfstat.Stat @@ -51,26 +45,28 @@ type BufferMetrics struct { func NewBuffer(name string, alias string, capacity int, strategy string, path string) Buffer { bm := NewBufferMetrics(name, alias, capacity) - if strategy == StrategyDisk { + switch strategy { + case "", "memory": + return NewMemoryBuffer(capacity, bm) + case "disk": return NewDiskBuffer(name, capacity, path, bm) - } else if strategy == StrategyOverflow { + case "overflow": // todo implementme // todo log currently unimplemented return NewMemoryBuffer(capacity, bm) - } else if strategy == StrategyMemory || strategy == "" { - return NewMemoryBuffer(capacity, bm) } + // todo log invalid buffer strategy configuration provided, falling back to memory return NewMemoryBuffer(capacity, bm) } -func NewBufferMetrics(name string, alias string, capacity int) BufferMetrics { +func NewBufferMetrics(name string, alias string, capacity int) BufferStats { tags := map[string]string{"output": name} if alias != "" { tags["alias"] = alias } - bm := BufferMetrics{ + bm := BufferStats{ MetricsAdded: selfstat.Register( "write", "metrics_added", @@ -102,17 +98,17 @@ func NewBufferMetrics(name string, alias string, capacity int) BufferMetrics { return bm } -func (b *BufferMetrics) metricAdded() { +func (b *BufferStats) metricAdded() { b.MetricsAdded.Incr(1) } -func (b *BufferMetrics) metricWritten(metric telegraf.Metric) { +func (b *BufferStats) metricWritten(metric telegraf.Metric) { AgentMetricsWritten.Incr(1) b.MetricsWritten.Incr(1) metric.Accept() } -func (b *BufferMetrics) metricDropped(metric telegraf.Metric) { +func (b *BufferStats) metricDropped(metric telegraf.Metric) { AgentMetricsDropped.Incr(1) b.MetricsDropped.Incr(1) metric.Reject() diff --git a/models/buffer_disk.go b/models/buffer_disk.go index e06358dbd7730..5e0a3fda7e00d 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -10,20 +10,20 @@ import ( ) type DiskBuffer struct { - BufferMetrics + BufferStats walFile *wal.Log } -func NewDiskBuffer(name string, capacity int, path string, metrics BufferMetrics) *DiskBuffer { +func NewDiskBuffer(name string, capacity int, path string, stats BufferStats) *DiskBuffer { // todo capacity walFile, err := wal.Open(path+"/"+name, nil) if err != nil { return nil // todo error handling } return &DiskBuffer{ - BufferMetrics: metrics, - walFile: walFile, + BufferStats: stats, + walFile: walFile, } } diff --git a/models/buffer_mem.go b/models/buffer_mem.go index f319f568c6180..a58cef61b9474 100644 --- a/models/buffer_mem.go +++ b/models/buffer_mem.go @@ -9,7 +9,7 @@ import ( // MemoryBuffer stores metrics in a circular buffer. type MemoryBuffer struct { sync.Mutex - BufferMetrics + BufferStats buf []telegraf.Metric first int // index of the first/oldest metric @@ -21,14 +21,14 @@ type MemoryBuffer struct { batchSize int // number of metrics currently in the batch } -func NewMemoryBuffer(capacity int, metrics BufferMetrics) *MemoryBuffer { +func NewMemoryBuffer(capacity int, stats BufferStats) *MemoryBuffer { return &MemoryBuffer{ - BufferMetrics: metrics, - buf: make([]telegraf.Metric, capacity), - first: 0, - last: 0, - size: 0, - cap: capacity, + BufferStats: stats, + buf: make([]telegraf.Metric, capacity), + first: 0, + last: 0, + size: 0, + cap: capacity, } } diff --git a/models/running_output.go b/models/running_output.go index d8be190a3f1af..4cc7864e45faa 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -37,8 +37,8 @@ type OutputConfig struct { NamePrefix string NameSuffix string - BufferStrategy string - BufferFileDirectory string + BufferStrategy string + BufferDirectory string } // RunningOutput contains the output configuration @@ -99,7 +99,7 @@ func NewRunningOutput( } ro := &RunningOutput{ - buffer: NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferFileDirectory), + buffer: NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory), BatchReady: make(chan time.Time, 1), Output: output, Config: config,