diff --git a/.circleci/config.yml b/.circleci/config.yml index 0e9396a9..3932dc98 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,4 +42,4 @@ jobs: - run: go get -v -t -d ./... - run: go vet ./... - run: go get honnef.co/go/tools/cmd/staticcheck && staticcheck ./... - - run: go test -v -e2e ./... + - run: go test -v -race -e2e ./... diff --git a/client.go b/client.go index e2b63b25..33078b1a 100644 --- a/client.go +++ b/client.go @@ -6,13 +6,9 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "net/http" "net/url" - "strconv" "sync" - "sync/atomic" - "time" "github.com/influxdata/influxdb-client-go/internal/gzip" ) @@ -20,8 +16,6 @@ import ( // TODO(docmerlin): change the generator so we don't have to hand edit the generated code //go:generate go run scripts/buildclient.go -const defaultMaxWait = 10 * time.Second - // Client is a client for writing to influx. type Client struct { httpClient *http.Client @@ -31,7 +25,6 @@ type Client struct { password string username string l sync.Mutex - maxRetries int errOnFieldErr bool userAgent string authorization string // the Authorization header @@ -112,32 +105,6 @@ func (c *Client) Ping(ctx context.Context) error { return nil } -// backoff is a helper method for backoff, triesPtr must not be nil. -func (c *Client) backoff(triesPtr *uint64, resp *http.Response, err error) error { - tries := atomic.LoadUint64(triesPtr) - if c.maxRetries >= 0 || int(tries) >= c.maxRetries { - return maxRetriesExceededError{ - err: err, - tries: c.maxRetries, - } - } - retry := 0 - if resp != nil { - retryAfter := resp.Header.Get("Retry-After") - retry, _ = strconv.Atoi(retryAfter) // we ignore the error here because an error already means retry is 0. - } - sleepFor := time.Duration(retry) * time.Second - if retry == 0 { // if we didn't get a Retry-After or it is zero, instead switch to exponential backoff - sleepFor = time.Duration(rand.Int63n(((1 << tries) - 1) * 10 * int64(time.Microsecond))) - } - if sleepFor > defaultMaxWait { - sleepFor = defaultMaxWait - } - time.Sleep(sleepFor) - atomic.AddUint64(triesPtr, 1) - return nil -} - func (c *Client) makeWriteRequest(bucket, org string, body io.Reader) (*http.Request, error) { var err error if c.contentEncoding == "gzip" { diff --git a/e2e_test.go b/e2e_test.go index 4bb9cf90..c612e9f6 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -43,7 +43,7 @@ func TestE2E(t *testing.T) { t.Fatalf("expected user to be %s, but was %s", "e2e-test-user", sRes.User.Name) } now := time.Now() - err = influx.Write(context.Background(), "e2e-test-bucket", "e2e-test-org", + _, err = influx.Write(context.Background(), "e2e-test-bucket", "e2e-test-org", &influxdb.RowMetric{ NameStr: "test", Tags: []*influxdb.Tag{ @@ -123,6 +123,7 @@ func TestE2E(t *testing.T) { } time.Sleep(5 * time.Second) + r, err := influx.QueryCSV( context.Background(), `from(bucket:bucket)|>range(start:-1000h)|>group()`, @@ -133,9 +134,11 @@ func TestE2E(t *testing.T) { if err != nil { t.Fatal(err) } + b, err := ioutil.ReadAll(r) if err != nil { t.Fatal(err) } + fmt.Println(string(b)) } diff --git a/errors.go b/errors.go index 7b025fc4..96cf9bf1 100644 --- a/errors.go +++ b/errors.go @@ -8,19 +8,6 @@ import ( // ErrUnimplemented is an error for when pieces of the client's functionality is unimplemented. var ErrUnimplemented = errors.New("unimplemented") -type maxRetriesExceededError struct { - tries int - err error -} - -func (err maxRetriesExceededError) Error() string { - return fmt.Sprintf("max retries of %d reached, and we recieved an error of %v", err.tries, err.err) -} - -func (err maxRetriesExceededError) Unwrap() error { - return err.err -} - type genericRespError struct { Code string Message string diff --git a/examples_test.go b/examples_test.go index 32d1fc68..2b9d96df 100644 --- a/examples_test.go +++ b/examples_test.go @@ -49,7 +49,7 @@ func setupHandler(w http.ResponseWriter, r *http.Request) { if err := json.NewDecoder(r.Body).Decode(&req); err != nil { log.Fatal(err) } - if err := resTemplate.Execute(w, map[string]string{"username": req.Username, "org": req.Org, "retentionSeconds": strconv.Itoa(req.RetentionPeriodHrs * 60 * 60)}); err != nil { + if err := resTemplate.Execute(w, map[string]string{"username": req.Username, "org": string(req.Org), "retentionSeconds": strconv.Itoa(req.RetentionPeriodHrs * 60 * 60)}); err != nil { log.Fatal(err) } } @@ -188,7 +188,7 @@ func ExampleClient_Write_basic() { } // The actual write..., this method can be called concurrently. - if err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) // as above use your own error handling here. } influx.Close() // closes the client. After this the client is useless. @@ -258,7 +258,7 @@ func ExampleClient_Write_tlsMutualAuthentication() { } // The actual write... - if err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) } influx.Close() // close the client after this the client is useless. @@ -292,7 +292,7 @@ func ExampleClient_Setup() { } // We can now do a write even though we didn't put a token in - if err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) } influx.Close() // close the client after this the client is useless. diff --git a/go.mod b/go.mod index 216ee9ee..d9d389b5 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,6 @@ require ( github.com/google/go-cmp v0.2.0 // test dependency github.com/influxdata/flux v0.0.0-20190620184636-886e3c28388d // test dependency github.com/influxdata/line-protocol v0.0.0-20190509173118-5712a8124a9a + github.com/stretchr/testify v1.3.0 // test dependency + honnef.co/go/tools v0.0.1-2019.2.2 // indirect ) diff --git a/go.sum b/go.sum index 92686289..38b482a7 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,7 @@ github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4r github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/goreleaser/goreleaser v0.94.0 h1:2CFMxMTLODjYfNOx2sADNzpgCwH9ltMqvQYtj+ntK1Q= github.com/goreleaser/goreleaser v0.94.0/go.mod h1:OjbYR2NhOI6AEUWCowMSBzo9nP1aRif3sYtx+rhp+Zo= github.com/goreleaser/nfpm v0.9.7 h1:h8RQMDztu6cW7b0/s4PGbdeMYykAbJG0UMXaWG5uBMI= @@ -71,6 +72,7 @@ github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJS github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/kevinburke/ssh_config v0.0.0-20180830205328-81db2a75821e/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -100,6 +102,7 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/kafka-go v0.1.0 h1:IXCHG+sXPNiIR5pC/vTEItZduPKu4cnpr85YgxpxlW0= @@ -111,10 +114,13 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/xanzy/ssh-agent v0.2.0/go.mod h1:0NyE30eGUDliuLEHJgYte/zncp2zdTStcOnWhgSqHD8= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -124,26 +130,39 @@ go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181112044915-a3060d491354 h1:6UAgZ8309zQ9+1iWkHzfszFguqzOdHGyGkd1HmhJ+UE= golang.org/x/exp v0.0.0-20181112044915-a3060d491354/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4 h1:99CA0JJbUX4ozCnLon680Jc9e0T1i8HCaLVJMwtI8Hc= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180903190138-2b024373dcd9/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181030150119-7e31e0c00fa0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2 h1:M7NLB69gFpUH4s6SJLwXiVs45aZfVjqGKynfNFKSGcI= golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac h1:MQEvx39qSf8vyrx3XRaOe+j1UDIzKwkYOVObRgGPVqI= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6 h1:4WsZyVtkthqrHTbDCJfiTs8IWNYE4uvsSDgaV6xpp+o= @@ -153,6 +172,7 @@ google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/src-d/go-billy.v4 v4.2.1/go.mod h1:tm33zBoOwxjYHZIE+OV8bxTWFMJLrconzFMd38aARFk= gopkg.in/src-d/go-git-fixtures.v3 v3.1.1/go.mod h1:dLBcvytrw/TYZsNTWCnkNF2DSIlzWYqTe3rJR56Ac7g= gopkg.in/src-d/go-git.v4 v4.8.1/go.mod h1:Vtut8izDyrM8BUVQnzJ+YvmNcem2J89EmfZYCkLokZk= @@ -161,3 +181,5 @@ gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20181108184350-ae8f1f9103cc h1:VdiEcF0DrrUbDdrLBceS0h7LE60ebD5yRYLLXi0ezIs= honnef.co/go/tools v0.0.0-20181108184350-ae8f1f9103cc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.2 h1:TEgegKbBqByGUb1Coo1pc2qIdf2xw6v0mYyLSYtyopE= +honnef.co/go/tools v0.0.1-2019.2.2/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/write.go b/write.go index 5c1839d7..5bee93ce 100644 --- a/write.go +++ b/write.go @@ -17,49 +17,41 @@ import ( // Write writes metrics to a bucket, and org. It retries intelligently. // If the write is too big, it retries again, after breaking the payloads into two requests. -func (c *Client) Write(ctx context.Context, bucket, org string, m ...Metric) (err error) { - tries := uint64(0) - return c.write(ctx, bucket, org, &tries, m...) -} +func (c *Client) Write(ctx context.Context, bucket, org string, m ...Metric) (n int, err error) { + var ( + buf = &bytes.Buffer{} + e = lp.NewEncoder(buf) + ) -func parseWriteError(r io.Reader) (*genericRespError, error) { - werr := &genericRespError{} - if err := json.NewDecoder(r).Decode(&werr); err != nil { - return nil, err - } - return werr, nil -} + e.FailOnFieldErr(c.errOnFieldErr) -func (c *Client) write(ctx context.Context, bucket, org string, triesPtr *uint64, m ...Metric) error { - buf := &bytes.Buffer{} - e := lp.NewEncoder(buf) - cleanup := func() {} - defer func() { cleanup() }() -doRequest: select { case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() default: } + for i := range m { if _, err := e.Encode(m[i]); err != nil { - return err + return 0, err } } + req, err := c.makeWriteRequest(bucket, org, buf) if err != nil { - return err + return 0, err } - resp, err := c.httpClient.Do(req) + + resp, err := c.httpClient.Do(req.WithContext(ctx)) if err != nil { - return err + return 0, err } - cleanup = func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - // throw away the rest of the body so the connection can be reused even if there is still stuff on the wire. - ioutil.ReadAll(r) // we don't care about the error here, it is just to empty the tcp buffer + + defer func() { + // discard body so connection can be reused + _, _ = io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() - } + }() switch resp.StatusCode { case http.StatusOK, http.StatusNoContent: @@ -68,42 +60,21 @@ doRequest: Code: resp.Status, Message: "too many requests too fast", } - cleanup() - if err2 := c.backoff(triesPtr, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest case http.StatusServiceUnavailable: err = &genericRespError{ Code: resp.Status, Message: "service temporarily unavaliable", } - cleanup() - if err2 := c.backoff(triesPtr, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest default: gwerr, err := parseWriteError(resp.Body) if err != nil { - return err + return 0, err } - return gwerr + return 0, gwerr } - // we don't defer and close till here, because of the retries. - defer func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - _, err := ioutil.ReadAll(r) // throw away the rest of the body so the connection gets reused. - err2 := resp.Body.Close() - if err == nil && err2 != nil { - err = err2 - } - }() - e.FailOnFieldErr(c.errOnFieldErr) - return err + + return len(m), err } func makeWriteURL(loc *url.URL, bucket, org string) (string, error) { @@ -128,3 +99,11 @@ func makeWriteURL(loc *url.URL, bucket, org string) (string, error) { u.RawQuery = params.Encode() return u.String(), nil } + +func parseWriteError(r io.Reader) (*genericRespError, error) { + werr := &genericRespError{} + if err := json.NewDecoder(r).Decode(&werr); err != nil { + return nil, err + } + return werr, nil +} diff --git a/writer.go b/writer.go deleted file mode 100644 index 62fd7ff6..00000000 --- a/writer.go +++ /dev/null @@ -1,272 +0,0 @@ -package influxdb - -import ( - "bytes" - "context" - "io" - "io/ioutil" - "math/rand" - "net/http" - "strconv" - "sync" - "sync/atomic" - "time" - - lp "github.com/influxdata/line-protocol" -) - -const maxPooledBuffer = 4 << 20 //8 megs - -// LPWriter is a type for writing line protocol in a buffered way. -// It allows you to set a flush interval and flush regularly or to call the Flush method to flush its internal buffer. -type LPWriter struct { - stopTicker func() - flushChan <-chan time.Time - flushInterval time.Duration - flushSize int - c *Client - buf switchableBuffer - lock sync.Mutex - enc *lp.Encoder - bucket, org string - tries uint64 - maxRetries int - errOnFieldErr bool - stop chan struct{} - once sync.Once - wg sync.WaitGroup - onError func(error) -} - -type switchableBuffer struct { - *bytes.Buffer -} - -// WriteMetrics writes Metrics to the LPWriter. -func (w *LPWriter) WriteMetrics(m ...Metric) (int, error) { - select { - case <-w.stop: - return 0, nil - default: - } - w.lock.Lock() - for i := range m { - j, err := w.enc.Encode(m[i]) - if err != nil { - return j, err - } - } - w.asyncFlush() - w.lock.Unlock() - return 0, nil -} - -// NewBufferingWriter creates a new BufferingWriter. -func (c *Client) NewBufferingWriter(bucket string, org string, flushInterval time.Duration, flushSize int, onError func(error)) *LPWriter { - // if onError is nil set to a noop - if onError == nil { - onError = func(_ error) {} - } - w := &LPWriter{c: c, buf: switchableBuffer{&bytes.Buffer{}}, flushSize: flushSize, flushInterval: flushInterval, stop: make(chan struct{}), onError: onError, bucket: bucket, org: org} - w.enc = lp.NewEncoder(&w.buf) - w.enc.FailOnFieldErr(w.errOnFieldErr) - return w -} - -// Write writes name, time stamp, tag keys, tag values, field keys, and field values to an LPWriter. -func (w *LPWriter) Write(name []byte, ts time.Time, tagKeys, tagVals, fieldKeys [][]byte, fieldVals []interface{}) (int, error) { - select { - case <-w.stop: - return 0, nil - default: - } - w.lock.Lock() - i, err := w.enc.Write(name, ts, tagKeys, tagVals, fieldKeys, fieldVals) - // asyncronously flush if the size of the buffer is too big. - if err != nil { - return i, err - } - w.asyncFlush() - w.lock.Unlock() - return i, err -} -func (w *LPWriter) asyncFlush() { - if w.flushSize > 0 && w.buf.Len() > w.flushSize { - w.wg.Add(1) - buf := w.buf.Buffer - w.buf.Buffer = bufferPool.Get().(*bytes.Buffer) - go func() { - w.flush(context.TODO(), buf) - if buf.Len() <= maxPooledBuffer { - buf.Reset() - bufferPool.Put(buf) - } - w.wg.Done() - }() - } -} - -// Start starts an LPWriter, so that the writer can flush it out to influxdb. -func (w *LPWriter) Start() { - w.lock.Lock() - w.once = sync.Once{} - if w.flushInterval != 0 { - t := time.NewTicker(w.flushInterval) - w.stopTicker = t.Stop - w.flushChan = t.C - w.wg.Add(1) - go func() { - for { - select { - case <-w.flushChan: - err := w.Flush(context.Background()) - if err != nil { - w.onError(err) - } - case <-w.stop: - w.wg.Done() - return - } - } - }() - } else { - w.stopTicker = func() {} - } - w.lock.Unlock() -} - -var bufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} - -// Flush writes out the internal buffer to the database. -func (w *LPWriter) Flush(ctx context.Context) error { - w.wg.Add(1) - defer w.wg.Done() - w.lock.Lock() - if w.buf.Len() == 0 { - w.lock.Unlock() - return nil - } - buf := w.buf.Buffer - w.buf.Buffer = bufferPool.Get().(*bytes.Buffer) - w.lock.Unlock() - err := w.flush(ctx, buf) - if err != nil { - return err - } - if buf.Len() <= maxPooledBuffer { - buf.Reset() - bufferPool.Put(buf) - } - return err -} - -func (w *LPWriter) flush(ctx context.Context, buf *bytes.Buffer) error { - - cleanup := func() {} - defer func() { cleanup() }() - // early exit so we don't send empty buffers -doRequest: - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - req, err := w.c.makeWriteRequest(w.bucket, w.org, buf) - if err != nil { - return err - } - resp, err := w.c.httpClient.Do(req) - if err != nil { - return err - } - cleanup = func() { - r := io.LimitReader(resp.Body, 1<<24) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - // throw away the rest of the body so the connection can be reused even if there is still stuff on the wire. - _, _ = ioutil.ReadAll(r) // we don't care about the error here, it is just to empty the tcp buffer - resp.Body.Close() - } - - switch resp.StatusCode { - case http.StatusOK, http.StatusNoContent: - case http.StatusTooManyRequests: - err = &genericRespError{ - Code: resp.Status, - Message: "too many requests too fast", - } - cleanup() - if err2 := w.backoff(&w.tries, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest - case http.StatusServiceUnavailable: - err = &genericRespError{ - Code: resp.Status, - Message: "service temporarily unavaliable", - } - cleanup() - if err2 := w.backoff(&w.tries, resp, err); err2 != nil { - return err2 - } - cleanup = func() { - w.lock.Unlock() - } - goto doRequest - default: - gwerr, err := parseWriteError(resp.Body) - if err != nil { - return err - } - - return gwerr - } - // we don't defer and close till here, because of the retries. - defer func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - _, err := ioutil.ReadAll(r) // throw away the rest of the body so the connection gets reused. - err2 := resp.Body.Close() - if err == nil && err2 != nil { - err = err2 - } - }() - return err -} - -// backoff is a helper method for backoff, triesPtr must not be nil. -func (w *LPWriter) backoff(triesPtr *uint64, resp *http.Response, err error) error { - tries := atomic.LoadUint64(triesPtr) - if w.maxRetries >= 0 || int(tries) >= w.maxRetries { - return maxRetriesExceededError{ - err: err, - tries: w.maxRetries, - } - } - retry := 0 - if resp != nil { - retryAfter := resp.Header.Get("Retry-After") - retry, _ = strconv.Atoi(retryAfter) // we ignore the error here because an error already means retry is 0. - } - sleepFor := time.Duration(retry) * time.Second - if retry == 0 { // if we didn't get a Retry-After or it is zero, instead switch to exponential backoff - sleepFor = time.Duration(rand.Int63n(((1 << tries) - 1) * 10 * int64(time.Microsecond))) - } - if sleepFor > defaultMaxWait { - sleepFor = defaultMaxWait - } - time.Sleep(sleepFor) - atomic.AddUint64(triesPtr, 1) - return nil -} - -// Stop gracefully stops a started LPWriter. -func (w *LPWriter) Stop() { - w.lock.Lock() - w.once.Do(func() { - close(w.stop) - w.wg.Wait() - w.stopTicker() - w.stop = make(chan struct{}) - }) - w.lock.Unlock() - w.wg.Wait() -} diff --git a/writer/buffered.go b/writer/buffered.go new file mode 100644 index 00000000..8da96608 --- /dev/null +++ b/writer/buffered.go @@ -0,0 +1,112 @@ +package writer + +import ( + "io" + + "github.com/influxdata/influxdb-client-go" +) + +const defaultBufferSize = 100 + +// MetricsWriter is a type which metrics can be written to +type MetricsWriter interface { + Write(...influxdb.Metric) (int, error) +} + +// BufferedWriter is a buffered implementation of the MetricsWriter interface +// It is unashamedly derived from the bufio pkg https://golang.org/pkg/bufio +// Metrics are buffered up until the buffer size is met and then flushed to +// an underlying MetricsWriter +// The writer can also be flushed manually by calling Flush +// BufferedWriter is not safe to be called concurrently and therefore concurrency +// should be managed by the caller +type BufferedWriter struct { + wr MetricsWriter + buf []influxdb.Metric + n int + err error +} + +// NewBufferedWriter returns a new *BufferedWriter with the default +// buffer size +func NewBufferedWriter(w MetricsWriter) *BufferedWriter { + return NewBufferedWriterSize(w, defaultBufferSize) +} + +// NewBufferedWriterSize returns a new *BufferedWriter with a buffer +// allocated with the provided size +func NewBufferedWriterSize(w MetricsWriter, size int) *BufferedWriter { + if size <= 0 { + size = defaultBufferSize + } + + return &BufferedWriter{ + wr: w, + buf: make([]influxdb.Metric, size), + } +} + +// Available returns how many bytes are unused in the buffer. +func (b *BufferedWriter) Available() int { return len(b.buf) - b.n } + +// Buffered returns the number of bytes that have been written into the current buffer. +func (b *BufferedWriter) Buffered() int { return b.n } + +// Write writes the provided metrics to the underlying buffer if there is available +// capacity. Otherwise it flushes the buffer and attempts to assign the remain metrics to +// the buffer. This process repeats until all the metrics are either flushed or in the buffer +func (b *BufferedWriter) Write(m ...influxdb.Metric) (nn int, err error) { + for len(m) > b.Available() && b.err == nil { + var n int + if b.Buffered() == 0 { + // Large write, empty buffer. + // Write directly from m to avoid copy. + n, b.err = b.wr.Write(m...) + } else { + n = copy(b.buf[b.n:], m) + b.n += n + b.Flush() + } + + nn += n + m = m[n:] + } + + if b.err != nil { + return nn, b.err + } + + n := copy(b.buf[b.n:], m) + b.n += n + nn += n + return nn, nil +} + +// Flush writes any buffered data to the underlying MetricsWriter +func (b *BufferedWriter) Flush() error { + if b.err != nil { + return b.err + } + + if b.n == 0 { + return nil + } + + n, err := b.wr.Write(b.buf[0:b.n]...) + if n < b.n && err == nil { + err = io.ErrShortWrite + } + + if err != nil { + if n > 0 && n < b.n { + copy(b.buf[0:b.n-n], b.buf[n:b.n]) + } + b.n -= n + b.err = err + return err + } + + b.n = 0 + + return nil +} diff --git a/writer/buffered_test.go b/writer/buffered_test.go new file mode 100644 index 00000000..45ed463c --- /dev/null +++ b/writer/buffered_test.go @@ -0,0 +1,72 @@ +package writer + +import ( + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_BufferedWriter(t *testing.T) { + var ( + // writer which asserts calls are made for org and bucket + underlyingWriter = newTestWriter() + writer = NewBufferedWriter(underlyingWriter) + // 100 rows of batch size 100 expected + expected = createNTestRowMetrics(t, 100, 100) + ) + + // write 10000 metrics in various batch sizes + for _, batchSize := range permuteCounts(t, 10000) { + n, err := writer.Write(createTestRowMetrics(t, batchSize)...) + require.NoError(t, err) + require.Equal(t, batchSize, n) + } + + // flush any remaining buffer to underlying writer + require.Zero(t, writer.Available()) + require.Equal(t, 100, writer.Buffered()) + require.NoError(t, writer.Flush()) + + // check batches written to underlying writer are 100 batches of 100 metrics + require.Equal(t, expected, underlyingWriter.writes) + require.Zero(t, writer.Buffered()) + require.Equal(t, 100, writer.Available()) +} + +func Test_BufferedWriter_LargeWriteEmptyBuffer(t *testing.T) { + var ( + // writer which asserts calls are made for org and bucket + underlyingWriter = newTestWriter() + writer = NewBufferedWriterSize(underlyingWriter, 100) + expected = createNTestRowMetrics(t, 1, 500) + ) + + n, err := writer.Write(createTestRowMetrics(t, 500)...) + require.Nil(t, err) + require.Equal(t, 500, n) + + // expect one large batch of 500 metrics to skip buffer and + // go straight to underyling writer + require.Equal(t, expected, underlyingWriter.writes) +} + +func TestBufferedWriter_ShortWrite(t *testing.T) { + var ( + // writer which reports 9 bytes written + underlyingWriter = &metricsWriter{n: 9} + writer = NewBufferedWriterSize(underlyingWriter, 10) + metrics = createTestRowMetrics(t, 11) + ) + + // warm the buffer otherwise we skip it and go + // straight to client + n, err := writer.Write(metrics[0]) + require.Nil(t, err) + require.Equal(t, 1, n) + + // attempt write where underyling write only writes 6 metrics + n, err = writer.Write(metrics[1:]...) + require.Equal(t, io.ErrShortWrite, err) + require.Equal(t, 9, n) +} diff --git a/writer/doc.go b/writer/doc.go new file mode 100644 index 00000000..514bc6d6 --- /dev/null +++ b/writer/doc.go @@ -0,0 +1,64 @@ +// Package writer contains useful types for buffering, batching and periodically syncing +// writes onto a provided metric writing client. +// +// The following example demonstrate the usage of a *writer.PointWriter. This is designed to +// buffer calls to Write metrics and flush them in configurable batch sizes (see WithBufferSize). +// It is also designed to periodically flush the buffer if a configurable duration ellapses between +// calls to Write. This is useful to ensure metrics are flushed to the client during a pause in their +// production. +// +// Example Usage +// +// import ( +// "github.com/influxdata/influxdb-client-go" +// "github.com/influxdata/influxdb-client-go/writer" +// ) +// +// func main() { +// var ( +// cli, _ = influxdb.New("http://localhost:9999", "some-token") +// bucket = "default" +// org = "influx" +// ) +// +// wr := writer.New(cli, bucket, org, writer.WithBufferSize(10)) +// +// wr.Write(influxdb.NewRowMetric( +// map[string]interface{}{ +// "value": 16, +// }, +// "temp_celsius", +// map[string]string{ +// "room": "living_room", +// }, +// time.Now(), +// ), +// influxdb.NewRowMetric( +// map[string]interface{}{ +// "value": 17, +// }, +// "temp_celsius", +// map[string]string{ +// "room": "living_room", +// }, +// time.Now(), +// )) +// +// wr.Close() +// } +// +// writer.New(...) return a PointerWriter which is composed of multiple other types available in this +// package. +// +// It first wraps the provided client in a *BucketWriter which takes care of ensuring all written metrics +// are called on the underyling client with a specific organisation and bucket. This is not safe for +// concurrent use. +// +// It then wraps this writer in a *BufferedWriter and configures its buffer size accordingly. This type +// implements the buffering of metrics and exposes a flush method. Once the buffer size is exceed flush +// is called automatically. However, Flush() can be called manually on this type. This is also not safe +// for concurrent use. +// +// Finally, it wraps the buffered writer in a *PointsWriter which takes care of ensuring Flush is called +// automatically when it hasn't been called for a configured duration. This final type is safe for concurrent use. +package writer diff --git a/writer/options.go b/writer/options.go new file mode 100644 index 00000000..f9b317ba --- /dev/null +++ b/writer/options.go @@ -0,0 +1,51 @@ +package writer + +import "time" + +// Config is a structure used to configure a point writer +type Config struct { + size int + flushInterval time.Duration +} + +// Option is a functional option for Configuring point writers +type Option func(*Config) + +// Options is a slice of Option +type Options []Option + +// Config constructs a default configuration and then +// applies the callee options and returns the config +func (o Options) Config() Config { + config := Config{ + size: defaultBufferSize, + flushInterval: defaultFlushInterval, + } + + o.Apply(&config) + + return config +} + +// Apply calls each option in the slice on options on the provided Config +func (o Options) Apply(c *Config) { + for _, opt := range o { + opt(c) + } +} + +// WithBufferSize sets the size of the underlying buffer on the point writer +func WithBufferSize(size int) Option { + return func(c *Config) { + c.size = size + } +} + +// WithFlushInterval sets the flush interval on the writer +// The point writer will wait at least this long between flushes +// of the undeyling buffered writer +func WithFlushInterval(interval time.Duration) Option { + return func(c *Config) { + c.flushInterval = interval + } +} diff --git a/writer/point.go b/writer/point.go new file mode 100644 index 00000000..40682e13 --- /dev/null +++ b/writer/point.go @@ -0,0 +1,140 @@ +package writer + +import ( + "io" + "sync" + "time" + + "github.com/influxdata/influxdb-client-go" +) + +const defaultFlushInterval = 1 * time.Second + +// MetricsWriteFlush is a type of metrics writer which is +// buffered and metrics can be flushed to +type MetricsWriteFlusher interface { + Write(m ...influxdb.Metric) (int, error) + Available() int + Flush() error +} + +// PointWriter delegates calls to Write to an underlying flushing writer +// implementation. It also periodically calls flush on the underlying writer and is safe +// to be called concurrently. As the flushing writer can also flush on calls to Write +// when the number of metrics being written exceeds the buffer capacity, it also ensures +// to reset its timer in this scenario as to avoid calling flush multiple times +type PointWriter struct { + w MetricsWriteFlusher + flushInterval time.Duration + resetTick chan struct{} + stopped chan struct{} + err error + mu sync.Mutex +} + +// NewPointWriter configures and returns a *PointWriter writer type +// The new writer will automatically begin scheduling periodic flushes based on the +// provided duration +func NewPointWriter(w MetricsWriteFlusher, flushInterval time.Duration) *PointWriter { + writer := &PointWriter{ + w: w, + flushInterval: flushInterval, + // buffer of one in order to not block writes + resetTick: make(chan struct{}, 1), + // stopped is closed once schedule has exited + stopped: make(chan struct{}), + } + + go writer.schedule() + + return writer +} + +func (p *PointWriter) schedule() { + defer close(p.stopped) + + ticker := time.NewTicker(p.flushInterval) + + for { + select { + case <-ticker.C: + if err := func() error { + p.mu.Lock() + defer p.mu.Unlock() + + // return if error is now not nil + if p.err != nil { + return p.err + } + + // between the recv on the ticker and the lock obtain + // the reset tick could've been triggered so we check + // and skip the flush if it did + select { + case <-p.resetTick: + return nil + default: + } + + p.err = p.w.Flush() + + return p.err + }(); err != nil { + return + } + case _, ok := <-p.resetTick: + if !ok { + return + } + + ticker.Stop() + ticker = time.NewTicker(p.flushInterval) + } + } +} + +// Write delegates to an underlying metrics writer +// If the delegating call is going to cause a flush, it signals +// to the schduled periodic flush to reset its timer +func (p *PointWriter) Write(m ...influxdb.Metric) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.err != nil { + return 0, p.err + } + + // check if the underlying flush will flush + if len(m) > p.w.Available() { + // tell the ticker to reset flush interval + select { + case p.resetTick <- struct{}{}: + default: + } + } + + var n int + n, p.err = p.w.Write(m...) + return n, p.err +} + +// Close signals to stop flushing metrics and causes subsequent +// calls to Write to return a closed pipe error +// Close returns once scheduledge flushing has stopped +func (p *PointWriter) Close() error { + p.mu.Lock() + + // signal close + close(p.resetTick) + + // return err io closed pipe for subsequent writes + p.err = io.ErrClosedPipe + + // release lock so scheduled may acknowledge and exit + p.mu.Unlock() + + // wait until schedule exits + <-p.stopped + + return nil +} diff --git a/writer/point_test.go b/writer/point_test.go new file mode 100644 index 00000000..a1c2bd2d --- /dev/null +++ b/writer/point_test.go @@ -0,0 +1,76 @@ +package writer + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var deltaMsgFmt = "delta between flushes exceeds 105ms: %q" + +func Test_PointWriter_Write_Batches(t *testing.T) { + var ( + underlyingWriter = newTestWriter() + writer = NewPointWriter(NewBufferedWriter(underlyingWriter), 50*time.Millisecond) + // 100 rows of batch size 100 expected + expected = createNTestRowMetrics(t, 100, 100) + ) + + // write 10000 metrics in various batch sizes + for _, batchSize := range permuteCounts(t, 10000) { + n, err := writer.Write(createTestRowMetrics(t, batchSize)...) + require.NoError(t, err) + require.Equal(t, batchSize, n) + + time.Sleep(10 * time.Millisecond) + } + + time.Sleep(100 * time.Millisecond) + + // close writer ensuring no more flushes occur + require.NoError(t, writer.Close()) + + // check batches written to underlying writer are 100 batches of 100 metrics + require.Equal(t, expected, underlyingWriter.writes) +} + +func Test_PointWriter_Write(t *testing.T) { + var ( + underlyingWriter = newTestWriter() + buffered = NewBufferedWriterSize(underlyingWriter, 10) + writer = NewPointWriter(buffered, 100*time.Millisecond) + ) + + // write between 1 and 4 metrics every 30ms ideally meaning + // some writes will flush for because of buffer size + // and some for periodic flush + for i := 0; i < 10; i++ { + var ( + count = rand.Intn(4) + 1 + n, err = writer.Write(createTestRowMetrics(t, count)...) + ) + + require.Nil(t, err) + require.Equal(t, count, n) + + time.Sleep(30 * time.Millisecond) + } + + // close writer to ensure scheduling has stopped + writer.Close() + + // ensure time between each write does not exceed roughly 100ms + // as per the flush interval of the point writer + for i := 1; i < len(underlyingWriter.when); i++ { + var ( + first, next = underlyingWriter.when[i-1], underlyingWriter.when[i] + delta = next.Sub(first) + ) + + // ensure writes are roughly 100 milliseconds apart + assert.Truef(t, delta <= 105*time.Millisecond, deltaMsgFmt, delta) + } +} diff --git a/writer/support_test.go b/writer/support_test.go new file mode 100644 index 00000000..039d9c5c --- /dev/null +++ b/writer/support_test.go @@ -0,0 +1,114 @@ +package writer + +import ( + "context" + "math" + "math/rand" + "testing" + "time" + + "github.com/influxdata/influxdb-client-go" +) + +type bucketWriter struct { + calls []bucketWriteCall +} + +type bucketWriteCall struct { + bkt string + org string + data []influxdb.Metric +} + +func (b *bucketWriter) Write(_ context.Context, bucket, org string, m ...influxdb.Metric) (int, error) { + b.calls = append(b.calls, bucketWriteCall{bucket, org, m}) + return len(m), nil +} + +type metricsWriter struct { + // length override + n int + // metrics written + when []time.Time + writes [][]influxdb.Metric +} + +func newTestWriter() *metricsWriter { + return &metricsWriter{n: -1} +} + +func (w *metricsWriter) Write(m ...influxdb.Metric) (int, error) { + w.when = append(w.when, time.Now()) + w.writes = append(w.writes, m) + + if w.n > -1 { + return w.n, nil + } + + return len(m), nil +} + +// permuteCounts returns a set of pseudo-random batch size counts +// which sum to the provided total +// E.g. for a sum total of 100 (permuteCounts(t, 100)) +// this function may produce the following [5 12 8 10 14 11 9 9 8 14] +// The sum of these values is == 100 and there are √100 buckets +func permuteCounts(t *testing.T, total int) (buckets []int) { + t.Helper() + + var accum int + + buckets = make([]int, int(math.Sqrt(float64(total)))) + for i := 0; i < len(buckets); i++ { + size := total / len(buckets) + if accum+size > total { + size = total - accum + } + + buckets[i] = size + + accum += size + + // shuffle some counts from previous bucket forward to current bucket + if i > 0 { + var ( + min = math.Min(float64(buckets[i]), float64(buckets[i-1])) + delta = rand.Intn(int(min / 2)) + ) + + buckets[i-1], buckets[i] = buckets[i-1]-delta, buckets[i]+delta + } + } + + return +} + +func createNTestRowMetrics(t *testing.T, rows, count int) (metrics [][]influxdb.Metric) { + metrics = make([][]influxdb.Metric, 0, rows) + + for i := 0; i < rows; i++ { + metrics = append(metrics, createTestRowMetrics(t, count)) + } + + return +} + +func createTestRowMetrics(t *testing.T, count int) (metrics []influxdb.Metric) { + t.Helper() + + metrics = make([]influxdb.Metric, 0, count) + for i := 0; i < count; i++ { + metrics = append(metrics, influxdb.NewRowMetric( + map[string]interface{}{ + "some_field": "some_value", + }, + "some_measurement", + map[string]string{ + "some_tag": "some_value", + }, + time.Date(2019, time.January, 1, 0, 0, 0, 0, time.UTC), + )) + } + + return +} diff --git a/writer/writer.go b/writer/writer.go new file mode 100644 index 00000000..d4a8129f --- /dev/null +++ b/writer/writer.go @@ -0,0 +1,49 @@ +package writer + +import ( + "context" + + "github.com/influxdata/influxdb-client-go" +) + +// BucketMetricWriter is a type which Metrics can be written to a particular bucket +// in a particular organisation +type BucketMetricWriter interface { + Write(context.Context, string, string, ...influxdb.Metric) (int, error) +} + +// New constructs a point writer with an underlying buffer from the provided BucketMetricWriter +// The writer will flushed metrics to the underlying BucketMetricWriter when the buffer is full +// or the configured flush interval ellapses without a flush occuring +func New(writer BucketMetricWriter, bkt, org string, opts ...Option) *PointWriter { + var ( + config = Options(opts).Config() + bucket = NewBucketWriter(writer, bkt, org) + buffered = NewBufferedWriterSize(bucket, config.size) + ) + + return NewPointWriter(buffered, config.flushInterval) +} + +// BucketWriter writes metrics to a particular bucket +// within a particular organisation +type BucketWriter struct { + w BucketMetricWriter + + ctxt context.Context + + bucket string + org string +} + +// NewBucketWriter allocates, configures and returned a new BucketWriter for writing +// metrics to a specific organisations bucket +func NewBucketWriter(w BucketMetricWriter, bucket, org string) *BucketWriter { + return &BucketWriter{w, context.Background(), bucket, org} +} + +// Write writes the provided metrics to the underlying metrics writer +// using the org and bucket configured on the bucket writer +func (b *BucketWriter) Write(m ...influxdb.Metric) (int, error) { + return b.w.Write(b.ctxt, b.org, b.bucket, m...) +} diff --git a/writer/writer_test.go b/writer/writer_test.go new file mode 100644 index 00000000..fef734bd --- /dev/null +++ b/writer/writer_test.go @@ -0,0 +1,49 @@ +package writer + +import ( + "testing" + "time" + + "github.com/influxdata/influxdb-client-go" + "github.com/stretchr/testify/require" +) + +func Test_New(t *testing.T) { + var ( + client = &influxdb.Client{} + bkt = "default" + org = "influx" + options = Options{WithBufferSize(12), WithFlushInterval(5 * time.Minute)} + wr = New(client, bkt, org, options...) + ) + + require.Equal(t, 5*time.Minute, wr.flushInterval) + require.Len(t, wr.w.(*BufferedWriter).buf, 12) + + require.Nil(t, wr.Close()) +} + +func Test_BucketWriter(t *testing.T) { + var ( + spy = &bucketWriter{} + bucket = "default" + org = "influx" + wr = NewBucketWriter(spy, bucket, org) + + expected = []bucketWriteCall{ + {org, bucket, createTestRowMetrics(t, 4)}, + {org, bucket, createTestRowMetrics(t, 8)}, + {org, bucket, createTestRowMetrics(t, 12)}, + {org, bucket, createTestRowMetrics(t, 16)}, + } + ) + + for _, count := range []int{4, 8, 12, 16} { + n, err := wr.Write(createTestRowMetrics(t, count)...) + require.Nil(t, err) + require.Equal(t, count, n) + } + + // ensure underlying "client" is called as expected + require.Equal(t, expected, spy.calls) +} diff --git a/writer_test.go b/writer_test.go deleted file mode 100644 index 8f2c6ff3..00000000 --- a/writer_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package influxdb_test - -import ( - "context" - "net/http" - "net/http/httptest" - "runtime" - "sync" - "sync/atomic" - "testing" - "time" - - influxdb "github.com/influxdata/influxdb-client-go" -) - -func TestWriterStartupAndShutdown(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) - cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client())) - if err != nil { - server.Close() - t.Fatal(err) - } - w := cl.NewBufferingWriter("my-bucket", "my-org", 10*time.Second, 1024*100, func(err error) { - t.Error(err) - }) - wg := sync.WaitGroup{} - w.Start() - for i := 0; i < 20; i++ { - wg.Add(1) - go func() { - runtime.Gosched() - w.Start() - wg.Done() - }() - } - for i := 0; i < 20; i++ { - wg.Add(1) - go func() { - runtime.Gosched() - w.Stop() - wg.Done() - }() - } - wg.Wait() -} - -func TestAutoFlush(t *testing.T) { - q := uint64(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - res := atomic.AddUint64(&q, 1) - if res > 3 { - t.Errorf("size based flush happened too often, expected 3 but got %d", res) - } - })) - cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client())) - if err != nil { - t.Error(e2e) - } - w := cl.NewBufferingWriter("my-bucket", "my-org", 0, 100*1024, func(err error) { - t.Error(err) - }) - w.Start() - ts := time.Time{} - for i := 0; i < 3000; i++ { - ts = ts.Add(1) - _, err = w.Write([]byte("TestWriterE2E"), - ts, - [][]byte{[]byte("test1"), []byte("test2")}, - [][]byte{[]byte("here"), []byte("alsohere")}, - [][]byte{[]byte("val1"), []byte("val2")}, - []interface{}{1, 99}) - if err != nil { - t.Error(err) - } - } - w.Flush(context.Background()) - tries := atomic.LoadUint64(&q) - w.Stop() - if tries < 3 { - t.Errorf("size based flush happened too infrequently expected 3 got %d", tries) - } -} - -func TestErrorFlush(t *testing.T) { - q := uint64(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddUint64(&q, 1) - w.WriteHeader(http.StatusInternalServerError) - })) - cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client())) - if err != nil { - t.Error(e2e) - } - { - w := cl.NewBufferingWriter("my-bucket", "my-org", 0, 100*1024, nil) // we are checking to make sure it won't panic if onError is nil - w.Start() - ts := time.Time{} - for i := 0; i < 3000; i++ { - ts = ts.Add(1) - _, err = w.Write([]byte("TestWriterE2E"), - ts, - [][]byte{[]byte("test1"), []byte("test2")}, - [][]byte{[]byte("here"), []byte("alsohere")}, - [][]byte{[]byte("val1"), []byte("val2")}, - []interface{}{1, 99}) - if err != nil { - t.Error(err) - } - } - w.Flush(context.Background()) - tries := atomic.LoadUint64(&q) - w.Stop() - if tries < 3 { - t.Errorf("size based flush happened too infrequently expected 3 got %d", tries) - } - } - { - w := cl.NewBufferingWriter("my-bucket", "my-org", 0, 100*1024, func(e error) { - if err == nil { - t.Error("expected non-nil error but got nil") - } - }) // we are checking to make sure it won't panic if onError is nil - w.Start() - ts := time.Time{} - for i := 0; i < 3000; i++ { - ts = ts.Add(1) - _, err = w.Write([]byte("TestWriterE2E"), - ts, - [][]byte{[]byte("test1"), []byte("test2")}, - [][]byte{[]byte("here"), []byte("alsohere")}, - [][]byte{[]byte("val1"), []byte("val2")}, - []interface{}{1, 99}) - if err != nil { - t.Error(err) - } - } - w.Flush(context.Background()) - tries := atomic.LoadUint64(&q) - w.Stop() - if tries < 3 { - t.Errorf("size based flush happened too infrequently expected 3 got %d", tries) - } - - } -}