Skip to content

Commit

Permalink
feat: force flush and shutdown callback for span exporters (#1296)
Browse files Browse the repository at this point in the history
  • Loading branch information
dyladan committed Jul 13, 2020
1 parent 965e69f commit f900e0f
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 19 deletions.
20 changes: 16 additions & 4 deletions packages/opentelemetry-tracing/src/MultiSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ import { ReadableSpan } from './export/ReadableSpan';
export class MultiSpanProcessor implements SpanProcessor {
constructor(private readonly _spanProcessors: SpanProcessor[]) {}

forceFlush(): void {
forceFlush(cb: () => void = () => {}): void {
let finished = 0;
const total = this._spanProcessors.length;
for (const spanProcessor of this._spanProcessors) {
spanProcessor.forceFlush();
spanProcessor.forceFlush(() => {
if (++finished === total) {
cb();
}
});
}
}

Expand All @@ -42,9 +48,15 @@ export class MultiSpanProcessor implements SpanProcessor {
}
}

shutdown(): void {
shutdown(cb: () => void = () => {}): void {
let finished = 0;
const total = this._spanProcessors.length;
for (const spanProcessor of this._spanProcessors) {
spanProcessor.shutdown();
spanProcessor.shutdown(() => {
if (++finished === total) {
cb();
}
});
}
}
}
8 changes: 6 additions & 2 deletions packages/opentelemetry-tracing/src/NoopSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import { ReadableSpan } from './export/ReadableSpan';
export class NoopSpanProcessor implements SpanProcessor {
onStart(span: ReadableSpan): void {}
onEnd(span: ReadableSpan): void {}
shutdown(): void {}
forceFlush(): void {}
shutdown(cb: () => unknown = () => {}): void {
setTimeout(cb, 0);
}
forceFlush(cb: () => unknown = () => {}): void {
setTimeout(cb, 0);
}
}
4 changes: 2 additions & 2 deletions packages/opentelemetry-tracing/src/SpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export interface SpanProcessor {
/**
* Forces to export all finished spans
*/
forceFlush(): void;
forceFlush(callback: () => void): void;

/**
* Called when a {@link ReadableSpan} is started, if the `span.isRecording()`
Expand All @@ -44,5 +44,5 @@ export interface SpanProcessor {
* Shuts down the processor. Called when SDK is shut down. This is an
* opportunity for processor to do any cleanup required.
*/
shutdown(): void;
shutdown(callback: () => void): void;
}
19 changes: 12 additions & 7 deletions packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ export class BatchSpanProcessor implements SpanProcessor {
: DEFAULT_BUFFER_TIMEOUT_MS;
}

forceFlush(): void {
forceFlush(cb: () => void = () => {}): void {
if (this._isShutdown) {
setTimeout(cb, 0);
return;
}
this._flush();
this._flush(cb);
}

// does nothing.
Expand All @@ -61,11 +62,12 @@ export class BatchSpanProcessor implements SpanProcessor {
this._addToBuffer(span);
}

shutdown(): void {
shutdown(cb: () => void = () => {}): void {
if (this._isShutdown) {
setTimeout(cb, 0);
return;
}
this.forceFlush();
this.forceFlush(cb);
this._isShutdown = true;
this._exporter.shutdown();
}
Expand All @@ -80,10 +82,13 @@ export class BatchSpanProcessor implements SpanProcessor {
}

/** Send the span data list to exporter */
private _flush() {
private _flush(cb: () => void = () => {}) {
this._clearTimer();
if (this._finishedSpans.length === 0) return;
this._exporter.export(this._finishedSpans, () => {});
if (this._finishedSpans.length === 0) {
setTimeout(cb, 0);
return;
}
this._exporter.export(this._finishedSpans, cb);
this._finishedSpans = [];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ export class SimpleSpanProcessor implements SpanProcessor {
constructor(private readonly _exporter: SpanExporter) {}
private _isShutdown = false;

forceFlush(): void {
forceFlush(cb: () => void = () => {}): void {
// do nothing as all spans are being exported without waiting
setTimeout(cb, 0);
}

// does nothing.
Expand All @@ -42,12 +43,14 @@ export class SimpleSpanProcessor implements SpanProcessor {
this._exporter.export([span], () => {});
}

shutdown(): void {
shutdown(cb: () => void = () => {}): void {
if (this._isShutdown) {
setTimeout(cb, 0);
return;
}
this._isShutdown = true;

this._exporter.shutdown();
setTimeout(cb, 0);
}
}
32 changes: 31 additions & 1 deletion packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/

import * as assert from 'assert';
import * as Sinon from 'sinon';
import {
BasicTracerProvider,
InMemorySpanExporter,
SimpleSpanProcessor,
Span,
SpanProcessor,
} from '../src';
import { MultiSpanProcessor } from '../src/MultiSpanProcessor';
import { SpanProcessor, Span, BasicTracerProvider } from '../src';

class TestProcessor implements SpanProcessor {
spans: Span[] = [];
Expand Down Expand Up @@ -91,4 +98,27 @@ describe('MultiSpanProcessor', () => {
multiSpanProcessor.forceFlush();
assert.ok(flushed);
});

it('should wait for all span processors to finish flushing', done => {
let flushed = 0;
const processor1 = new SimpleSpanProcessor(new InMemorySpanExporter());
const processor2 = new SimpleSpanProcessor(new InMemorySpanExporter());

const spy1 = Sinon.stub(processor1, 'forceFlush').callsFake(cb => {
flushed++;
cb!();
});
const spy2 = Sinon.stub(processor2, 'forceFlush').callsFake(cb => {
flushed++;
cb!();
});

const multiSpanProcessor = new MultiSpanProcessor([processor1, processor2]);
multiSpanProcessor.forceFlush(() => {
Sinon.assert.calledOnce(spy1);
Sinon.assert.calledOnce(spy2);
assert.strictEqual(flushed, 2);
done();
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { AlwaysOnSampler } from '@opentelemetry/core';
import { AlwaysOnSampler, ExportResult } from '@opentelemetry/core';
import * as assert from 'assert';
import * as sinon from 'sinon';
import {
Expand Down Expand Up @@ -174,4 +174,58 @@ describe('BatchSpanProcessor', () => {
clock.restore();
});
});

describe('force flush', () => {
describe('no waiting spans', () => {
it('should call an async callback when flushing is complete', done => {
const processor = new BatchSpanProcessor(exporter);
processor.forceFlush(() => {
done();
});
});

it('should call an async callback when shutdown is complete', done => {
const processor = new BatchSpanProcessor(exporter);
processor.shutdown(() => {
done();
});
});
});

describe('spans waiting to flush', () => {
let processor: BatchSpanProcessor;

beforeEach(() => {
processor = new BatchSpanProcessor(exporter);
const span = createSampledSpan('test');
processor.onStart(span);
processor.onEnd(span);

assert.strictEqual(processor['_finishedSpans'].length, 1);
});

it('should call an async callback when flushing is complete', done => {
processor.forceFlush(() => {
assert.strictEqual(exporter.getFinishedSpans().length, 1);
done();
});
});

it('should call an async callback when shutdown is complete', done => {
let exportedSpans = 0;
sinon.stub(exporter, 'export').callsFake((spans, callback) => {
console.log('uh, export?');
setTimeout(() => {
exportedSpans = exportedSpans + spans.length;
callback(ExportResult.SUCCESS);
}, 0);
});

processor.shutdown(() => {
assert.strictEqual(exportedSpans, 1);
done();
});
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,21 @@ describe('SimpleSpanProcessor', () => {
processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

describe('force flush', () => {
it('should call an async callback when flushing is complete', done => {
const processor = new SimpleSpanProcessor(exporter);
processor.forceFlush(() => {
done();
});
});

it('should call an async callback when shutdown is complete', done => {
const processor = new SimpleSpanProcessor(exporter);
processor.shutdown(() => {
done();
});
});
});
});
});

0 comments on commit f900e0f

Please sign in to comment.