Skip to content

Commit

Permalink
Performance refactor of running_output buffers
Browse files Browse the repository at this point in the history
closes #914
closes #967
  • Loading branch information
sparrc committed Apr 26, 2016
1 parent 44c945b commit 72be04f
Show file tree
Hide file tree
Showing 5 changed files with 369 additions and 194 deletions.
64 changes: 64 additions & 0 deletions internal/buffer/buffer.go
@@ -0,0 +1,64 @@
package buffer

import (
"github.com/influxdata/telegraf"
)

type Buffer struct {
buf chan telegraf.Metric
// total dropped metrics
drops int
// total metrics added
total int
}

func NewBuffer(size int) *Buffer {
return &Buffer{
buf: make(chan telegraf.Metric, size),
}
}

func (b *Buffer) IsEmpty() bool {
return len(b.buf) == 0
}

func (b *Buffer) Len() int {
return len(b.buf)
}

func (b *Buffer) Drops() int {
return b.drops
}

func (b *Buffer) Total() int {
return b.total
}

func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i, _ := range metrics {
b.total++
select {
case b.buf <- metrics[i]:
default:
b.drops++
<-b.buf
b.buf <- metrics[i]
}
}
}

func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
n := min(len(b.buf), batchSize)
out := make([]telegraf.Metric, n)
for i := 0; i < n; i++ {
out[i] = <-b.buf
}
return out
}

func min(a, b int) int {
if b < a {
return b
}
return a
}
95 changes: 95 additions & 0 deletions internal/buffer/buffer_test.go
@@ -0,0 +1,95 @@
package buffer

import (
"testing"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"
//"github.com/stretchr/testify/require"
)

var metricList = []telegraf.Metric{
testutil.TestMetric(2, "mymetric1"),
testutil.TestMetric(1, "mymetric2"),
testutil.TestMetric(11, "mymetric3"),
testutil.TestMetric(15, "mymetric4"),
testutil.TestMetric(8, "mymetric5"),
}

func BenchmarkAddMetrics(b *testing.B) {
buf := NewBuffer(10000)
m := testutil.TestMetric(1, "mymetric")
for n := 0; n < b.N; n++ {
buf.Add(m)
}
}

func TestNewBufferBasicFuncs(t *testing.T) {
b := NewBuffer(10)

assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len())
assert.Zero(t, b.Drops())
assert.Zero(t, b.Total())

m := testutil.TestMetric(1, "mymetric")
b.Add(m)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 1)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 1)

b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 6)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 6)
}

func TestDroppingMetrics(t *testing.T) {
b := NewBuffer(10)

// Add up to the size of the buffer
b.Add(metricList...)
b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 10)

// Add 5 more and verify they were dropped
b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 5)
assert.Equal(t, b.Total(), 15)
}

func TestGettingBatches(t *testing.T) {
b := NewBuffer(20)

// Verify that the buffer returned is smaller than requested when there are
// not as many items as requested.
b.Add(metricList...)
batch := b.Batch(10)
assert.Len(t, batch, 5)

// Verify that the buffer is now empty
assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len())
assert.Zero(t, b.Drops())
assert.Equal(t, b.Total(), 5)

// Verify that the buffer returned is not more than the size requested
b.Add(metricList...)
batch = b.Batch(3)
assert.Len(t, batch, 3)

// Verify that buffer is not empty
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 2)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 10)
}
16 changes: 4 additions & 12 deletions internal/config/config.go
Expand Up @@ -188,15 +188,13 @@ var header = `# Telegraf Configuration
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to output in batch of at
## Telegraf will send metrics to outputs in batches of at
## most metric_batch_size metrics.
metric_batch_size = 1000
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write. This should be a multiple of
## metric_batch_size and could not be less than 2 times metric_batch_size
## metric_batch_size and can not be less than 2 times metric_batch_size.
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
Expand Down Expand Up @@ -535,14 +533,8 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return err
}

ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBatchSize > 0 {
ro.MetricBatchSize = c.Agent.MetricBatchSize
}
if c.Agent.MetricBufferLimit > 0 {
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
}
ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
ro := internal_models.NewRunningOutput(name, output, outputConfig,
c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit)
c.Outputs = append(c.Outputs, ro)
return nil
}
Expand Down

0 comments on commit 72be04f

Please sign in to comment.