Skip to content

Commit

Permalink
address some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
DStrand1 committed Apr 30, 2024
1 parent 61de4ec commit 0f93b91
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 35 deletions.
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ type AgentConfig struct {
// this setting to true will skip the second run of processors.
SkipProcessorsAfterAggregators bool `toml:"skip_processors_after_aggregators"`

BufferStrategy string `toml:"buffer_strategy"`
BufferFileDirectory string `toml:"buffer_file_directory"`
BufferStrategy string `toml:"buffer_strategy"`
BufferDirectory string `toml:"buffer_directory"`
}

// InputNames returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -1492,7 +1492,7 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior)
c.getFieldString(tbl, "buffer_strategy", &oc.BufferStrategy)
c.getFieldString(tbl, "buffer_file_directory", &oc.BufferFileDirectory)
c.getFieldString(tbl, "buffer_directory", &oc.BufferDirectory)

if c.hasErrs() {
return nil, c.firstErr()
Expand Down
30 changes: 13 additions & 17 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ var (
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})
)

const (
StrategyMemory = "memory"
StrategyDisk = "disk"
StrategyOverflow = "overflow"
)

type Buffer interface {

// Len returns the number of metrics currently in the buffer.
Expand All @@ -37,9 +31,9 @@ type Buffer interface {
Reject([]telegraf.Metric)
}

// BufferMetrics holds common metrics used for buffer implementations.
// BufferStats holds common metrics used for buffer implementations.
// Implementations of Buffer should embed this struct in them.
type BufferMetrics struct {
type BufferStats struct {
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsDropped selfstat.Stat
Expand All @@ -51,26 +45,28 @@ type BufferMetrics struct {
func NewBuffer(name string, alias string, capacity int, strategy string, path string) Buffer {
bm := NewBufferMetrics(name, alias, capacity)

if strategy == StrategyDisk {
switch strategy {
case "", "memory":
return NewMemoryBuffer(capacity, bm)
case "disk":
return NewDiskBuffer(name, capacity, path, bm)
} else if strategy == StrategyOverflow {
case "overflow":
// todo implementme
// todo log currently unimplemented
return NewMemoryBuffer(capacity, bm)
} else if strategy == StrategyMemory || strategy == "" {
return NewMemoryBuffer(capacity, bm)
}

// todo log invalid buffer strategy configuration provided, falling back to memory
return NewMemoryBuffer(capacity, bm)
}

func NewBufferMetrics(name string, alias string, capacity int) BufferMetrics {
func NewBufferMetrics(name string, alias string, capacity int) BufferStats {
tags := map[string]string{"output": name}
if alias != "" {
tags["alias"] = alias
}

bm := BufferMetrics{
bm := BufferStats{
MetricsAdded: selfstat.Register(
"write",
"metrics_added",
Expand Down Expand Up @@ -102,17 +98,17 @@ func NewBufferMetrics(name string, alias string, capacity int) BufferMetrics {
return bm
}

func (b *BufferMetrics) metricAdded() {
func (b *BufferStats) metricAdded() {
b.MetricsAdded.Incr(1)
}

func (b *BufferMetrics) metricWritten(metric telegraf.Metric) {
func (b *BufferStats) metricWritten(metric telegraf.Metric) {
AgentMetricsWritten.Incr(1)
b.MetricsWritten.Incr(1)
metric.Accept()
}

func (b *BufferMetrics) metricDropped(metric telegraf.Metric) {
func (b *BufferStats) metricDropped(metric telegraf.Metric) {
AgentMetricsDropped.Incr(1)
b.MetricsDropped.Incr(1)
metric.Reject()
Expand Down
8 changes: 4 additions & 4 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ import (
)

type DiskBuffer struct {
BufferMetrics
BufferStats

walFile *wal.Log
}

func NewDiskBuffer(name string, capacity int, path string, metrics BufferMetrics) *DiskBuffer {
func NewDiskBuffer(name string, capacity int, path string, stats BufferStats) *DiskBuffer {
// todo capacity
walFile, err := wal.Open(path+"/"+name, nil)
if err != nil {
return nil // todo error handling
}
return &DiskBuffer{
BufferMetrics: metrics,
walFile: walFile,
BufferStats: stats,
walFile: walFile,
}
}

Expand Down
16 changes: 8 additions & 8 deletions models/buffer_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// MemoryBuffer stores metrics in a circular buffer.
type MemoryBuffer struct {
sync.Mutex
BufferMetrics
BufferStats

buf []telegraf.Metric
first int // index of the first/oldest metric
Expand All @@ -21,14 +21,14 @@ type MemoryBuffer struct {
batchSize int // number of metrics currently in the batch
}

func NewMemoryBuffer(capacity int, metrics BufferMetrics) *MemoryBuffer {
func NewMemoryBuffer(capacity int, stats BufferStats) *MemoryBuffer {
return &MemoryBuffer{
BufferMetrics: metrics,
buf: make([]telegraf.Metric, capacity),
first: 0,
last: 0,
size: 0,
cap: capacity,
BufferStats: stats,
buf: make([]telegraf.Metric, capacity),
first: 0,
last: 0,
size: 0,
cap: capacity,
}
}

Expand Down
6 changes: 3 additions & 3 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type OutputConfig struct {
NamePrefix string
NameSuffix string

BufferStrategy string
BufferFileDirectory string
BufferStrategy string
BufferDirectory string
}

// RunningOutput contains the output configuration
Expand Down Expand Up @@ -99,7 +99,7 @@ func NewRunningOutput(
}

ro := &RunningOutput{
buffer: NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferFileDirectory),
buffer: NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory),
BatchReady: make(chan time.Time, 1),
Output: output,
Config: config,
Expand Down

0 comments on commit 0f93b91

Please sign in to comment.