Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unifying shutdown across code base #1439

Merged
merged 13 commits into from
Sep 3, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export abstract class CollectorExporterBase<
public readonly hostname: string | undefined;
public readonly attributes?: Attributes;
protected _isShutdown: boolean = false;
private _shuttingDownPromise: Promise<void> = Promise.resolve();
protected _sendingPromises: Promise<unknown>[] = [];

/**
* @param config
Expand Down Expand Up @@ -98,16 +100,29 @@ export abstract class CollectorExporterBase<
/**
* Shutdown the exporter.
*/
shutdown(): void {
shutdown(): Promise<void> {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
if (this._isShutdown) {
this.logger.debug('shutdown already started');
return;
return this._shuttingDownPromise;
}
this._isShutdown = true;
this.logger.debug('shutdown started');

// platform dependent
this.onShutdown();
this._shuttingDownPromise = new Promise((resolve, reject) => {
Promise.resolve()
.then(() => {
return this.onShutdown();
})
.then(() => {
return Promise.all(this._sendingPromises);
})
.then(() => {
resolve();
})
.catch(e => {
reject(e);
});
});
return this._shuttingDownPromise;
}

abstract onShutdown(): void;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,41 @@ export abstract class CollectorExporterBrowserBase<
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
) {
if (this._isShutdown) {
this.logger.debug('Shutdown already started. Cannot send objects');
return;
}
const serviceRequest = this.convert(items);
const body = JSON.stringify(serviceRequest);

if (this._useXHR) {
sendWithXhr(
body,
this.url,
this._headers,
this.logger,
onSuccess,
onError
);
} else {
sendWithBeacon(body, this.url, this.logger, onSuccess, onError);
}
const promise = new Promise(resolve => {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
resolve();
};

if (this._useXHR) {
sendWithXhr(
body,
this.url,
this._headers,
this.logger,
_onSuccess,
_onError
);
} else {
sendWithBeacon(body, this.url, this.logger, _onSuccess, _onError);
}
});
this._sendingPromises.push(promise);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,32 @@ export abstract class CollectorExporterNodeBase<
this.logger.debug('Shutdown already started. Cannot send objects');
return;
}
if (this._protocol === CollectorProtocolNode.HTTP_JSON) {
sendWithJson(this, objects, onSuccess, onError);
} else if (this._protocol === CollectorProtocolNode.HTTP_PROTO) {
sendWithJsonProto(this, objects, onSuccess, onError);
} else {
sendWithGrpc(this, objects, onSuccess, onError);
}

const promise = new Promise(resolve => {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
resolve();
};

if (this._protocol === CollectorProtocolNode.HTTP_JSON) {
sendWithJson(this, objects, _onSuccess, _onError);
} else if (this._protocol === CollectorProtocolNode.HTTP_PROTO) {
sendWithJsonProto(this, objects, _onSuccess, _onError);
} else {
sendWithGrpc(this, objects, _onSuccess, _onError);
}
});

this._sendingPromises.push(promise);
}

onShutdown(): void {
Expand All @@ -116,5 +135,6 @@ export abstract class CollectorExporterNodeBase<
}

abstract getServiceProtoPath(): string;

abstract getServiceClientType(): ServiceClientType;
}
45 changes: 37 additions & 8 deletions packages/opentelemetry-exporter-jaeger/src/jaeger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ export class JaegerExporter implements SpanExporter {
private readonly _process: jaegerTypes.ThriftProcess;
private readonly _sender: typeof jaegerTypes.UDPSender;
private readonly _onShutdownFlushTimeout: number;
private _isShutdown = false;
private _shutdownFlushTimeout: NodeJS.Timeout | undefined;
private _shuttingDownPromise: Promise<void> = Promise.resolve();

constructor(config: jaegerTypes.ExporterConfig) {
const localConfig = Object.assign({}, config);
Expand Down Expand Up @@ -85,14 +88,40 @@ export class JaegerExporter implements SpanExporter {
}

/** Shutdown exporter. */
shutdown(): void {
// Make an optimistic flush.
this._flush();
// Sleeping x seconds before closing the sender's connection to ensure
// all spans are flushed.
setTimeout(() => {
this._sender.close();
}, this._onShutdownFlushTimeout);
shutdown(): Promise<void> {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
if (this._isShutdown) {
return this._shuttingDownPromise;
}
this._isShutdown = true;

this._shuttingDownPromise = new Promise((resolve, reject) => {
let rejected = false;
this._shutdownFlushTimeout = setTimeout(() => {
rejected = true;
reject('timeout');
this._sender.close();
}, this._onShutdownFlushTimeout);

Promise.resolve()
.then(() => {
// Make an optimistic flush.
return this._flush();
})
.then(() => {
if (rejected) {
return;
} else {
this._shutdownFlushTimeout &&
clearTimeout(this._shutdownFlushTimeout);
resolve();
this._sender.close();
}
})
.catch(e => {
reject(e);
});
});
return this._shuttingDownPromise;
}

/** Transform spans and sends to Jaeger service. */
Expand Down
54 changes: 29 additions & 25 deletions packages/opentelemetry-exporter-prometheus/src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class PrometheusExporter implements MetricExporter {
).replace(/^([^/])/, '/$1');

if (config.startServer || PrometheusExporter.DEFAULT_OPTIONS.startServer) {
this.startServer(callback);
this.startServer().then(callback);
} else if (callback) {
callback();
}
Expand Down Expand Up @@ -108,12 +108,10 @@ export class PrometheusExporter implements MetricExporter {

/**
* Shuts down the export server and clears the registry
*
* @param cb called when server is stopped
*/
shutdown(cb?: () => void) {
shutdown(): Promise<void> {
this._registry.clear();
this.stopServer(cb);
return this.stopServer();
}

/**
Expand Down Expand Up @@ -252,39 +250,45 @@ export class PrometheusExporter implements MetricExporter {

/**
* Stops the Prometheus export server
* @param callback A callback that will be executed once the server is stopped
*/
stopServer(callback?: () => void) {
stopServer(): Promise<void> {
if (!this._server) {
this._logger.debug(
'Prometheus stopServer() was called but server was never started.'
);
if (callback) {
callback();
}
return Promise.resolve();
} else {
this._server.close(() => {
this._logger.debug('Prometheus exporter was stopped');
if (callback) {
callback();
}
return new Promise(resolve => {
this._server.close(err => {
if (!err) {
this._logger.debug('Prometheus exporter was stopped');
} else {
if (
((err as unknown) as { code: string }).code !==
'ERR_SERVER_NOT_RUNNING'
) {
this._logger.error(
`Error during stopping the Prometheus Exporter "${err.message}"`
);
}
}
resolve();
});
});
}
}

/**
* Starts the Prometheus export server
*
* @param callback called once the server is ready
*/
startServer(callback?: () => void) {
this._server.listen(this._port, () => {
this._logger.debug(
`Prometheus exporter started on port ${this._port} at endpoint ${this._endpoint}`
);
if (callback) {
callback();
}
startServer(): Promise<void> {
return new Promise(resolve => {
this._server.listen(this._port, () => {
this._logger.debug(
`Prometheus exporter started on port ${this._port} at endpoint ${this._endpoint}`
);
resolve();
});
});
}

Expand Down
Loading