Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] [exporterhelper] Remove retry sender -> queue sender callback #8985

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func WithRetry(config RetrySettings) Option {
}
return
}
o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure)
o.retrySender = newRetrySender(config, o.set)
}
}

Expand All @@ -110,9 +110,7 @@ func WithQueue(config QueueSettings) Option {
}
return
}
qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
}
}

Expand Down Expand Up @@ -146,9 +144,6 @@ type baseExporter struct {
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc

consumerOptions []consumer.Option
}

Expand Down Expand Up @@ -181,6 +176,15 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}
be.connectSenders()

// If retry sender is disabled then disable requeuing in the queue sender.
// TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender.
if qs, ok := be.queueSender.(*queueSender); ok {
// if it's not retrySender, then it is disabled.
if _, ok = be.retrySender.(*retrySender); !ok {
qs.requeuingEnabled = false
}
}

Comment on lines +179 to +187
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that this was intentional or a bug of the previous implementation. Happy to keep it as it is now, but my bet is that it was just a bug in the previous implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it's a bug, I believe some users may rely on this behavior. Fixing it right away by removing the dependency would be a breaking change for those who don't want re-enqueueing in the persistent queue. I submitted an issue to introduce an explicit option to control it #8987. Will handle it next

return be, nil
}

Expand Down Expand Up @@ -215,10 +219,3 @@ func (be *baseExporter) Shutdown(ctx context.Context) error {
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}
27 changes: 16 additions & 11 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig"
Expand Down Expand Up @@ -114,37 +115,41 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
}
}

func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err error, logger *zap.Logger) error {
// consume is the function that is executed by the queue consumers to send the data to the next consumerSender.
func (qs *queueSender) consume(ctx context.Context, req Request) {
err := qs.nextSender.send(ctx, req)

// Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender.
if err == nil || consumererror.IsPermanent(err) {
return
}

if !qs.requeuingEnabled {
logger.Error(
qs.logger.Error(
"Exporting failed. No more retries left. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
return err
return
}

if qs.queue.Offer(ctx, req) == nil {
logger.Error(
qs.logger.Error(
"Exporting failed. Putting back to the end of the queue.",
zap.Error(err),
)
} else {
logger.Error(
qs.logger.Error(
"Exporting failed. Queue did not accept requeuing request. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
}
return err
}

// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, func(ctx context.Context, req Request) {
// TODO: Update item.OnProcessingFinished to accept error and remove the retry->queue sender callback.
_ = qs.nextSender.send(ctx, req)
})
qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, qs.consume)
if err := qs.consumers.Start(ctx, host); err != nil {
return err
}
Expand Down Expand Up @@ -214,7 +219,7 @@ func (qs *queueSender) Shutdown(ctx context.Context) error {
return qs.consumers.Shutdown(ctx)
}

// send implements the requestSender interface
// send implements the requestSender interface. It puts the request in the queue.
func (qs *queueSender) send(ctx context.Context, req Request) error {
// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
Expand Down
33 changes: 33 additions & 0 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,39 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
ocs.checkDroppedItemsCount(t, 1) // not actually dropped, but ocs counts each failed send here
}

// disabling retry sender should disable requeuing.
func TestQueuedRetry_RequeuingDisabled(t *testing.T) {
mockR := newMockRequest(2, errors.New("transient error"))

// use persistent storage as it expected to be used with requeuing unless the retry sender is disabled
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false

be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)

var extensions = map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(nil),
}
host := &mockHost{ext: extensions}
require.NoError(t, be.Start(context.Background(), host))

ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.send(context.Background(), mockR))
})
ocs.awaitAsyncProcessing()

// one failed request, no retries, two items dropped.
mockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
}

// if requeueing is enabled, but the queue is full, we get an error
func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
Expand Down
34 changes: 12 additions & 22 deletions exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,20 @@ func NewThrottleRetry(err error, delay time.Duration) error {
}
}

type onRequestHandlingFinishedFunc func(context.Context, Request, error, *zap.Logger) error

type retrySender struct {
baseRequestSender
traceAttribute attribute.KeyValue
cfg RetrySettings
stopCh chan struct{}
logger *zap.Logger
onTemporaryFailure onRequestHandlingFinishedFunc
traceAttribute attribute.KeyValue
cfg RetrySettings
stopCh chan struct{}
logger *zap.Logger
}

func newRetrySender(config RetrySettings, set exporter.CreateSettings, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender {
if onTemporaryFailure == nil {
onTemporaryFailure = func(_ context.Context, _ Request, err error, _ *zap.Logger) error {
return err
}
}
func newRetrySender(config RetrySettings, set exporter.CreateSettings) *retrySender {
return &retrySender{
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
cfg: config,
stopCh: make(chan struct{}),
logger: set.Logger,
onTemporaryFailure: onTemporaryFailure,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
cfg: config,
stopCh: make(chan struct{}),
logger: set.Logger,
}
}

Expand Down Expand Up @@ -126,6 +117,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
trace.WithAttributes(rs.traceAttribute, attribute.Int64("retry_num", retryNum)))

err := rs.nextSender.send(ctx, req)
rs.logger.Info("Exporting finished.", zap.Error(err))
if err == nil {
return nil
}
Expand All @@ -148,9 +140,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired %w", err)
return rs.onTemporaryFailure(ctx, req, err, rs.logger)
return fmt.Errorf("max elapsed time expired %w", err)
}

throttleErr := throttleRetry{}
Expand Down Expand Up @@ -178,7 +168,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-rs.stopCh:
return rs.onTemporaryFailure(ctx, req, fmt.Errorf("interrupted due to shutdown %w", err), rs.logger)
return fmt.Errorf("interrupted due to shutdown %w", err)
case <-time.After(backoffDelay):
}
}
Expand Down
Loading