Skip to content

Commit

Permalink
feat(http): add configurable limit to points batch size on write endp…
Browse files Browse the repository at this point in the history
…oint (#16469)
  • Loading branch information
GeorgeMac committed Jan 10, 2020
1 parent 63b3a07 commit a0c18c9
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
1. [16430](https://github.com/influxdata/influxdb/pull/16430): Added toggle to table thresholds to allow users to choose between setting threshold colors to text or background
1. [16418](https://github.com/influxdata/influxdb/pull/16418): Add Developer Documentation
1. [16260](https://github.com/influxdata/influxdb/pull/16260): Capture User-Agent header as query source for logging purposes
1. [16469](https://github.com/influxdata/influxdb/pull/16469): Add support for configurable max batch size in points write handler

### Bug Fixes

Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
ETooManyRequests = "too many requests"
EUnauthorized = "unauthorized"
EMethodNotAllowed = "method not allowed"
ETooLarge = "request too large"
)

// Error is the error struct of platform.
Expand Down
1 change: 1 addition & 0 deletions http/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,5 @@ var statusCodePlatformError = map[string]int{
platform.ETooManyRequests: http.StatusTooManyRequests,
platform.EUnauthorized: http.StatusUnauthorized,
platform.EMethodNotAllowed: http.StatusMethodNotAllowed,
platform.ETooLarge: http.StatusRequestEntityTooLarge,
}
150 changes: 106 additions & 44 deletions http/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -21,6 +22,13 @@ import (
"go.uber.org/zap"
)

var (
// ErrMaxBatchSizeExceeded is returned when a points batch exceeds
// the defined upper limit in bytes. This pertains to the size of the
// batch after inflation from any compression (i.e. ungzipped).
ErrMaxBatchSizeExceeded = errors.New("points batch is too large")
)

// WriteBackend is all services and associated parameters required to construct
// the WriteHandler.
type WriteBackend struct {
Expand Down Expand Up @@ -58,6 +66,19 @@ type WriteHandler struct {
PointsWriter storage.PointsWriter

EventRecorder metric.EventRecorder

maxBatchSizeBytes int64
}

// WriteHandlerOption is a functional option for a *WriteHandler
type WriteHandlerOption func(*WriteHandler)

// WithMaxBatchSizeBytes configures the maximum size for a
// (decompressed) points batch allowed by the write handler
func WithMaxBatchSizeBytes(n int64) WriteHandlerOption {
return func(w *WriteHandler) {
w.maxBatchSizeBytes = n
}
}

// Prefix provides the route prefix.
Expand All @@ -72,7 +93,7 @@ const (
)

// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
func NewWriteHandler(log *zap.Logger, b *WriteBackend) *WriteHandler {
func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOption) *WriteHandler {
h := &WriteHandler{
Router: NewRouter(b.HTTPErrorHandler),
HTTPErrorHandler: b.HTTPErrorHandler,
Expand All @@ -84,6 +105,10 @@ func NewWriteHandler(log *zap.Logger, b *WriteBackend) *WriteHandler {
EventRecorder: b.WriteEventRecorder,
}

for _, opt := range opts {
opt(h)
}

h.HandlerFunc("POST", prefixWrite, h.handleWrite)
return h
}
Expand All @@ -97,9 +122,19 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {

// TODO(desa): I really don't like how we're recording the usage metrics here
// Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403
var orgID influxdb.ID
var requestBytes int
sw := kithttp.NewStatusResponseWriter(w)
var (
orgID influxdb.ID
requestBytes int
sw = kithttp.NewStatusResponseWriter(w)
handleError = func(err error, code, message string) {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: code,
Op: "http/handleWrite",
Msg: message,
Err: err,
}, w)
}
)
w = sw
defer func() {
h.EventRecorder.Record(ctx, metric.Event{
Expand All @@ -111,20 +146,22 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
})
}()

in := r.Body
var in io.ReadCloser = r.Body
defer in.Close()

if r.Header.Get("Content-Encoding") == "gzip" {
var err error
in, err = gzip.NewReader(r.Body)
if err != nil {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: errInvalidGzipHeader,
Err: err,
}, w)
handleError(err, influxdb.EInvalid, errInvalidGzipHeader)
return
}
defer in.Close()
}

// given a limit is configured on the number of bytes in a
// batch then wrap the reader in a limited reader
if h.maxBatchSizeBytes > 0 {
in = newLimitedReadCloser(in, h.maxBatchSizeBytes)
}

a, err := pcontext.GetAuthorizer(ctx)
Expand Down Expand Up @@ -183,21 +220,12 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {

p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID)
if err != nil {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to create permission for bucket: %v", err),
Err: err,
}, w)
handleError(err, influxdb.EInternal, fmt.Sprintf("unable to create permission for bucket: %v", err))
return
}

if !a.Allowed(*p) {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EForbidden,
Op: "http/handleWrite",
Msg: "insufficient permissions for write",
}, w)
handleError(err, influxdb.EForbidden, "insufficient permissions for write")
return
}

