diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index a129d9bbd92..69fb3771d85 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -91,7 +91,7 @@ func WithRetry(config RetrySettings) Option { } return } - o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure) + o.retrySender = newRetrySender(config, o.set) } } @@ -110,9 +110,7 @@ func WithQueue(config QueueSettings) Option { } return } - qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler) - o.queueSender = qs - o.setOnTemporaryFailure(qs.onTemporaryFailure) + o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler) } } @@ -146,9 +144,6 @@ type baseExporter struct { retrySender requestSender timeoutSender *timeoutSender // timeoutSender is always initialized. - // onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer. - onTemporaryFailure onRequestHandlingFinishedFunc - consumerOptions []consumer.Option } @@ -181,6 +176,15 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req } be.connectSenders() + // If retry sender is disabled then disable requeuing in the queue sender. + // TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender. + if qs, ok := be.queueSender.(*queueSender); ok { + // if it's not retrySender, then it is disabled. + if _, ok = be.retrySender.(*retrySender); !ok { + qs.requeuingEnabled = false + } + } + return be, nil } @@ -215,10 +219,3 @@ func (be *baseExporter) Shutdown(ctx context.Context) error { // Last shutdown the wrapped exporter itself. be.ShutdownFunc.Shutdown(ctx)) } - -func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) { - be.onTemporaryFailure = onTemporaryFailure - if rs, ok := be.retrySender.(*retrySender); ok { - rs.onTemporaryFailure = onTemporaryFailure - } -} diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index d629af4c438..41e885af0cf 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/internal/obsreportconfig" @@ -114,37 +115,41 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co } } -func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err error, logger *zap.Logger) error { +// consume is the function that is executed by the queue consumers to send the data to the next consumerSender. +func (qs *queueSender) consume(ctx context.Context, req Request) { + err := qs.nextSender.send(ctx, req) + + // Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender. + if err == nil || consumererror.IsPermanent(err) { + return + } + if !qs.requeuingEnabled { - logger.Error( + qs.logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), zap.Int("dropped_items", req.ItemsCount()), ) - return err + return } if qs.queue.Offer(ctx, req) == nil { - logger.Error( + qs.logger.Error( "Exporting failed. Putting back to the end of the queue.", zap.Error(err), ) } else { - logger.Error( + qs.logger.Error( "Exporting failed. Queue did not accept requeuing request. Dropping data.", zap.Error(err), zap.Int("dropped_items", req.ItemsCount()), ) } - return err } // Start is invoked during service startup. func (qs *queueSender) Start(ctx context.Context, host component.Host) error { - qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, func(ctx context.Context, req Request) { - // TODO: Update item.OnProcessingFinished to accept error and remove the retry->queue sender callback. - _ = qs.nextSender.send(ctx, req) - }) + qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, qs.consume) if err := qs.consumers.Start(ctx, host); err != nil { return err } @@ -214,7 +219,7 @@ func (qs *queueSender) Shutdown(ctx context.Context) error { return qs.consumers.Shutdown(ctx) } -// send implements the requestSender interface +// send implements the requestSender interface. It puts the request in the queue. func (qs *queueSender) send(ctx context.Context, req Request) error { // Prevent cancellation and deadline to propagate to the context stored in the queue. // The grpc/http based receivers will cancel the request context after this function returns. diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 38049ed1707..7893b427019 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -255,6 +255,39 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { ocs.checkDroppedItemsCount(t, 1) // not actually dropped, but ocs counts each failed send here } +// disabling retry sender should disable requeuing. +func TestQueuedRetry_RequeuingDisabled(t *testing.T) { + mockR := newMockRequest(2, errors.New("transient error")) + + // use persistent storage as it expected to be used with requeuing unless the retry sender is disabled + qCfg := NewDefaultQueueSettings() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := NewDefaultRetrySettings() + rCfg.Enabled = false + + be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.obsrepSender.(*observabilityConsumerSender) + + var extensions = map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(nil), + } + host := &mockHost{ext: extensions} + require.NoError(t, be.Start(context.Background(), host)) + + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // one failed request, no retries, two items dropped. + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) +} + // if requeueing is enabled, but the queue is full, we get an error func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg := NewDefaultQueueSettings() diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index f74aecc6bfb..3cb3f775a91 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -73,29 +73,20 @@ func NewThrottleRetry(err error, delay time.Duration) error { } } -type onRequestHandlingFinishedFunc func(context.Context, Request, error, *zap.Logger) error - type retrySender struct { baseRequestSender - traceAttribute attribute.KeyValue - cfg RetrySettings - stopCh chan struct{} - logger *zap.Logger - onTemporaryFailure onRequestHandlingFinishedFunc + traceAttribute attribute.KeyValue + cfg RetrySettings + stopCh chan struct{} + logger *zap.Logger } -func newRetrySender(config RetrySettings, set exporter.CreateSettings, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { - if onTemporaryFailure == nil { - onTemporaryFailure = func(_ context.Context, _ Request, err error, _ *zap.Logger) error { - return err - } - } +func newRetrySender(config RetrySettings, set exporter.CreateSettings) *retrySender { return &retrySender{ - traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), - cfg: config, - stopCh: make(chan struct{}), - logger: set.Logger, - onTemporaryFailure: onTemporaryFailure, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + cfg: config, + stopCh: make(chan struct{}), + logger: set.Logger, } } @@ -126,6 +117,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { trace.WithAttributes(rs.traceAttribute, attribute.Int64("retry_num", retryNum))) err := rs.nextSender.send(ctx, req) + rs.logger.Info("Exporting finished.", zap.Error(err)) if err == nil { return nil } @@ -148,9 +140,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { backoffDelay := expBackoff.NextBackOff() if backoffDelay == backoff.Stop { - // throw away the batch - err = fmt.Errorf("max elapsed time expired %w", err) - return rs.onTemporaryFailure(ctx, req, err, rs.logger) + return fmt.Errorf("max elapsed time expired %w", err) } throttleErr := throttleRetry{} @@ -178,7 +168,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { case <-ctx.Done(): return fmt.Errorf("request is cancelled or timed out %w", err) case <-rs.stopCh: - return rs.onTemporaryFailure(ctx, req, fmt.Errorf("interrupted due to shutdown %w", err), rs.logger) + return fmt.Errorf("interrupted due to shutdown %w", err) case <-time.After(backoffDelay): } }