Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for getting clustered metrics as object #471

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ export class AggregatorRegistry extends Registry {
*/
clusterMetrics(): Promise<string>;

/**
* Gets aggregated metrics for all workers as objects
* @return {Promise<metric[]>} Promise that resolves with the aggregated
* metrics.
*/
getClusterMetricsAsJSON(): Promise<metric[]>;

/**
* Gets aggregated metrics for all workers as objects
* @return {Promise<metric[]>} Promise that resolves with the aggregated
* metrics.
*/
getClusterMetricsAsArray(): Promise<metric[]>;

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
Expand Down Expand Up @@ -560,23 +574,23 @@ export class Pushgateway {
*/
pushAdd(
params: Pushgateway.Parameters,
): Promise<{ resp?: unknown, body?: unknown }>;
): Promise<{ resp?: unknown; body?: unknown }>;

/**
* Overwrite all metric (using PUT to Pushgateway)
* @param params Push parameters
*/
push(
params: Pushgateway.Parameters,
): Promise<{ resp?: unknown, body?: unknown }>;
): Promise<{ resp?: unknown; body?: unknown }>;

/**
* Delete all metrics for jobName
* @param params Push parameters
*/
delete(
params: Pushgateway.Parameters,
): Promise<{ resp?: unknown, body?: unknown }>;
): Promise<{ resp?: unknown; body?: unknown }>;
}

export namespace Pushgateway {
Expand Down
138 changes: 96 additions & 42 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,48 +40,29 @@ class AggregatorRegistry extends Registry {
* metrics.
*/
clusterMetrics() {
const requestId = requestCtr++;

return new Promise((resolve, reject) => {
let settled = false;
function done(err, result) {
if (settled) return;
settled = true;
if (err) reject(err);
else resolve(result);
}
return clusterMetricsRequest('string');
}

const request = {
responses: [],
pending: 0,
done,
errorTimeout: setTimeout(() => {
const err = new Error('Operation timed out.');
request.done(err);
}, 5000),
};
requests.set(requestId, request);

const message = {
type: GET_METRICS_REQ,
requestId,
};

for (const id in cluster().workers) {
// If the worker exits abruptly, it may still be in the workers
// list but not able to communicate.
if (cluster().workers[id].isConnected()) {
cluster().workers[id].send(message);
request.pending++;
}
}
/**
* Gets aggregated metrics for all workers in JSON format.
* The optional callback and returned Promise resolve with the same value;
* either may be used.
* @return {Promise<Object>} Promise that resolves with the aggregated
* metrics in JSON format.
*/
getClusterMetricsAsJSON() {
return clusterMetricsRequest('json');
}

if (request.pending === 0) {
// No workers were up
clearTimeout(request.errorTimeout);
process.nextTick(() => done(null, ''));
}
});
/**
* Gets aggregated metrics for all workers as an array.
* The optional callback and returned Promise resolve with the same value;
* either may be used.
* @return {Promise<Object>} Promise that resolves with the aggregated
* metrics in JSON format.
*/
getClusterMetricsAsArray() {
return clusterMetricsRequest('array');
}

/**
Expand Down Expand Up @@ -174,8 +155,21 @@ function addListeners() {
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
switch (request.format) {
case 'json':
request.done(null, registry.getMetricsAsJSON());
break;
case 'array':
request.done(null, registry.getMetricsAsArray());
break;
case 'string':
request.done(null, registry.metrics());
break;
default:
request.done(
new Error(`unknown metrics format: ${request.format}`),
);
}
}
}
});
Expand Down Expand Up @@ -205,4 +199,64 @@ function addListeners() {
}
}

function clusterMetricsRequest(format) {
const requestId = requestCtr++;

return new Promise((resolve, reject) => {
let settled = false;
function done(err, result) {
if (settled) return;
settled = true;
if (err) reject(err);
else resolve(result);
}

const request = {
responses: [],
pending: 0,
format,
done,
errorTimeout: setTimeout(() => {
const err = new Error('Operation timed out.');
request.done(err);
}, 5000),
};
requests.set(requestId, request);

const message = {
type: GET_METRICS_REQ,
requestId,
};

for (const id in cluster().workers) {
// If the worker exits abruptly, it may still be in the workers
// list but not able to communicate.
if (cluster().workers[id].isConnected()) {
cluster().workers[id].send(message);
request.pending++;
}
}

if (request.pending === 0) {
// No workers were up
clearTimeout(request.errorTimeout);
switch (request.format) {
case 'json':
process.nextTick(() => done(null, []));
break;
case 'array':
process.nextTick(() => done(null, []));
break;
case 'string':
process.nextTick(() => done(null, ''));
break;
default:
process.nextTick(() =>
done(new Error(`unknown metrics format: ${request.format}`)),
);
}
}
});
}

module.exports = AggregatorRegistry;
18 changes: 18 additions & 0 deletions test/clusterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ describe('AggregatorRegistry', () => {
});
});

describe('aggregatorRegistry.getClusterMetricsAsJSON()', () => {
it('works properly if there are no cluster workers', async () => {
const AggregatorRegistry = require('../lib/cluster');
const ar = new AggregatorRegistry();
const metrics = await ar.getClusterMetricsAsJSON();
expect(metrics).toEqual([]);
});
});

describe('aggregatorRegistry.getClusterMetricsAsArray()', () => {
it('works properly if there are no cluster workers', async () => {
const AggregatorRegistry = require('../lib/cluster');
const ar = new AggregatorRegistry();
const metrics = await ar.getClusterMetricsAsArray();
expect(metrics).toEqual([]);
});
});

describe('AggregatorRegistry.aggregate()', () => {
const Registry = require('../lib/cluster');
// These mimic the output of `getMetricsAsJSON`.
Expand Down