Skip to content

Commit

Permalink
[exporterhelper] Remove re-enqueue dependency on retry sender
Browse files Browse the repository at this point in the history
This change changes the re-enqueue capability of the queue sender to not depend on the retry sender and have a separate configuration for that
  • Loading branch information
dmitryax committed Nov 22, 2023
1 parent 7e3e725 commit d84c40a
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 62 deletions.
28 changes: 28 additions & 0 deletions .chloggen/introduce-reenque-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make the re-enqueue behavior configurable.

# One or more tracking issues or pull requests related to the change
issues: [8122]

subtext: |
Instead of relying or enabled `retry_on_failure` option, we now have a new option
to control the re-enqueue independently of the retry sender. This can be useful
for users who don't want the blocking exponential retry, just want to put the
failed request in beginning of the queue. Also this option can be enabled with
memory queue, which means that the data will never be dropped after getting
to the queue as long as the collector is up and running.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
16 changes: 2 additions & 14 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func WithRetry(config RetrySettings) Option {
}
return
}
o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure)
o.retrySender = newRetrySender(config, o.set)
}
}

Expand All @@ -110,9 +110,7 @@ func WithQueue(config QueueSettings) Option {
}
return
}
qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
}
}

Expand Down Expand Up @@ -146,9 +144,6 @@ type baseExporter struct {
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc

consumerOptions []consumer.Option
}

Expand Down Expand Up @@ -215,10 +210,3 @@ func (be *baseExporter) Shutdown(ctx context.Context) error {
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,10 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool)
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
defer pq.mu.Unlock()
if err = pq.itemDispatchingFinish(ctx, index); err != nil {
if err = pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.set.Logger.Error("Error deleting item from queue", zap.Error(err))
}
if err = pq.unrefClient(ctx); err != nil {
if err = pq.unrefClient(context.Background()); err != nil {
pq.set.Logger.Error("Error closing the storage client", zap.Error(err))
}
}, true
Expand Down
49 changes: 28 additions & 21 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig"
Expand All @@ -41,6 +42,9 @@ type QueueSettings struct {
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
// ReenqueueOnFailure indicates whether to re-enqueue items on send failure. If false, items will be dropped after
// failed send. If true, items will be re-enqueued and retried after the current queue is drained.
ReenqueueOnFailure bool `mapstructure:"reenqueue_on_failure"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
Expand Down Expand Up @@ -101,50 +105,53 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize)
}
return &queueSender{
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
numConsumers: config.NumConsumers,
stopWG: sync.WaitGroup{},
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: isPersistent,
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
numConsumers: config.NumConsumers,
stopWG: sync.WaitGroup{},
requeuingEnabled: config.ReenqueueOnFailure,
}
}

func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err error, logger *zap.Logger) error {
// consume is the function that is executed by the queue consumers to send the data to the next consumerSender.
func (qs *queueSender) consume(ctx context.Context, req Request) {
err := qs.nextSender.send(ctx, req)

// Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender.
if err == nil || consumererror.IsPermanent(err) {
return
}

if !qs.requeuingEnabled {
logger.Error(
qs.logger.Error(
"Exporting failed. No more retries left. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
return err
return
}

if qs.queue.Offer(ctx, req) == nil {
logger.Error(
qs.logger.Error(
"Exporting failed. Putting back to the end of the queue.",
zap.Error(err),
)
} else {
logger.Error(
qs.logger.Error(
"Exporting failed. Queue did not accept requeuing request. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
}
return err
}

// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, func(ctx context.Context, req Request) {
// TODO: Update item.OnProcessingFinished to accept error and remove the retry->queue sender callback.
_ = qs.nextSender.send(ctx, req)
})
qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, qs.consume)
if err := qs.consumers.Start(ctx, host); err != nil {
return err
}
Expand Down Expand Up @@ -214,7 +221,7 @@ func (qs *queueSender) Shutdown(ctx context.Context) error {
return qs.consumers.Shutdown(ctx)
}

// send implements the requestSender interface
// send implements the requestSender interface. It enqueues the request to be sent by the queue consumers.
func (qs *queueSender) send(ctx context.Context, req Request) error {
// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,13 @@ func TestQueueSettings_Validate(t *testing.T) {
// if requeueing is enabled, we eventually retry even if we failed at first
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.ReenqueueOnFailure = true
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -261,6 +261,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 1
qCfg.ReenqueueOnFailure = true
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead

Expand All @@ -270,7 +271,6 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
Expand Down
34 changes: 12 additions & 22 deletions exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,20 @@ func NewThrottleRetry(err error, delay time.Duration) error {
}
}

type onRequestHandlingFinishedFunc func(context.Context, Request, error, *zap.Logger) error

type retrySender struct {
baseRequestSender
traceAttribute attribute.KeyValue
cfg RetrySettings
stopCh chan struct{}
logger *zap.Logger
onTemporaryFailure onRequestHandlingFinishedFunc
traceAttribute attribute.KeyValue
cfg RetrySettings
stopCh chan struct{}
logger *zap.Logger
}

func newRetrySender(config RetrySettings, set exporter.CreateSettings, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender {
if onTemporaryFailure == nil {
onTemporaryFailure = func(_ context.Context, _ Request, err error, _ *zap.Logger) error {
return err
}
}
func newRetrySender(config RetrySettings, set exporter.CreateSettings) *retrySender {
return &retrySender{
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
cfg: config,
stopCh: make(chan struct{}),
logger: set.Logger,
onTemporaryFailure: onTemporaryFailure,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
cfg: config,
stopCh: make(chan struct{}),
logger: set.Logger,
}
}

Expand Down Expand Up @@ -126,6 +117,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
trace.WithAttributes(rs.traceAttribute, attribute.Int64("retry_num", retryNum)))

err := rs.nextSender.send(ctx, req)
rs.logger.Info("Exporting finished.", zap.Error(err))
if err == nil {
return nil
}
Expand All @@ -148,9 +140,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired %w", err)
return rs.onTemporaryFailure(ctx, req, err, rs.logger)
return fmt.Errorf("max elapsed time expired %w", err)
}

throttleErr := throttleRetry{}
Expand Down Expand Up @@ -178,7 +168,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-rs.stopCh:
return rs.onTemporaryFailure(ctx, req, fmt.Errorf("interrupted due to shutdown %w", err), rs.logger)
return fmt.Errorf("interrupted due to shutdown %w", err)
case <-time.After(backoffDelay):
}
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
ocs.checkDroppedItemsCount(t, 2)
}

func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
func TestQueuedRetry_DropOnNoReenqueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
Expand Down

0 comments on commit d84c40a

Please sign in to comment.