Skip to content

Commit

Permalink
[chore] [exporterhelper] Remove retry sender -> queue sender callback
Browse files Browse the repository at this point in the history
Use returned error instead to simplify the senders feedback loop
  • Loading branch information
dmitryax committed Nov 18, 2023
1 parent 639223a commit 94a70c0
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 251 deletions.
16 changes: 2 additions & 14 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func WithRetry(config RetrySettings) Option {
}
return
}
o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure)
o.retrySender = newRetrySender(config, o.set)
}
}

Expand All @@ -110,9 +110,7 @@ func WithQueue(config QueueSettings) Option {
}
return
}
qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
}
}

Expand Down Expand Up @@ -146,9 +144,6 @@ type baseExporter struct {
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc

consumerOptions []consumer.Option
}

Expand Down Expand Up @@ -215,10 +210,3 @@ func (be *baseExporter) Shutdown(ctx context.Context) error {
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}
37 changes: 29 additions & 8 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,28 @@ import (
"context"
"sync/atomic"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
)

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
type boundedMemoryQueue[T itemsCounter] struct {
component.StartFunc
logger *zap.Logger
stopped *atomic.Bool
items chan QueueRequest[T]
items chan queueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
func NewBoundedMemoryQueue[T itemsCounter](logger *zap.Logger, capacity int) Queue[T] {
return &boundedMemoryQueue[T]{
items: make(chan QueueRequest[T], capacity),
logger: logger,
items: make(chan queueRequest[T], capacity),
stopped: &atomic.Bool{},
}
}
Expand All @@ -37,17 +42,28 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
}

select {
case q.items <- newQueueRequest(ctx, req):
case q.items <- queueRequest[T]{
req: req,
ctx: ctx,
}:
return nil
default:
return ErrQueueIsFull
}
}

// Poll returns a request from the queue once it's available. It returns false if the queue is stopped.
func (q *boundedMemoryQueue[T]) Poll() (QueueRequest[T], bool) {
// Consume consumes an item from the queue once it's available. It returns false if the queue is stopped.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := <-q.items
return item, ok
if !ok {
return false
}
err := consumeFunc(item.ctx, item.req)
if err != nil && !consumererror.IsPermanent(err) {
q.logger.Error("Exporting failed. Dropping data.", zap.Error(err),
zap.Int("dropped_items", item.req.ItemsCount()))
}
return true
}

// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
Expand All @@ -65,3 +81,8 @@ func (q *boundedMemoryQueue[T]) Size() int {
func (q *boundedMemoryQueue[T]) Capacity() int {
return cap(q.items)
}

type queueRequest[T itemsCounter] struct {
req T
ctx context.Context
}
54 changes: 32 additions & 22 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,36 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
)

type stringReq string

func (r stringReq) ItemsCount() int {
return 1
}

// In this test we run a queue with capacity 1 and a single consumer.
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
q := NewBoundedMemoryQueue[string](1)
q := NewBoundedMemoryQueue[stringReq](zap.NewNop(), 1)

var startLock sync.Mutex

startLock.Lock() // block consumers
consumerState := newConsumerState(t)

consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) {
consumers := NewQueueConsumers(q, 1, func(_ context.Context, item stringReq) error {
consumerState.record(item)

// block further processing until startLock is released
startLock.Lock()
//nolint:staticcheck // SA2001 ignore this!
startLock.Unlock()
return nil
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -49,7 +57,7 @@ func TestBoundedQueue(t *testing.T) {

// at this point the item must have been read off the queue, but the consumer is blocked
assert.Equal(t, 0, q.Size())
consumerState.assertConsumed(map[string]bool{
consumerState.assertConsumed(map[stringReq]bool{
"a": true,
})

Expand All @@ -62,17 +70,17 @@ func TestBoundedQueue(t *testing.T) {

startLock.Unlock() // unblock consumer

consumerState.assertConsumed(map[string]bool{
consumerState.assertConsumed(map[stringReq]bool{
"a": true,
"b": true,
})

// now that consumers are unblocked, we can add more items
expected := map[string]bool{
expected := map[stringReq]bool{
"a": true,
"b": true,
}
for _, item := range []string{"d", "e", "f"} {
for _, item := range []stringReq{"d", "e", "f"} {
assert.NoError(t, q.Offer(context.Background(), item))
expected[item] = true
consumerState.assertConsumed(expected)
Expand All @@ -89,14 +97,15 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue[string](1000)
q := NewBoundedMemoryQueue[stringReq](zap.NewNop(), 1000)

consumerState := newConsumerState(t)

waitChan := make(chan struct{})
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) {
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item stringReq) error {
<-waitChan
consumerState.record(item)
return nil
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -123,7 +132,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) {

assert.NoError(t, consumers.Shutdown(context.Background()))

consumerState.assertConsumed(map[string]bool{
consumerState.assertConsumed(map[stringReq]bool{
"a": true,
"b": true,
"c": true,
Expand All @@ -139,7 +148,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}

func TestZeroSize(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)
q := NewBoundedMemoryQueue[stringReq](zap.NewNop(), 0)
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
assert.ErrorIs(t, q.Offer(context.Background(), "a"), ErrQueueIsFull)
assert.NoError(t, q.Shutdown(context.Background()))
Expand Down Expand Up @@ -190,13 +199,14 @@ func Benchmark_QueueUsage_10000_10_250000(b *testing.B) {
func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
q := NewBoundedMemoryQueue[string](capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) {
q := NewBoundedMemoryQueue[stringReq](zap.NewNop(), capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, stringReq) error {
time.Sleep(1 * time.Millisecond)
return nil
})
require.NoError(b, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
_ = q.Offer(context.Background(), fmt.Sprintf("%d", j))
_ = q.Offer(context.Background(), stringReq(fmt.Sprintf("%d", j)))
}
assert.NoError(b, consumers.Shutdown(context.Background()))
}
Expand All @@ -205,29 +215,29 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
type consumerState struct {
sync.Mutex
t *testing.T
consumed map[string]bool
consumed map[stringReq]bool
consumedOnce *atomic.Bool
}

func newConsumerState(t *testing.T) *consumerState {
return &consumerState{
t: t,
consumed: make(map[string]bool),
consumed: make(map[stringReq]bool),
consumedOnce: &atomic.Bool{},
}
}

func (s *consumerState) record(val string) {
func (s *consumerState) record(val stringReq) {
s.Lock()
defer s.Unlock()
s.consumed[val] = true
s.consumedOnce.Store(true)
}

func (s *consumerState) snapshot() map[string]bool {
func (s *consumerState) snapshot() map[stringReq]bool {
s.Lock()
defer s.Unlock()
out := make(map[string]bool)
out := make(map[stringReq]bool)
for k, v := range s.consumed {
out[k] = v
}
Expand All @@ -238,7 +248,7 @@ func (s *consumerState) waitToConsumeOnce() {
require.Eventually(s.t, s.consumedOnce.Load, 2*time.Second, 10*time.Millisecond, "expected to consumer once")
}

func (s *consumerState) assertConsumed(expected map[string]bool) {
func (s *consumerState) assertConsumed(expected map[stringReq]bool) {
for i := 0; i < 1000; i++ {
if snapshot := s.snapshot(); !reflect.DeepEqual(snapshot, expected) {
time.Sleep(time.Millisecond)
Expand All @@ -248,9 +258,9 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
}

func TestZeroSizeWithConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)
q := NewBoundedMemoryQueue[stringReq](zap.NewNop(), 0)

consumers := NewQueueConsumers(q, 1, func(context.Context, string) {})
consumers := NewQueueConsumers(q, 1, func(context.Context, stringReq) error { return nil })
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

assert.NoError(t, q.Offer(context.Background(), "a")) // in process
Expand All @@ -259,7 +269,7 @@ func TestZeroSizeWithConsumers(t *testing.T) {
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)
q := NewBoundedMemoryQueue[stringReq](zap.NewNop(), 0)

err := q.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
Expand Down
14 changes: 6 additions & 8 deletions exporter/exporterhelper/internal/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import (
"go.opentelemetry.io/collector/component"
)

type QueueConsumers[T any] struct {
type QueueConsumers[T itemsCounter] struct {
queue Queue[T]
numConsumers int
callback func(context.Context, T)
consumeFunc func(context.Context, T) error
stopWG sync.WaitGroup
}

func NewQueueConsumers[T any](q Queue[T], numConsumers int, callback func(context.Context, T)) *QueueConsumers[T] {
func NewQueueConsumers[T itemsCounter](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *QueueConsumers[T] {
return &QueueConsumers[T]{
queue: q,
numConsumers: numConsumers,
callback: callback,
consumeFunc: consumeFunc,
stopWG: sync.WaitGroup{},
}
}
Expand All @@ -40,12 +40,10 @@ func (qc *QueueConsumers[T]) Start(ctx context.Context, host component.Host) err
startWG.Done()
defer qc.stopWG.Done()
for {
item, success := qc.queue.Poll()
if !success {
ok := qc.queue.Consume(qc.consumeFunc)
if !ok {
return
}
qc.callback(item.Context, item.Request)
item.OnProcessingFinished()
}
}()
}
Expand Down
Loading

0 comments on commit 94a70c0

Please sign in to comment.