Skip to content

Commit

Permalink
Merge pull request #4042 from influxdb/buffered_batcher
Browse files Browse the repository at this point in the history
Add pending control to batcher
  • Loading branch information
otoolep committed Sep 9, 2015
2 parents 78f4a69 + 24aca56 commit 02a54d0
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ With this release InfluxDB is moving to Go 1.5.
- [#3996](https://github.com/influxdb/influxdb/pull/3996): Add statistics to httpd package
- [#4003](https://github.com/influxdb/influxdb/pull/4033): Add logrotate configuration.
- [#4043](https://github.com/influxdb/influxdb/pull/4043): Add stats and batching to openTSDB input
- [#4042](https://github.com/influxdb/influxdb/pull/4042): Add pending batches control to batcher

### Bugfixes
- [#4042](https://github.com/influxdb/influxdb/pull/4042): Set UDP input batching defaults as needed.
- [#3785](https://github.com/influxdb/influxdb/issues/3785): Invalid time stamp in graphite metric causes panic
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.
- [#3823](https://github.com/influxdb/influxdb/pull/3823): Deterministic ordering for first() and last()
Expand Down
4 changes: 4 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ reporting-disabled = false
# will buffer points in memory if you have many coming in.

# batch-size = 1000 # will flush if this many points get buffered
# batch-pending = 5 # number of batches that may be pending in memory
# batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit

## "name-schema" configures tag names for parsing the metric name from graphite protocol;
Expand Down Expand Up @@ -187,6 +188,7 @@ reporting-disabled = false
# will buffer points in memory if you have many coming in.

# batch-size = 1000 # will flush if this many points get buffered
# batch-pending = 5 # number of batches that may be pending in memory
# batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit

###
Expand All @@ -206,6 +208,7 @@ reporting-disabled = false
# metrics received over the telnet protocol undergo batching.

# batch-size = 1000 # will flush if this many points get buffered
# batch-pending = 5 # number of batches that may be pending in memory
# batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit

###
Expand All @@ -224,6 +227,7 @@ reporting-disabled = false
# will buffer points in memory if you have many coming in.

# batch-size = 1000 # will flush if this many points get buffered
# batch-pending = 5 # number of batches that may be pending in memory
# batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit

###
Expand Down
6 changes: 5 additions & 1 deletion services/collectd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ const (

DefaultRetentionPolicy = ""

DefaultBatchSize = 5000
DefaultBatchSize = 1000

DefaultBatchPending = 5

DefaultBatchDuration = toml.Duration(10 * time.Second)

Expand All @@ -27,6 +29,7 @@ type Config struct {
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`
BatchDuration toml.Duration `toml:"batch-timeout"`
TypesDB string `toml:"typesdb"`
}
Expand All @@ -38,6 +41,7 @@ func NewConfig() Config {
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
BatchSize: DefaultBatchSize,
BatchPending: DefaultBatchPending,
BatchDuration: DefaultBatchDuration,
TypesDB: DefaultTypesDB,
}
Expand Down
2 changes: 1 addition & 1 deletion services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *Service) Open() error {
s.Logger.Println("Listening on UDP: ", ln.LocalAddr().String())

// Start the points batcher.
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, time.Duration(s.Config.BatchDuration))
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
s.batcher.Start()

// Create channel and wait group for signalling goroutines to stop.
Expand Down
7 changes: 7 additions & 0 deletions services/graphite/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
// DefaultBatchSize is the default Graphite batch size.
DefaultBatchSize = 1000

// DefaultBatchPending is the default number of pending Graphite batches.
DefaultBatchPending = 5

// DefaultBatchTimeout is the default Graphite batch timeout.
DefaultBatchTimeout = time.Second
)
Expand All @@ -40,6 +43,7 @@ type Config struct {
Enabled bool `toml:"enabled"`
Protocol string `toml:"protocol"`
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
ConsistencyLevel string `toml:"consistency-level"`
Templates []string `toml:"templates"`
Expand All @@ -63,6 +67,9 @@ func (c *Config) WithDefaults() *Config {
if d.BatchSize == 0 {
d.BatchSize = DefaultBatchSize
}
if d.BatchPending == 0 {
d.BatchPending = DefaultBatchPending
}
if d.BatchTimeout == 0 {
d.BatchTimeout = toml.Duration(DefaultBatchTimeout)
}
Expand Down
3 changes: 3 additions & 0 deletions services/graphite/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ database = "mydb"
enabled = true
protocol = "tcp"
batch-size=100
batch-pending=77
batch-timeout="1s"
consistency-level="one"
templates=["servers.* .host.measurement*"]
Expand All @@ -36,6 +37,8 @@ tags=["region=us-east"]
t.Fatalf("unexpected graphite protocol: %s", c.Protocol)
} else if c.BatchSize != 100 {
t.Fatalf("unexpected graphite batch size: %d", c.BatchSize)
} else if c.BatchPending != 77 {
t.Fatalf("unexpected graphite batch pending: %d", c.BatchPending)
} else if time.Duration(c.BatchTimeout) != time.Second {
t.Fatalf("unexpected graphite batch timeout: %v", c.BatchTimeout)
} else if c.ConsistencyLevel != "one" {
Expand Down
4 changes: 3 additions & 1 deletion services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Service struct {
database string
protocol string
batchSize int
batchPending int
batchTimeout time.Duration
consistencyLevel cluster.ConsistencyLevel

Expand Down Expand Up @@ -125,6 +126,7 @@ func NewService(c Config) (*Service, error) {
database: d.Database,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
done: make(chan struct{}),
Expand Down Expand Up @@ -178,7 +180,7 @@ func (s *Service) Open() error {
return err
}

s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchTimeout)
s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
s.batcher.Start()

// Start processing batches.
Expand Down
5 changes: 5 additions & 0 deletions services/opentsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (

// DefaultBatchTimeout is the default Graphite batch timeout.
DefaultBatchTimeout = time.Second

// DefaultBatchPending is the default number of batches that can be in the queue.
DefaultBatchPending = 5
)

type Config struct {
Expand All @@ -35,6 +38,7 @@ type Config struct {
TLSEnabled bool `toml:"tls-enabled"`
Certificate string `toml:"certificate"`
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
}

Expand All @@ -47,6 +51,7 @@ func NewConfig() Config {
TLSEnabled: false,
Certificate: "/etc/ssl/influxdb.pem",
BatchSize: DefaultBatchSize,
BatchPending: DefaultBatchPending,
BatchTimeout: toml.Duration(DefaultBatchTimeout),
}
}
4 changes: 3 additions & 1 deletion services/opentsdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Service struct {

// Points received over the telnet protocol are batched.
batchSize int
batchPending int
batchTimeout time.Duration
batcher *tsdb.PointBatcher

Expand All @@ -88,6 +89,7 @@ func NewService(c Config) (*Service, error) {
RetentionPolicy: c.RetentionPolicy,
ConsistencyLevel: consistencyLevel,
batchSize: c.BatchSize,
batchPending: c.BatchPending,
batchTimeout: time.Duration(c.BatchTimeout),
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
}
Expand All @@ -114,7 +116,7 @@ func (s *Service) Open() error {
return err
}

s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchTimeout)
s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
s.batcher.Start()

// Start processing batches.
Expand Down
34 changes: 33 additions & 1 deletion services/udp/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
package udp

import "github.com/influxdb/influxdb/toml"
import (
"time"

"github.com/influxdb/influxdb/toml"
)

const (
// DefaultBatchSize is the default UDP batch size.
DefaultBatchSize = 1000

// DefaultBatchPending is the default number of pending UDP batches.
DefaultBatchPending = 5

// DefaultBatchTimeout is the default UDP batch timeout.
DefaultBatchTimeout = time.Second
)

type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`

Database string `toml:"database"`
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
}

// WithDefaults takes the given config and returns a new config with any required
// default values set.
func (c *Config) WithDefaults() *Config {
d := *c
if d.BatchSize == 0 {
d.BatchSize = DefaultBatchSize
}
if d.BatchPending == 0 {
d.BatchPending = DefaultBatchPending
}
if d.BatchTimeout == 0 {
d.BatchTimeout = toml.Duration(DefaultBatchTimeout)
}
return &d
}
3 changes: 3 additions & 0 deletions services/udp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ enabled = true
bind-address = ":4444"
database = "awesomedb"
batch-size = 100
batch-pending = 9
batch-timeout = "10ms"
`, &c); err != nil {
t.Fatal(err)
Expand All @@ -30,6 +31,8 @@ batch-timeout = "10ms"
t.Fatalf("unexpected database: %s", c.Database)
} else if c.BatchSize != 100 {
t.Fatalf("unexpected batch size: %d", c.BatchSize)
} else if c.BatchPending != 9 {
t.Fatalf("unexpected batch pending: %d", c.BatchPending)
} else if time.Duration(c.BatchTimeout) != (10 * time.Millisecond) {
t.Fatalf("unexpected batch timeout: %v", c.BatchTimeout)
}
Expand Down
5 changes: 3 additions & 2 deletions services/udp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ type Service struct {
}

func NewService(c Config) *Service {
d := *c.WithDefaults()
return &Service{
config: c,
config: d,
done: make(chan struct{}),
batcher: tsdb.NewPointBatcher(c.BatchSize, time.Duration(c.BatchTimeout)),
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
}
}
Expand Down
9 changes: 6 additions & 3 deletions tsdb/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ type PointBatcher struct {
wg *sync.WaitGroup
}

// NewPointBatcher returns a new PointBatcher.
func NewPointBatcher(sz int, d time.Duration) *PointBatcher {
// NewPointBatcher returns a new PointBatcher. sz is the batching size,
// bp is the maximum number of batches that may be pending. d is the time
// after which a batch will be emitted after the first point is received
// for the batch, regardless of its size.
func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher {
return &PointBatcher{
size: sz,
duration: d,
stop: make(chan struct{}),
in: make(chan Point),
in: make(chan Point, bp*sz),
out: make(chan []Point),
flush: make(chan struct{}),
}
Expand Down
31 changes: 27 additions & 4 deletions tsdb/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,30 @@ import (
// TestBatch_Size ensures that a batcher generates a batch when the size threshold is reached.
func TestBatch_Size(t *testing.T) {
batchSize := 5
batcher := tsdb.NewPointBatcher(batchSize, time.Hour)
batcher := tsdb.NewPointBatcher(batchSize, 0, time.Hour)
if batcher == nil {
t.Fatal("failed to create batcher for size test")
}

batcher.Start()

var p tsdb.Point
go func() {
for i := 0; i < batchSize; i++ {
batcher.In() <- p
}
}()
batch := <-batcher.Out()
if len(batch) != batchSize {
t.Errorf("received batch has incorrect length exp %d, got %d", batchSize, len(batch))
}
checkPointBatcherStats(t, batcher, -1, batchSize, 1, 0)
}

// TestBatch_Size ensures that a buffered batcher generates a batch when the size threshold is reached.
func TestBatch_SizeBuffered(t *testing.T) {
batchSize := 5
batcher := tsdb.NewPointBatcher(batchSize, 5, time.Hour)
if batcher == nil {
t.Fatal("failed to create batcher for size test")
}
Expand All @@ -33,7 +56,7 @@ func TestBatch_Size(t *testing.T) {
// TestBatch_Size ensures that a batcher generates a batch when the timeout triggers.
func TestBatch_Timeout(t *testing.T) {
batchSize := 5
batcher := tsdb.NewPointBatcher(batchSize+1, 100*time.Millisecond)
batcher := tsdb.NewPointBatcher(batchSize+1, 0, 100*time.Millisecond)
if batcher == nil {
t.Fatal("failed to create batcher for timeout test")
}
Expand All @@ -56,7 +79,7 @@ func TestBatch_Timeout(t *testing.T) {
// TestBatch_Flush ensures that a batcher generates a batch when flushed
func TestBatch_Flush(t *testing.T) {
batchSize := 2
batcher := tsdb.NewPointBatcher(batchSize, time.Hour)
batcher := tsdb.NewPointBatcher(batchSize, 0, time.Hour)
if batcher == nil {
t.Fatal("failed to create batcher for flush test")
}
Expand All @@ -78,7 +101,7 @@ func TestBatch_Flush(t *testing.T) {
// TestBatch_MultipleBatches ensures that a batcher correctly processes multiple batches.
func TestBatch_MultipleBatches(t *testing.T) {
batchSize := 2
batcher := tsdb.NewPointBatcher(batchSize, 100*time.Millisecond)
batcher := tsdb.NewPointBatcher(batchSize, 0, 100*time.Millisecond)
if batcher == nil {
t.Fatal("failed to create batcher for size test")
}
Expand Down

0 comments on commit 02a54d0

Please sign in to comment.