Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator|tcpclient] Drop enqueued payloads for a flush after write fails #4116

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 56 additions & 30 deletions src/aggregator/client/queue.go
Expand Up @@ -121,15 +121,17 @@ type instanceQueue interface {
type writeFn func([]byte) error

type queue struct {
metrics queueMetrics
instance placement.Instance
conn *connection
log *zap.Logger
writeFn writeFn
buf qbuf
dropType DropType
closed atomic.Bool
mtx sync.Mutex
metrics queueMetrics
instance placement.Instance
conn *connection
log *zap.Logger
writeFn writeFn
bufIncoming qbuf
bufProcessing qbuf
dropType DropType
closed atomic.Bool
mtx sync.Mutex
processingMtx sync.Mutex
}

func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue {
Expand All @@ -156,7 +158,10 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue {
metrics: newQueueMetrics(iOpts.MetricsScope()),
instance: instance,
conn: conn,
buf: qbuf{
bufIncoming: qbuf{
b: make([]protobuf.Buffer, int(qsize)),
},
bufProcessing: qbuf{
b: make([]protobuf.Buffer, int(qsize)),
},
}
Expand All @@ -178,7 +183,7 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error {
q.mtx.Lock()
defer q.mtx.Unlock()

if full := q.buf.full(); full {
if full := q.bufIncoming.full(); full {
switch q.dropType {
case DropCurrent:
// Close the current buffer so it's resources are freed.
Expand All @@ -187,15 +192,15 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error {
return errWriterQueueFull
case DropOldest:
// Consume oldest buffer instead.
oldest := q.buf.shift()
oldest := q.bufIncoming.shift()
oldest.Close()
q.metrics.enqueueOldestDropped.Inc(1)
default:
return errInvalidDropType
}
}

q.buf.push(buf)
q.bufIncoming.push(buf)
q.metrics.enqueueSuccesses.Inc(1)
return nil
}
Expand All @@ -214,15 +219,29 @@ func (q *queue) Flush() {
n int
err error
)
// ensure Flush is safe for concurrent use
q.processingMtx.Lock()
defer q.processingMtx.Unlock()

// replace incoming buffer with a fresh one, to have a snapshot of queue before flush and unblock new writes
q.mtx.Lock()
q.bufProcessing, q.bufIncoming = q.bufIncoming, q.bufProcessing
q.mtx.Unlock()

for err == nil {
// flush everything in batches, to make sure no single payload is too large,
// to prevent a) allocs and b) timeouts due to big buffer IO taking too long.
var processed int
processed, err = q.flush(buf)
processed, err = q.flush(&q.bufProcessing, buf)
n += processed
}

// drop any unconsumed messages
if drops := q.bufProcessing.size(); drops > 0 {
q.metrics.flushErrorDropped.Inc(int64(drops))
}
q.bufProcessing.reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity checking:

  • Do we have/need retries before we drop the buffer here?
  • Are these buffered elsewhere? Afaict no, but sanity checking.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the processing buf is not used outside of Flush(), which is now thread-safe, and retries are handled by the "connection".


if err != nil && !errors.Is(err, io.EOF) {
q.log.Error("error writing data",
zap.String("target_instance_id", q.instance.ID()),
Expand All @@ -240,39 +259,32 @@ func (q *queue) Flush() {
}
}

func (q *queue) flush(tmpWriteBuf *[]byte) (int, error) {
func (q *queue) flush(buf *qbuf, tmpWriteBuf *[]byte) (int, error) {
var n int

q.mtx.Lock()

if q.buf.size() == 0 {
q.mtx.Unlock()
if buf.size() == 0 {
return n, io.EOF
}

*tmpWriteBuf = (*tmpWriteBuf)[:0]
for q.buf.size() > 0 {
protoBuffer := q.buf.peek()
for buf.size() > 0 {
protoBuffer := buf.peek()
bytes := protoBuffer.Bytes()

if n > 0 && len(bytes)+len(*tmpWriteBuf) >= _queueMaxWriteBufSize {
// only merge buffers that are smaller than _queueMaxWriteBufSize bytes
break
}
_ = q.buf.shift()

if len(bytes) == 0 {
continue
if len(bytes) > 0 {
*tmpWriteBuf = append(*tmpWriteBuf, bytes...)
n += len(bytes)
}

*tmpWriteBuf = append(*tmpWriteBuf, bytes...)
n += len(bytes)
protoBuffer.Close()
_ = buf.shift()
}

// mutex is not held while doing IO
q.mtx.Unlock()

if n == 0 {
return n, io.EOF
}
Expand All @@ -288,21 +300,26 @@ func (q *queue) flush(tmpWriteBuf *[]byte) (int, error) {
}

func (q *queue) Size() int {
return int(q.buf.size())
q.mtx.Lock()
defer q.mtx.Unlock()

return int(q.bufIncoming.size())
}

type queueMetrics struct {
enqueueSuccesses tally.Counter
enqueueOldestDropped tally.Counter
enqueueCurrentDropped tally.Counter
enqueueClosedErrors tally.Counter
flushErrorDropped tally.Counter
connWriteSuccesses tally.Counter
connWriteErrors tally.Counter
}

func newQueueMetrics(s tally.Scope) queueMetrics {
enqueueScope := s.Tagged(map[string]string{"action": "enqueue"})
connWriteScope := s.Tagged(map[string]string{"action": "conn-write"})
flushScope := s.Tagged(map[string]string{"action": "flush"})
return queueMetrics{
enqueueSuccesses: enqueueScope.Counter("successes"),
enqueueOldestDropped: enqueueScope.Tagged(map[string]string{"drop-type": "oldest"}).
Expand All @@ -311,6 +328,8 @@ func newQueueMetrics(s tally.Scope) queueMetrics {
Counter("dropped"),
enqueueClosedErrors: enqueueScope.Tagged(map[string]string{"error-type": "queue-closed"}).
Counter("errors"),
flushErrorDropped: flushScope.Tagged(map[string]string{"drop-type": "flush-write-error"}).
Counter("dropped"),
connWriteSuccesses: connWriteScope.Counter("successes"),
connWriteErrors: connWriteScope.Counter("errors"),
}
Expand Down Expand Up @@ -356,6 +375,13 @@ func (q *qbuf) peek() protobuf.Buffer {
return q.b[idx]
}

func (q *qbuf) reset() {
for q.size() > 0 {
val := q.shift()
val.Close()
}
}

func roundUpToPowerOfTwo(val int) int {
return int(math.Pow(2, math.Ceil(math.Log2(float64(val)))))
}
25 changes: 24 additions & 1 deletion src/aggregator/client/queue_test.go
Expand Up @@ -21,10 +21,13 @@
package client

import (
"bytes"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"

"github.com/m3db/m3/src/metrics/encoding/protobuf"
Expand Down Expand Up @@ -183,6 +186,25 @@ func TestInstanceQueueEnqueueSuccessWriteError(t *testing.T) {
<-done
}

func TestInstanceQueueDropAfterWriteError(t *testing.T) {
var writeCounter atomic.Int32
opts := testOptions()
queue := newInstanceQueue(testPlacementInstance, opts).(*queue)
queue.writeFn = func(data []byte) error {
writeCounter.Inc()
return errors.New("dummy error")
}

// write two payloads that will not be merged
require.NoError(t, queue.Enqueue(testNewBuffer(bytes.Repeat([]byte{0x42}, _queueMaxWriteBufSize+1))))
require.NoError(t, queue.Enqueue(testNewBuffer(bytes.Repeat([]byte{0x42}, _queueMaxWriteBufSize+1))))
queue.Flush()
require.Equal(t, int32(1), writeCounter.Load())
// next flush should not perform any writes
queue.Flush()
require.Equal(t, int32(1), writeCounter.Load())
}

func TestInstanceQueueCloseAlreadyClosed(t *testing.T) {
opts := testOptions()
queue := newInstanceQueue(testPlacementInstance, opts).(*queue)
Expand Down Expand Up @@ -213,7 +235,8 @@ func TestInstanceQueueSizeIsPowerOfTwo(t *testing.T) {
} {
opts := testOptions().SetInstanceQueueSize(tt.size)
q := newInstanceQueue(testPlacementInstance, opts).(*queue)
require.Equal(t, tt.expected, cap(q.buf.b))
require.Equal(t, tt.expected, cap(q.bufIncoming.b))
require.Equal(t, tt.expected, cap(q.bufProcessing.b))
}
}

Expand Down