Skip to content

Commit

Permalink
[exporterhelper] Remove re-enqueue dependency on retry sender
Browse files Browse the repository at this point in the history
This change changes the re-enqueue capability of the queue sender to not depend on the retry sender and have a separate configuration for that
  • Loading branch information
dmitryax committed Nov 22, 2023
1 parent 73fa163 commit 2b3035d
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 145 deletions.
28 changes: 28 additions & 0 deletions .chloggen/introduce-reenque-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make the re-enqueue behavior configurable.

# One or more tracking issues or pull requests related to the change
issues: [8122]

subtext: |
Instead of relying or enabled `retry_on_failure` option, we now have a new option
to control the re-enqueue independently of the retry sender. This can be useful
for users who don't want the blocking exponential retry, just want to put the
failed request in beginning of the queue. Also this option can be enabled with
memory queue, which means that the data will never be dropped after getting
to the queue as long as the collector is up and running.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
16 changes: 2 additions & 14 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func WithRetry(config RetrySettings) Option {
}
return
}
o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure)
o.retrySender = newRetrySender(config, o.set)
}
}

Expand All @@ -110,9 +110,7 @@ func WithQueue(config QueueSettings) Option {
}
return
}
qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
}
}

Expand Down Expand Up @@ -146,9 +144,6 @@ type baseExporter struct {
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc

consumerOptions []consumer.Option
}

Expand Down Expand Up @@ -215,10 +210,3 @@ func (be *baseExporter) Shutdown(ctx context.Context) error {
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}
24 changes: 18 additions & 6 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
type boundedMemoryQueue[T any] struct {
component.StartFunc
stopped *atomic.Bool
items chan QueueRequest[T]
items chan queueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
return &boundedMemoryQueue[T]{
items: make(chan QueueRequest[T], capacity),
items: make(chan queueRequest[T], capacity),
stopped: &atomic.Bool{},
}
}
Expand All @@ -37,17 +37,24 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
}

select {
case q.items <- newQueueRequest(ctx, req):
case q.items <- queueRequest[T]{
req: req,
ctx: ctx,
}:
return nil
default:
return ErrQueueIsFull
}
}

// Poll returns a request from the queue once it's available. It returns false if the queue is stopped.
func (q *boundedMemoryQueue[T]) Poll() (QueueRequest[T], bool) {
// Consume consumes an item from the queue once it's available. It returns false if the queue is stopped.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T)) bool {
item, ok := <-q.items
return item, ok
if !ok {
return false
}
consumeFunc(item.ctx, item.req)
return true
}

// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
Expand All @@ -65,3 +72,8 @@ func (q *boundedMemoryQueue[T]) Size() int {
func (q *boundedMemoryQueue[T]) Capacity() int {
return cap(q.items)
}

type queueRequest[T any] struct {
req T
ctx context.Context
}
12 changes: 5 additions & 7 deletions exporter/exporterhelper/internal/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
type QueueConsumers[T any] struct {
queue Queue[T]
numConsumers int
callback func(context.Context, T)
consumeFunc func(context.Context, T)
stopWG sync.WaitGroup
}

func NewQueueConsumers[T any](q Queue[T], numConsumers int, callback func(context.Context, T)) *QueueConsumers[T] {
func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T)) *QueueConsumers[T] {
return &QueueConsumers[T]{
queue: q,
numConsumers: numConsumers,
callback: callback,
consumeFunc: consumeFunc,
stopWG: sync.WaitGroup{},
}
}
Expand All @@ -40,12 +40,10 @@ func (qc *QueueConsumers[T]) Start(ctx context.Context, host component.Host) err
startWG.Done()
defer qc.stopWG.Done()
for {
item, success := qc.queue.Poll()
if !success {
ok := qc.queue.Consume(qc.consumeFunc)
if !ok {
return
}
qc.callback(item.Context, item.Request)
item.OnProcessingFinished()
}
}()
}
Expand Down
52 changes: 31 additions & 21 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,27 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
}
}

// Poll returns the next available item from the queue, or blocks until one is available.
// If the queue is stopped, returns (QueueRequest{}, false)
func (pq *persistentQueue[T]) Poll() (QueueRequest[T], bool) {
// Consume consumes the next available item from the queue, or blocks until one is available.
// If the queue is stopped, returns false
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T)) bool {
var (
req T
consumed bool
onProcessingFinished func()
)

for {
select {
case <-pq.stopChan:
return QueueRequest[T]{}, false
return false
case <-pq.putChan:
req, found := pq.getNextItem(context.Background())
if found {
return req, true
}
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
}

if consumed {
consumeFunc(context.Background(), req)
onProcessingFinished()
return true
}
}
}
Expand Down Expand Up @@ -221,18 +230,21 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
return err
}

// getNextItem pulls the next available item from the persistent storage; if none is found, returns (nil, false)
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (QueueRequest[T], bool) {
// getNextItem pulls the next available item from the persistent storage along with a callback function that should be
// called after the item is processed to clean up the storage. If no new item is available, returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool) {
pq.mu.Lock()
defer pq.mu.Unlock()

var request T

// If called in the same time with Shutdown, make sure client is not closed.
if pq.refClient <= 0 {
return QueueRequest[T]{}, false
return request, nil, false
}

if pq.readIndex == pq.writeIndex {
return QueueRequest[T]{}, false
return request, nil, false

Check warning on line 247 in exporter/exporterhelper/internal/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/persistent_queue.go#L247

Added line #L247 was not covered by tests
}
index := pq.readIndex
// Increase here, so even if errors happen below, it always iterates
Expand All @@ -245,7 +257,6 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (QueueRequest[T],
storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems)),
getOp)

