From 5373f263a3b71cc750ba448c6055e51d65ed321c Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 8 Sep 2015 15:18:14 -0700 Subject: [PATCH 1/4] Add pending control to batcher With this change, the generic batcher used by many inputs can now be buffered. Testing shows that this performance of the Graphite input by 10-100%, with the biggest improvements at lower numbers of connections. --- etc/config.sample.toml | 3 +++ services/collectd/config.go | 6 +++++- services/collectd/service.go | 2 +- services/graphite/config.go | 7 +++++++ services/graphite/config_test.go | 3 +++ services/graphite/service.go | 4 +++- services/udp/config.go | 1 + services/udp/config_test.go | 3 +++ services/udp/service.go | 2 +- tsdb/batcher.go | 9 ++++++--- tsdb/batcher_test.go | 31 +++++++++++++++++++++++++++---- 11 files changed, 60 insertions(+), 11 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 14fb288f1aa..e37aae364b0 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -148,6 +148,7 @@ reporting-disabled = false # will buffer points in memory if you have many coming in. # batch-size = 1000 # will flush if this many points get buffered + # batch-pending = 5 # number of batches that may be pending in memory # batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit ## "name-schema" configures tag names for parsing the metric name from graphite protocol; @@ -187,6 +188,7 @@ reporting-disabled = false # will buffer points in memory if you have many coming in. # batch-size = 1000 # will flush if this many points get buffered + # batch-pending = 5 # number of batches that may be pending in memory # batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit ### @@ -224,6 +226,7 @@ reporting-disabled = false # will buffer points in memory if you have many coming in. # batch-size = 1000 # will flush if this many points get buffered + # batch-pending = 5 # number of batches that may be pending in memory # batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit ### diff --git a/services/collectd/config.go b/services/collectd/config.go index 3129c3378d6..427598ca6dc 100644 --- a/services/collectd/config.go +++ b/services/collectd/config.go @@ -13,7 +13,9 @@ const ( DefaultRetentionPolicy = "" - DefaultBatchSize = 5000 + DefaultBatchSize = 1000 + + DefaultBatchPending = 5 DefaultBatchDuration = toml.Duration(10 * time.Second) @@ -27,6 +29,7 @@ type Config struct { Database string `toml:"database"` RetentionPolicy string `toml:"retention-policy"` BatchSize int `toml:"batch-size"` + BatchPending int `toml:"batch-pending"` BatchDuration toml.Duration `toml:"batch-timeout"` TypesDB string `toml:"typesdb"` } @@ -38,6 +41,7 @@ func NewConfig() Config { Database: DefaultDatabase, RetentionPolicy: DefaultRetentionPolicy, BatchSize: DefaultBatchSize, + BatchPending: DefaultBatchPending, BatchDuration: DefaultBatchDuration, TypesDB: DefaultTypesDB, } diff --git a/services/collectd/service.go b/services/collectd/service.go index 7d8e65f820c..35a31049a0a 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -126,7 +126,7 @@ func (s *Service) Open() error { s.Logger.Println("Listening on UDP: ", ln.LocalAddr().String()) // Start the points batcher. - s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, time.Duration(s.Config.BatchDuration)) + s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration)) s.batcher.Start() // Create channel and wait group for signalling goroutines to stop. diff --git a/services/graphite/config.go b/services/graphite/config.go index ba7381c1244..b7b5df1496a 100644 --- a/services/graphite/config.go +++ b/services/graphite/config.go @@ -29,6 +29,9 @@ const ( // DefaultBatchSize is the default Graphite batch size. DefaultBatchSize = 1000 + // DefaultBatchPending is the default number of pending Graphite batches. + DefaultBatchPending = 5 + // DefaultBatchTimeout is the default Graphite batch timeout. DefaultBatchTimeout = time.Second ) @@ -40,6 +43,7 @@ type Config struct { Enabled bool `toml:"enabled"` Protocol string `toml:"protocol"` BatchSize int `toml:"batch-size"` + BatchPending int `toml:"batch-pending"` BatchTimeout toml.Duration `toml:"batch-timeout"` ConsistencyLevel string `toml:"consistency-level"` Templates []string `toml:"templates"` @@ -63,6 +67,9 @@ func (c *Config) WithDefaults() *Config { if d.BatchSize == 0 { d.BatchSize = DefaultBatchSize } + if d.BatchPending == 0 { + d.BatchPending = DefaultBatchPending + } if d.BatchTimeout == 0 { d.BatchTimeout = toml.Duration(DefaultBatchTimeout) } diff --git a/services/graphite/config_test.go b/services/graphite/config_test.go index 3738a927cf4..e9efc64668e 100644 --- a/services/graphite/config_test.go +++ b/services/graphite/config_test.go @@ -17,6 +17,7 @@ database = "mydb" enabled = true protocol = "tcp" batch-size=100 +batch-pending=77 batch-timeout="1s" consistency-level="one" templates=["servers.* .host.measurement*"] @@ -36,6 +37,8 @@ tags=["region=us-east"] t.Fatalf("unexpected graphite protocol: %s", c.Protocol) } else if c.BatchSize != 100 { t.Fatalf("unexpected graphite batch size: %d", c.BatchSize) + } else if c.BatchPending != 77 { + t.Fatalf("unexpected graphite batch pending: %d", c.BatchPending) } else if time.Duration(c.BatchTimeout) != time.Second { t.Fatalf("unexpected graphite batch timeout: %v", c.BatchTimeout) } else if c.ConsistencyLevel != "one" { diff --git a/services/graphite/service.go b/services/graphite/service.go index 998c28542ea..25db6f7bab5 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -87,6 +87,7 @@ type Service struct { database string protocol string batchSize int + batchPending int batchTimeout time.Duration consistencyLevel cluster.ConsistencyLevel @@ -125,6 +126,7 @@ func NewService(c Config) (*Service, error) { database: d.Database, protocol: d.Protocol, batchSize: d.BatchSize, + batchPending: d.BatchPending, batchTimeout: time.Duration(d.BatchTimeout), logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags), done: make(chan struct{}), @@ -178,7 +180,7 @@ func (s *Service) Open() error { return err } - s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchTimeout) + s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout) s.batcher.Start() // Start processing batches. diff --git a/services/udp/config.go b/services/udp/config.go index 0e5a2b1b0f4..2d8367d5127 100644 --- a/services/udp/config.go +++ b/services/udp/config.go @@ -8,5 +8,6 @@ type Config struct { Database string `toml:"database"` BatchSize int `toml:"batch-size"` + BatchPending int `toml:"batch-pending"` BatchTimeout toml.Duration `toml:"batch-timeout"` } diff --git a/services/udp/config_test.go b/services/udp/config_test.go index d094c74c9af..cbec79f5a85 100644 --- a/services/udp/config_test.go +++ b/services/udp/config_test.go @@ -16,6 +16,7 @@ enabled = true bind-address = ":4444" database = "awesomedb" batch-size = 100 +batch-pending = 9 batch-timeout = "10ms" `, &c); err != nil { t.Fatal(err) @@ -30,6 +31,8 @@ batch-timeout = "10ms" t.Fatalf("unexpected database: %s", c.Database) } else if c.BatchSize != 100 { t.Fatalf("unexpected batch size: %d", c.BatchSize) + } else if c.BatchPending != 9 { + t.Fatalf("unexpected batch pending: %d", c.BatchPending) } else if time.Duration(c.BatchTimeout) != (10 * time.Millisecond) { t.Fatalf("unexpected batch timeout: %v", c.BatchTimeout) } diff --git a/services/udp/service.go b/services/udp/service.go index 15c8631f799..0d30c0f415e 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -56,7 +56,7 @@ func NewService(c Config) *Service { return &Service{ config: c, done: make(chan struct{}), - batcher: tsdb.NewPointBatcher(c.BatchSize, time.Duration(c.BatchTimeout)), + batcher: tsdb.NewPointBatcher(c.BatchSize, c.BatchPending, time.Duration(c.BatchTimeout)), Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), } } diff --git a/tsdb/batcher.go b/tsdb/batcher.go index aefbea723e1..8ce8d4e5196 100644 --- a/tsdb/batcher.go +++ b/tsdb/batcher.go @@ -22,13 +22,16 @@ type PointBatcher struct { wg *sync.WaitGroup } -// NewPointBatcher returns a new PointBatcher. -func NewPointBatcher(sz int, d time.Duration) *PointBatcher { +// NewPointBatcher returns a new PointBatcher. sz is the batching size, +// bp is the maximum number of batches that may be pending. d is the time +// after which a batch will be emitted after the first point is received +// for the batch, regardless of its size. +func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher { return &PointBatcher{ size: sz, duration: d, stop: make(chan struct{}), - in: make(chan Point), + in: make(chan Point, bp*sz), out: make(chan []Point), flush: make(chan struct{}), } diff --git a/tsdb/batcher_test.go b/tsdb/batcher_test.go index f3652e6c88f..6757bd15961 100644 --- a/tsdb/batcher_test.go +++ b/tsdb/batcher_test.go @@ -10,7 +10,30 @@ import ( // TestBatch_Size ensures that a batcher generates a batch when the size threshold is reached. func TestBatch_Size(t *testing.T) { batchSize := 5 - batcher := tsdb.NewPointBatcher(batchSize, time.Hour) + batcher := tsdb.NewPointBatcher(batchSize, 0, time.Hour) + if batcher == nil { + t.Fatal("failed to create batcher for size test") + } + + batcher.Start() + + var p tsdb.Point + go func() { + for i := 0; i < batchSize; i++ { + batcher.In() <- p + } + }() + batch := <-batcher.Out() + if len(batch) != batchSize { + t.Errorf("received batch has incorrect length exp %d, got %d", batchSize, len(batch)) + } + checkPointBatcherStats(t, batcher, -1, batchSize, 1, 0) +} + +// TestBatch_Size ensures that a buffered batcher generates a batch when the size threshold is reached. +func TestBatch_SizeBuffered(t *testing.T) { + batchSize := 5 + batcher := tsdb.NewPointBatcher(batchSize, 5, time.Hour) if batcher == nil { t.Fatal("failed to create batcher for size test") } @@ -33,7 +56,7 @@ func TestBatch_Size(t *testing.T) { // TestBatch_Size ensures that a batcher generates a batch when the timeout triggers. func TestBatch_Timeout(t *testing.T) { batchSize := 5 - batcher := tsdb.NewPointBatcher(batchSize+1, 100*time.Millisecond) + batcher := tsdb.NewPointBatcher(batchSize+1, 0, 100*time.Millisecond) if batcher == nil { t.Fatal("failed to create batcher for timeout test") } @@ -56,7 +79,7 @@ func TestBatch_Timeout(t *testing.T) { // TestBatch_Flush ensures that a batcher generates a batch when flushed func TestBatch_Flush(t *testing.T) { batchSize := 2 - batcher := tsdb.NewPointBatcher(batchSize, time.Hour) + batcher := tsdb.NewPointBatcher(batchSize, 0, time.Hour) if batcher == nil { t.Fatal("failed to create batcher for flush test") } @@ -78,7 +101,7 @@ func TestBatch_Flush(t *testing.T) { // TestBatch_MultipleBatches ensures that a batcher correctly processes multiple batches. func TestBatch_MultipleBatches(t *testing.T) { batchSize := 2 - batcher := tsdb.NewPointBatcher(batchSize, 100*time.Millisecond) + batcher := tsdb.NewPointBatcher(batchSize, 0, 100*time.Millisecond) if batcher == nil { t.Fatal("failed to create batcher for size test") } From e4a332ea120170fc7c23011cf441430d935a6dfc Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 8 Sep 2015 15:22:03 -0700 Subject: [PATCH 2/4] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a560583b0cb..b8734e2c211 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ With this release InfluxDB is moving to Go 1.5. - [#3996](https://github.com/influxdb/influxdb/pull/3996): Add statistics to httpd package - [#4003](https://github.com/influxdb/influxdb/pull/4033): Add logrotate configuration. - [#4043](https://github.com/influxdb/influxdb/pull/4043): Add stats and batching to openTSDB input +- [#4042](https://github.com/influxdb/influxdb/pull/4042): Add pending batches control to batcher ### Bugfixes - [#3785](https://github.com/influxdb/influxdb/issues/3785): Invalid time stamp in graphite metric causes panic From 95530e162325216e6fb9f0b00795cff5696e7a8b Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 8 Sep 2015 15:25:42 -0700 Subject: [PATCH 3/4] Set UDP input defaults if not set --- CHANGELOG.md | 1 + services/udp/config.go | 33 ++++++++++++++++++++++++++++++++- services/udp/service.go | 5 +++-- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b8734e2c211..56fce89d91a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ With this release InfluxDB is moving to Go 1.5. - [#4042](https://github.com/influxdb/influxdb/pull/4042): Add pending batches control to batcher ### Bugfixes +- [#4042](https://github.com/influxdb/influxdb/pull/4042): Set UDP input batching defaults as needed. - [#3785](https://github.com/influxdb/influxdb/issues/3785): Invalid time stamp in graphite metric causes panic - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. - [#3823](https://github.com/influxdb/influxdb/pull/3823): Deterministic ordering for first() and last() diff --git a/services/udp/config.go b/services/udp/config.go index 2d8367d5127..2cb16ae6ca1 100644 --- a/services/udp/config.go +++ b/services/udp/config.go @@ -1,6 +1,21 @@ package udp -import "github.com/influxdb/influxdb/toml" +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // DefaultBatchSize is the default UDP batch size. + DefaultBatchSize = 1000 + + // DefaultBatchPending is the default number of pending UDP batches. + DefaultBatchPending = 5 + + // DefaultBatchTimeout is the default UDP batch timeout. + DefaultBatchTimeout = time.Second +) type Config struct { Enabled bool `toml:"enabled"` @@ -11,3 +26,19 @@ type Config struct { BatchPending int `toml:"batch-pending"` BatchTimeout toml.Duration `toml:"batch-timeout"` } + +// WithDefaults takes the given config and returns a new config with any required +// default values set. +func (c *Config) WithDefaults() *Config { + d := *c + if d.BatchSize == 0 { + d.BatchSize = DefaultBatchSize + } + if d.BatchPending == 0 { + d.BatchPending = DefaultBatchPending + } + if d.BatchTimeout == 0 { + d.BatchTimeout = toml.Duration(DefaultBatchTimeout) + } + return &d +} diff --git a/services/udp/service.go b/services/udp/service.go index 0d30c0f415e..cf0dfe94bf2 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -53,10 +53,11 @@ type Service struct { } func NewService(c Config) *Service { + d := *c.WithDefaults() return &Service{ - config: c, + config: d, done: make(chan struct{}), - batcher: tsdb.NewPointBatcher(c.BatchSize, c.BatchPending, time.Duration(c.BatchTimeout)), + batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)), Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), } } From 24aca5611aaa7dd0b5b951006c0502fd07fe2bc7 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 8 Sep 2015 19:35:19 -0700 Subject: [PATCH 4/4] Add batch-pending control to openTSDB input --- etc/config.sample.toml | 1 + services/opentsdb/config.go | 5 +++++ services/opentsdb/service.go | 4 +++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e37aae364b0..89020000353 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -208,6 +208,7 @@ reporting-disabled = false # metrics received over the telnet protocol undergo batching. # batch-size = 1000 # will flush if this many points get buffered + # batch-pending = 5 # number of batches that may be pending in memory # batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit ### diff --git a/services/opentsdb/config.go b/services/opentsdb/config.go index 95d33caee7d..5c17ba6758b 100644 --- a/services/opentsdb/config.go +++ b/services/opentsdb/config.go @@ -24,6 +24,9 @@ const ( // DefaultBatchTimeout is the default Graphite batch timeout. DefaultBatchTimeout = time.Second + + // DefaultBatchPending is the default number of batches that can be in the queue. + DefaultBatchPending = 5 ) type Config struct { @@ -35,6 +38,7 @@ type Config struct { TLSEnabled bool `toml:"tls-enabled"` Certificate string `toml:"certificate"` BatchSize int `toml:"batch-size"` + BatchPending int `toml:"batch-pending"` BatchTimeout toml.Duration `toml:"batch-timeout"` } @@ -47,6 +51,7 @@ func NewConfig() Config { TLSEnabled: false, Certificate: "/etc/ssl/influxdb.pem", BatchSize: DefaultBatchSize, + BatchPending: DefaultBatchPending, BatchTimeout: toml.Duration(DefaultBatchTimeout), } } diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 653d4003dab..7b7b952337e 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -64,6 +64,7 @@ type Service struct { // Points received over the telnet protocol are batched. batchSize int + batchPending int batchTimeout time.Duration batcher *tsdb.PointBatcher @@ -88,6 +89,7 @@ func NewService(c Config) (*Service, error) { RetentionPolicy: c.RetentionPolicy, ConsistencyLevel: consistencyLevel, batchSize: c.BatchSize, + batchPending: c.BatchPending, batchTimeout: time.Duration(c.BatchTimeout), Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags), } @@ -114,7 +116,7 @@ func (s *Service) Open() error { return err } - s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchTimeout) + s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout) s.batcher.Start() // Start processing batches.