Skip to content

Commit

Permalink
Writer (#21)
Browse files Browse the repository at this point in the history
A batching line-protocol writer.  
You just write to it and it flushes for you (or you can flush manually).
  • Loading branch information
docmerlin committed Jul 10, 2019
1 parent 14e633c commit de2584b
Show file tree
Hide file tree
Showing 2 changed files with 350 additions and 0 deletions.
268 changes: 268 additions & 0 deletions writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package influxdb

import (
"bytes"
"context"
"io"
"io/ioutil"
"math/rand"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

lp "github.com/influxdata/line-protocol"
)

const maxPooledBuffer = 4 << 20 //8 megs

// LPWriter is a type for writing line protocol in a buffered way.
// It allows you to set a flush interval and flush regularly or to call the Flush method to flush its internal buffer.
type LPWriter struct {
stopTicker func()
flushChan <-chan time.Time
flushInterval time.Duration
flushSize int
c *Client
buf switchableBuffer
lock sync.Mutex
enc *lp.Encoder
bucket, org string
tries uint64
maxRetries int
errOnFieldErr bool
stop chan struct{}
once sync.Once
wg sync.WaitGroup
onError func(error)
}

type switchableBuffer struct {
*bytes.Buffer
}

// WriteMetrics writes Metrics to the LPWriter.
func (w *LPWriter) WriteMetrics(m ...Metric) (int, error) {
select {
case <-w.stop:
return 0, nil
default:
}
w.lock.Lock()
for i := range m {
j, err := w.enc.Encode(m[i])
if err != nil {
return j, err
}
}
w.asyncFlush()
w.lock.Unlock()
return 0, nil
}

// NewBufferingWriter creates a new BufferingWriter.
func (c *Client) NewBufferingWriter(bucket string, org string, flushInterval time.Duration, flushSize int, onError func(error)) *LPWriter {
w := &LPWriter{c: c, buf: switchableBuffer{&bytes.Buffer{}}, flushSize: flushSize, flushInterval: flushInterval, stop: make(chan struct{})}
w.enc = lp.NewEncoder(&w.buf)
w.enc.FailOnFieldErr(w.errOnFieldErr)
return w
}

// Write writes name, time stamp, tag keys, tag values, field keys, and field values to an LPWriter.
func (w *LPWriter) Write(name []byte, ts time.Time, tagKeys, tagVals, fieldKeys [][]byte, fieldVals []interface{}) (int, error) {
select {
case <-w.stop:
return 0, nil
default:
}
w.lock.Lock()
i, err := w.enc.Write(name, ts, tagKeys, tagVals, fieldKeys, fieldVals)
// asyncronously flush if the size of the buffer is too big.
if err != nil {
return i, err
}
w.asyncFlush()
w.lock.Unlock()
return i, err
}
func (w *LPWriter) asyncFlush() {
if w.flushSize > 0 && w.buf.Len() > w.flushSize {
w.wg.Add(1)
buf := w.buf.Buffer
w.buf.Buffer = bufferPool.Get().(*bytes.Buffer)
go func() {
w.flush(context.TODO(), buf)
if buf.Len() <= maxPooledBuffer {
buf.Reset()
bufferPool.Put(buf)
}
w.wg.Done()
}()
}
}

// Start starts an LPWriter, so that the writer can flush it out to influxdb.
func (w *LPWriter) Start() {
w.lock.Lock()
w.once = sync.Once{}
if w.flushInterval != 0 {
t := time.NewTicker(w.flushInterval)
w.stopTicker = t.Stop
w.flushChan = t.C
w.wg.Add(1)
go func() {
for {
select {
case <-w.flushChan:
err := w.Flush(context.Background())
if err != nil {
w.onError(err)
}
case <-w.stop:
w.wg.Done()
return
}
}
}()
} else {
w.stopTicker = func() {}
}
w.lock.Unlock()
}

var bufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}

// Flush writes out the internal buffer to the database.
func (w *LPWriter) Flush(ctx context.Context) error {
w.wg.Add(1)
w.lock.Lock()
if w.buf.Len() == 0 {
w.lock.Unlock()
return nil
}
buf := w.buf.Buffer
w.buf.Buffer = bufferPool.Get().(*bytes.Buffer)
w.lock.Unlock()
err := w.flush(ctx, buf)
if err != nil {
return err
}
if buf.Len() <= maxPooledBuffer {
buf.Reset()
bufferPool.Put(buf)
}
w.wg.Done()
return err
}

