diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 0499db15c8f..841942daa00 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -22,6 +22,7 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) // Defaults for BatchSpanProcessorOptions. @@ -158,16 +159,16 @@ type forceFlushSpan struct { flushed chan struct{} } +func (f forceFlushSpan) SpanContext() trace.SpanContext { + return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) +} + // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { flushCh := make(chan struct{}) - select { - case bsp.queue <- forceFlushSpan{flushed: flushCh}: - case <-ctx.Done(): - return ctx.Err() - } + bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) select { case <-flushCh: @@ -319,6 +320,10 @@ func (bsp *batchSpanProcessor) drainQueue() { } func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { + bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull) +} + +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) { if !sd.SpanContext().IsSampled() { return } @@ -344,9 +349,13 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { default: } - if bsp.o.BlockOnQueueFull { - bsp.queue <- sd - return + if block { + select { + case bsp.queue <- sd: + return + case <-ctx.Done(): + return + } } select {