diff --git a/.chloggen/introduce-reenque-option.yaml b/.chloggen/introduce-reenque-option.yaml new file mode 100755 index 00000000000..53667768969 --- /dev/null +++ b/.chloggen/introduce-reenque-option.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make the re-enqueue behavior configurable. + +# One or more tracking issues or pull requests related to the change +issues: [8122] + +subtext: | + Instead of relying or enabled `retry_on_failure` option, we now have a new option + to control the re-enqueue independently of the retry sender. This can be useful + for users who don't want the blocking exponential retry, just want to put the + failed request in beginning of the queue. Also this option can be enabled with + memory queue, which means that the data will never be dropped after getting + to the queue as long as the collector is up and running. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index a129d9bbd92..0d46945ce6f 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -91,7 +91,7 @@ func WithRetry(config RetrySettings) Option { } return } - o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure) + o.retrySender = newRetrySender(config, o.set) } } @@ -110,9 +110,7 @@ func WithQueue(config QueueSettings) Option { } return } - qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler) - o.queueSender = qs - o.setOnTemporaryFailure(qs.onTemporaryFailure) + o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler) } } @@ -146,9 +144,6 @@ type baseExporter struct { retrySender requestSender timeoutSender *timeoutSender // timeoutSender is always initialized. - // onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer. - onTemporaryFailure onRequestHandlingFinishedFunc - consumerOptions []consumer.Option } @@ -215,10 +210,3 @@ func (be *baseExporter) Shutdown(ctx context.Context) error { // Last shutdown the wrapped exporter itself. be.ShutdownFunc.Shutdown(ctx)) } - -func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) { - be.onTemporaryFailure = onTemporaryFailure - if rs, ok := be.retrySender.(*retrySender); ok { - rs.onTemporaryFailure = onTemporaryFailure - } -} diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index 97d5a100727..78e133bd97b 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -278,10 +278,10 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool) // Delete the item from the persistent storage after it was processed. pq.mu.Lock() defer pq.mu.Unlock() - if err = pq.itemDispatchingFinish(ctx, index); err != nil { + if err = pq.itemDispatchingFinish(context.Background(), index); err != nil { pq.set.Logger.Error("Error deleting item from queue", zap.Error(err)) } - if err = pq.unrefClient(ctx); err != nil { + if err = pq.unrefClient(context.Background()); err != nil { pq.set.Logger.Error("Error closing the storage client", zap.Error(err)) } }, true diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index d629af4c438..f26b0d72d04 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/internal/obsreportconfig" @@ -41,6 +42,9 @@ type QueueSettings struct { // StorageID if not empty, enables the persistent storage and uses the component specified // as a storage extension for the persistent queue StorageID *component.ID `mapstructure:"storage"` + // ReenqueueOnFailure indicates whether to re-enqueue items on send failure. If false, items will be dropped after + // failed send. If true, items will be re-enqueued and retried after the current queue is drained. + ReenqueueOnFailure bool `mapstructure:"reenqueue_on_failure"` } // NewDefaultQueueSettings returns the default settings for QueueSettings. @@ -101,50 +105,53 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize) } return &queueSender{ - fullName: set.ID.String(), - signal: signal, - queue: queue, - traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), - logger: set.TelemetrySettings.Logger, - meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), - numConsumers: config.NumConsumers, - stopWG: sync.WaitGroup{}, - // TODO: this can be further exposed as a config param rather than relying on a type of queue - requeuingEnabled: isPersistent, + fullName: set.ID.String(), + signal: signal, + queue: queue, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + logger: set.TelemetrySettings.Logger, + meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), + numConsumers: config.NumConsumers, + stopWG: sync.WaitGroup{}, + requeuingEnabled: config.ReenqueueOnFailure, } } -func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err error, logger *zap.Logger) error { +// consume is the function that is executed by the queue consumers to send the data to the next consumerSender. +func (qs *queueSender) consume(ctx context.Context, req Request) { + err := qs.nextSender.send(ctx, req) + + // Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender. + if err == nil || consumererror.IsPermanent(err) { + return + } + if !qs.requeuingEnabled { - logger.Error( + qs.logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), zap.Int("dropped_items", req.ItemsCount()), ) - return err + return } if qs.queue.Offer(ctx, req) == nil { - logger.Error( + qs.logger.Error( "Exporting failed. Putting back to the end of the queue.", zap.Error(err), ) } else { - logger.Error( + qs.logger.Error( "Exporting failed. Queue did not accept requeuing request. Dropping data.", zap.Error(err), zap.Int("dropped_items", req.ItemsCount()), ) } - return err } // Start is invoked during service startup. func (qs *queueSender) Start(ctx context.Context, host component.Host) error { - qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, func(ctx context.Context, req Request) { - // TODO: Update item.OnProcessingFinished to accept error and remove the retry->queue sender callback. - _ = qs.nextSender.send(ctx, req) - }) + qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, qs.consume) if err := qs.consumers.Start(ctx, host); err != nil { return err } @@ -214,7 +221,7 @@ func (qs *queueSender) Shutdown(ctx context.Context) error { return qs.consumers.Shutdown(ctx) } -// send implements the requestSender interface +// send implements the requestSender interface. It enqueues the request to be sent by the queue consumers. func (qs *queueSender) send(ctx context.Context, req Request) error { // Prevent cancellation and deadline to propagate to the context stored in the queue. // The grpc/http based receivers will cancel the request context after this function returns. diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 97d8220a2d4..0ab34e79647 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -229,13 +229,13 @@ func TestQueueSettings_Validate(t *testing.T) { // if requeueing is enabled, we eventually retry even if we failed at first func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg := NewDefaultQueueSettings() + qCfg.ReenqueueOnFailure = true qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) - be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -261,6 +261,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 qCfg.QueueSize = 1 + qCfg.ReenqueueOnFailure = true rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead @@ -270,7 +271,6 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index f74aecc6bfb..3cb3f775a91 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -73,29 +73,20 @@ func NewThrottleRetry(err error, delay time.Duration) error { } } -type onRequestHandlingFinishedFunc func(context.Context, Request, error, *zap.Logger) error - type retrySender struct { baseRequestSender - traceAttribute attribute.KeyValue - cfg RetrySettings - stopCh chan struct{} - logger *zap.Logger - onTemporaryFailure onRequestHandlingFinishedFunc + traceAttribute attribute.KeyValue + cfg RetrySettings + stopCh chan struct{} + logger *zap.Logger } -func newRetrySender(config RetrySettings, set exporter.CreateSettings, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { - if onTemporaryFailure == nil { - onTemporaryFailure = func(_ context.Context, _ Request, err error, _ *zap.Logger) error { - return err - } - } +func newRetrySender(config RetrySettings, set exporter.CreateSettings) *retrySender { return &retrySender{ - traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), - cfg: config, - stopCh: make(chan struct{}), - logger: set.Logger, - onTemporaryFailure: onTemporaryFailure, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + cfg: config, + stopCh: make(chan struct{}), + logger: set.Logger, } } @@ -126,6 +117,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { trace.WithAttributes(rs.traceAttribute, attribute.Int64("retry_num", retryNum))) err := rs.nextSender.send(ctx, req) + rs.logger.Info("Exporting finished.", zap.Error(err)) if err == nil { return nil } @@ -148,9 +140,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { backoffDelay := expBackoff.NextBackOff() if backoffDelay == backoff.Stop { - // throw away the batch - err = fmt.Errorf("max elapsed time expired %w", err) - return rs.onTemporaryFailure(ctx, req, err, rs.logger) + return fmt.Errorf("max elapsed time expired %w", err) } throttleErr := throttleRetry{} @@ -178,7 +168,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { case <-ctx.Done(): return fmt.Errorf("request is cancelled or timed out %w", err) case <-rs.stopCh: - return rs.onTemporaryFailure(ctx, req, fmt.Errorf("interrupted due to shutdown %w", err), rs.logger) + return fmt.Errorf("interrupted due to shutdown %w", err) case <-time.After(backoffDelay): } } diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 235a9f3ccbc..a2cf50a0eac 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -59,7 +59,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { ocs.checkDroppedItemsCount(t, 2) } -func TestQueuedRetry_DropOnNoRetry(t *testing.T) { +func TestQueuedRetry_DropOnNoReenqueue(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false