Skip to content

Commit

Permalink
refactor: add writer package for buffered writes
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgeMac committed Aug 1, 2019
1 parent de2584b commit aa324b5
Show file tree
Hide file tree
Showing 16 changed files with 675 additions and 397 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ jobs:
- run: go get -v -t -d ./...
- run: go vet ./...
- run: go get honnef.co/go/tools/cmd/staticcheck && staticcheck ./...
- run: go test -v -e2e ./...
- run: go test -v -race -e2e ./...
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ require (
github.com/google/go-cmp v0.2.0 // test dependency
github.com/influxdata/flux v0.0.0-20190620184636-886e3c28388d // test dependency
github.com/influxdata/line-protocol v0.0.0-20190509173118-5712a8124a9a
github.com/stretchr/testify v1.3.0
)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,13 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xanzy/ssh-agent v0.2.0/go.mod h1:0NyE30eGUDliuLEHJgYte/zncp2zdTStcOnWhgSqHD8=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
8 changes: 8 additions & 0 deletions models.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import (
lp "github.com/influxdata/line-protocol"
)

// Organisation is the name of the organisation under which
// metrics will be published
type Organisation string

// Bucket is the name of the bucket to which metrics will be
// published
type Bucket string

// Metric is just a github.com/influxdata/line-protocol.Metric.
// We alias here to keep abstractions from leaking.
type Metric = lp.Metric
Expand Down
71 changes: 25 additions & 46 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,41 @@ import (

// Write writes metrics to a bucket, and org. It retries intelligently.
// If the write is too big, it retries again, after breaking the payloads into two requests.
func (c *Client) Write(ctx context.Context, bucket, org string, m ...Metric) (err error) {
tries := uint64(0)
return c.write(ctx, bucket, org, &tries, m...)
}
func (c *Client) Write(ctx context.Context, org Organisation, bucket Bucket, m ...Metric) (err error) {
var (
buf = &bytes.Buffer{}
e = lp.NewEncoder(buf)
)

func parseWriteError(r io.Reader) (*genericRespError, error) {
werr := &genericRespError{}
if err := json.NewDecoder(r).Decode(&werr); err != nil {
return nil, err
}
return werr, nil
}
e.FailOnFieldErr(c.errOnFieldErr)

func (c *Client) write(ctx context.Context, bucket, org string, triesPtr *uint64, m ...Metric) error {
buf := &bytes.Buffer{}
e := lp.NewEncoder(buf)
cleanup := func() {}
defer func() { cleanup() }()
doRequest:
select {
case <-ctx.Done():
return ctx.Err()
default:
}

for i := range m {
if _, err := e.Encode(m[i]); err != nil {
return err
}
}
req, err := c.makeWriteRequest(bucket, org, buf)

req, err := c.makeWriteRequest(string(bucket), string(org), buf)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)

resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
cleanup = func() {
r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it.
// throw away the rest of the body so the connection can be reused even if there is still stuff on the wire.
ioutil.ReadAll(r) // we don't care about the error here, it is just to empty the tcp buffer

defer func() {
// discard body so connection can be reused
_, _ = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
}()

switch resp.StatusCode {
case http.StatusOK, http.StatusNoContent:
Expand All @@ -68,23 +60,11 @@ doRequest:
Code: resp.Status,
Message: "too many requests too fast",
}
cleanup()
if err2 := c.backoff(triesPtr, resp, err); err2 != nil {
return err2
}
cleanup = func() {}
goto doRequest
case http.StatusServiceUnavailable:
err = &genericRespError{
Code: resp.Status,
Message: "service temporarily unavaliable",
}
cleanup()
if err2 := c.backoff(triesPtr, resp, err); err2 != nil {
return err2
}
cleanup = func() {}
goto doRequest
default:
gwerr, err := parseWriteError(resp.Body)
if err != nil {
Expand All @@ -93,16 +73,7 @@ doRequest:

return gwerr
}
// we don't defer and close till here, because of the retries.
defer func() {
r := io.LimitReader(resp.Body, 1<<16) // we limit it because it is usually better to just reuse the body, but sometimes it isn't worth it.
_, err := ioutil.ReadAll(r) // throw away the rest of the body so the connection gets reused.
err2 := resp.Body.Close()
if err == nil && err2 != nil {
err = err2
}
}()
e.FailOnFieldErr(c.errOnFieldErr)

return err
}

Expand All @@ -128,3 +99,11 @@ func makeWriteURL(loc *url.URL, bucket, org string) (string, error) {
u.RawQuery = params.Encode()
return u.String(), nil
}

func parseWriteError(r io.Reader) (*genericRespError, error) {
werr := &genericRespError{}
if err := json.NewDecoder(r).Decode(&werr); err != nil {
return nil, err
}
return werr, nil
}

0 comments on commit aa324b5

Please sign in to comment.