var request T
if err == nil {
request, err = pq.unmarshaler(getOp.Value)
}
Expand All @@ -257,23 +268,22 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (QueueRequest[T],
pq.set.Logger.Error("Error deleting item from queue", zap.Error(err))
}

return QueueRequest[T]{}, false
return request, nil, false
}

req := newQueueRequest[T](context.Background(), request)
// If all went well so far, cleanup will be handled by callback
// Increase the reference count, so the client is not closed while the request is being processed.
pq.refClient++
req.onProcessingFinishedFunc = func() {
return request, func() {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
defer pq.mu.Unlock()
if err = pq.itemDispatchingFinish(ctx, index); err != nil {
if err = pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.set.Logger.Error("Error deleting item from queue", zap.Error(err))
}
if err = pq.unrefClient(ctx); err != nil {
if err = pq.unrefClient(context.Background()); err != nil {
pq.set.Logger.Error("Error closing the storage client", zap.Error(err))
}
}
return req, true
}, true
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
Expand Down
42 changes: 21 additions & 21 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) {

// First request is picked by the consumer. Wait until the consumer is blocked on done.
assert.NoError(t, pq.Offer(context.Background(), req))
start <- struct{}{}
close(start)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -339,7 +340,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) {
require.NoError(t, err)
}
assert.Equal(t, 3, ps.Size())
_, _ = ps.Poll()
_, _, _ = ps.getNextItem(context.Background())
assert.Equal(t, 2, ps.Size())
assert.NoError(t, ps.Shutdown(context.Background()))

Expand Down Expand Up @@ -386,18 +387,19 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{})

// Takes index 0 in process.
readReq, found := ps.Poll()
readReq, _, found := ps.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, readReq.Request)
assert.Equal(t, req, readReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// This takes item 1 to process.
secondReadReq, found := ps.Poll()
secondReadReq, onProcessingFinished, found := ps.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, secondReadReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
secondReadReq.OnProcessingFinished()
onProcessingFinished()
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
Expand All @@ -408,9 +410,10 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {

// We should be able to pull all remaining items now
for i := 0; i < 4; i++ {
qReq, found := newPs.Poll()
r, onProcessingFinished, found := newPs.getNextItem(context.Background())
require.True(t, found)
qReq.OnProcessingFinished()
assert.Equal(t, req, r)
onProcessingFinished()
}

// The queue should be now empty
Expand Down Expand Up @@ -443,7 +446,8 @@ func TestPersistentQueue_StartWithNonDispatched(t *testing.T) {
}

// get one item out, but don't mark it as processed
_, _ = ps.Poll()
<-ps.putChan
_, _, _ = ps.getNextItem(context.Background())
// put one more item in
require.NoError(t, ps.Offer(context.Background(), req))

Expand All @@ -466,20 +470,20 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) {
assert.NoError(t, ps.Offer(context.Background(), req))
assert.Equal(t, 2, ps.Size())
// TODO: Remove this, after the initialization writes the readIndex.
_, _ = ps.Poll()
_, _, _ = ps.getNextItem(context.Background())
assert.NoError(t, ps.Shutdown(context.Background()))

newPs := createTestPersistentQueue(createTestClient(t, ext))
require.Equal(t, 2, newPs.Size())

// Lets read both of the elements we put
readReq, found := newPs.Poll()
readReq, _, found := newPs.getNextItem(context.Background())
require.True(t, found)
require.Equal(t, req, readReq.Request)
require.Equal(t, req, readReq)

readReq, found = newPs.Poll()
readReq, _, found = newPs.getNextItem(context.Background())
require.True(t, found)
require.Equal(t, req, readReq.Request)
require.Equal(t, req, readReq)
require.Equal(t, 0, newPs.Size())
assert.NoError(t, newPs.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -518,9 +522,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) {
}

for i := 0; i < bb.N; i++ {
req, found := ps.Poll()
require.True(bb, found)
require.NotNil(bb, req)
require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) {}))
}
require.NoError(b, ext.Shutdown(context.Background()))
})
Expand Down Expand Up @@ -594,12 +596,12 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {

assert.NoError(t, ps.Offer(context.Background(), newTraces(5, 10)))

req, ok := ps.Poll()
_, onProcessingFinished, ok := ps.getNextItem(context.Background())
require.True(t, ok)
assert.False(t, client.(*mockStorageClient).isClosed())
assert.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, client.(*mockStorageClient).isClosed())
req.OnProcessingFinished()
onProcessingFinished()
assert.True(t, client.(*mockStorageClient).isClosed())
}

Expand Down Expand Up @@ -633,9 +635,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) {

// Take out all the items
for i := reqCount; i > 0; i-- {
request, found := ps.Poll()
require.True(t, found)
request.OnProcessingFinished()
require.True(t, ps.Consume(func(context.Context, ptrace.Traces) {}))
}

// We should be able to put a new item in
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type Queue[T any] interface {
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
Offer(ctx context.Context, item T) error
// Poll returns the head of this queue. The call blocks until there is an item available.
// Consume applies the provided function on the head of queue. The call blocks until there is an item available.
// It returns false if the queue is stopped.
Poll() (QueueRequest[T], bool)
Consume(func(ctx context.Context, item T)) bool
// Size returns the current Size of the queue
Size() int
// Capacity returns the capacity of the queue.
Expand Down
Loading

0 comments on commit 2b3035d

Please sign in to comment.