Skip to content

Commit

Permalink
Adding shutdown and startup debug logs for retry, queue, and batch se…
Browse files Browse the repository at this point in the history
…nder
  • Loading branch information
timannguyen committed May 22, 2024
1 parent a90fa50 commit 822f427
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
25 changes: 25 additions & 0 deletions .chloggen/exporterhelper-sender-start-shutdown-logging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: adding debug logs to retry, queue, and batch sender in exporterhelper

# One or more tracking issues or pull requests related to the change
issues: [10201]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
11 changes: 10 additions & 1 deletion exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/otel/attribute"
)

// batchSender is a component that places requests into batches before passing them to the downstream senders.
Expand All @@ -38,7 +40,8 @@ type batchSender struct {
mu sync.Mutex
activeBatch *batch

logger *zap.Logger
traceAttribute attribute.KeyValue
logger *zap.Logger

shutdownCh chan struct{}
stopped *atomic.Bool
Expand All @@ -49,6 +52,7 @@ func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings,
mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) *batchSender {
bs := &batchSender{
activeBatch: newEmptyBatch(),
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
cfg: cfg,
logger: set.Logger,
mergeFunc: mf,
Expand Down Expand Up @@ -91,6 +95,7 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
}
}()

bs.logger.Debug("Started Batch Sender", zap.String(obsmetrics.ExporterKey, bs.traceAttribute.Value.AsString()))
return nil
}

Expand Down Expand Up @@ -213,11 +218,15 @@ func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
}

func (bs *batchSender) Shutdown(context.Context) error {
bs.logger.Debug("Shutting down Batch Sender", zap.String(obsmetrics.ExporterKey, bs.traceAttribute.Value.AsString()))

bs.stopped.Store(true)
close(bs.shutdownCh)
// Wait for the active requests to finish.
for bs.activeRequests.Load() > 0 {
time.Sleep(10 * time.Millisecond)
}

bs.logger.Debug("Batch Sender has been shutdown", zap.String(obsmetrics.ExporterKey, bs.traceAttribute.Value.AsString()))
return nil
}
9 changes: 8 additions & 1 deletion exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.CreateSettings,

// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
qs.logger.Debug("Starting Queue Sender", zap.String(obsmetrics.ExporterKey, qs.fullName))

if err := qs.consumers.Start(ctx, host); err != nil {
return err
}
Expand Down Expand Up @@ -137,15 +139,20 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
return nil
}))

qs.logger.Debug("Started Queue Sender", zap.String(obsmetrics.ExporterKey, qs.fullName), zap.Error(err))
errs = multierr.Append(errs, err)
return errs
}

// Shutdown is invoked during service shutdown.
func (qs *queueSender) Shutdown(ctx context.Context) error {
qs.logger.Debug("Shutting Down Queue Sender", zap.String(obsmetrics.ExporterKey, qs.fullName))

// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
return qs.consumers.Shutdown(ctx)
err := qs.consumers.Shutdown(ctx)
qs.logger.Debug("Queue Sender has shut down", zap.String(obsmetrics.ExporterKey, qs.fullName), zap.Error(err))
return err
}

// send implements the requestSender interface. It puts the request in the queue.
Expand Down
1 change: 1 addition & 0 deletions exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func newRetrySender(config configretry.BackOffConfig, set exporter.CreateSetting

func (rs *retrySender) Shutdown(context.Context) error {
close(rs.stopCh)
rs.logger.Debug("Retry Sender has been shutdown", zap.String(obsmetrics.ExporterKey, rs.traceAttribute.Value.AsString()))
return nil
}

Expand Down

0 comments on commit 822f427

Please sign in to comment.