Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: force flush and shutdown callback for span exporters #1296

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions packages/opentelemetry-tracing/src/MultiSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ import { ReadableSpan } from './export/ReadableSpan';
export class MultiSpanProcessor implements SpanProcessor {
constructor(private readonly _spanProcessors: SpanProcessor[]) {}

forceFlush(): void {
forceFlush(cb: () => void = () => {}): void {
let finished = 0;
const total = this._spanProcessors.length;
for (const spanProcessor of this._spanProcessors) {
spanProcessor.forceFlush();
spanProcessor.forceFlush(() => {
if (++finished === total) {
cb();
}
});
}
}

Expand All @@ -42,9 +48,15 @@ export class MultiSpanProcessor implements SpanProcessor {
}
}

shutdown(): void {
shutdown(cb: () => void = () => {}): void {
let finished = 0;
const total = this._spanProcessors.length;
for (const spanProcessor of this._spanProcessors) {
spanProcessor.shutdown();
spanProcessor.shutdown(() => {
if (++finished === total) {
cb();
}
});
}
}
}
8 changes: 6 additions & 2 deletions packages/opentelemetry-tracing/src/NoopSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import { ReadableSpan } from './export/ReadableSpan';
export class NoopSpanProcessor implements SpanProcessor {
onStart(span: ReadableSpan): void {}
onEnd(span: ReadableSpan): void {}
shutdown(): void {}
forceFlush(): void {}
shutdown(cb: () => unknown = () => {}): void {
setTimeout(cb, 0);
dyladan marked this conversation as resolved.
Show resolved Hide resolved
}
forceFlush(cb: () => unknown = () => {}): void {
setTimeout(cb, 0);
}
}
4 changes: 2 additions & 2 deletions packages/opentelemetry-tracing/src/SpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export interface SpanProcessor {
/**
* Forces to export all finished spans
*/
forceFlush(): void;
forceFlush(callback: () => void): void;

/**
* Called when a {@link ReadableSpan} is started, if the `span.isRecording()`
Expand All @@ -44,5 +44,5 @@ export interface SpanProcessor {
* Shuts down the processor. Called when SDK is shut down. This is an
* opportunity for processor to do any cleanup required.
*/
shutdown(): void;
shutdown(callback: () => void): void;
}
19 changes: 12 additions & 7 deletions packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ export class BatchSpanProcessor implements SpanProcessor {
: DEFAULT_BUFFER_TIMEOUT_MS;
}

forceFlush(): void {
forceFlush(cb: () => void = () => {}): void {
if (this._isShutdown) {
setTimeout(cb, 0);
return;
}
this._flush();
this._flush(cb);
}

// does nothing.
Expand All @@ -61,11 +62,12 @@ export class BatchSpanProcessor implements SpanProcessor {
this._addToBuffer(span);
}

shutdown(): void {
shutdown(cb: () => void = () => {}): void {
if (this._isShutdown) {
setTimeout(cb, 0);
return;
}
this.forceFlush();
this.forceFlush(cb);
this._isShutdown = true;
this._exporter.shutdown();
}
Expand All @@ -80,10 +82,13 @@ export class BatchSpanProcessor implements SpanProcessor {
}

/** Send the span data list to exporter */
private _flush() {
private _flush(cb: () => void = () => {}) {
this._clearTimer();
if (this._finishedSpans.length === 0) return;
this._exporter.export(this._finishedSpans, () => {});
if (this._finishedSpans.length === 0) {
setTimeout(cb, 0);
return;
}
this._exporter.export(this._finishedSpans, cb);
this._finishedSpans = [];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ export class SimpleSpanProcessor implements SpanProcessor {
constructor(private readonly _exporter: SpanExporter) {}
private _isShutdown = false;

forceFlush(): void {
forceFlush(cb: () => void = () => {}): void {
// do nothing as all spans are being exported without waiting
setTimeout(cb, 0);
}

// does nothing.
Expand All @@ -42,12 +43,14 @@ export class SimpleSpanProcessor implements SpanProcessor {
this._exporter.export([span], () => {});
}

shutdown(): void {
shutdown(cb: () => void = () => {}): void {
if (this._isShutdown) {
setTimeout(cb, 0);
return;
}
this._isShutdown = true;

this._exporter.shutdown();
setTimeout(cb, 0);
}
}
32 changes: 31 additions & 1 deletion packages/opentelemetry-tracing/test/MultiSpanProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/

import * as assert from 'assert';
import * as Sinon from 'sinon';
import {
BasicTracerProvider,
InMemorySpanExporter,
SimpleSpanProcessor,
Span,
SpanProcessor,
} from '../src';
import { MultiSpanProcessor } from '../src/MultiSpanProcessor';
import { SpanProcessor, Span, BasicTracerProvider } from '../src';

class TestProcessor implements SpanProcessor {
spans: Span[] = [];
Expand Down Expand Up @@ -91,4 +98,27 @@ describe('MultiSpanProcessor', () => {
multiSpanProcessor.forceFlush();
assert.ok(flushed);
});

it('should wait for all span processors to finish flushing', done => {
let flushed = 0;
const processor1 = new SimpleSpanProcessor(new InMemorySpanExporter());
const processor2 = new SimpleSpanProcessor(new InMemorySpanExporter());

const spy1 = Sinon.stub(processor1, 'forceFlush').callsFake(cb => {
flushed++;
cb!();
});
const spy2 = Sinon.stub(processor2, 'forceFlush').callsFake(cb => {
flushed++;
cb!();
});

const multiSpanProcessor = new MultiSpanProcessor([processor1, processor2]);
multiSpanProcessor.forceFlush(() => {
Sinon.assert.calledOnce(spy1);
Sinon.assert.calledOnce(spy2);
assert.strictEqual(flushed, 2);
done();
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { ALWAYS_SAMPLER } from '@opentelemetry/core';
import { ALWAYS_SAMPLER, ExportResult } from '@opentelemetry/core';
import * as assert from 'assert';
import * as sinon from 'sinon';
import {
Expand Down Expand Up @@ -174,4 +174,58 @@ describe('BatchSpanProcessor', () => {
clock.restore();
});
});

describe('force flush', () => {
describe('no waiting spans', () => {
it('should call an async callback when flushing is complete', done => {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
const processor = new BatchSpanProcessor(exporter);
processor.forceFlush(() => {
done();
});
});

it('should call an async callback when shutdown is complete', done => {
const processor = new BatchSpanProcessor(exporter);
processor.shutdown(() => {
done();
});
});
});

describe('spans waiting to flush', () => {
let processor: BatchSpanProcessor;

beforeEach(() => {
processor = new BatchSpanProcessor(exporter);
const span = createSampledSpan('test');
processor.onStart(span);
processor.onEnd(span);

assert.strictEqual(processor['_finishedSpans'].length, 1);
});

it('should call an async callback when flushing is complete', done => {
processor.forceFlush(() => {
assert.strictEqual(exporter.getFinishedSpans().length, 1);
done();
});
});

it('should call an async callback when shutdown is complete', done => {
let exportedSpans = 0;
sinon.stub(exporter, 'export').callsFake((spans, callback) => {
console.log('uh, export?');
setTimeout(() => {
exportedSpans = exportedSpans + spans.length;
callback(ExportResult.SUCCESS);
}, 0);
});

processor.shutdown(() => {
assert.strictEqual(exportedSpans, 1);
done();
});
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,21 @@ describe('SimpleSpanProcessor', () => {
processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

describe('force flush', () => {
it('should call an async callback when flushing is complete', done => {
const processor = new SimpleSpanProcessor(exporter);
processor.forceFlush(() => {
done();
});
});

it('should call an async callback when shutdown is complete', done => {
const processor = new SimpleSpanProcessor(exporter);
processor.shutdown(() => {
done();
});
});
});
});
});