func (w *LPWriter) flush(ctx context.Context, buf *bytes.Buffer) error {

cleanup := func() {}
defer func() { cleanup() }()
// early exit so we don't send empty buffers
doRequest:
select {
case <-ctx.Done():
return ctx.Err()
default:
}
req, err := w.c.makeWriteRequest(w.bucket, w.org, buf)
if err != nil {
return err
}
resp, err := w.c.httpClient.Do(req)
if err != nil {
return err
}
cleanup = func() {
r := io.LimitReader(resp.Body, 1<<24) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it.
// throw away the rest of the body so the connection can be reused even if there is still stuff on the wire.
_, _ = ioutil.ReadAll(r) // we don't care about the error here, it is just to empty the tcp buffer
resp.Body.Close()
}

switch resp.StatusCode {
case http.StatusOK, http.StatusNoContent:
case http.StatusTooManyRequests:
err = &genericRespError{
Code: resp.Status,
Message: "too many requests too fast",
}
cleanup()
if err2 := w.backoff(&w.tries, resp, err); err2 != nil {
return err2
}
cleanup = func() {}
goto doRequest
case http.StatusServiceUnavailable:
err = &genericRespError{
Code: resp.Status,
Message: "service temporarily unavaliable",
}
cleanup()
if err2 := w.backoff(&w.tries, resp, err); err2 != nil {
return err2
}
cleanup = func() {
w.lock.Unlock()
}
goto doRequest
default:
gwerr, err := parseWriteError(resp.Body)
if err != nil {
return err
}

return gwerr
}
// we don't defer and close till here, because of the retries.
defer func() {
r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it.
_, err := ioutil.ReadAll(r) // throw away the rest of the body so the connection gets reused.
err2 := resp.Body.Close()
if err == nil && err2 != nil {
err = err2
}
}()
return err
}

// backoff is a helper method for backoff, triesPtr must not be nil.
func (w *LPWriter) backoff(triesPtr *uint64, resp *http.Response, err error) error {
tries := atomic.LoadUint64(triesPtr)
if w.maxRetries >= 0 || int(tries) >= w.maxRetries {
return maxRetriesExceededError{
err: err,
tries: w.maxRetries,
}
}
retry := 0
if resp != nil {
retryAfter := resp.Header.Get("Retry-After")
retry, _ = strconv.Atoi(retryAfter) // we ignore the error here because an error already means retry is 0.
}
sleepFor := time.Duration(retry) * time.Second
if retry == 0 { // if we didn't get a Retry-After or it is zero, instead switch to exponential backoff
sleepFor = time.Duration(rand.Int63n(((1 << tries) - 1) * 10 * int64(time.Microsecond)))
}
if sleepFor > defaultMaxWait {
sleepFor = defaultMaxWait
}
time.Sleep(sleepFor)
atomic.AddUint64(triesPtr, 1)
return nil
}

// Stop gracefully stops a started LPWriter.
func (w *LPWriter) Stop() {
w.lock.Lock()
w.once.Do(func() {
close(w.stop)
w.wg.Wait()
w.stopTicker()
w.stop = make(chan struct{})
})
w.lock.Unlock()
w.wg.Wait()
}
82 changes: 82 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package influxdb_test

import (
"context"
"net/http"
"net/http/httptest"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

influxdb "github.com/influxdata/influxdb-client-go"
)

func TestWriterStartupAndShutdown(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client()))
if err != nil {
server.Close()
t.Fatal(err)
}
w := cl.NewBufferingWriter("my-bucket", "my-org", 10*time.Second, 1024*100, func(err error) {
t.Error(err)
})
wg := sync.WaitGroup{}
w.Start()
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
runtime.Gosched()
w.Start()
wg.Done()
}()
}
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
runtime.Gosched()
w.Stop()
wg.Done()
}()
}
wg.Wait()
}

func TestAutoFlush(t *testing.T) {
q := uint64(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
res := atomic.AddUint64(&q, 1)
if res > 3 {
t.Errorf("size based flush happened too often, expected 3 but got %d", res)
}
}))
cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client()))
if err != nil {
t.Error(e2e)
}
w := cl.NewBufferingWriter("my-bucket", "my-org", 0, 100*1024, func(err error) {
t.Error(err)
})
w.Start()
ts := time.Time{}
for i := 0; i < 3000; i++ {
ts = ts.Add(1)
_, err = w.Write([]byte("TestWriterE2E"),
ts,
[][]byte{[]byte("test1"), []byte("test2")},
[][]byte{[]byte("here"), []byte("alsohere")},
[][]byte{[]byte("val1"), []byte("val2")},
[]interface{}{1, 99})
if err != nil {
t.Error(err)
}
}
w.Flush(context.Background())
tries := atomic.LoadUint64(&q)
w.Stop()
if tries < 3 {
t.Errorf("size based flush happened too infrequently expected 3 got %d", tries)
}
}

0 comments on commit de2584b

Please sign in to comment.