From 5f7d6209f64931119d12a61b3e58e0c0bf5f3c1c Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 9 Jul 2020 11:58:58 -0600 Subject: [PATCH 1/5] feat: force flush and shutdown callback for span exporters --- .../src/MultiSpanProcessor.ts | 20 +++++++-- .../src/NoopSpanProcessor.ts | 4 +- .../src/SpanProcessor.ts | 4 +- .../src/export/BatchSpanProcessor.ts | 18 +++++--- .../src/export/SimpleSpanProcessor.ts | 15 ++++--- .../test/MultiSpanProcessor.test.ts | 32 +++++++++++++- .../test/export/BatchSpanProcessor.test.ts | 44 +++++++++++++++++++ .../test/export/SimpleSpanProcessor.test.ts | 16 +++++++ 8 files changed, 131 insertions(+), 22 deletions(-) diff --git a/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts b/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts index ccd96d51d7..6684378c21 100644 --- a/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts @@ -24,9 +24,15 @@ import { ReadableSpan } from './export/ReadableSpan'; export class MultiSpanProcessor implements SpanProcessor { constructor(private readonly _spanProcessors: SpanProcessor[]) {} - forceFlush(): void { + forceFlush(cb: () => void = () => {}): void { + let finished = 0; + let total = this._spanProcessors.length; for (const spanProcessor of this._spanProcessors) { - spanProcessor.forceFlush(); + spanProcessor.forceFlush(() => { + if (++finished === total) { + cb(); + } + }); } } @@ -42,9 +48,15 @@ export class MultiSpanProcessor implements SpanProcessor { } } - shutdown(): void { + shutdown(cb: () => void = () => {}): void { + let finished = 0; + let total = this._spanProcessors.length; for (const spanProcessor of this._spanProcessors) { - spanProcessor.shutdown(); + spanProcessor.shutdown(() => { + if (++finished === total) { + cb(); + } + }); } } } diff --git a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts index d9291c0436..3f7561fa4f 100644 --- a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts @@ -21,6 +21,6 @@ import { ReadableSpan } from './export/ReadableSpan'; export class NoopSpanProcessor implements SpanProcessor { onStart(span: ReadableSpan): void {} onEnd(span: ReadableSpan): void {} - shutdown(): void {} - forceFlush(): void {} + shutdown(cb: () => unknown = () => {}): void { setImmediate(cb) } + forceFlush(cb: () => unknown = () => {}): void { setImmediate(cb) } } diff --git a/packages/opentelemetry-tracing/src/SpanProcessor.ts b/packages/opentelemetry-tracing/src/SpanProcessor.ts index ad415fec83..83cc53c0ab 100644 --- a/packages/opentelemetry-tracing/src/SpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/SpanProcessor.ts @@ -24,7 +24,7 @@ export interface SpanProcessor { /** * Forces to export all finished spans */ - forceFlush(): void; + forceFlush(callback: () => void): void; /** * Called when a {@link ReadableSpan} is started, if the `span.isRecording()` @@ -44,5 +44,5 @@ export interface SpanProcessor { * Shuts down the processor. Called when SDK is shut down. This is an * opportunity for processor to do any cleanup required. */ - shutdown(): void; + shutdown(callback: () => void): void; } diff --git a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts index 9076dfdf21..a36398bc3e 100644 --- a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts @@ -44,11 +44,12 @@ export class BatchSpanProcessor implements SpanProcessor { : DEFAULT_BUFFER_TIMEOUT_MS; } - forceFlush(): void { + forceFlush(cb: () => void = () => {}): void { if (this._isShutdown) { + setImmediate(cb); return; } - this._flush(); + this._flush(cb); } // does nothing. @@ -61,11 +62,11 @@ export class BatchSpanProcessor implements SpanProcessor { this._addToBuffer(span); } - shutdown(): void { + shutdown(cb: () => void = () => {}): void { if (this._isShutdown) { return; } - this.forceFlush(); + this.forceFlush(cb); this._isShutdown = true; this._exporter.shutdown(); } @@ -80,10 +81,13 @@ export class BatchSpanProcessor implements SpanProcessor { } /** Send the span data list to exporter */ - private _flush() { + private _flush(cb: () => void = () => {}) { this._clearTimer(); - if (this._finishedSpans.length === 0) return; - this._exporter.export(this._finishedSpans, () => {}); + if (this._finishedSpans.length === 0) { + setImmediate(cb); + return; + } + this._exporter.export(this._finishedSpans, cb); this._finishedSpans = []; } diff --git a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts index eb35a378f6..300c5ebf35 100644 --- a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts @@ -25,29 +25,32 @@ import { ReadableSpan } from './ReadableSpan'; * Only spans that are sampled are converted. */ export class SimpleSpanProcessor implements SpanProcessor { - constructor(private readonly _exporter: SpanExporter) {} + constructor(private readonly _exporter: SpanExporter) { } private _isShutdown = false; - forceFlush(): void { + forceFlush(cb: () => void = () => { }): void { // do nothing as all spans are being exported without waiting + setImmediate(cb); } // does nothing. - onStart(span: ReadableSpan): void {} + onStart(span: ReadableSpan): void { } onEnd(span: ReadableSpan): void { if (this._isShutdown) { return; } - this._exporter.export([span], () => {}); + this._exporter.export([span], () => { }); } - shutdown(): void { + shutdown(cb: () => void = () => { }): void { if (this._isShutdown) { + setImmediate(cb); return; } this._isShutdown = true; this._exporter.shutdown(); - } + setImmediate(cb); + } } diff --git a/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts index ea0a6b8e3e..e5a48b20b1 100644 --- a/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts @@ -16,7 +16,14 @@ import * as assert from 'assert'; import { MultiSpanProcessor } from '../src/MultiSpanProcessor'; -import { SpanProcessor, Span, BasicTracerProvider } from '../src'; +import { + SpanProcessor, + Span, + BasicTracerProvider, + SimpleSpanProcessor, + InMemorySpanExporter, +} from '../src'; +import Sinon = require('sinon'); class TestProcessor implements SpanProcessor { spans: Span[] = []; @@ -91,4 +98,27 @@ describe('MultiSpanProcessor', () => { multiSpanProcessor.forceFlush(); assert.ok(flushed); }); + + it('should wait for all span processors to finish flushing', done => { + let flushed = 0; + const processor1 = new SimpleSpanProcessor(new InMemorySpanExporter()); + const processor2 = new SimpleSpanProcessor(new InMemorySpanExporter()); + + const spy1 = Sinon.stub(processor1, 'forceFlush').callsFake(cb => { + flushed++; + cb!(); + }); + const spy2 = Sinon.stub(processor2, 'forceFlush').callsFake(cb => { + flushed++; + cb!(); + }); + + const multiSpanProcessor = new MultiSpanProcessor([processor1, processor2]); + multiSpanProcessor.forceFlush(() => { + Sinon.assert.calledOnce(spy1); + Sinon.assert.calledOnce(spy2); + assert.strictEqual(flushed, 2); + done(); + }); + }); }); diff --git a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts index 05ec6b95ff..49d82fa0d3 100644 --- a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts @@ -174,4 +174,48 @@ describe('BatchSpanProcessor', () => { clock.restore(); }); }); + + describe('force flush', () => { + describe('no waiting spans', () => { + it('should call an async callback when flushing is complete', done => { + const processor = new BatchSpanProcessor(exporter); + processor.forceFlush(() => { + done(); + }); + }); + + it('should call an async callback when shutdown is complete', done => { + const processor = new BatchSpanProcessor(exporter); + processor.shutdown(() => { + done(); + }); + }); + }); + + describe('spans waiting to flush', () => { + let processor: BatchSpanProcessor; + + beforeEach(() => { + processor = new BatchSpanProcessor(exporter); + const span = createSampledSpan('test'); + processor.onStart(span); + processor.onEnd(span); + + assert.strictEqual(processor['_finishedSpans'].length, 1); + }); + + it('should call an async callback when flushing is complete', done => { + processor.forceFlush(() => { + done(); + }); + }); + + it('should call an async callback when shutdown is complete', done => { + const processor = new BatchSpanProcessor(exporter); + processor.shutdown(() => { + done(); + }); + }); + }); + }); }); diff --git a/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts index 60f5cf0e8b..8b67013153 100644 --- a/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/SimpleSpanProcessor.test.ts @@ -80,5 +80,21 @@ describe('SimpleSpanProcessor', () => { processor.shutdown(); assert.strictEqual(exporter.getFinishedSpans().length, 0); }); + + describe('force flush', () => { + it('should call an async callback when flushing is complete', done => { + const processor = new SimpleSpanProcessor(exporter); + processor.forceFlush(() => { + done(); + }); + }); + + it('should call an async callback when shutdown is complete', done => { + const processor = new SimpleSpanProcessor(exporter); + processor.shutdown(() => { + done(); + }); + }); + }); }); }); From 50637f08686feab0fd5700a0bec73379414edc39 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 9 Jul 2020 12:07:26 -0600 Subject: [PATCH 2/5] chore: lint --- .../opentelemetry-tracing/src/MultiSpanProcessor.ts | 4 ++-- .../opentelemetry-tracing/src/NoopSpanProcessor.ts | 8 ++++++-- .../src/export/SimpleSpanProcessor.ts | 12 ++++++------ 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts b/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts index 6684378c21..3f717f52fa 100644 --- a/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/MultiSpanProcessor.ts @@ -26,7 +26,7 @@ export class MultiSpanProcessor implements SpanProcessor { forceFlush(cb: () => void = () => {}): void { let finished = 0; - let total = this._spanProcessors.length; + const total = this._spanProcessors.length; for (const spanProcessor of this._spanProcessors) { spanProcessor.forceFlush(() => { if (++finished === total) { @@ -50,7 +50,7 @@ export class MultiSpanProcessor implements SpanProcessor { shutdown(cb: () => void = () => {}): void { let finished = 0; - let total = this._spanProcessors.length; + const total = this._spanProcessors.length; for (const spanProcessor of this._spanProcessors) { spanProcessor.shutdown(() => { if (++finished === total) { diff --git a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts index 3f7561fa4f..821f8f11bf 100644 --- a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts @@ -21,6 +21,10 @@ import { ReadableSpan } from './export/ReadableSpan'; export class NoopSpanProcessor implements SpanProcessor { onStart(span: ReadableSpan): void {} onEnd(span: ReadableSpan): void {} - shutdown(cb: () => unknown = () => {}): void { setImmediate(cb) } - forceFlush(cb: () => unknown = () => {}): void { setImmediate(cb) } + shutdown(cb: () => unknown = () => {}): void { + setImmediate(cb); + } + forceFlush(cb: () => unknown = () => {}): void { + setImmediate(cb); + } } diff --git a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts index 300c5ebf35..7a7ff40be0 100644 --- a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts @@ -25,25 +25,25 @@ import { ReadableSpan } from './ReadableSpan'; * Only spans that are sampled are converted. */ export class SimpleSpanProcessor implements SpanProcessor { - constructor(private readonly _exporter: SpanExporter) { } + constructor(private readonly _exporter: SpanExporter) {} private _isShutdown = false; - forceFlush(cb: () => void = () => { }): void { + forceFlush(cb: () => void = () => {}): void { // do nothing as all spans are being exported without waiting setImmediate(cb); } // does nothing. - onStart(span: ReadableSpan): void { } + onStart(span: ReadableSpan): void {} onEnd(span: ReadableSpan): void { if (this._isShutdown) { return; } - this._exporter.export([span], () => { }); + this._exporter.export([span], () => {}); } - shutdown(cb: () => void = () => { }): void { + shutdown(cb: () => void = () => {}): void { if (this._isShutdown) { setImmediate(cb); return; @@ -52,5 +52,5 @@ export class SimpleSpanProcessor implements SpanProcessor { this._exporter.shutdown(); setImmediate(cb); - } + } } From 3f1304d1c42c5e80ada74342c59b8bb665af2977 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 9 Jul 2020 12:16:16 -0600 Subject: [PATCH 3/5] chore: fix browser build --- packages/opentelemetry-tracing/src/NoopSpanProcessor.ts | 4 ++-- .../opentelemetry-tracing/src/export/BatchSpanProcessor.ts | 4 ++-- .../opentelemetry-tracing/src/export/SimpleSpanProcessor.ts | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts index 821f8f11bf..66f074f3a9 100644 --- a/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/NoopSpanProcessor.ts @@ -22,9 +22,9 @@ export class NoopSpanProcessor implements SpanProcessor { onStart(span: ReadableSpan): void {} onEnd(span: ReadableSpan): void {} shutdown(cb: () => unknown = () => {}): void { - setImmediate(cb); + setTimeout(cb, 0); } forceFlush(cb: () => unknown = () => {}): void { - setImmediate(cb); + setTimeout(cb, 0); } } diff --git a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts index a36398bc3e..24a0abbcf6 100644 --- a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts @@ -46,7 +46,7 @@ export class BatchSpanProcessor implements SpanProcessor { forceFlush(cb: () => void = () => {}): void { if (this._isShutdown) { - setImmediate(cb); + setTimeout(cb, 0); return; } this._flush(cb); @@ -84,7 +84,7 @@ export class BatchSpanProcessor implements SpanProcessor { private _flush(cb: () => void = () => {}) { this._clearTimer(); if (this._finishedSpans.length === 0) { - setImmediate(cb); + setTimeout(cb, 0); return; } this._exporter.export(this._finishedSpans, cb); diff --git a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts index 7a7ff40be0..294b61777a 100644 --- a/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts @@ -30,7 +30,7 @@ export class SimpleSpanProcessor implements SpanProcessor { forceFlush(cb: () => void = () => {}): void { // do nothing as all spans are being exported without waiting - setImmediate(cb); + setTimeout(cb, 0); } // does nothing. @@ -45,12 +45,12 @@ export class SimpleSpanProcessor implements SpanProcessor { shutdown(cb: () => void = () => {}): void { if (this._isShutdown) { - setImmediate(cb); + setTimeout(cb, 0); return; } this._isShutdown = true; this._exporter.shutdown(); - setImmediate(cb); + setTimeout(cb, 0); } } From 8aa19eb5bfc8de5e7e0c5565558d4a87abf31bba Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 9 Jul 2020 17:26:49 -0600 Subject: [PATCH 4/5] chore: test that spans are actually flushed --- .../src/export/BatchSpanProcessor.ts | 1 + .../test/export/BatchSpanProcessor.test.ts | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts index 24a0abbcf6..a81e747ac8 100644 --- a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts @@ -64,6 +64,7 @@ export class BatchSpanProcessor implements SpanProcessor { shutdown(cb: () => void = () => {}): void { if (this._isShutdown) { + setTimeout(cb, 0); return; } this.forceFlush(cb); diff --git a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts index 49d82fa0d3..87d5d02e8d 100644 --- a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { ALWAYS_SAMPLER } from '@opentelemetry/core'; +import { ALWAYS_SAMPLER, ExportResult } from '@opentelemetry/core'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { @@ -23,6 +23,7 @@ import { InMemorySpanExporter, Span, } from '../../src'; +import Sinon = require('sinon'); function createSampledSpan(spanName: string): Span { const tracer = new BasicTracerProvider({ @@ -206,13 +207,23 @@ describe('BatchSpanProcessor', () => { it('should call an async callback when flushing is complete', done => { processor.forceFlush(() => { + assert.strictEqual(exporter.getFinishedSpans().length, 1); done(); }); }); it('should call an async callback when shutdown is complete', done => { - const processor = new BatchSpanProcessor(exporter); + let exportedSpans = 0; + Sinon.stub(exporter, 'export').callsFake((spans, callback) => { + console.log('uh, export?'); + setTimeout(() => { + exportedSpans = exportedSpans + spans.length; + callback(ExportResult.SUCCESS); + }, 0); + }); + processor.shutdown(() => { + assert.strictEqual(exportedSpans, 1); done(); }); }); From a22f03a7a7677a1afdcf535ac227aad18405880b Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 9 Jul 2020 21:20:14 -0600 Subject: [PATCH 5/5] chore: fix sinon imports --- .../test/MultiSpanProcessor.test.ts | 10 +++++----- .../test/export/BatchSpanProcessor.test.ts | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts index e5a48b20b1..c4e4aa35fd 100644 --- a/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts @@ -15,15 +15,15 @@ */ import * as assert from 'assert'; -import { MultiSpanProcessor } from '../src/MultiSpanProcessor'; +import * as Sinon from 'sinon'; import { - SpanProcessor, - Span, BasicTracerProvider, - SimpleSpanProcessor, InMemorySpanExporter, + SimpleSpanProcessor, + Span, + SpanProcessor, } from '../src'; -import Sinon = require('sinon'); +import { MultiSpanProcessor } from '../src/MultiSpanProcessor'; class TestProcessor implements SpanProcessor { spans: Span[] = []; diff --git a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts index 87d5d02e8d..53799027b2 100644 --- a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts @@ -23,7 +23,6 @@ import { InMemorySpanExporter, Span, } from '../../src'; -import Sinon = require('sinon'); function createSampledSpan(spanName: string): Span { const tracer = new BasicTracerProvider({ @@ -214,7 +213,7 @@ describe('BatchSpanProcessor', () => { it('should call an async callback when shutdown is complete', done => { let exportedSpans = 0; - Sinon.stub(exporter, 'export').callsFake((spans, callback) => { + sinon.stub(exporter, 'export').callsFake((spans, callback) => { console.log('uh, export?'); setTimeout(() => { exportedSpans = exportedSpans + spans.length;