Expand All @@ -210,22 +238,28 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
span.Finish()
if err != nil {
log.Error("Error reading body", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to read data: %v", err),
Err: err,
}, w)
handleError(err, influxdb.EInternal, "unable to read data")
return
}

// close the reader now that all bytes have been consumed
// this will return non-nil in the case of a configured limit
// being exceeded
if err := in.Close(); err != nil {
log.Error("Error reading body", zap.Error(err))

code := influxdb.EInternal
if errors.Is(err, ErrMaxBatchSizeExceeded) {
code = influxdb.ETooLarge
}

handleError(err, code, "unable to read data")
return
}

requestBytes = len(data)
if requestBytes == 0 {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: "writing requires points",
}, w)
handleError(err, influxdb.EInvalid, "writing requires points")
return
}

Expand All @@ -237,21 +271,13 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
span.Finish()
if err != nil {
log.Error("Error parsing points", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: err.Error(),
}, w)
handleError(err, influxdb.EInvalid, "")
return
}

if err := h.PointsWriter.WritePoints(ctx, points); err != nil {
log.Error("Error writing points", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: "unexpected error writing points to database",
Err: err,
}, w)
handleError(err, influxdb.EInternal, "unexpected error writing points to database")
return
}

Expand Down Expand Up @@ -369,3 +395,39 @@ func compressWithGzip(data io.Reader) (io.Reader, error) {

return pr, err
}

type limitedReader struct {
*io.LimitedReader
err error
close func() error
}

func newLimitedReadCloser(r io.ReadCloser, n int64) *limitedReader {
// read up to max + 1 as limited reader just returns EOF when the limit is reached
// or when there is nothing left to read. If we exceed the max batch size by one
// then we know the limit has been passed.
return &limitedReader{
LimitedReader: &io.LimitedReader{R: r, N: n + 1},
close: r.Close,
}
}

// Close returns an ErrMaxBatchSizeExceeded when the wrapped reader
// exceeds the set limit for number of bytes.
// This is safe to call more than once but not concurrently.
func (l *limitedReader) Close() (err error) {
defer func() {
if cerr := l.close(); cerr != nil && err == nil {
err = cerr
}

// only call close once
l.close = func() error { return nil }
}()

if l.N < 1 {
l.err = ErrMaxBatchSizeExceeded
}

return l.err
}
21 changes: 20 additions & 1 deletion http/write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
bucket *influxdb.Bucket // bucket to return in bucket service
bucketErr error // err to return in bucket service
writeErr error // err to return from the points writer
opts []WriteHandlerOption // write handle configured options
}

// want is the expected output of the HTTP endpoint
Expand Down Expand Up @@ -255,6 +256,24 @@ func TestWriteHandler_handleWrite(t *testing.T) {
body: `{"code":"internal error","message":"authorizer not found on context"}`,
},
},
{
name: "large requests rejected",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
body: "m1,t1=v1 f1=1",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
opts: []WriteHandlerOption{WithMaxBatchSizeBytes(5)},
},
wants: wants{
code: 413,
body: `{"code":"request too large","message":"unable to read data: points batch is too large"}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -275,7 +294,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
PointsWriter: &mock.PointsWriter{Err: tt.state.writeErr},
WriteEventRecorder: &metric.NopEventRecorder{},
}
writeHandler := NewWriteHandler(zaptest.NewLogger(t), NewWriteBackend(zaptest.NewLogger(t), b))
writeHandler := NewWriteHandler(zaptest.NewLogger(t), NewWriteBackend(zaptest.NewLogger(t), b), tt.state.opts...)
handler := httpmock.NewAuthMiddlewareHandler(writeHandler, tt.request.auth)

r := httptest.NewRequest(
Expand Down

0 comments on commit a0c18c9

Please sign in to comment.