Skip to content

Commit

Permalink
Refactor code to use enqueue.
Browse files Browse the repository at this point in the history
  • Loading branch information
pkwarren committed Oct 29, 2021
1 parent 0180f37 commit cb23081
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

// Defaults for BatchSpanProcessorOptions.
Expand Down Expand Up @@ -158,16 +159,16 @@ type forceFlushSpan struct {
flushed chan struct{}
}

func (f forceFlushSpan) SpanContext() trace.SpanContext {
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
select {
case bsp.queue <- forceFlushSpan{flushed: flushCh}:
case <-ctx.Done():
return ctx.Err()
}
bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true)

select {
case <-flushCh:
Expand Down Expand Up @@ -319,6 +320,10 @@ func (bsp *batchSpanProcessor) drainQueue() {
}

func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull)
}

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) {
if !sd.SpanContext().IsSampled() {
return
}
Expand All @@ -344,9 +349,13 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
default:
}

if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
return
if block {
select {
case bsp.queue <- sd:
return
case <-ctx.Done():
return
}
}

select {
Expand Down

0 comments on commit cb23081

Please sign in to comment.