diff --git a/.circleci/config.yml b/.circleci/config.yml index 1643aa0d..a49f00eb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -35,4 +35,4 @@ jobs: - run: go get -v -t -d ./... - run: go vet ./... - run: go get honnef.co/go/tools/cmd/staticcheck && staticcheck ./... - - run: go test -v -e2e ./... + - run: go test -v -race -e2e ./... diff --git a/go.mod b/go.mod index 216ee9ee..4c76c6c8 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,5 @@ require ( github.com/google/go-cmp v0.2.0 // test dependency github.com/influxdata/flux v0.0.0-20190620184636-886e3c28388d // test dependency github.com/influxdata/line-protocol v0.0.0-20190509173118-5712a8124a9a + github.com/stretchr/testify v1.3.0 ) diff --git a/go.sum b/go.sum index 92686289..838a38eb 100644 --- a/go.sum +++ b/go.sum @@ -111,10 +111,13 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/xanzy/ssh-agent v0.2.0/go.mod h1:0NyE30eGUDliuLEHJgYte/zncp2zdTStcOnWhgSqHD8= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= diff --git a/models.go b/models.go index 92536242..feb8f19a 100644 --- a/models.go +++ b/models.go @@ -7,6 +7,14 @@ import ( lp "github.com/influxdata/line-protocol" ) +// Organisation is the name of the organisation under which +// metrics will be published +type Organisation string + +// Bucket is the name of the bucket to which metrics will be +// published +type Bucket string + // Metric is just a github.com/influxdata/line-protocol.Metric. // We alias here to keep abstractions from leaking. type Metric = lp.Metric diff --git a/write.go b/write.go index 5c1839d7..fd241e8b 100644 --- a/write.go +++ b/write.go @@ -17,49 +17,41 @@ import ( // Write writes metrics to a bucket, and org. It retries intelligently. // If the write is too big, it retries again, after breaking the payloads into two requests. -func (c *Client) Write(ctx context.Context, bucket, org string, m ...Metric) (err error) { - tries := uint64(0) - return c.write(ctx, bucket, org, &tries, m...) -} +func (c *Client) Write(ctx context.Context, org Organisation, bucket Bucket, m ...Metric) (err error) { + var ( + buf = &bytes.Buffer{} + e = lp.NewEncoder(buf) + ) -func parseWriteError(r io.Reader) (*genericRespError, error) { - werr := &genericRespError{} - if err := json.NewDecoder(r).Decode(&werr); err != nil { - return nil, err - } - return werr, nil -} + e.FailOnFieldErr(c.errOnFieldErr) -func (c *Client) write(ctx context.Context, bucket, org string, triesPtr *uint64, m ...Metric) error { - buf := &bytes.Buffer{} - e := lp.NewEncoder(buf) - cleanup := func() {} - defer func() { cleanup() }() -doRequest: select { case <-ctx.Done(): return ctx.Err() default: } + for i := range m { if _, err := e.Encode(m[i]); err != nil { return err } } - req, err := c.makeWriteRequest(bucket, org, buf) + + req, err := c.makeWriteRequest(string(bucket), string(org), buf) if err != nil { return err } - resp, err := c.httpClient.Do(req) + + resp, err := c.httpClient.Do(req.WithContext(ctx)) if err != nil { return err } - cleanup = func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - // throw away the rest of the body so the connection can be reused even if there is still stuff on the wire. - ioutil.ReadAll(r) // we don't care about the error here, it is just to empty the tcp buffer + + defer func() { + // discard body so connection can be reused + _, _ = io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() - } + }() switch resp.StatusCode { case http.StatusOK, http.StatusNoContent: @@ -68,23 +60,11 @@ doRequest: Code: resp.Status, Message: "too many requests too fast", } - cleanup() - if err2 := c.backoff(triesPtr, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest case http.StatusServiceUnavailable: err = &genericRespError{ Code: resp.Status, Message: "service temporarily unavaliable", } - cleanup() - if err2 := c.backoff(triesPtr, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest default: gwerr, err := parseWriteError(resp.Body) if err != nil { @@ -93,16 +73,7 @@ doRequest: return gwerr } - // we don't defer and close till here, because of the retries. - defer func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - _, err := ioutil.ReadAll(r) // throw away the rest of the body so the connection gets reused. - err2 := resp.Body.Close() - if err == nil && err2 != nil { - err = err2 - } - }() - e.FailOnFieldErr(c.errOnFieldErr) + return err } @@ -128,3 +99,11 @@ func makeWriteURL(loc *url.URL, bucket, org string) (string, error) { u.RawQuery = params.Encode() return u.String(), nil } + +func parseWriteError(r io.Reader) (*genericRespError, error) { + werr := &genericRespError{} + if err := json.NewDecoder(r).Decode(&werr); err != nil { + return nil, err + } + return werr, nil +} diff --git a/writer.go b/writer.go deleted file mode 100644 index c59817b6..00000000 --- a/writer.go +++ /dev/null @@ -1,268 +0,0 @@ -package influxdb - -import ( - "bytes" - "context" - "io" - "io/ioutil" - "math/rand" - "net/http" - "strconv" - "sync" - "sync/atomic" - "time" - - lp "github.com/influxdata/line-protocol" -) - -const maxPooledBuffer = 4 << 20 //8 megs - -// LPWriter is a type for writing line protocol in a buffered way. -// It allows you to set a flush interval and flush regularly or to call the Flush method to flush its internal buffer. -type LPWriter struct { - stopTicker func() - flushChan <-chan time.Time - flushInterval time.Duration - flushSize int - c *Client - buf switchableBuffer - lock sync.Mutex - enc *lp.Encoder - bucket, org string - tries uint64 - maxRetries int - errOnFieldErr bool - stop chan struct{} - once sync.Once - wg sync.WaitGroup - onError func(error) -} - -type switchableBuffer struct { - *bytes.Buffer -} - -// WriteMetrics writes Metrics to the LPWriter. -func (w *LPWriter) WriteMetrics(m ...Metric) (int, error) { - select { - case <-w.stop: - return 0, nil - default: - } - w.lock.Lock() - for i := range m { - j, err := w.enc.Encode(m[i]) - if err != nil { - return j, err - } - } - w.asyncFlush() - w.lock.Unlock() - return 0, nil -} - -// NewBufferingWriter creates a new BufferingWriter. -func (c *Client) NewBufferingWriter(bucket string, org string, flushInterval time.Duration, flushSize int, onError func(error)) *LPWriter { - w := &LPWriter{c: c, buf: switchableBuffer{&bytes.Buffer{}}, flushSize: flushSize, flushInterval: flushInterval, stop: make(chan struct{})} - w.enc = lp.NewEncoder(&w.buf) - w.enc.FailOnFieldErr(w.errOnFieldErr) - return w -} - -// Write writes name, time stamp, tag keys, tag values, field keys, and field values to an LPWriter. -func (w *LPWriter) Write(name []byte, ts time.Time, tagKeys, tagVals, fieldKeys [][]byte, fieldVals []interface{}) (int, error) { - select { - case <-w.stop: - return 0, nil - default: - } - w.lock.Lock() - i, err := w.enc.Write(name, ts, tagKeys, tagVals, fieldKeys, fieldVals) - // asyncronously flush if the size of the buffer is too big. - if err != nil { - return i, err - } - w.asyncFlush() - w.lock.Unlock() - return i, err -} -func (w *LPWriter) asyncFlush() { - if w.flushSize > 0 && w.buf.Len() > w.flushSize { - w.wg.Add(1) - buf := w.buf.Buffer - w.buf.Buffer = bufferPool.Get().(*bytes.Buffer) - go func() { - w.flush(context.TODO(), buf) - if buf.Len() <= maxPooledBuffer { - buf.Reset() - bufferPool.Put(buf) - } - w.wg.Done() - }() - } -} - -// Start starts an LPWriter, so that the writer can flush it out to influxdb. -func (w *LPWriter) Start() { - w.lock.Lock() - w.once = sync.Once{} - if w.flushInterval != 0 { - t := time.NewTicker(w.flushInterval) - w.stopTicker = t.Stop - w.flushChan = t.C - w.wg.Add(1) - go func() { - for { - select { - case <-w.flushChan: - err := w.Flush(context.Background()) - if err != nil { - w.onError(err) - } - case <-w.stop: - w.wg.Done() - return - } - } - }() - } else { - w.stopTicker = func() {} - } - w.lock.Unlock() -} - -var bufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} - -// Flush writes out the internal buffer to the database. -func (w *LPWriter) Flush(ctx context.Context) error { - w.wg.Add(1) - w.lock.Lock() - if w.buf.Len() == 0 { - w.lock.Unlock() - return nil - } - buf := w.buf.Buffer - w.buf.Buffer = bufferPool.Get().(*bytes.Buffer) - w.lock.Unlock() - err := w.flush(ctx, buf) - if err != nil { - return err - } - if buf.Len() <= maxPooledBuffer { - buf.Reset() - bufferPool.Put(buf) - } - w.wg.Done() - return err -} - -func (w *LPWriter) flush(ctx context.Context, buf *bytes.Buffer) error { - - cleanup := func() {} - defer func() { cleanup() }() - // early exit so we don't send empty buffers -doRequest: - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - req, err := w.c.makeWriteRequest(w.bucket, w.org, buf) - if err != nil { - return err - } - resp, err := w.c.httpClient.Do(req) - if err != nil { - return err - } - cleanup = func() { - r := io.LimitReader(resp.Body, 1<<24) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - // throw away the rest of the body so the connection can be reused even if there is still stuff on the wire. - _, _ = ioutil.ReadAll(r) // we don't care about the error here, it is just to empty the tcp buffer - resp.Body.Close() - } - - switch resp.StatusCode { - case http.StatusOK, http.StatusNoContent: - case http.StatusTooManyRequests: - err = &genericRespError{ - Code: resp.Status, - Message: "too many requests too fast", - } - cleanup() - if err2 := w.backoff(&w.tries, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest - case http.StatusServiceUnavailable: - err = &genericRespError{ - Code: resp.Status, - Message: "service temporarily unavaliable", - } - cleanup() - if err2 := w.backoff(&w.tries, resp, err); err2 != nil { - return err2 - } - cleanup = func() { - w.lock.Unlock() - } - goto doRequest - default: - gwerr, err := parseWriteError(resp.Body) - if err != nil { - return err - } - - return gwerr - } - // we don't defer and close till here, because of the retries. - defer func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - _, err := ioutil.ReadAll(r) // throw away the rest of the body so the connection gets reused. - err2 := resp.Body.Close() - if err == nil && err2 != nil { - err = err2 - } - }() - return err -} - -// backoff is a helper method for backoff, triesPtr must not be nil. -func (w *LPWriter) backoff(triesPtr *uint64, resp *http.Response, err error) error { - tries := atomic.LoadUint64(triesPtr) - if w.maxRetries >= 0 || int(tries) >= w.maxRetries { - return maxRetriesExceededError{ - err: err, - tries: w.maxRetries, - } - } - retry := 0 - if resp != nil { - retryAfter := resp.Header.Get("Retry-After") - retry, _ = strconv.Atoi(retryAfter) // we ignore the error here because an error already means retry is 0. - } - sleepFor := time.Duration(retry) * time.Second - if retry == 0 { // if we didn't get a Retry-After or it is zero, instead switch to exponential backoff - sleepFor = time.Duration(rand.Int63n(((1 << tries) - 1) * 10 * int64(time.Microsecond))) - } - if sleepFor > defaultMaxWait { - sleepFor = defaultMaxWait - } - time.Sleep(sleepFor) - atomic.AddUint64(triesPtr, 1) - return nil -} - -// Stop gracefully stops a started LPWriter. -func (w *LPWriter) Stop() { - w.lock.Lock() - w.once.Do(func() { - close(w.stop) - w.wg.Wait() - w.stopTicker() - w.stop = make(chan struct{}) - }) - w.lock.Unlock() - w.wg.Wait() -} diff --git a/writer/bucket.go b/writer/bucket.go new file mode 100644 index 00000000..41a42614 --- /dev/null +++ b/writer/bucket.go @@ -0,0 +1 @@ +package writer diff --git a/writer/buffered.go b/writer/buffered.go new file mode 100644 index 00000000..7a37ac3d --- /dev/null +++ b/writer/buffered.go @@ -0,0 +1,119 @@ +package writer + +import ( + "io" + + "github.com/influxdata/influxdb-client-go" +) + +const defaultBufferSize = 100 + +// MetricsWriter is a type which metrics can be written to +type MetricsWriter interface { + Write(...influxdb.Metric) (int, error) +} + +// BufferedWriter is a buffered implementation of the MetricsWriter interface +// It is unashamedly derived from the bufio pkg https://golang.org/pkg/bufio +// Metrics are buffered up until the buffer size is met and then flushed to +// an underlying MetricsWriter +// The writer can also be flushed manually by calling Flush +// BufferedWriter is not safe to be called concurrently and therefore concurrency +// should be managed by the caller +type BufferedWriter struct { + wr MetricsWriter + buf []influxdb.Metric + n int + err error +} + +// NewBufferedWriter returns a new *BufferedWriter with the default +// buffer size +func NewBufferedWriter(w MetricsWriter) *BufferedWriter { + return NewBufferedWriterSize(w, defaultBufferSize) +} + +// NewBufferedWriterSize returns a new *BufferedWriter with a buffer +// allocated with the provided size +func NewBufferedWriterSize(w MetricsWriter, size int) *BufferedWriter { + if size <= 0 { + size = defaultBufferSize + } + + return &BufferedWriter{ + wr: w, + buf: make([]influxdb.Metric, size), + } +} + +// Available returns how many bytes are unused in the buffer. +func (b *BufferedWriter) Available() int { return len(b.buf) - b.n } + +// BufferedWriter returns the number of bytes that have been written into the current buffer. +func (b *BufferedWriter) BufferedWriter() int { return b.n } + +// Write writes the provided metrics to the underlying buffer if there is available +// capacity. Otherwise it flushes the buffer and attempts to assign the remain metrics to +// the buffer. This process repeats until all the metrics are either flushed or in the buffer +func (b *BufferedWriter) Write(m ...influxdb.Metric) (nn int, err error) { + for len(m) > b.Available() && b.err == nil { + var n int + if b.BufferedWriter() == 0 { + // Large write, empty buffer. + // Write directly from m to avoid copy. + n, b.err = b.wr.Write(m...) + } else { + n = copy(b.buf[b.n:], m) + b.n += n + b.Flush() + } + + nn += n + m = m[n:] + } + + if b.err != nil { + return nn, b.err + } + + n := copy(b.buf[b.n:], m) + b.n += n + nn += n + return nn, nil +} + +// Reset replaces the underlying writer, clears the buffer and any error +func (b *BufferedWriter) Reset(wr MetricsWriter) { + b.wr = wr + b.n = 0 + b.err = nil +} + +// Flush writes any buffered data to the underlying MetricsWriter +func (b *BufferedWriter) Flush() error { + if b.err != nil { + return b.err + } + + if b.n == 0 { + return nil + } + + n, err := b.wr.Write(b.buf[0:b.n]...) + if n < b.n && err == nil { + err = io.ErrShortWrite + } + + if err != nil { + if n > 0 && n < b.n { + copy(b.buf[0:b.n-n], b.buf[n:b.n]) + } + b.n -= n + b.err = err + return err + } + + b.n = 0 + + return nil +} diff --git a/writer/buffered_test.go b/writer/buffered_test.go new file mode 100644 index 00000000..ff897511 --- /dev/null +++ b/writer/buffered_test.go @@ -0,0 +1,34 @@ +package writer + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_BufferedWriter(t *testing.T) { + var ( + // writer which asserts calls are made for org and bucket + underlyingWriter = newTestWriter() + writer = NewBufferedWriter(underlyingWriter) + // 100 rows of batch size 100 expected + expected = createNTestRowMetrics(t, 100, 100) + ) + + // write 10000 metrics in various batch sizes + for _, batchSize := range permuteCounts(t, 10000) { + n, err := writer.Write(createTestRowMetrics(t, batchSize)...) + require.NoError(t, err) + require.Equal(t, batchSize, n) + } + + // flush any remaining buffer to underlying writer + require.Zero(t, writer.Available()) + require.Equal(t, 100, writer.BufferedWriter()) + require.NoError(t, writer.Flush()) + + // check batches written to underlying writer are 100 batches of 100 metrics + require.Equal(t, expected, underlyingWriter.writes) + require.Zero(t, writer.BufferedWriter()) + require.Equal(t, 100, writer.Available()) +} diff --git a/writer/doc.go b/writer/doc.go new file mode 100644 index 00000000..b7e224f5 --- /dev/null +++ b/writer/doc.go @@ -0,0 +1,60 @@ +// Package writer contains useful types for buffering, batching and periodically syncing +// writes onto a provided metric writing client. +// +// The following example demonstrate the usage of a *writer.PointWriter. This is designed to +// buffer calls to Write metrics and flush them in configurable batch sizes (see WithBufferSize). +// It is also designed to periodically flush the buffer if a configurable duration ellapses between +// calls to Write. This is useful to ensure metrics are flushed to the client during a pause in their +// production. +// +// Example Usage +// +// import ( +// "github.com/influxdata/influxdb-client-go" +// "github.com/influxdata/influxdb-client-go/writer" +// ) +// +// func main() { +// cli, _ := influxdb.New("http://localhost:9999", "some-token") +// +// wr := writer.New(cli, influxdb.Organisation("influx"), influxdb.Bucket("default"), writer.WithBufferSize(10)) +// +// wr.Write(influxdb.NewRowMetric( +// map[string]interface{}{ +// "value": 16, +// }, +// "temp_celsius", +// map[string]string{ +// "room": "living_room", +// }, +// time.Now(), +// ), +// influxdb.NewRowMetric( +// map[string]interface{}{ +// "value": 17, +// }, +// "temp_celsius", +// map[string]string{ +// "room": "living_room", +// }, +// time.Now(), +// )) +// +// wr.Close() +// } +// +// writer.New(...) return a PointerWriter which is composed of multiple other types available in this +// package. +// +// It first wraps the provided client in a *BucketWriter which takes care of ensuring all written metrics +// are called on the underyling client with a specific organisation and bucket. This is not safe for +// concurrent use. +// +// It then wraps this writer in a *BufferedWriter and configures its buffer size accordingly. This type +// implements the buffering of metrics and exposes a flush method. Once the buffer size is exceed flush +// is called automatically. However, Flush() can be called manually on this type. This is also not safe +// for concurrent use. +// +// Finally, it wraps the buffered writer in a *PointsWriter which takes care of ensuring Flush is called +// automatically when it hasn't been called for a configured duration. This final type is safe for concurrent use. +package writer diff --git a/writer/options.go b/writer/options.go new file mode 100644 index 00000000..f9b317ba --- /dev/null +++ b/writer/options.go @@ -0,0 +1,51 @@ +package writer + +import "time" + +// Config is a structure used to configure a point writer +type Config struct { + size int + flushInterval time.Duration +} + +// Option is a functional option for Configuring point writers +type Option func(*Config) + +// Options is a slice of Option +type Options []Option + +// Config constructs a default configuration and then +// applies the callee options and returns the config +func (o Options) Config() Config { + config := Config{ + size: defaultBufferSize, + flushInterval: defaultFlushInterval, + } + + o.Apply(&config) + + return config +} + +// Apply calls each option in the slice on options on the provided Config +func (o Options) Apply(c *Config) { + for _, opt := range o { + opt(c) + } +} + +// WithBufferSize sets the size of the underlying buffer on the point writer +func WithBufferSize(size int) Option { + return func(c *Config) { + c.size = size + } +} + +// WithFlushInterval sets the flush interval on the writer +// The point writer will wait at least this long between flushes +// of the undeyling buffered writer +func WithFlushInterval(interval time.Duration) Option { + return func(c *Config) { + c.flushInterval = interval + } +} diff --git a/writer/point.go b/writer/point.go new file mode 100644 index 00000000..40682e13 --- /dev/null +++ b/writer/point.go @@ -0,0 +1,140 @@ +package writer + +import ( + "io" + "sync" + "time" + + "github.com/influxdata/influxdb-client-go" +) + +const defaultFlushInterval = 1 * time.Second + +// MetricsWriteFlush is a type of metrics writer which is +// buffered and metrics can be flushed to +type MetricsWriteFlusher interface { + Write(m ...influxdb.Metric) (int, error) + Available() int + Flush() error +} + +// PointWriter delegates calls to Write to an underlying flushing writer +// implementation. It also periodically calls flush on the underlying writer and is safe +// to be called concurrently. As the flushing writer can also flush on calls to Write +// when the number of metrics being written exceeds the buffer capacity, it also ensures +// to reset its timer in this scenario as to avoid calling flush multiple times +type PointWriter struct { + w MetricsWriteFlusher + flushInterval time.Duration + resetTick chan struct{} + stopped chan struct{} + err error + mu sync.Mutex +} + +// NewPointWriter configures and returns a *PointWriter writer type +// The new writer will automatically begin scheduling periodic flushes based on the +// provided duration +func NewPointWriter(w MetricsWriteFlusher, flushInterval time.Duration) *PointWriter { + writer := &PointWriter{ + w: w, + flushInterval: flushInterval, + // buffer of one in order to not block writes + resetTick: make(chan struct{}, 1), + // stopped is closed once schedule has exited + stopped: make(chan struct{}), + } + + go writer.schedule() + + return writer +} + +func (p *PointWriter) schedule() { + defer close(p.stopped) + + ticker := time.NewTicker(p.flushInterval) + + for { + select { + case <-ticker.C: + if err := func() error { + p.mu.Lock() + defer p.mu.Unlock() + + // return if error is now not nil + if p.err != nil { + return p.err + } + + // between the recv on the ticker and the lock obtain + // the reset tick could've been triggered so we check + // and skip the flush if it did + select { + case <-p.resetTick: + return nil + default: + } + + p.err = p.w.Flush() + + return p.err + }(); err != nil { + return + } + case _, ok := <-p.resetTick: + if !ok { + return + } + + ticker.Stop() + ticker = time.NewTicker(p.flushInterval) + } + } +} + +// Write delegates to an underlying metrics writer +// If the delegating call is going to cause a flush, it signals +// to the schduled periodic flush to reset its timer +func (p *PointWriter) Write(m ...influxdb.Metric) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.err != nil { + return 0, p.err + } + + // check if the underlying flush will flush + if len(m) > p.w.Available() { + // tell the ticker to reset flush interval + select { + case p.resetTick <- struct{}{}: + default: + } + } + + var n int + n, p.err = p.w.Write(m...) + return n, p.err +} + +// Close signals to stop flushing metrics and causes subsequent +// calls to Write to return a closed pipe error +// Close returns once scheduledge flushing has stopped +func (p *PointWriter) Close() error { + p.mu.Lock() + + // signal close + close(p.resetTick) + + // return err io closed pipe for subsequent writes + p.err = io.ErrClosedPipe + + // release lock so scheduled may acknowledge and exit + p.mu.Unlock() + + // wait until schedule exits + <-p.stopped + + return nil +} diff --git a/writer/point_test.go b/writer/point_test.go new file mode 100644 index 00000000..a1c2bd2d --- /dev/null +++ b/writer/point_test.go @@ -0,0 +1,76 @@ +package writer + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var deltaMsgFmt = "delta between flushes exceeds 105ms: %q" + +func Test_PointWriter_Write_Batches(t *testing.T) { + var ( + underlyingWriter = newTestWriter() + writer = NewPointWriter(NewBufferedWriter(underlyingWriter), 50*time.Millisecond) + // 100 rows of batch size 100 expected + expected = createNTestRowMetrics(t, 100, 100) + ) + + // write 10000 metrics in various batch sizes + for _, batchSize := range permuteCounts(t, 10000) { + n, err := writer.Write(createTestRowMetrics(t, batchSize)...) + require.NoError(t, err) + require.Equal(t, batchSize, n) + + time.Sleep(10 * time.Millisecond) + } + + time.Sleep(100 * time.Millisecond) + + // close writer ensuring no more flushes occur + require.NoError(t, writer.Close()) + + // check batches written to underlying writer are 100 batches of 100 metrics + require.Equal(t, expected, underlyingWriter.writes) +} + +func Test_PointWriter_Write(t *testing.T) { + var ( + underlyingWriter = newTestWriter() + buffered = NewBufferedWriterSize(underlyingWriter, 10) + writer = NewPointWriter(buffered, 100*time.Millisecond) + ) + + // write between 1 and 4 metrics every 30ms ideally meaning + // some writes will flush for because of buffer size + // and some for periodic flush + for i := 0; i < 10; i++ { + var ( + count = rand.Intn(4) + 1 + n, err = writer.Write(createTestRowMetrics(t, count)...) + ) + + require.Nil(t, err) + require.Equal(t, count, n) + + time.Sleep(30 * time.Millisecond) + } + + // close writer to ensure scheduling has stopped + writer.Close() + + // ensure time between each write does not exceed roughly 100ms + // as per the flush interval of the point writer + for i := 1; i < len(underlyingWriter.when); i++ { + var ( + first, next = underlyingWriter.when[i-1], underlyingWriter.when[i] + delta = next.Sub(first) + ) + + // ensure writes are roughly 100 milliseconds apart + assert.Truef(t, delta <= 105*time.Millisecond, deltaMsgFmt, delta) + } +} diff --git a/writer/support_test.go b/writer/support_test.go new file mode 100644 index 00000000..d713b9ad --- /dev/null +++ b/writer/support_test.go @@ -0,0 +1,107 @@ +package writer + +import ( + "context" + "math" + "math/rand" + "testing" + "time" + + "github.com/influxdata/influxdb-client-go" +) + +type bucketWriter struct { + calls []bucketWriteCall +} + +type bucketWriteCall struct { + org influxdb.Organisation + bkt influxdb.Bucket + data []influxdb.Metric +} + +func (b *bucketWriter) Write(_ context.Context, org influxdb.Organisation, bkt influxdb.Bucket, m ...influxdb.Metric) (int, error) { + b.calls = append(b.calls, bucketWriteCall{org, bkt, m}) + return len(m), nil +} + +type metricsWriter struct { + // metrics written + when []time.Time + writes [][]influxdb.Metric +} + +func newTestWriter() *metricsWriter { + return &metricsWriter{} +} + +func (w *metricsWriter) Write(m ...influxdb.Metric) (int, error) { + w.when = append(w.when, time.Now()) + w.writes = append(w.writes, m) + return len(m), nil +} + +// permuteCounts returns a set of pseudo-random batch size counts +// which sum to the provided total +// E.g. for a sum total of 100 (permuteCounts(t, 100)) +// this function may produce the following [5 12 8 10 14 11 9 9 8 14] +// The sum of these values is == 100 and there are √100 buckets +func permuteCounts(t *testing.T, total int) (buckets []int) { + t.Helper() + + var accum int + + buckets = make([]int, int(math.Sqrt(float64(total)))) + for i := 0; i < len(buckets); i++ { + size := total / len(buckets) + if accum+size > total { + size = total - accum + } + + buckets[i] = size + + accum += size + + // shuffle some counts from previous bucket forward to current bucket + if i > 0 { + var ( + min = math.Min(float64(buckets[i]), float64(buckets[i-1])) + delta = rand.Intn(int(min / 2)) + ) + + buckets[i-1], buckets[i] = buckets[i-1]-delta, buckets[i]+delta + } + } + + return +} + +func createNTestRowMetrics(t *testing.T, rows, count int) (metrics [][]influxdb.Metric) { + metrics = make([][]influxdb.Metric, 0, rows) + + for i := 0; i < rows; i++ { + metrics = append(metrics, createTestRowMetrics(t, count)) + } + + return +} + +func createTestRowMetrics(t *testing.T, count int) (metrics []influxdb.Metric) { + t.Helper() + + metrics = make([]influxdb.Metric, 0, count) + for i := 0; i < count; i++ { + metrics = append(metrics, influxdb.NewRowMetric( + map[string]interface{}{ + "some_field": "some_value", + }, + "some_measurement", + map[string]string{ + "some_tag": "some_value", + }, + time.Date(2019, time.January, 1, 0, 0, 0, 0, time.UTC), + )) + } + + return +} diff --git a/writer/writer.go b/writer/writer.go new file mode 100644 index 00000000..4b6e5f9f --- /dev/null +++ b/writer/writer.go @@ -0,0 +1,49 @@ +package writer + +import ( + "context" + + "github.com/influxdata/influxdb-client-go" +) + +// BucketMetricWriter is a type which Metrics can be written to a particular bucket +// in a particular organisation +type BucketMetricWriter interface { + Write(context.Context, influxdb.Organisation, influxdb.Bucket, ...influxdb.Metric) (int, error) +} + +// New constructs a point writer with an underlying buffer from the provided BucketMetricWriter +// The writer will flushed metrics to the underlying BucketMetricWriter when the buffer is full +// or the configured flush interval ellapses without a flush occuring +func New(writer BucketMetricWriter, org influxdb.Organisation, bkt influxdb.Bucket, opts ...Option) *PointWriter { + var ( + config = Options(opts).Config() + bucket = NewBucketWriter(writer, org, bkt) + buffered = NewBufferedWriterSize(bucket, config.size) + ) + + return NewPointWriter(buffered, config.flushInterval) +} + +// BucketWriter writes metrics to a particular bucket +// within a particular organisation +type BucketWriter struct { + w BucketMetricWriter + + ctxt context.Context + + org influxdb.Organisation + bucket influxdb.Bucket +} + +// NewBucketWriter allocates, configures and returned a new BucketWriter for writing +// metrics to a specific organisations bucket +func NewBucketWriter(w BucketMetricWriter, org influxdb.Organisation, bkt influxdb.Bucket) *BucketWriter { + return &BucketWriter{w, context.Background(), org, bkt} +} + +// Write writes the provided metrics to the underlying metrics writer +// using the org and bucket configured on the bucket writer +func (b *BucketWriter) Write(m ...influxdb.Metric) (int, error) { + return b.w.Write(b.ctxt, b.org, b.bucket, m...) +} diff --git a/writer_test.go b/writer_test.go deleted file mode 100644 index c839bb21..00000000 --- a/writer_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package influxdb_test - -import ( - "context" - "net/http" - "net/http/httptest" - "runtime" - "sync" - "sync/atomic" - "testing" - "time" - - influxdb "github.com/influxdata/influxdb-client-go" -) - -func TestWriterStartupAndShutdown(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) - cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client())) - if err != nil { - server.Close() - t.Fatal(err) - } - w := cl.NewBufferingWriter("my-bucket", "my-org", 10*time.Second, 1024*100, func(err error) { - t.Error(err) - }) - wg := sync.WaitGroup{} - w.Start() - for i := 0; i < 20; i++ { - wg.Add(1) - go func() { - runtime.Gosched() - w.Start() - wg.Done() - }() - } - for i := 0; i < 20; i++ { - wg.Add(1) - go func() { - runtime.Gosched() - w.Stop() - wg.Done() - }() - } - wg.Wait() -} - -func TestAutoFlush(t *testing.T) { - q := uint64(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - res := atomic.AddUint64(&q, 1) - if res > 3 { - t.Errorf("size based flush happened too often, expected 3 but got %d", res) - } - })) - cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client())) - if err != nil { - t.Error(e2e) - } - w := cl.NewBufferingWriter("my-bucket", "my-org", 0, 100*1024, func(err error) { - t.Error(err) - }) - w.Start() - ts := time.Time{} - for i := 0; i < 3000; i++ { - ts = ts.Add(1) - _, err = w.Write([]byte("TestWriterE2E"), - ts, - [][]byte{[]byte("test1"), []byte("test2")}, - [][]byte{[]byte("here"), []byte("alsohere")}, - [][]byte{[]byte("val1"), []byte("val2")}, - []interface{}{1, 99}) - if err != nil { - t.Error(err) - } - } - w.Flush(context.Background()) - tries := atomic.LoadUint64(&q) - w.Stop() - if tries < 3 { - t.Errorf("size based flush happened too infrequently expected 3 got %d", tries) - } -}