From 67c751f15ef075b97967dd0d1a8391c968a2fd8e Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Fri, 12 Jul 2019 15:37:36 +0100 Subject: [PATCH 1/4] refactor: add writer package for buffered writes --- .circleci/config.yml | 2 +- client.go | 33 ----- e2e_test.go | 2 +- errors.go | 13 -- examples_test.go | 6 +- go.mod | 2 + go.sum | 22 ++++ models.go | 8 ++ write.go | 85 +++++-------- writer.go | 272 ---------------------------------------- writer/buffered.go | 112 +++++++++++++++++ writer/buffered_test.go | 72 +++++++++++ writer/doc.go | 60 +++++++++ writer/options.go | 51 ++++++++ writer/point.go | 140 +++++++++++++++++++++ writer/point_test.go | 76 +++++++++++ writer/support_test.go | 114 +++++++++++++++++ writer/writer.go | 49 ++++++++ writer/writer_test.go | 49 ++++++++ writer_test.go | 145 --------------------- 20 files changed, 792 insertions(+), 521 deletions(-) delete mode 100644 writer.go create mode 100644 writer/buffered.go create mode 100644 writer/buffered_test.go create mode 100644 writer/doc.go create mode 100644 writer/options.go create mode 100644 writer/point.go create mode 100644 writer/point_test.go create mode 100644 writer/support_test.go create mode 100644 writer/writer.go create mode 100644 writer/writer_test.go delete mode 100644 writer_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 0e9396a9..3932dc98 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,4 +42,4 @@ jobs: - run: go get -v -t -d ./... - run: go vet ./... - run: go get honnef.co/go/tools/cmd/staticcheck && staticcheck ./... - - run: go test -v -e2e ./... + - run: go test -v -race -e2e ./... diff --git a/client.go b/client.go index e2b63b25..33078b1a 100644 --- a/client.go +++ b/client.go @@ -6,13 +6,9 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "net/http" "net/url" - "strconv" "sync" - "sync/atomic" - "time" "github.com/influxdata/influxdb-client-go/internal/gzip" ) @@ -20,8 +16,6 @@ import ( // TODO(docmerlin): change the generator so we don't have to hand edit the generated code //go:generate go run scripts/buildclient.go -const defaultMaxWait = 10 * time.Second - // Client is a client for writing to influx. type Client struct { httpClient *http.Client @@ -31,7 +25,6 @@ type Client struct { password string username string l sync.Mutex - maxRetries int errOnFieldErr bool userAgent string authorization string // the Authorization header @@ -112,32 +105,6 @@ func (c *Client) Ping(ctx context.Context) error { return nil } -// backoff is a helper method for backoff, triesPtr must not be nil. -func (c *Client) backoff(triesPtr *uint64, resp *http.Response, err error) error { - tries := atomic.LoadUint64(triesPtr) - if c.maxRetries >= 0 || int(tries) >= c.maxRetries { - return maxRetriesExceededError{ - err: err, - tries: c.maxRetries, - } - } - retry := 0 - if resp != nil { - retryAfter := resp.Header.Get("Retry-After") - retry, _ = strconv.Atoi(retryAfter) // we ignore the error here because an error already means retry is 0. - } - sleepFor := time.Duration(retry) * time.Second - if retry == 0 { // if we didn't get a Retry-After or it is zero, instead switch to exponential backoff - sleepFor = time.Duration(rand.Int63n(((1 << tries) - 1) * 10 * int64(time.Microsecond))) - } - if sleepFor > defaultMaxWait { - sleepFor = defaultMaxWait - } - time.Sleep(sleepFor) - atomic.AddUint64(triesPtr, 1) - return nil -} - func (c *Client) makeWriteRequest(bucket, org string, body io.Reader) (*http.Request, error) { var err error if c.contentEncoding == "gzip" { diff --git a/e2e_test.go b/e2e_test.go index 4bb9cf90..41c9bdb9 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -43,7 +43,7 @@ func TestE2E(t *testing.T) { t.Fatalf("expected user to be %s, but was %s", "e2e-test-user", sRes.User.Name) } now := time.Now() - err = influx.Write(context.Background(), "e2e-test-bucket", "e2e-test-org", + _, err = influx.Write(context.Background(), influxdb.Organisation("e2e-test-org"), influxdb.Bucket("e2e-test-bucket"), &influxdb.RowMetric{ NameStr: "test", Tags: []*influxdb.Tag{ diff --git a/errors.go b/errors.go index 7b025fc4..96cf9bf1 100644 --- a/errors.go +++ b/errors.go @@ -8,19 +8,6 @@ import ( // ErrUnimplemented is an error for when pieces of the client's functionality is unimplemented. var ErrUnimplemented = errors.New("unimplemented") -type maxRetriesExceededError struct { - tries int - err error -} - -func (err maxRetriesExceededError) Error() string { - return fmt.Sprintf("max retries of %d reached, and we recieved an error of %v", err.tries, err.err) -} - -func (err maxRetriesExceededError) Unwrap() error { - return err.err -} - type genericRespError struct { Code string Message string diff --git a/examples_test.go b/examples_test.go index 32d1fc68..8005aee6 100644 --- a/examples_test.go +++ b/examples_test.go @@ -188,7 +188,7 @@ func ExampleClient_Write_basic() { } // The actual write..., this method can be called concurrently. - if err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) // as above use your own error handling here. } influx.Close() // closes the client. After this the client is useless. @@ -258,7 +258,7 @@ func ExampleClient_Write_tlsMutualAuthentication() { } // The actual write... - if err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) } influx.Close() // close the client after this the client is useless. @@ -292,7 +292,7 @@ func ExampleClient_Setup() { } // We can now do a write even though we didn't put a token in - if err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) } influx.Close() // close the client after this the client is useless. diff --git a/go.mod b/go.mod index 216ee9ee..fec6cf50 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,6 @@ require ( github.com/google/go-cmp v0.2.0 // test dependency github.com/influxdata/flux v0.0.0-20190620184636-886e3c28388d // test dependency github.com/influxdata/line-protocol v0.0.0-20190509173118-5712a8124a9a + github.com/stretchr/testify v1.3.0 + honnef.co/go/tools v0.0.1-2019.2.2 // indirect ) diff --git a/go.sum b/go.sum index 92686289..38b482a7 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,7 @@ github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4r github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/goreleaser/goreleaser v0.94.0 h1:2CFMxMTLODjYfNOx2sADNzpgCwH9ltMqvQYtj+ntK1Q= github.com/goreleaser/goreleaser v0.94.0/go.mod h1:OjbYR2NhOI6AEUWCowMSBzo9nP1aRif3sYtx+rhp+Zo= github.com/goreleaser/nfpm v0.9.7 h1:h8RQMDztu6cW7b0/s4PGbdeMYykAbJG0UMXaWG5uBMI= @@ -71,6 +72,7 @@ github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJS github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/kevinburke/ssh_config v0.0.0-20180830205328-81db2a75821e/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -100,6 +102,7 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/kafka-go v0.1.0 h1:IXCHG+sXPNiIR5pC/vTEItZduPKu4cnpr85YgxpxlW0= @@ -111,10 +114,13 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/xanzy/ssh-agent v0.2.0/go.mod h1:0NyE30eGUDliuLEHJgYte/zncp2zdTStcOnWhgSqHD8= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -124,26 +130,39 @@ go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181112044915-a3060d491354 h1:6UAgZ8309zQ9+1iWkHzfszFguqzOdHGyGkd1HmhJ+UE= golang.org/x/exp v0.0.0-20181112044915-a3060d491354/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4 h1:99CA0JJbUX4ozCnLon680Jc9e0T1i8HCaLVJMwtI8Hc= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180903190138-2b024373dcd9/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181030150119-7e31e0c00fa0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2 h1:M7NLB69gFpUH4s6SJLwXiVs45aZfVjqGKynfNFKSGcI= golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac h1:MQEvx39qSf8vyrx3XRaOe+j1UDIzKwkYOVObRgGPVqI= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6 h1:4WsZyVtkthqrHTbDCJfiTs8IWNYE4uvsSDgaV6xpp+o= @@ -153,6 +172,7 @@ google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/src-d/go-billy.v4 v4.2.1/go.mod h1:tm33zBoOwxjYHZIE+OV8bxTWFMJLrconzFMd38aARFk= gopkg.in/src-d/go-git-fixtures.v3 v3.1.1/go.mod h1:dLBcvytrw/TYZsNTWCnkNF2DSIlzWYqTe3rJR56Ac7g= gopkg.in/src-d/go-git.v4 v4.8.1/go.mod h1:Vtut8izDyrM8BUVQnzJ+YvmNcem2J89EmfZYCkLokZk= @@ -161,3 +181,5 @@ gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20181108184350-ae8f1f9103cc h1:VdiEcF0DrrUbDdrLBceS0h7LE60ebD5yRYLLXi0ezIs= honnef.co/go/tools v0.0.0-20181108184350-ae8f1f9103cc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.2 h1:TEgegKbBqByGUb1Coo1pc2qIdf2xw6v0mYyLSYtyopE= +honnef.co/go/tools v0.0.1-2019.2.2/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/models.go b/models.go index 92536242..feb8f19a 100644 --- a/models.go +++ b/models.go @@ -7,6 +7,14 @@ import ( lp "github.com/influxdata/line-protocol" ) +// Organisation is the name of the organisation under which +// metrics will be published +type Organisation string + +// Bucket is the name of the bucket to which metrics will be +// published +type Bucket string + // Metric is just a github.com/influxdata/line-protocol.Metric. // We alias here to keep abstractions from leaking. type Metric = lp.Metric diff --git a/write.go b/write.go index 5c1839d7..2bfb9bad 100644 --- a/write.go +++ b/write.go @@ -17,49 +17,41 @@ import ( // Write writes metrics to a bucket, and org. It retries intelligently. // If the write is too big, it retries again, after breaking the payloads into two requests. -func (c *Client) Write(ctx context.Context, bucket, org string, m ...Metric) (err error) { - tries := uint64(0) - return c.write(ctx, bucket, org, &tries, m...) -} +func (c *Client) Write(ctx context.Context, org Organisation, bucket Bucket, m ...Metric) (n int, err error) { + var ( + buf = &bytes.Buffer{} + e = lp.NewEncoder(buf) + ) -func parseWriteError(r io.Reader) (*genericRespError, error) { - werr := &genericRespError{} - if err := json.NewDecoder(r).Decode(&werr); err != nil { - return nil, err - } - return werr, nil -} + e.FailOnFieldErr(c.errOnFieldErr) -func (c *Client) write(ctx context.Context, bucket, org string, triesPtr *uint64, m ...Metric) error { - buf := &bytes.Buffer{} - e := lp.NewEncoder(buf) - cleanup := func() {} - defer func() { cleanup() }() -doRequest: select { case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() default: } + for i := range m { if _, err := e.Encode(m[i]); err != nil { - return err + return 0, err } } - req, err := c.makeWriteRequest(bucket, org, buf) + + req, err := c.makeWriteRequest(string(bucket), string(org), buf) if err != nil { - return err + return 0, err } - resp, err := c.httpClient.Do(req) + + resp, err := c.httpClient.Do(req.WithContext(ctx)) if err != nil { - return err + return 0, err } - cleanup = func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - // throw away the rest of the body so the connection can be reused even if there is still stuff on the wire. - ioutil.ReadAll(r) // we don't care about the error here, it is just to empty the tcp buffer + + defer func() { + // discard body so connection can be reused + _, _ = io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() - } + }() switch resp.StatusCode { case http.StatusOK, http.StatusNoContent: @@ -68,42 +60,21 @@ doRequest: Code: resp.Status, Message: "too many requests too fast", } - cleanup() - if err2 := c.backoff(triesPtr, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest case http.StatusServiceUnavailable: err = &genericRespError{ Code: resp.Status, Message: "service temporarily unavaliable", } - cleanup() - if err2 := c.backoff(triesPtr, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest default: gwerr, err := parseWriteError(resp.Body) if err != nil { - return err + return 0, err } - return gwerr + return 0, gwerr } - // we don't defer and close till here, because of the retries. - defer func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - _, err := ioutil.ReadAll(r) // throw away the rest of the body so the connection gets reused. - err2 := resp.Body.Close() - if err == nil && err2 != nil { - err = err2 - } - }() - e.FailOnFieldErr(c.errOnFieldErr) - return err + + return len(m), err } func makeWriteURL(loc *url.URL, bucket, org string) (string, error) { @@ -128,3 +99,11 @@ func makeWriteURL(loc *url.URL, bucket, org string) (string, error) { u.RawQuery = params.Encode() return u.String(), nil } + +func parseWriteError(r io.Reader) (*genericRespError, error) { + werr := &genericRespError{} + if err := json.NewDecoder(r).Decode(&werr); err != nil { + return nil, err + } + return werr, nil +} diff --git a/writer.go b/writer.go deleted file mode 100644 index 62fd7ff6..00000000 --- a/writer.go +++ /dev/null @@ -1,272 +0,0 @@ -package influxdb - -import ( - "bytes" - "context" - "io" - "io/ioutil" - "math/rand" - "net/http" - "strconv" - "sync" - "sync/atomic" - "time" - - lp "github.com/influxdata/line-protocol" -) - -const maxPooledBuffer = 4 << 20 //8 megs - -// LPWriter is a type for writing line protocol in a buffered way. -// It allows you to set a flush interval and flush regularly or to call the Flush method to flush its internal buffer. -type LPWriter struct { - stopTicker func() - flushChan <-chan time.Time - flushInterval time.Duration - flushSize int - c *Client - buf switchableBuffer - lock sync.Mutex - enc *lp.Encoder - bucket, org string - tries uint64 - maxRetries int - errOnFieldErr bool - stop chan struct{} - once sync.Once - wg sync.WaitGroup - onError func(error) -} - -type switchableBuffer struct { - *bytes.Buffer -} - -// WriteMetrics writes Metrics to the LPWriter. -func (w *LPWriter) WriteMetrics(m ...Metric) (int, error) { - select { - case <-w.stop: - return 0, nil - default: - } - w.lock.Lock() - for i := range m { - j, err := w.enc.Encode(m[i]) - if err != nil { - return j, err - } - } - w.asyncFlush() - w.lock.Unlock() - return 0, nil -} - -// NewBufferingWriter creates a new BufferingWriter. -func (c *Client) NewBufferingWriter(bucket string, org string, flushInterval time.Duration, flushSize int, onError func(error)) *LPWriter { - // if onError is nil set to a noop - if onError == nil { - onError = func(_ error) {} - } - w := &LPWriter{c: c, buf: switchableBuffer{&bytes.Buffer{}}, flushSize: flushSize, flushInterval: flushInterval, stop: make(chan struct{}), onError: onError, bucket: bucket, org: org} - w.enc = lp.NewEncoder(&w.buf) - w.enc.FailOnFieldErr(w.errOnFieldErr) - return w -} - -// Write writes name, time stamp, tag keys, tag values, field keys, and field values to an LPWriter. -func (w *LPWriter) Write(name []byte, ts time.Time, tagKeys, tagVals, fieldKeys [][]byte, fieldVals []interface{}) (int, error) { - select { - case <-w.stop: - return 0, nil - default: - } - w.lock.Lock() - i, err := w.enc.Write(name, ts, tagKeys, tagVals, fieldKeys, fieldVals) - // asyncronously flush if the size of the buffer is too big. - if err != nil { - return i, err - } - w.asyncFlush() - w.lock.Unlock() - return i, err -} -func (w *LPWriter) asyncFlush() { - if w.flushSize > 0 && w.buf.Len() > w.flushSize { - w.wg.Add(1) - buf := w.buf.Buffer - w.buf.Buffer = bufferPool.Get().(*bytes.Buffer) - go func() { - w.flush(context.TODO(), buf) - if buf.Len() <= maxPooledBuffer { - buf.Reset() - bufferPool.Put(buf) - } - w.wg.Done() - }() - } -} - -// Start starts an LPWriter, so that the writer can flush it out to influxdb. -func (w *LPWriter) Start() { - w.lock.Lock() - w.once = sync.Once{} - if w.flushInterval != 0 { - t := time.NewTicker(w.flushInterval) - w.stopTicker = t.Stop - w.flushChan = t.C - w.wg.Add(1) - go func() { - for { - select { - case <-w.flushChan: - err := w.Flush(context.Background()) - if err != nil { - w.onError(err) - } - case <-w.stop: - w.wg.Done() - return - } - } - }() - } else { - w.stopTicker = func() {} - } - w.lock.Unlock() -} - -var bufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} - -// Flush writes out the internal buffer to the database. -func (w *LPWriter) Flush(ctx context.Context) error { - w.wg.Add(1) - defer w.wg.Done() - w.lock.Lock() - if w.buf.Len() == 0 { - w.lock.Unlock() - return nil - } - buf := w.buf.Buffer - w.buf.Buffer = bufferPool.Get().(*bytes.Buffer) - w.lock.Unlock() - err := w.flush(ctx, buf) - if err != nil { - return err - } - if buf.Len() <= maxPooledBuffer { - buf.Reset() - bufferPool.Put(buf) - } - return err -} - -func (w *LPWriter) flush(ctx context.Context, buf *bytes.Buffer) error { - - cleanup := func() {} - defer func() { cleanup() }() - // early exit so we don't send empty buffers -doRequest: - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - req, err := w.c.makeWriteRequest(w.bucket, w.org, buf) - if err != nil { - return err - } - resp, err := w.c.httpClient.Do(req) - if err != nil { - return err - } - cleanup = func() { - r := io.LimitReader(resp.Body, 1<<24) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - // throw away the rest of the body so the connection can be reused even if there is still stuff on the wire. - _, _ = ioutil.ReadAll(r) // we don't care about the error here, it is just to empty the tcp buffer - resp.Body.Close() - } - - switch resp.StatusCode { - case http.StatusOK, http.StatusNoContent: - case http.StatusTooManyRequests: - err = &genericRespError{ - Code: resp.Status, - Message: "too many requests too fast", - } - cleanup() - if err2 := w.backoff(&w.tries, resp, err); err2 != nil { - return err2 - } - cleanup = func() {} - goto doRequest - case http.StatusServiceUnavailable: - err = &genericRespError{ - Code: resp.Status, - Message: "service temporarily unavaliable", - } - cleanup() - if err2 := w.backoff(&w.tries, resp, err); err2 != nil { - return err2 - } - cleanup = func() { - w.lock.Unlock() - } - goto doRequest - default: - gwerr, err := parseWriteError(resp.Body) - if err != nil { - return err - } - - return gwerr - } - // we don't defer and close till here, because of the retries. - defer func() { - r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it. - _, err := ioutil.ReadAll(r) // throw away the rest of the body so the connection gets reused. - err2 := resp.Body.Close() - if err == nil && err2 != nil { - err = err2 - } - }() - return err -} - -// backoff is a helper method for backoff, triesPtr must not be nil. -func (w *LPWriter) backoff(triesPtr *uint64, resp *http.Response, err error) error { - tries := atomic.LoadUint64(triesPtr) - if w.maxRetries >= 0 || int(tries) >= w.maxRetries { - return maxRetriesExceededError{ - err: err, - tries: w.maxRetries, - } - } - retry := 0 - if resp != nil { - retryAfter := resp.Header.Get("Retry-After") - retry, _ = strconv.Atoi(retryAfter) // we ignore the error here because an error already means retry is 0. - } - sleepFor := time.Duration(retry) * time.Second - if retry == 0 { // if we didn't get a Retry-After or it is zero, instead switch to exponential backoff - sleepFor = time.Duration(rand.Int63n(((1 << tries) - 1) * 10 * int64(time.Microsecond))) - } - if sleepFor > defaultMaxWait { - sleepFor = defaultMaxWait - } - time.Sleep(sleepFor) - atomic.AddUint64(triesPtr, 1) - return nil -} - -// Stop gracefully stops a started LPWriter. -func (w *LPWriter) Stop() { - w.lock.Lock() - w.once.Do(func() { - close(w.stop) - w.wg.Wait() - w.stopTicker() - w.stop = make(chan struct{}) - }) - w.lock.Unlock() - w.wg.Wait() -} diff --git a/writer/buffered.go b/writer/buffered.go new file mode 100644 index 00000000..8da96608 --- /dev/null +++ b/writer/buffered.go @@ -0,0 +1,112 @@ +package writer + +import ( + "io" + + "github.com/influxdata/influxdb-client-go" +) + +const defaultBufferSize = 100 + +// MetricsWriter is a type which metrics can be written to +type MetricsWriter interface { + Write(...influxdb.Metric) (int, error) +} + +// BufferedWriter is a buffered implementation of the MetricsWriter interface +// It is unashamedly derived from the bufio pkg https://golang.org/pkg/bufio +// Metrics are buffered up until the buffer size is met and then flushed to +// an underlying MetricsWriter +// The writer can also be flushed manually by calling Flush +// BufferedWriter is not safe to be called concurrently and therefore concurrency +// should be managed by the caller +type BufferedWriter struct { + wr MetricsWriter + buf []influxdb.Metric + n int + err error +} + +// NewBufferedWriter returns a new *BufferedWriter with the default +// buffer size +func NewBufferedWriter(w MetricsWriter) *BufferedWriter { + return NewBufferedWriterSize(w, defaultBufferSize) +} + +// NewBufferedWriterSize returns a new *BufferedWriter with a buffer +// allocated with the provided size +func NewBufferedWriterSize(w MetricsWriter, size int) *BufferedWriter { + if size <= 0 { + size = defaultBufferSize + } + + return &BufferedWriter{ + wr: w, + buf: make([]influxdb.Metric, size), + } +} + +// Available returns how many bytes are unused in the buffer. +func (b *BufferedWriter) Available() int { return len(b.buf) - b.n } + +// Buffered returns the number of bytes that have been written into the current buffer. +func (b *BufferedWriter) Buffered() int { return b.n } + +// Write writes the provided metrics to the underlying buffer if there is available +// capacity. Otherwise it flushes the buffer and attempts to assign the remain metrics to +// the buffer. This process repeats until all the metrics are either flushed or in the buffer +func (b *BufferedWriter) Write(m ...influxdb.Metric) (nn int, err error) { + for len(m) > b.Available() && b.err == nil { + var n int + if b.Buffered() == 0 { + // Large write, empty buffer. + // Write directly from m to avoid copy. + n, b.err = b.wr.Write(m...) + } else { + n = copy(b.buf[b.n:], m) + b.n += n + b.Flush() + } + + nn += n + m = m[n:] + } + + if b.err != nil { + return nn, b.err + } + + n := copy(b.buf[b.n:], m) + b.n += n + nn += n + return nn, nil +} + +// Flush writes any buffered data to the underlying MetricsWriter +func (b *BufferedWriter) Flush() error { + if b.err != nil { + return b.err + } + + if b.n == 0 { + return nil + } + + n, err := b.wr.Write(b.buf[0:b.n]...) + if n < b.n && err == nil { + err = io.ErrShortWrite + } + + if err != nil { + if n > 0 && n < b.n { + copy(b.buf[0:b.n-n], b.buf[n:b.n]) + } + b.n -= n + b.err = err + return err + } + + b.n = 0 + + return nil +} diff --git a/writer/buffered_test.go b/writer/buffered_test.go new file mode 100644 index 00000000..45ed463c --- /dev/null +++ b/writer/buffered_test.go @@ -0,0 +1,72 @@ +package writer + +import ( + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_BufferedWriter(t *testing.T) { + var ( + // writer which asserts calls are made for org and bucket + underlyingWriter = newTestWriter() + writer = NewBufferedWriter(underlyingWriter) + // 100 rows of batch size 100 expected + expected = createNTestRowMetrics(t, 100, 100) + ) + + // write 10000 metrics in various batch sizes + for _, batchSize := range permuteCounts(t, 10000) { + n, err := writer.Write(createTestRowMetrics(t, batchSize)...) + require.NoError(t, err) + require.Equal(t, batchSize, n) + } + + // flush any remaining buffer to underlying writer + require.Zero(t, writer.Available()) + require.Equal(t, 100, writer.Buffered()) + require.NoError(t, writer.Flush()) + + // check batches written to underlying writer are 100 batches of 100 metrics + require.Equal(t, expected, underlyingWriter.writes) + require.Zero(t, writer.Buffered()) + require.Equal(t, 100, writer.Available()) +} + +func Test_BufferedWriter_LargeWriteEmptyBuffer(t *testing.T) { + var ( + // writer which asserts calls are made for org and bucket + underlyingWriter = newTestWriter() + writer = NewBufferedWriterSize(underlyingWriter, 100) + expected = createNTestRowMetrics(t, 1, 500) + ) + + n, err := writer.Write(createTestRowMetrics(t, 500)...) + require.Nil(t, err) + require.Equal(t, 500, n) + + // expect one large batch of 500 metrics to skip buffer and + // go straight to underyling writer + require.Equal(t, expected, underlyingWriter.writes) +} + +func TestBufferedWriter_ShortWrite(t *testing.T) { + var ( + // writer which reports 9 bytes written + underlyingWriter = &metricsWriter{n: 9} + writer = NewBufferedWriterSize(underlyingWriter, 10) + metrics = createTestRowMetrics(t, 11) + ) + + // warm the buffer otherwise we skip it and go + // straight to client + n, err := writer.Write(metrics[0]) + require.Nil(t, err) + require.Equal(t, 1, n) + + // attempt write where underyling write only writes 6 metrics + n, err = writer.Write(metrics[1:]...) + require.Equal(t, io.ErrShortWrite, err) + require.Equal(t, 9, n) +} diff --git a/writer/doc.go b/writer/doc.go new file mode 100644 index 00000000..b7e224f5 --- /dev/null +++ b/writer/doc.go @@ -0,0 +1,60 @@ +// Package writer contains useful types for buffering, batching and periodically syncing +// writes onto a provided metric writing client. +// +// The following example demonstrate the usage of a *writer.PointWriter. This is designed to +// buffer calls to Write metrics and flush them in configurable batch sizes (see WithBufferSize). +// It is also designed to periodically flush the buffer if a configurable duration ellapses between +// calls to Write. This is useful to ensure metrics are flushed to the client during a pause in their +// production. +// +// Example Usage +// +// import ( +// "github.com/influxdata/influxdb-client-go" +// "github.com/influxdata/influxdb-client-go/writer" +// ) +// +// func main() { +// cli, _ := influxdb.New("http://localhost:9999", "some-token") +// +// wr := writer.New(cli, influxdb.Organisation("influx"), influxdb.Bucket("default"), writer.WithBufferSize(10)) +// +// wr.Write(influxdb.NewRowMetric( +// map[string]interface{}{ +// "value": 16, +// }, +// "temp_celsius", +// map[string]string{ +// "room": "living_room", +// }, +// time.Now(), +// ), +// influxdb.NewRowMetric( +// map[string]interface{}{ +// "value": 17, +// }, +// "temp_celsius", +// map[string]string{ +// "room": "living_room", +// }, +// time.Now(), +// )) +// +// wr.Close() +// } +// +// writer.New(...) return a PointerWriter which is composed of multiple other types available in this +// package. +// +// It first wraps the provided client in a *BucketWriter which takes care of ensuring all written metrics +// are called on the underyling client with a specific organisation and bucket. This is not safe for +// concurrent use. +// +// It then wraps this writer in a *BufferedWriter and configures its buffer size accordingly. This type +// implements the buffering of metrics and exposes a flush method. Once the buffer size is exceed flush +// is called automatically. However, Flush() can be called manually on this type. This is also not safe +// for concurrent use. +// +// Finally, it wraps the buffered writer in a *PointsWriter which takes care of ensuring Flush is called +// automatically when it hasn't been called for a configured duration. This final type is safe for concurrent use. +package writer diff --git a/writer/options.go b/writer/options.go new file mode 100644 index 00000000..f9b317ba --- /dev/null +++ b/writer/options.go @@ -0,0 +1,51 @@ +package writer + +import "time" + +// Config is a structure used to configure a point writer +type Config struct { + size int + flushInterval time.Duration +} + +// Option is a functional option for Configuring point writers +type Option func(*Config) + +// Options is a slice of Option +type Options []Option + +// Config constructs a default configuration and then +// applies the callee options and returns the config +func (o Options) Config() Config { + config := Config{ + size: defaultBufferSize, + flushInterval: defaultFlushInterval, + } + + o.Apply(&config) + + return config +} + +// Apply calls each option in the slice on options on the provided Config +func (o Options) Apply(c *Config) { + for _, opt := range o { + opt(c) + } +} + +// WithBufferSize sets the size of the underlying buffer on the point writer +func WithBufferSize(size int) Option { + return func(c *Config) { + c.size = size + } +} + +// WithFlushInterval sets the flush interval on the writer +// The point writer will wait at least this long between flushes +// of the undeyling buffered writer +func WithFlushInterval(interval time.Duration) Option { + return func(c *Config) { + c.flushInterval = interval + } +} diff --git a/writer/point.go b/writer/point.go new file mode 100644 index 00000000..40682e13 --- /dev/null +++ b/writer/point.go @@ -0,0 +1,140 @@ +package writer + +import ( + "io" + "sync" + "time" + + "github.com/influxdata/influxdb-client-go" +) + +const defaultFlushInterval = 1 * time.Second + +// MetricsWriteFlush is a type of metrics writer which is +// buffered and metrics can be flushed to +type MetricsWriteFlusher interface { + Write(m ...influxdb.Metric) (int, error) + Available() int + Flush() error +} + +// PointWriter delegates calls to Write to an underlying flushing writer +// implementation. It also periodically calls flush on the underlying writer and is safe +// to be called concurrently. As the flushing writer can also flush on calls to Write +// when the number of metrics being written exceeds the buffer capacity, it also ensures +// to reset its timer in this scenario as to avoid calling flush multiple times +type PointWriter struct { + w MetricsWriteFlusher + flushInterval time.Duration + resetTick chan struct{} + stopped chan struct{} + err error + mu sync.Mutex +} + +// NewPointWriter configures and returns a *PointWriter writer type +// The new writer will automatically begin scheduling periodic flushes based on the +// provided duration +func NewPointWriter(w MetricsWriteFlusher, flushInterval time.Duration) *PointWriter { + writer := &PointWriter{ + w: w, + flushInterval: flushInterval, + // buffer of one in order to not block writes + resetTick: make(chan struct{}, 1), + // stopped is closed once schedule has exited + stopped: make(chan struct{}), + } + + go writer.schedule() + + return writer +} + +func (p *PointWriter) schedule() { + defer close(p.stopped) + + ticker := time.NewTicker(p.flushInterval) + + for { + select { + case <-ticker.C: + if err := func() error { + p.mu.Lock() + defer p.mu.Unlock() + + // return if error is now not nil + if p.err != nil { + return p.err + } + + // between the recv on the ticker and the lock obtain + // the reset tick could've been triggered so we check + // and skip the flush if it did + select { + case <-p.resetTick: + return nil + default: + } + + p.err = p.w.Flush() + + return p.err + }(); err != nil { + return + } + case _, ok := <-p.resetTick: + if !ok { + return + } + + ticker.Stop() + ticker = time.NewTicker(p.flushInterval) + } + } +} + +// Write delegates to an underlying metrics writer +// If the delegating call is going to cause a flush, it signals +// to the schduled periodic flush to reset its timer +func (p *PointWriter) Write(m ...influxdb.Metric) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.err != nil { + return 0, p.err + } + + // check if the underlying flush will flush + if len(m) > p.w.Available() { + // tell the ticker to reset flush interval + select { + case p.resetTick <- struct{}{}: + default: + } + } + + var n int + n, p.err = p.w.Write(m...) + return n, p.err +} + +// Close signals to stop flushing metrics and causes subsequent +// calls to Write to return a closed pipe error +// Close returns once scheduledge flushing has stopped +func (p *PointWriter) Close() error { + p.mu.Lock() + + // signal close + close(p.resetTick) + + // return err io closed pipe for subsequent writes + p.err = io.ErrClosedPipe + + // release lock so scheduled may acknowledge and exit + p.mu.Unlock() + + // wait until schedule exits + <-p.stopped + + return nil +} diff --git a/writer/point_test.go b/writer/point_test.go new file mode 100644 index 00000000..a1c2bd2d --- /dev/null +++ b/writer/point_test.go @@ -0,0 +1,76 @@ +package writer + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var deltaMsgFmt = "delta between flushes exceeds 105ms: %q" + +func Test_PointWriter_Write_Batches(t *testing.T) { + var ( + underlyingWriter = newTestWriter() + writer = NewPointWriter(NewBufferedWriter(underlyingWriter), 50*time.Millisecond) + // 100 rows of batch size 100 expected + expected = createNTestRowMetrics(t, 100, 100) + ) + + // write 10000 metrics in various batch sizes + for _, batchSize := range permuteCounts(t, 10000) { + n, err := writer.Write(createTestRowMetrics(t, batchSize)...) + require.NoError(t, err) + require.Equal(t, batchSize, n) + + time.Sleep(10 * time.Millisecond) + } + + time.Sleep(100 * time.Millisecond) + + // close writer ensuring no more flushes occur + require.NoError(t, writer.Close()) + + // check batches written to underlying writer are 100 batches of 100 metrics + require.Equal(t, expected, underlyingWriter.writes) +} + +func Test_PointWriter_Write(t *testing.T) { + var ( + underlyingWriter = newTestWriter() + buffered = NewBufferedWriterSize(underlyingWriter, 10) + writer = NewPointWriter(buffered, 100*time.Millisecond) + ) + + // write between 1 and 4 metrics every 30ms ideally meaning + // some writes will flush for because of buffer size + // and some for periodic flush + for i := 0; i < 10; i++ { + var ( + count = rand.Intn(4) + 1 + n, err = writer.Write(createTestRowMetrics(t, count)...) + ) + + require.Nil(t, err) + require.Equal(t, count, n) + + time.Sleep(30 * time.Millisecond) + } + + // close writer to ensure scheduling has stopped + writer.Close() + + // ensure time between each write does not exceed roughly 100ms + // as per the flush interval of the point writer + for i := 1; i < len(underlyingWriter.when); i++ { + var ( + first, next = underlyingWriter.when[i-1], underlyingWriter.when[i] + delta = next.Sub(first) + ) + + // ensure writes are roughly 100 milliseconds apart + assert.Truef(t, delta <= 105*time.Millisecond, deltaMsgFmt, delta) + } +} diff --git a/writer/support_test.go b/writer/support_test.go new file mode 100644 index 00000000..b895b7c1 --- /dev/null +++ b/writer/support_test.go @@ -0,0 +1,114 @@ +package writer + +import ( + "context" + "math" + "math/rand" + "testing" + "time" + + "github.com/influxdata/influxdb-client-go" +) + +type bucketWriter struct { + calls []bucketWriteCall +} + +type bucketWriteCall struct { + org influxdb.Organisation + bkt influxdb.Bucket + data []influxdb.Metric +} + +func (b *bucketWriter) Write(_ context.Context, org influxdb.Organisation, bkt influxdb.Bucket, m ...influxdb.Metric) (int, error) { + b.calls = append(b.calls, bucketWriteCall{org, bkt, m}) + return len(m), nil +} + +type metricsWriter struct { + // length override + n int + // metrics written + when []time.Time + writes [][]influxdb.Metric +} + +func newTestWriter() *metricsWriter { + return &metricsWriter{n: -1} +} + +func (w *metricsWriter) Write(m ...influxdb.Metric) (int, error) { + w.when = append(w.when, time.Now()) + w.writes = append(w.writes, m) + + if w.n > -1 { + return w.n, nil + } + + return len(m), nil +} + +// permuteCounts returns a set of pseudo-random batch size counts +// which sum to the provided total +// E.g. for a sum total of 100 (permuteCounts(t, 100)) +// this function may produce the following [5 12 8 10 14 11 9 9 8 14] +// The sum of these values is == 100 and there are √100 buckets +func permuteCounts(t *testing.T, total int) (buckets []int) { + t.Helper() + + var accum int + + buckets = make([]int, int(math.Sqrt(float64(total)))) + for i := 0; i < len(buckets); i++ { + size := total / len(buckets) + if accum+size > total { + size = total - accum + } + + buckets[i] = size + + accum += size + + // shuffle some counts from previous bucket forward to current bucket + if i > 0 { + var ( + min = math.Min(float64(buckets[i]), float64(buckets[i-1])) + delta = rand.Intn(int(min / 2)) + ) + + buckets[i-1], buckets[i] = buckets[i-1]-delta, buckets[i]+delta + } + } + + return +} + +func createNTestRowMetrics(t *testing.T, rows, count int) (metrics [][]influxdb.Metric) { + metrics = make([][]influxdb.Metric, 0, rows) + + for i := 0; i < rows; i++ { + metrics = append(metrics, createTestRowMetrics(t, count)) + } + + return +} + +func createTestRowMetrics(t *testing.T, count int) (metrics []influxdb.Metric) { + t.Helper() + + metrics = make([]influxdb.Metric, 0, count) + for i := 0; i < count; i++ { + metrics = append(metrics, influxdb.NewRowMetric( + map[string]interface{}{ + "some_field": "some_value", + }, + "some_measurement", + map[string]string{ + "some_tag": "some_value", + }, + time.Date(2019, time.January, 1, 0, 0, 0, 0, time.UTC), + )) + } + + return +} diff --git a/writer/writer.go b/writer/writer.go new file mode 100644 index 00000000..4b6e5f9f --- /dev/null +++ b/writer/writer.go @@ -0,0 +1,49 @@ +package writer + +import ( + "context" + + "github.com/influxdata/influxdb-client-go" +) + +// BucketMetricWriter is a type which Metrics can be written to a particular bucket +// in a particular organisation +type BucketMetricWriter interface { + Write(context.Context, influxdb.Organisation, influxdb.Bucket, ...influxdb.Metric) (int, error) +} + +// New constructs a point writer with an underlying buffer from the provided BucketMetricWriter +// The writer will flushed metrics to the underlying BucketMetricWriter when the buffer is full +// or the configured flush interval ellapses without a flush occuring +func New(writer BucketMetricWriter, org influxdb.Organisation, bkt influxdb.Bucket, opts ...Option) *PointWriter { + var ( + config = Options(opts).Config() + bucket = NewBucketWriter(writer, org, bkt) + buffered = NewBufferedWriterSize(bucket, config.size) + ) + + return NewPointWriter(buffered, config.flushInterval) +} + +// BucketWriter writes metrics to a particular bucket +// within a particular organisation +type BucketWriter struct { + w BucketMetricWriter + + ctxt context.Context + + org influxdb.Organisation + bucket influxdb.Bucket +} + +// NewBucketWriter allocates, configures and returned a new BucketWriter for writing +// metrics to a specific organisations bucket +func NewBucketWriter(w BucketMetricWriter, org influxdb.Organisation, bkt influxdb.Bucket) *BucketWriter { + return &BucketWriter{w, context.Background(), org, bkt} +} + +// Write writes the provided metrics to the underlying metrics writer +// using the org and bucket configured on the bucket writer +func (b *BucketWriter) Write(m ...influxdb.Metric) (int, error) { + return b.w.Write(b.ctxt, b.org, b.bucket, m...) +} diff --git a/writer/writer_test.go b/writer/writer_test.go new file mode 100644 index 00000000..d0476b76 --- /dev/null +++ b/writer/writer_test.go @@ -0,0 +1,49 @@ +package writer + +import ( + "testing" + "time" + + "github.com/influxdata/influxdb-client-go" + "github.com/stretchr/testify/require" +) + +func Test_New(t *testing.T) { + var ( + client = &influxdb.Client{} + org = influxdb.Organisation("influx") + bkt = influxdb.Bucket("default") + options = Options{WithBufferSize(12), WithFlushInterval(5 * time.Minute)} + wr = New(client, org, bkt, options...) + ) + + require.Equal(t, 5*time.Minute, wr.flushInterval) + require.Len(t, wr.w.(*BufferedWriter).buf, 12) + + require.Nil(t, wr.Close()) +} + +func Test_BucketWriter(t *testing.T) { + var ( + spy = &bucketWriter{} + org = influxdb.Organisation("influx") + bucket = influxdb.Bucket("default") + wr = NewBucketWriter(spy, org, bucket) + + expected = []bucketWriteCall{ + {org, bucket, createTestRowMetrics(t, 4)}, + {org, bucket, createTestRowMetrics(t, 8)}, + {org, bucket, createTestRowMetrics(t, 12)}, + {org, bucket, createTestRowMetrics(t, 16)}, + } + ) + + for _, count := range []int{4, 8, 12, 16} { + n, err := wr.Write(createTestRowMetrics(t, count)...) + require.Nil(t, err) + require.Equal(t, count, n) + } + + // ensure underlying "client" is called as expected + require.Equal(t, expected, spy.calls) +} diff --git a/writer_test.go b/writer_test.go deleted file mode 100644 index 8f2c6ff3..00000000 --- a/writer_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package influxdb_test - -import ( - "context" - "net/http" - "net/http/httptest" - "runtime" - "sync" - "sync/atomic" - "testing" - "time" - - influxdb "github.com/influxdata/influxdb-client-go" -) - -func TestWriterStartupAndShutdown(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) - cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client())) - if err != nil { - server.Close() - t.Fatal(err) - } - w := cl.NewBufferingWriter("my-bucket", "my-org", 10*time.Second, 1024*100, func(err error) { - t.Error(err) - }) - wg := sync.WaitGroup{} - w.Start() - for i := 0; i < 20; i++ { - wg.Add(1) - go func() { - runtime.Gosched() - w.Start() - wg.Done() - }() - } - for i := 0; i < 20; i++ { - wg.Add(1) - go func() { - runtime.Gosched() - w.Stop() - wg.Done() - }() - } - wg.Wait() -} - -func TestAutoFlush(t *testing.T) { - q := uint64(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - res := atomic.AddUint64(&q, 1) - if res > 3 { - t.Errorf("size based flush happened too often, expected 3 but got %d", res) - } - })) - cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client())) - if err != nil { - t.Error(e2e) - } - w := cl.NewBufferingWriter("my-bucket", "my-org", 0, 100*1024, func(err error) { - t.Error(err) - }) - w.Start() - ts := time.Time{} - for i := 0; i < 3000; i++ { - ts = ts.Add(1) - _, err = w.Write([]byte("TestWriterE2E"), - ts, - [][]byte{[]byte("test1"), []byte("test2")}, - [][]byte{[]byte("here"), []byte("alsohere")}, - [][]byte{[]byte("val1"), []byte("val2")}, - []interface{}{1, 99}) - if err != nil { - t.Error(err) - } - } - w.Flush(context.Background()) - tries := atomic.LoadUint64(&q) - w.Stop() - if tries < 3 { - t.Errorf("size based flush happened too infrequently expected 3 got %d", tries) - } -} - -func TestErrorFlush(t *testing.T) { - q := uint64(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddUint64(&q, 1) - w.WriteHeader(http.StatusInternalServerError) - })) - cl, err := influxdb.New(server.URL, "foo", influxdb.WithHTTPClient(server.Client())) - if err != nil { - t.Error(e2e) - } - { - w := cl.NewBufferingWriter("my-bucket", "my-org", 0, 100*1024, nil) // we are checking to make sure it won't panic if onError is nil - w.Start() - ts := time.Time{} - for i := 0; i < 3000; i++ { - ts = ts.Add(1) - _, err = w.Write([]byte("TestWriterE2E"), - ts, - [][]byte{[]byte("test1"), []byte("test2")}, - [][]byte{[]byte("here"), []byte("alsohere")}, - [][]byte{[]byte("val1"), []byte("val2")}, - []interface{}{1, 99}) - if err != nil { - t.Error(err) - } - } - w.Flush(context.Background()) - tries := atomic.LoadUint64(&q) - w.Stop() - if tries < 3 { - t.Errorf("size based flush happened too infrequently expected 3 got %d", tries) - } - } - { - w := cl.NewBufferingWriter("my-bucket", "my-org", 0, 100*1024, func(e error) { - if err == nil { - t.Error("expected non-nil error but got nil") - } - }) // we are checking to make sure it won't panic if onError is nil - w.Start() - ts := time.Time{} - for i := 0; i < 3000; i++ { - ts = ts.Add(1) - _, err = w.Write([]byte("TestWriterE2E"), - ts, - [][]byte{[]byte("test1"), []byte("test2")}, - [][]byte{[]byte("here"), []byte("alsohere")}, - [][]byte{[]byte("val1"), []byte("val2")}, - []interface{}{1, 99}) - if err != nil { - t.Error(err) - } - } - w.Flush(context.Background()) - tries := atomic.LoadUint64(&q) - w.Stop() - if tries < 3 { - t.Errorf("size based flush happened too infrequently expected 3 got %d", tries) - } - - } -} From bf294d42a556b177c771345c3a141939f1394379 Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Fri, 2 Aug 2019 12:48:08 +0200 Subject: [PATCH 2/4] Reorder Bucket and Organisation parametes everywhere --- e2e_test.go | 2 +- examples_test.go | 10 +++++----- query.go | 6 +++--- setup.go | 12 ++++++------ 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/e2e_test.go b/e2e_test.go index 41c9bdb9..a4d7895b 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -29,7 +29,7 @@ func TestE2E(t *testing.T) { t.Fatal(err) } // set up the bucket and org and get the token - sRes, err := influx.Setup(context.Background(), "e2e-test-bucket", "e2e-test-org", 0) + sRes, err := influx.Setup(context.Background(), influxdb.Organisation("e2e-test-org"), influxdb.Bucket("e2e-test-bucket"), 0) if err != nil { t.Fatal(err) } diff --git a/examples_test.go b/examples_test.go index 8005aee6..cea7c5cd 100644 --- a/examples_test.go +++ b/examples_test.go @@ -49,7 +49,7 @@ func setupHandler(w http.ResponseWriter, r *http.Request) { if err := json.NewDecoder(r.Body).Decode(&req); err != nil { log.Fatal(err) } - if err := resTemplate.Execute(w, map[string]string{"username": req.Username, "org": req.Org, "retentionSeconds": strconv.Itoa(req.RetentionPeriodHrs * 60 * 60)}); err != nil { + if err := resTemplate.Execute(w, map[string]string{"username": req.Username, "org": string(req.Org), "retentionSeconds": strconv.Itoa(req.RetentionPeriodHrs * 60 * 60)}); err != nil { log.Fatal(err) } } @@ -188,7 +188,7 @@ func ExampleClient_Write_basic() { } // The actual write..., this method can be called concurrently. - if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), influxdb.Organisation("my-very-awesome-org"), influxdb.Bucket("my-awesome-bucket"), myMetrics...); err != nil { log.Fatal(err) // as above use your own error handling here. } influx.Close() // closes the client. After this the client is useless. @@ -258,7 +258,7 @@ func ExampleClient_Write_tlsMutualAuthentication() { } // The actual write... - if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), influxdb.Organisation("my-very-awesome-org"), influxdb.Bucket("my-awesome-bucket"), myMetrics...); err != nil { log.Fatal(err) } influx.Close() // close the client after this the client is useless. @@ -277,7 +277,7 @@ func ExampleClient_Setup() { if err != nil { panic(err) // error handling here, normally we wouldn't use fmt, but it works for the example } - resp, err := influx.Setup(context.Background(), "my-bucket", "my-org", 32) + resp, err := influx.Setup(context.Background(), influxdb.Organisation("my-org"), influxdb.Bucket("my-bucket"), 32) if err != nil { log.Fatal(err) } @@ -292,7 +292,7 @@ func ExampleClient_Setup() { } // We can now do a write even though we didn't put a token in - if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), influxdb.Organisation("my-very-awesome-org"), influxdb.Bucket("my-awesome-bucket"), myMetrics...); err != nil { log.Fatal(err) } influx.Close() // close the client after this the client is useless. diff --git a/query.go b/query.go index a04d5d8f..a3c876bf 100644 --- a/query.go +++ b/query.go @@ -48,7 +48,7 @@ type dialect struct { // QueryCSV returns the result of a flux query. // TODO: annotations -func (c *Client) QueryCSV(ctx context.Context, flux string, org string, extern ...interface{}) (*QueryCSVResult, error) { +func (c *Client) QueryCSV(ctx context.Context, flux string, org Organisation, extern ...interface{}) (*QueryCSVResult, error) { qURL, err := c.makeQueryURL(org) if err != nil { return nil, err @@ -108,7 +108,7 @@ func (c *Client) QueryCSV(ctx context.Context, flux string, org string, extern . return &QueryCSVResult{ReadCloser: resp.Body, csvReader: csv.NewReader(resp.Body)}, nil } -func (c *Client) makeQueryURL(org string) (string, error) { +func (c *Client) makeQueryURL(org Organisation) (string, error) { qu, err := url.Parse(c.url.String()) if err != nil { return "", err @@ -116,7 +116,7 @@ func (c *Client) makeQueryURL(org string) (string, error) { qu.Path = path.Join(qu.Path, "query") params := qu.Query() - params.Set("org", org) + params.Set("org", string(org)) qu.RawQuery = params.Encode() return qu.String(), nil } diff --git a/setup.go b/setup.go index c15928d8..ab5c3dee 100644 --- a/setup.go +++ b/setup.go @@ -12,7 +12,7 @@ import ( // It requires a client be set up with a username and password. // If successful will add a token to the client. // RetentionPeriodHrs of zero will result in infinite retention. -func (c *Client) Setup(ctx context.Context, bucket, org string, retentionPeriodHrs int) (*SetupResult, error) { +func (c *Client) Setup(ctx context.Context, org Organisation, bucket Bucket, retentionPeriodHrs int) (*SetupResult, error) { if c.username == "" || c.password == "" { return nil, errors.New("a username and password is requred for a setup") } @@ -54,11 +54,11 @@ func (c *Client) Setup(ctx context.Context, bucket, org string, retentionPeriodH // SetupRequest is a request to setup a new influx instance. type SetupRequest struct { - Username string `json:"username"` - Password string `json:"password"` - Org string `json:"org"` - Bucket string `json:"bucket"` - RetentionPeriodHrs int `json:"retentionPeriodHrs"` + Username string `json:"username"` + Password string `json:"password"` + Org Organisation `json:"org"` + Bucket Bucket `json:"bucket"` + RetentionPeriodHrs int `json:"retentionPeriodHrs"` } // SetupResult is the result of setting up a new influx instance. From d52593159fbb0e3e2203c3a3cde483139a796ec8 Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Mon, 5 Aug 2019 17:30:10 +0100 Subject: [PATCH 3/4] Swap bucket and organisation arguments back --- e2e_test.go | 7 +++++-- examples_test.go | 8 ++++---- models.go | 8 -------- query.go | 6 +++--- setup.go | 12 ++++++------ write.go | 4 ++-- writer/doc.go | 8 ++++++-- writer/support_test.go | 8 ++++---- writer/writer.go | 14 +++++++------- writer/writer_test.go | 12 ++++++------ 10 files changed, 43 insertions(+), 44 deletions(-) diff --git a/e2e_test.go b/e2e_test.go index a4d7895b..c612e9f6 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -29,7 +29,7 @@ func TestE2E(t *testing.T) { t.Fatal(err) } // set up the bucket and org and get the token - sRes, err := influx.Setup(context.Background(), influxdb.Organisation("e2e-test-org"), influxdb.Bucket("e2e-test-bucket"), 0) + sRes, err := influx.Setup(context.Background(), "e2e-test-bucket", "e2e-test-org", 0) if err != nil { t.Fatal(err) } @@ -43,7 +43,7 @@ func TestE2E(t *testing.T) { t.Fatalf("expected user to be %s, but was %s", "e2e-test-user", sRes.User.Name) } now := time.Now() - _, err = influx.Write(context.Background(), influxdb.Organisation("e2e-test-org"), influxdb.Bucket("e2e-test-bucket"), + _, err = influx.Write(context.Background(), "e2e-test-bucket", "e2e-test-org", &influxdb.RowMetric{ NameStr: "test", Tags: []*influxdb.Tag{ @@ -123,6 +123,7 @@ func TestE2E(t *testing.T) { } time.Sleep(5 * time.Second) + r, err := influx.QueryCSV( context.Background(), `from(bucket:bucket)|>range(start:-1000h)|>group()`, @@ -133,9 +134,11 @@ func TestE2E(t *testing.T) { if err != nil { t.Fatal(err) } + b, err := ioutil.ReadAll(r) if err != nil { t.Fatal(err) } + fmt.Println(string(b)) } diff --git a/examples_test.go b/examples_test.go index cea7c5cd..2b9d96df 100644 --- a/examples_test.go +++ b/examples_test.go @@ -188,7 +188,7 @@ func ExampleClient_Write_basic() { } // The actual write..., this method can be called concurrently. - if _, err := influx.Write(context.Background(), influxdb.Organisation("my-very-awesome-org"), influxdb.Bucket("my-awesome-bucket"), myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) // as above use your own error handling here. } influx.Close() // closes the client. After this the client is useless. @@ -258,7 +258,7 @@ func ExampleClient_Write_tlsMutualAuthentication() { } // The actual write... - if _, err := influx.Write(context.Background(), influxdb.Organisation("my-very-awesome-org"), influxdb.Bucket("my-awesome-bucket"), myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) } influx.Close() // close the client after this the client is useless. @@ -277,7 +277,7 @@ func ExampleClient_Setup() { if err != nil { panic(err) // error handling here, normally we wouldn't use fmt, but it works for the example } - resp, err := influx.Setup(context.Background(), influxdb.Organisation("my-org"), influxdb.Bucket("my-bucket"), 32) + resp, err := influx.Setup(context.Background(), "my-bucket", "my-org", 32) if err != nil { log.Fatal(err) } @@ -292,7 +292,7 @@ func ExampleClient_Setup() { } // We can now do a write even though we didn't put a token in - if _, err := influx.Write(context.Background(), influxdb.Organisation("my-very-awesome-org"), influxdb.Bucket("my-awesome-bucket"), myMetrics...); err != nil { + if _, err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil { log.Fatal(err) } influx.Close() // close the client after this the client is useless. diff --git a/models.go b/models.go index feb8f19a..92536242 100644 --- a/models.go +++ b/models.go @@ -7,14 +7,6 @@ import ( lp "github.com/influxdata/line-protocol" ) -// Organisation is the name of the organisation under which -// metrics will be published -type Organisation string - -// Bucket is the name of the bucket to which metrics will be -// published -type Bucket string - // Metric is just a github.com/influxdata/line-protocol.Metric. // We alias here to keep abstractions from leaking. type Metric = lp.Metric diff --git a/query.go b/query.go index a3c876bf..a04d5d8f 100644 --- a/query.go +++ b/query.go @@ -48,7 +48,7 @@ type dialect struct { // QueryCSV returns the result of a flux query. // TODO: annotations -func (c *Client) QueryCSV(ctx context.Context, flux string, org Organisation, extern ...interface{}) (*QueryCSVResult, error) { +func (c *Client) QueryCSV(ctx context.Context, flux string, org string, extern ...interface{}) (*QueryCSVResult, error) { qURL, err := c.makeQueryURL(org) if err != nil { return nil, err @@ -108,7 +108,7 @@ func (c *Client) QueryCSV(ctx context.Context, flux string, org Organisation, ex return &QueryCSVResult{ReadCloser: resp.Body, csvReader: csv.NewReader(resp.Body)}, nil } -func (c *Client) makeQueryURL(org Organisation) (string, error) { +func (c *Client) makeQueryURL(org string) (string, error) { qu, err := url.Parse(c.url.String()) if err != nil { return "", err @@ -116,7 +116,7 @@ func (c *Client) makeQueryURL(org Organisation) (string, error) { qu.Path = path.Join(qu.Path, "query") params := qu.Query() - params.Set("org", string(org)) + params.Set("org", org) qu.RawQuery = params.Encode() return qu.String(), nil } diff --git a/setup.go b/setup.go index ab5c3dee..c15928d8 100644 --- a/setup.go +++ b/setup.go @@ -12,7 +12,7 @@ import ( // It requires a client be set up with a username and password. // If successful will add a token to the client. // RetentionPeriodHrs of zero will result in infinite retention. -func (c *Client) Setup(ctx context.Context, org Organisation, bucket Bucket, retentionPeriodHrs int) (*SetupResult, error) { +func (c *Client) Setup(ctx context.Context, bucket, org string, retentionPeriodHrs int) (*SetupResult, error) { if c.username == "" || c.password == "" { return nil, errors.New("a username and password is requred for a setup") } @@ -54,11 +54,11 @@ func (c *Client) Setup(ctx context.Context, org Organisation, bucket Bucket, ret // SetupRequest is a request to setup a new influx instance. type SetupRequest struct { - Username string `json:"username"` - Password string `json:"password"` - Org Organisation `json:"org"` - Bucket Bucket `json:"bucket"` - RetentionPeriodHrs int `json:"retentionPeriodHrs"` + Username string `json:"username"` + Password string `json:"password"` + Org string `json:"org"` + Bucket string `json:"bucket"` + RetentionPeriodHrs int `json:"retentionPeriodHrs"` } // SetupResult is the result of setting up a new influx instance. diff --git a/write.go b/write.go index 2bfb9bad..5bee93ce 100644 --- a/write.go +++ b/write.go @@ -17,7 +17,7 @@ import ( // Write writes metrics to a bucket, and org. It retries intelligently. // If the write is too big, it retries again, after breaking the payloads into two requests. -func (c *Client) Write(ctx context.Context, org Organisation, bucket Bucket, m ...Metric) (n int, err error) { +func (c *Client) Write(ctx context.Context, bucket, org string, m ...Metric) (n int, err error) { var ( buf = &bytes.Buffer{} e = lp.NewEncoder(buf) @@ -37,7 +37,7 @@ func (c *Client) Write(ctx context.Context, org Organisation, bucket Bucket, m . } } - req, err := c.makeWriteRequest(string(bucket), string(org), buf) + req, err := c.makeWriteRequest(bucket, org, buf) if err != nil { return 0, err } diff --git a/writer/doc.go b/writer/doc.go index b7e224f5..514bc6d6 100644 --- a/writer/doc.go +++ b/writer/doc.go @@ -15,9 +15,13 @@ // ) // // func main() { -// cli, _ := influxdb.New("http://localhost:9999", "some-token") +// var ( +// cli, _ = influxdb.New("http://localhost:9999", "some-token") +// bucket = "default" +// org = "influx" +// ) // -// wr := writer.New(cli, influxdb.Organisation("influx"), influxdb.Bucket("default"), writer.WithBufferSize(10)) +// wr := writer.New(cli, bucket, org, writer.WithBufferSize(10)) // // wr.Write(influxdb.NewRowMetric( // map[string]interface{}{ diff --git a/writer/support_test.go b/writer/support_test.go index b895b7c1..039d9c5c 100644 --- a/writer/support_test.go +++ b/writer/support_test.go @@ -15,13 +15,13 @@ type bucketWriter struct { } type bucketWriteCall struct { - org influxdb.Organisation - bkt influxdb.Bucket + bkt string + org string data []influxdb.Metric } -func (b *bucketWriter) Write(_ context.Context, org influxdb.Organisation, bkt influxdb.Bucket, m ...influxdb.Metric) (int, error) { - b.calls = append(b.calls, bucketWriteCall{org, bkt, m}) +func (b *bucketWriter) Write(_ context.Context, bucket, org string, m ...influxdb.Metric) (int, error) { + b.calls = append(b.calls, bucketWriteCall{bucket, org, m}) return len(m), nil } diff --git a/writer/writer.go b/writer/writer.go index 4b6e5f9f..d4a8129f 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -9,16 +9,16 @@ import ( // BucketMetricWriter is a type which Metrics can be written to a particular bucket // in a particular organisation type BucketMetricWriter interface { - Write(context.Context, influxdb.Organisation, influxdb.Bucket, ...influxdb.Metric) (int, error) + Write(context.Context, string, string, ...influxdb.Metric) (int, error) } // New constructs a point writer with an underlying buffer from the provided BucketMetricWriter // The writer will flushed metrics to the underlying BucketMetricWriter when the buffer is full // or the configured flush interval ellapses without a flush occuring -func New(writer BucketMetricWriter, org influxdb.Organisation, bkt influxdb.Bucket, opts ...Option) *PointWriter { +func New(writer BucketMetricWriter, bkt, org string, opts ...Option) *PointWriter { var ( config = Options(opts).Config() - bucket = NewBucketWriter(writer, org, bkt) + bucket = NewBucketWriter(writer, bkt, org) buffered = NewBufferedWriterSize(bucket, config.size) ) @@ -32,14 +32,14 @@ type BucketWriter struct { ctxt context.Context - org influxdb.Organisation - bucket influxdb.Bucket + bucket string + org string } // NewBucketWriter allocates, configures and returned a new BucketWriter for writing // metrics to a specific organisations bucket -func NewBucketWriter(w BucketMetricWriter, org influxdb.Organisation, bkt influxdb.Bucket) *BucketWriter { - return &BucketWriter{w, context.Background(), org, bkt} +func NewBucketWriter(w BucketMetricWriter, bucket, org string) *BucketWriter { + return &BucketWriter{w, context.Background(), bucket, org} } // Write writes the provided metrics to the underlying metrics writer diff --git a/writer/writer_test.go b/writer/writer_test.go index d0476b76..fef734bd 100644 --- a/writer/writer_test.go +++ b/writer/writer_test.go @@ -11,10 +11,10 @@ import ( func Test_New(t *testing.T) { var ( client = &influxdb.Client{} - org = influxdb.Organisation("influx") - bkt = influxdb.Bucket("default") + bkt = "default" + org = "influx" options = Options{WithBufferSize(12), WithFlushInterval(5 * time.Minute)} - wr = New(client, org, bkt, options...) + wr = New(client, bkt, org, options...) ) require.Equal(t, 5*time.Minute, wr.flushInterval) @@ -26,9 +26,9 @@ func Test_New(t *testing.T) { func Test_BucketWriter(t *testing.T) { var ( spy = &bucketWriter{} - org = influxdb.Organisation("influx") - bucket = influxdb.Bucket("default") - wr = NewBucketWriter(spy, org, bucket) + bucket = "default" + org = "influx" + wr = NewBucketWriter(spy, bucket, org) expected = []bucketWriteCall{ {org, bucket, createTestRowMetrics(t, 4)}, From 107cc54c9791f49621013af57f7c4b2ce1f2d8ce Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Mon, 5 Aug 2019 17:49:02 +0100 Subject: [PATCH 4/4] Add comment to go.mod testify entry to state it is a test dependency --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index fec6cf50..d9d389b5 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,6 @@ require ( github.com/google/go-cmp v0.2.0 // test dependency github.com/influxdata/flux v0.0.0-20190620184636-886e3c28388d // test dependency github.com/influxdata/line-protocol v0.0.0-20190509173118-5712a8124a9a - github.com/stretchr/testify v1.3.0 + github.com/stretchr/testify v1.3.0 // test dependency honnef.co/go/tools v0.0.1-2019.2.2 // indirect )