Skip to content

Commit

Permalink
drop failed to exporter batches and return error when forcing flush a…
Browse files Browse the repository at this point in the history
… span processor (#1860)

* drop failed to exporter batches and return error when forcing flush a span processor

* changelog

* changelog

* change should export condition

* cleanup
  • Loading branch information
paivagustavo committed Apr 29, 2021
1 parent f6a9279 commit e399d35
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Make `NewSplitDriver` from `go.opentelemetry.io/otel/exporters/otlp` take variadic arguments instead of a `SplitConfig` item.
`NewSplitDriver` now automatically implements an internal `noopDriver` for `SplitConfig` fields that are not initialized. (#1798)
- BatchSpanProcessor now report export failures when calling `ForceFlush()` method. (#1860)

### Deprecated

Expand All @@ -23,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Only report errors from the `"go.opentelemetry.io/otel/sdk/resource".Environment` function when they are not `nil`. (#1850, #1851)
- The `Shutdown` method of the simple `SpanProcessor` in the `go.opentelemetry.io/otel/sdk/trace` package now honors the context deadline or cancellation. (#1616, #1856)
- BatchSpanProcessor now drops span batches that failed to be exported. (#1860)

### Security

Expand Down
23 changes: 14 additions & 9 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -156,16 +156,14 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
wait := make(chan struct{})
wait := make(chan error)
go func() {
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
wait <- bsp.exportSpans(ctx)
close(wait)
}()
// Wait until the export is finished or the context is cancelled/timed out
select {
case <-wait:
case err = <-wait:
case <-ctx.Done():
err = ctx.Err()
}
Expand Down Expand Up @@ -216,11 +214,18 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
defer cancel()
}

if len(bsp.batch) > 0 {
if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil {
if l := len(bsp.batch); l > 0 {
err := bsp.e.ExportSpans(ctx, bsp.batch)

// A new batch is always created after exporting, even if the batch failed to be exported.
//
// It is up to the exporter to implement any type of retry logic if a batch is failing
// to be exported, since it is specific to the protocol and backend being sent to.
bsp.batch = bsp.batch[:0]

if err != nil {
return err
}
bsp.batch = bsp.batch[:0]
}
return nil
}
Expand All @@ -244,7 +249,7 @@ func (bsp *batchSpanProcessor) processQueue() {
case sd := <-bsp.queue:
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
if shouldExport {
if !bsp.timer.Stop() {
Expand Down
77 changes: 71 additions & 6 deletions sdk/trace/batch_span_processor_test.go
Expand Up @@ -37,13 +37,23 @@ type testBatchExporter struct {
batchCount int
shutdownCount int
delay time.Duration
errors []error
droppedCount int
idx int
err error
}

func (t *testBatchExporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot) error {
t.mu.Lock()
defer t.mu.Unlock()

if t.idx < len(t.errors) {
t.droppedCount += len(ss)
err := t.errors[t.idx]
t.idx++
return err
}

time.Sleep(t.delay)

select {
Expand Down Expand Up @@ -338,12 +348,8 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
// Force flush any held span batches
err := ssp.ForceFlush(context.Background())

gotNumOfSpans := te.len()
spanDifference := option.wantNumSpans - gotNumOfSpans
if spanDifference > 10 || spanDifference < 0 {
t.Errorf("number of exported span not equal to or within 10 less than: got %+v, want %+v\n",
gotNumOfSpans, option.wantNumSpans)
}
assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10)

gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
Expand All @@ -353,6 +359,65 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
assert.NoError(t, err)
}

func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) {
te := testBatchExporter{
errors: []error{errors.New("fail to export")},
}
tp := basicTracerProvider(t)
option := testOption{
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(2000),
},
wantNumSpans: 1000,
wantBatchCount: 1,
genNumSpans: 1000,
}
ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOption")
generateSpan(t, option.parallel, tr, option)

// Force flush any held span batches
err := ssp.ForceFlush(context.Background())
assert.Error(t, err)
assert.EqualError(t, err, "fail to export")

// First flush will fail, nothing should be exported.
assertMaxSpanDiff(t, te.droppedCount, option.wantNumSpans, 10)
assert.Equal(t, 0, te.len())
assert.Equal(t, 0, te.getBatchCount())

// Generate a new batch, this will succeed
generateSpan(t, option.parallel, tr, option)

// Force flush any held span batches
err = ssp.ForceFlush(context.Background())
assert.NoError(t, err)

assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10)
gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
}

func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) {
spanDifference := want - got
if spanDifference < 0 {
spanDifference = spanDifference * -1
}
if spanDifference > maxDif {
t.Errorf("number of exported span not equal to or within %d less than: got %+v, want %+v\n",
maxDif, got, want)
}
}

func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
Expand Down

0 comments on commit e399d35

Please sign in to comment.