From f900e0fa368316f86fe6bf2c604ff1dd13b89d38 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Mon, 13 Jul 2020 09:21:39 -0400 Subject: [PATCH] feat: force flush and shutdown callback for span exporters (#1296) --- .../src/MultiSpanProcessor.ts | 20 +++++-- .../src/NoopSpanProcessor.ts | 8 ++- .../src/SpanProcessor.ts | 4 +- .../src/export/BatchSpanProcessor.ts | 19 ++++--- .../src/export/SimpleSpanProcessor.ts | 7 ++- .../test/MultiSpanProcessor.test.ts | 32 ++++++++++- .../test/export/BatchSpanProcessor.test.ts | 56 ++++++++++++++++++- .../test/export/SimpleSpanProcessor.test.ts | 16 ++++++ 8 files changed, 143 insertions(+), 19 deletions(-) diff --git a/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts b/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts index ccd96d51d7..3f717f52fa 100644 --- a/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts @@ -24,9 +24,15 @@ import { ReadableSpan } from './export/ReadableSpan'; export class MultiSpanProcessor implements SpanProcessor { constructor(private readonly _spanProcessors: SpanProcessor[]) {} - forceFlush(): void { + forceFlush(cb: () => void = () => {}): void { + let finished = 0; + const total = this._spanProcessors.length; for (const spanProcessor of this._spanProcessors) { - spanProcessor.forceFlush(); + spanProcessor.forceFlush(() => { + if (++finished === total) { + cb(); + } + }); } } @@ -42,9 +48,15 @@ export class MultiSpanProcessor implements SpanProcessor { } } - shutdown(): void { + shutdown(cb: () => void = () => {}): void { + let finished = 0; + const total = this._spanProcessors.length; for (const spanProcessor of this._spanProcessors) { - spanProcessor.shutdown(); + spanProcessor.shutdown(() => { + if (++finished === total) { + cb(); + } + }); } } } diff --git a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts index d9291c0436..66f074f3a9 100644 --- a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts @@ -21,6 +21,10 @@ import { ReadableSpan } from './export/ReadableSpan'; export class NoopSpanProcessor implements SpanProcessor { onStart(span: ReadableSpan): void {} onEnd(span: ReadableSpan): void {} - shutdown(): void {} - forceFlush(): void {} + shutdown(cb: () => unknown = () => {}): void { + setTimeout(cb, 0); + } + forceFlush(cb: () => unknown = () => {}): void { + setTimeout(cb, 0); + } } diff --git a/packages/opentelemetry-tracing/src/SpanProcessor.ts b/packages/opentelemetry-tracing/src/SpanProcessor.ts index ad415fec83..83cc53c0ab 100644 --- a/packages/opentelemetry-tracing/src/SpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/SpanProcessor.ts @@ -24,7 +24,7 @@ export interface SpanProcessor { /** * Forces to export all finished spans */ - forceFlush(): void; + forceFlush(callback: () => void): void; /** * Called when a {@link ReadableSpan} is started, if the `span.isRecording()` @@ -44,5 +44,5 @@ export interface SpanProcessor { * Shuts down the processor. Called when SDK is shut down. This is an * opportunity for processor to do any cleanup required. */ - shutdown(): void; + shutdown(callback: () => void): void; } diff --git a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts index 9076dfdf21..a81e747ac8 100644 --- a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts @@ -44,11 +44,12 @@ export class BatchSpanProcessor implements SpanProcessor { : DEFAULT_BUFFER_TIMEOUT_MS; } - forceFlush(): void { + forceFlush(cb: () => void = () => {}): void { if (this._isShutdown) { + setTimeout(cb, 0); return; } - this._flush(); + this._flush(cb); } // does nothing. @@ -61,11 +62,12 @@ export class BatchSpanProcessor implements SpanProcessor { this._addToBuffer(span); } - shutdown(): void { + shutdown(cb: () => void = () => {}): void { if (this._isShutdown) { + setTimeout(cb, 0); return; } - this.forceFlush(); + this.forceFlush(cb); this._isShutdown = true; this._exporter.shutdown(); } @@ -80,10 +82,13 @@ export class BatchSpanProcessor implements SpanProcessor { } /** Send the span data list to exporter */ - private _flush() { + private _flush(cb: () => void = () => {}) { this._clearTimer(); - if (this._finishedSpans.length === 0) return; - this._exporter.export(this._finishedSpans, () => {}); + if (this._finishedSpans.length === 0) { + setTimeout(cb, 0); + return; + } + this._exporter.export(this._finishedSpans, cb); this._finishedSpans = []; } diff --git a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts index eb35a378f6..294b61777a 100644 --- a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts @@ -28,8 +28,9 @@ export class SimpleSpanProcessor implements SpanProcessor { constructor(private readonly _exporter: SpanExporter) {} private _isShutdown = false; - forceFlush(): void { + forceFlush(cb: () => void = () => {}): void { // do nothing as all spans are being exported without waiting + setTimeout(cb, 0); } // does nothing. @@ -42,12 +43,14 @@ export class SimpleSpanProcessor implements SpanProcessor { this._exporter.export([span], () => {}); } - shutdown(): void { + shutdown(cb: () => void = () => {}): void { if (this._isShutdown) { + setTimeout(cb, 0); return; } this._isShutdown = true; this._exporter.shutdown(); + setTimeout(cb, 0); } } diff --git a/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts index ea0a6b8e3e..c4e4aa35fd 100644 --- a/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts @@ -15,8 +15,15 @@ */ import * as assert from 'assert'; +import * as Sinon from 'sinon'; +import { + BasicTracerProvider, + InMemorySpanExporter, + SimpleSpanProcessor, + Span, + SpanProcessor, +} from '../src'; import { MultiSpanProcessor } from '../src/MultiSpanProcessor'; -import { SpanProcessor, Span, BasicTracerProvider } from '../src'; class TestProcessor implements SpanProcessor { spans: Span[] = []; @@ -91,4 +98,27 @@ describe('MultiSpanProcessor', () => { multiSpanProcessor.forceFlush(); assert.ok(flushed); }); + + it('should wait for all span processors to finish flushing', done => { + let flushed = 0; + const processor1 = new SimpleSpanProcessor(new InMemorySpanExporter()); + const processor2 = new SimpleSpanProcessor(new InMemorySpanExporter()); + + const spy1 = Sinon.stub(processor1, 'forceFlush').callsFake(cb => { + flushed++; + cb!(); + }); + const spy2 = Sinon.stub(processor2, 'forceFlush').callsFake(cb => { + flushed++; + cb!(); + }); + + const multiSpanProcessor = new MultiSpanProcessor([processor1, processor2]); + multiSpanProcessor.forceFlush(() => { + Sinon.assert.calledOnce(spy1); + Sinon.assert.calledOnce(spy2); + assert.strictEqual(flushed, 2); + done(); + }); + }); }); diff --git a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts index 7af2c55a51..73ac0a0053 100644 --- a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { AlwaysOnSampler } from '@opentelemetry/core'; +import { AlwaysOnSampler, ExportResult } from '@opentelemetry/core'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { @@ -174,4 +174,58 @@ describe('BatchSpanProcessor', () => { clock.restore(); }); }); + + describe('force flush', () => { + describe('no waiting spans', () => { + it('should call an async callback when flushing is complete', done => { + const processor = new BatchSpanProcessor(exporter); + processor.forceFlush(() => { + done(); + }); + }); + + it('should call an async callback when shutdown is complete', done => { + const processor = new BatchSpanProcessor(exporter); + processor.shutdown(() => { + done(); + }); + }); + }); + + describe('spans waiting to flush', () => { + let processor: BatchSpanProcessor; + + beforeEach(() => { + processor = new BatchSpanProcessor(exporter); + const span = createSampledSpan('test'); + processor.onStart(span); + processor.onEnd(span); + + assert.strictEqual(processor['_finishedSpans'].length, 1); + }); + + it('should call an async callback when flushing is complete', done => { + processor.forceFlush(() => { + assert.strictEqual(exporter.getFinishedSpans().length, 1); + done(); + }); + }); + + it('should call an async callback when shutdown is complete', done => { + let exportedSpans = 0; + sinon.stub(exporter, 'export').callsFake((spans, callback) => { + console.log('uh, export?'); + setTimeout(() => { + exportedSpans = exportedSpans + spans.length; + callback(ExportResult.SUCCESS); + }, 0); + }); + + processor.shutdown(() => { + assert.strictEqual(exportedSpans, 1); + done(); + }); + }); + }); + }); }); diff --git a/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts index 60f5cf0e8b..8b67013153 100644 --- a/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts @@ -80,5 +80,21 @@ describe('SimpleSpanProcessor', () => { processor.shutdown(); assert.strictEqual(exporter.getFinishedSpans().length, 0); }); + + describe('force flush', () => { + it('should call an async callback when flushing is complete', done => { + const processor = new SimpleSpanProcessor(exporter); + processor.forceFlush(() => { + done(); + }); + }); + + it('should call an async callback when shutdown is complete', done => { + const processor = new SimpleSpanProcessor(exporter); + processor.shutdown(() => { + done(); + }); + }); + }); }); });