diff --git a/examples/basic-tracer-node/multi_exporter.js b/examples/basic-tracer-node/multi_exporter.js index e5920a7e6b..6c12a24c57 100644 --- a/examples/basic-tracer-node/multi_exporter.js +++ b/examples/basic-tracer-node/multi_exporter.js @@ -9,8 +9,6 @@ const tracer = new BasicTracer(); const zipkinExporter = new ZipkinExporter({serviceName: 'basic-service'}); const jaegerExporter = new JaegerExporter({ serviceName: 'basic-service', - // The default flush interval is 5 seconds. - flushInterval: 2000 }); const collectorExporter = new CollectorExporter({serviceName: 'basic-service'}); diff --git a/examples/dns/setup.js b/examples/dns/setup.js index 26da571236..e7e40d199e 100644 --- a/examples/dns/setup.js +++ b/examples/dns/setup.js @@ -27,8 +27,6 @@ function setupTracerAndExporters(service) { } else { exporter = new JaegerExporter({ serviceName: service, - // The default flush interval is 5 seconds. - flushInterval: 2000 }); } diff --git a/examples/grpc/setup.js b/examples/grpc/setup.js index 4b01ef130a..162884ffcb 100644 --- a/examples/grpc/setup.js +++ b/examples/grpc/setup.js @@ -26,8 +26,6 @@ function setupTracerAndExporters(service) { } else { exporter = new JaegerExporter({ serviceName: service, - // The default flush interval is 5 seconds. - flushInterval: 2000 }); } diff --git a/examples/grpc_dynamic_codegen/setup.js b/examples/grpc_dynamic_codegen/setup.js index 5ac2a903e9..f738c610fa 100644 --- a/examples/grpc_dynamic_codegen/setup.js +++ b/examples/grpc_dynamic_codegen/setup.js @@ -26,8 +26,6 @@ function setupTracerAndExporters(service) { } else { exporter = new JaegerExporter({ serviceName: service, - // The default flush interval is 5 seconds. - flushInterval: 2000 }); } diff --git a/examples/http/setup.js b/examples/http/setup.js index 3e729201cd..abe667f064 100644 --- a/examples/http/setup.js +++ b/examples/http/setup.js @@ -18,8 +18,6 @@ function setupTracerAndExporters(service) { } else { exporter = new JaegerExporter({ serviceName: service, - // The default flush interval is 5 seconds. - flushInterval: 2000 }); } diff --git a/examples/https/setup.js b/examples/https/setup.js index d09961eec9..d4433c20bd 100644 --- a/examples/https/setup.js +++ b/examples/https/setup.js @@ -18,8 +18,6 @@ function setupTracerAndExporters(service) { } else { exporter = new JaegerExporter({ serviceName: service, - // The default flush interval is 5 seconds. - flushInterval: 2000 }); } diff --git a/examples/mysql/setup.js b/examples/mysql/setup.js index d1e0185335..a5f5303c57 100644 --- a/examples/mysql/setup.js +++ b/examples/mysql/setup.js @@ -25,8 +25,6 @@ function setupTracerAndExporters(service) { }))); tracer.addSpanProcessor(new SimpleSpanProcessor(new JaegerExporter({ serviceName: service, - // The default flush interval is 5 seconds. - flushInterval: 2000 }))); // Initialize the OpenTelemetry APIs to use the BasicTracer bindings diff --git a/examples/redis/setup.js b/examples/redis/setup.js index 3e729201cd..abe667f064 100644 --- a/examples/redis/setup.js +++ b/examples/redis/setup.js @@ -18,8 +18,6 @@ function setupTracerAndExporters(service) { } else { exporter = new JaegerExporter({ serviceName: service, - // The default flush interval is 5 seconds. - flushInterval: 2000 }); } diff --git a/packages/opentelemetry-exporter-jaeger/src/jaeger.ts b/packages/opentelemetry-exporter-jaeger/src/jaeger.ts index 8f912bd648..1b1f50e6b3 100644 --- a/packages/opentelemetry-exporter-jaeger/src/jaeger.ts +++ b/packages/opentelemetry-exporter-jaeger/src/jaeger.ts @@ -20,7 +20,6 @@ import * as jaegerTypes from './types'; import { NoopLogger } from '@opentelemetry/core'; import * as types from '@opentelemetry/types'; import { spanToThrift } from './transform'; -import { unrefTimer } from '@opentelemetry/core'; /** * Format and sends span information to Jaeger Exporter. @@ -29,17 +28,16 @@ export class JaegerExporter implements SpanExporter { private readonly _logger: types.Logger; private readonly _process: jaegerTypes.ThriftProcess; private readonly _sender: typeof jaegerTypes.UDPSender; - private readonly _forceFlush: boolean = true; - private readonly _flushTimeout: number; - private _timer: NodeJS.Timeout; + private readonly _forceFlushOnShutdown: boolean = true; + private readonly _onShutdownFlushTimeout: number; constructor(config: jaegerTypes.ExporterConfig) { this._logger = config.logger || new NoopLogger(); const tags: jaegerTypes.Tag[] = config.tags || []; - if (config.forceFlush !== undefined) { - this._forceFlush = config.forceFlush; - } - this._flushTimeout = config.flushTimeout || 2000; + this._forceFlushOnShutdown = + typeof config.forceFlush === 'boolean' ? config.forceFlush : true; + this._onShutdownFlushTimeout = + typeof config.flushTimeout === 'number' ? config.flushTimeout : 2000; this._sender = new jaegerTypes.UDPSender(config); this._process = { @@ -47,10 +45,6 @@ export class JaegerExporter implements SpanExporter { tags: jaegerTypes.ThriftUtils.getThriftTags(tags), }; this._sender.setProcess(this._process); - - const flushInterval = config.flushInterval || 5000; - this._timer = setInterval(this._flush.bind(this), flushInterval); - unrefTimer(this._timer); } /** Exports a list of spans to Jaeger. */ @@ -58,48 +52,70 @@ export class JaegerExporter implements SpanExporter { spans: ReadableSpan[], resultCallback: (result: ExportResult) => void ): void { + if (spans.length === 0) { + return resultCallback(ExportResult.SUCCESS); + } this._logger.debug('Jaeger exporter export'); - return this._sendSpans(spans, resultCallback); + this._sendSpans(spans, resultCallback).catch(err => { + this._logger.error(`JaegerExporter failed to export: ${err}`); + }); } /** Shutdown exporter. */ shutdown(): void { - if (!this._forceFlush) return; + if (!this._forceFlushOnShutdown) return; // Make an optimistic flush. this._flush(); // Sleeping x seconds before closing the sender's connection to ensure // all spans are flushed. setTimeout(() => { this._sender.close(); - }, this._flushTimeout); + }, this._onShutdownFlushTimeout); } /** Transform spans and sends to Jaeger service. */ - private _sendSpans( + private async _sendSpans( spans: ReadableSpan[], done?: (result: ExportResult) => void ) { const thriftSpan = spans.map(span => spanToThrift(span)); for (const span of thriftSpan) { - this._sender.append(span, (numSpans: number, err?: string) => { - if (err) { - // @todo: decide whether to break out the loop on first error. - this._logger.error(`failed to append span: ${err}`); - if (done) return done(ExportResult.FAILED_NOT_RETRYABLE); - } - }); + try { + await this._append(span); + } catch (err) { + this._logger.error(`failed to append span: ${err}`); + // TODO right now we break out on first error, is that desirable? + if (done) return done(ExportResult.FAILED_NOT_RETRYABLE); + } } - // @todo: We should wait for all the callbacks of the append calls to - // complete before it calls done with success. this._logger.debug('successful append for : %s', thriftSpan.length); + + // Flush all spans on each export. No-op if span buffer is empty + await this._flush(); + if (done) return done(ExportResult.SUCCESS); } - private _flush(): void { - this._sender.flush((numSpans: number, err?: string) => { - if (err) { - this._logger.error(`failed to flush ${numSpans} spans: ${err}`); - } + private async _append(span: jaegerTypes.ThriftSpan): Promise { + return new Promise((resolve, reject) => { + this._sender.append(span, (count: number, err?: string) => { + if (err) { + return reject(new Error(err)); + } + resolve(count); + }); + }); + } + + private async _flush(): Promise { + await new Promise((resolve, reject) => { + this._sender.flush((_count: number, err?: string) => { + if (err) { + return reject(new Error(err)); + } + this._logger.debug('successful flush for %s spans', _count); + resolve(); + }); }); } } diff --git a/packages/opentelemetry-exporter-jaeger/src/types.ts b/packages/opentelemetry-exporter-jaeger/src/types.ts index fb2f58968b..ab17951c6c 100644 --- a/packages/opentelemetry-exporter-jaeger/src/types.ts +++ b/packages/opentelemetry-exporter-jaeger/src/types.ts @@ -26,9 +26,10 @@ export interface ExporterConfig { host?: string; // default: 'localhost' port?: number; // default: 6832 maxPacketSize?: number; // default: 65000 + /** Force a flush on shutdown */ forceFlush?: boolean; // default: true + /** Time to wait for an onShutdown flush to finish before closing the sender */ flushTimeout?: number; // default: 2000 - flushInterval?: number; // default(ms): 5000 } // Below require is needed as jaeger-client types does not expose the thrift, diff --git a/packages/opentelemetry-exporter-jaeger/test/jaeger.test.ts b/packages/opentelemetry-exporter-jaeger/test/jaeger.test.ts index ad0bcc2685..3cc994294b 100644 --- a/packages/opentelemetry-exporter-jaeger/test/jaeger.test.ts +++ b/packages/opentelemetry-exporter-jaeger/test/jaeger.test.ts @@ -61,8 +61,8 @@ describe('JaegerExporter', () => { assert.ok(typeof exporter.export === 'function'); assert.ok(typeof exporter.shutdown === 'function'); - assert.ok(exporter['_forceFlush']); - assert.strictEqual(exporter['_flushTimeout'], 5000); + assert.ok(exporter['_forceFlushOnShutdown']); + assert.strictEqual(exporter['_onShutdownFlushTimeout'], 5000); }); it('should construct an exporter without forceFlush and flushTimeout', () => { @@ -72,8 +72,8 @@ describe('JaegerExporter', () => { assert.ok(typeof exporter.export === 'function'); assert.ok(typeof exporter.shutdown === 'function'); - assert.ok(exporter['_forceFlush']); - assert.strictEqual(exporter['_flushTimeout'], 2000); + assert.ok(exporter['_forceFlushOnShutdown']); + assert.strictEqual(exporter['_onShutdownFlushTimeout'], 2000); }); it('should construct an exporter with forceFlush = false', () => { @@ -84,7 +84,7 @@ describe('JaegerExporter', () => { assert.ok(typeof exporter.export === 'function'); assert.ok(typeof exporter.shutdown === 'function'); - assert.ok(!exporter['_forceFlush']); + assert.ok(!exporter['_forceFlushOnShutdown']); }); }); diff --git a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts index 457319c7cf..c4ff93fc2a 100644 --- a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts @@ -45,7 +45,7 @@ export class BatchSpanProcessor implements SpanProcessor { : DEFAULT_BUFFER_TIMEOUT_MS; this._timer = setInterval(() => { - if (Date.now() - this._lastSpanFlush >= this._bufferTimeout) { + if (this._shouldFlush()) { this._flush(); } }, this._bufferTimeout); @@ -73,6 +73,13 @@ export class BatchSpanProcessor implements SpanProcessor { } } + private _shouldFlush(): boolean { + return ( + this._finishedSpans.length >= 0 && + Date.now() - this._lastSpanFlush >= this._bufferTimeout + ); + } + /** Send the span data list to exporter */ private _flush() { this._exporter.export(this._finishedSpans, () => {});