Skip to content

Commit

Permalink
Be more defensive on waiting for queue.
Browse files Browse the repository at this point in the history
Update the handling of the force flush span so we only wait on the
channel if we were able to enqueue the span to the queue.
  • Loading branch information
pkwarren committed Oct 30, 2021
1 parent cb23081 commit bf24f4c
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -168,13 +168,13 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true)

select {
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) {
select {
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}
}

wait := make(chan error)
Expand Down Expand Up @@ -323,9 +323,9 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull)
}

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) {
func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool {
if !sd.SpanContext().IsSampled() {
return
return false
}

// This ensures the bsp.queue<- below does not panic as the
Expand All @@ -345,22 +345,24 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R

select {
case <-bsp.stopCh:
return
return false
default:
}

if block {
select {
case bsp.queue <- sd:
return
return true
case <-ctx.Done():
return
return false
}
}

select {
case bsp.queue <- sd:
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
}
return false
}

0 comments on commit bf24f4c

Please sign in to comment.