Skip to content

Commit

Permalink
feat: sre 456 ut move high cardinality histogram metrics to summaries…
Browse files Browse the repository at this point in the history
… cp (#3409)

* chore(user-transformer): move some high cardinality metrics to summaries

* chore(user-transformer): move some high cardinality metrics to summaries

* chore: add flag to disable summary metric collection

* chore: minor fix

* chore: minor fix
  • Loading branch information
dhawal1248 committed May 23, 2024
1 parent 6fa89e3 commit be20dc2
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/legacy/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,9 @@ if (startDestTransformer) {
stats.timing('user_transform_request_latency', startTime, {
processSessions,
});
stats.timingSummary('user_transform_request_latency_summary', startTime, {
processSessions,
});
stats.increment('user_transform_requests', { processSessions });
stats.histogram('user_transform_output_events', transformedEvents.length, {
processSessions,
Expand Down
12 changes: 11 additions & 1 deletion src/services/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,17 @@ export class UserTransformService {
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});

stats.histogram('user_transform_batch_size', requestSize, {
stats.timing('user_transform_batch_size', requestSize, {
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});

stats.timingSummary('user_transform_request_latency_summary', userFuncStartTime, {
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});

stats.timingSummary('user_transform_batch_size_summary', requestSize, {
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});
Expand Down
1 change: 1 addition & 0 deletions src/util/customTransformer-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ async function userTransformHandlerV1(
};
stats.counter('user_transform_function_input_events', events.length, tags);
stats.timing('user_transform_function_latency', invokeTime, tags);
stats.timingSummary('user_transform_function_latency_summary', invokeTime, tags);
}

return { transformedEvents, logs };
Expand Down
1 change: 1 addition & 0 deletions src/util/customTransformer.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ async function runUserTransform(

stats.counter('user_transform_function_input_events', events.length, tags);
stats.timing('user_transform_function_latency', invokeTime, tags);
stats.timingSummary('user_transform_function_latency_summary', invokeTime, tags);
}

return {
Expand Down
3 changes: 3 additions & 0 deletions src/util/customTransforrmationsStore-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async function getTransformationCodeV1(versionId) {
responseStatusHandler(response.status, 'Transformation', versionId, url);
stats.increment('get_transformation_code', { success: 'true', ...tags });
stats.timing('get_transformation_code_time', startTime, tags);
stats.timingSummary('get_transformation_code_time_summary', startTime, tags);
const myJson = await response.json();
transformationCache[versionId] = myJson;
return myJson;
Expand All @@ -56,6 +57,7 @@ async function getLibraryCodeV1(versionId) {
responseStatusHandler(response.status, 'Transformation Library', versionId, url);
stats.increment('get_libraries_code', { success: 'true', ...tags });
stats.timing('get_libraries_code_time', startTime, tags);
stats.timingSummary('get_libraries_code_time_summary', startTime, tags);
const myJson = await response.json();
libraryCache[versionId] = myJson;
return myJson;
Expand Down Expand Up @@ -83,6 +85,7 @@ async function getRudderLibByImportName(importName) {
responseStatusHandler(response.status, 'Rudder Library', importName, url);
stats.increment('get_libraries_code', { success: 'true', ...tags });
stats.timing('get_libraries_code_time', startTime, tags);
stats.timingSummary('get_libraries_code_time_summary', startTime, tags);
const myJson = await response.json();
rudderLibraryCache[importName] = myJson;
return myJson;
Expand Down
1 change: 1 addition & 0 deletions src/util/customTransforrmationsStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async function getTransformationCode(versionId) {
responseStatusHandler(response.status, 'Transformation', versionId, url);
stats.increment('get_transformation_code', { versionId, success: 'true' });
stats.timing('get_transformation_code_time', startTime, { versionId });
stats.timingSummary('get_transformation_code_time_summary', startTime, { versionId });
const myJson = await response.json();
myCache.set(versionId, myJson);
return myJson;
Expand Down
1 change: 1 addition & 0 deletions src/util/openfaas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ const executeFaasFunction = async (

stats.counter('user_transform_function_input_events', events.length, tags);
stats.timing('user_transform_function_latency', startTime, tags);
stats.timingSummary('user_transform_function_latency_summary', startTime, tags);
}
};

Expand Down
97 changes: 93 additions & 4 deletions src/util/prometheus.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ function appendPrefix(name) {
}

class Prometheus {
constructor() {
constructor(enableSummaryMetrics = true) {
this.prometheusRegistry = new prometheusClient.Registry();
this.prometheusRegistry.setDefaultLabels(defaultLabels);
prometheusClient.collectDefaultMetrics({
Expand All @@ -21,7 +21,7 @@ class Prometheus {
prometheusClient.AggregatorRegistry.setRegistries(this.prometheusRegistry);
this.aggregatorRegistry = new prometheusClient.AggregatorRegistry();

this.createMetrics();
this.createMetrics(enableSummaryMetrics);
}

async metricsController(ctx) {
Expand Down Expand Up @@ -56,11 +56,22 @@ class Prometheus {
return gauge;
}

newSummaryStat(name, help, labelNames) {
newSummaryStat(
name,
help,
labelNames,
percentiles = [0.5, 0.9, 0.99],
maxAgeSeconds = 300,
ageBuckets = 5,
) {
// we enable a 5 minute sliding window and calculate the 50th, 90th, and 99th percentiles by default
const summary = new prometheusClient.Summary({
name,
help,
labelNames,
percentiles,
maxAgeSeconds,
ageBuckets,
});
this.prometheusRegistry.registerMetric(summary);
return summary;
Expand Down Expand Up @@ -117,6 +128,21 @@ class Prometheus {
}
}

timingSummary(name, start, tags = {}) {
try {
let metric = this.prometheusRegistry.getSingleMetric(appendPrefix(name));
if (!metric) {
logger.warn(
`Prometheus: summary metric ${name} not found in the registry. Creating a new one`,
);
metric = this.newSummaryStat(name, name, Object.keys(tags));
}
metric.observe(tags, (new Date() - start) / 1000);
} catch (e) {
logger.error(`Prometheus: Summary metric ${name} failed with error ${e}`);
}
}

histogram(name, value, tags = {}) {
try {
let metric = this.prometheusRegistry.getSingleMetric(appendPrefix(name));
Expand Down Expand Up @@ -166,7 +192,7 @@ class Prometheus {
}
}

createMetrics() {
createMetrics(enableSummaryMetrics) {
const metrics = [
// Counters
{
Expand Down Expand Up @@ -698,6 +724,18 @@ class Prometheus {
'k8_namespace',
],
},
{
name: 'user_transform_request_latency_summary',
help: 'user_transform_request_latency_summary',
type: 'summary',
labelNames: [
'workspaceId',
'transformationId',
'sourceType',
'destinationType',
'k8_namespace',
],
},
{
name: 'user_transform_batch_size',
help: 'user_transform_batch_size',
Expand All @@ -714,6 +752,18 @@ class Prometheus {
524288000,
], // 1KB, 100KB, 0.5MB, 1MB, 10MB, 20MB, 50MB, 100MB, 200MB, 500MB
},
{
name: 'user_transform_batch_size_summary',
help: 'user_transform_batch_size_summary',
type: 'summary',
labelNames: [
'workspaceId',
'transformationId',
'sourceType',
'destinationType',
'k8_namespace',
],
},
{
name: 'source_transform_request_latency',
help: 'source_transform_request_latency',
Expand Down Expand Up @@ -770,12 +820,24 @@ class Prometheus {
type: 'histogram',
labelNames: ['versionId', 'version'],
},
{
name: 'get_transformation_code_time_summary',
help: 'get_transformation_code_time_summary',
type: 'summary',
labelNames: ['versionId', 'version'],
},
{
name: 'get_libraries_code_time',
help: 'get_libraries_code_time',
type: 'histogram',
labelNames: ['libraryVersionId', 'versionId', 'type', 'version'],
},
{
name: 'get_libraries_code_time_summary',
help: 'get_libraries_code_time_summary',
type: 'summary',
labelNames: ['libraryVersionId', 'versionId', 'type', 'version'],
},
{
name: 'isolate_cpu_time',
help: 'isolate_cpu_time',
Expand Down Expand Up @@ -1027,6 +1089,22 @@ class Prometheus {
'workspaceId',
],
},
{
name: 'user_transform_function_latency_summary',
help: 'user_transform_function_latency_summary',
type: 'summary',
labelNames: [
'identifier',
'testMode',
'sourceType',
'destinationType',
'k8_namespace',
'errored',
'statusCode',
'transformationId',
'workspaceId',
],
},
];

metrics.forEach((metric) => {
Expand All @@ -1042,6 +1120,17 @@ class Prometheus {
metric.labelNames,
metric.buckets,
);
} else if (metric.type === 'summary') {
if (enableSummaryMetrics) {
this.newSummaryStat(
appendPrefix(metric.name),
metric.help,
metric.labelNames,
metric.percentiles,
metric.maxAge,
metric.ageBuckets,
);
}
} else {
logger.error(
`Prometheus: Metric creation failed. Name: ${metric.name}. Invalid type: ${metric.type}`,
Expand Down
24 changes: 22 additions & 2 deletions src/util/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const logger = require('../logger');

const enableStats = process.env.ENABLE_STATS !== 'false';
const statsClientType = process.env.STATS_CLIENT || 'statsd';
// summary metrics are enabled by default. To disable set ENABLE_SUMMARY_METRICS='false'.
const enableSummaryMetrics = process.env.ENABLE_SUMMARY_METRICS !== 'false';

let statsClient;
function init() {
Expand All @@ -19,7 +21,7 @@ function init() {

case 'prometheus':
logger.info('setting up prometheus client');
statsClient = new prometheus.Prometheus();
statsClient = new prometheus.Prometheus(enableSummaryMetrics);
break;

default:
Expand All @@ -38,6 +40,15 @@ const timing = (name, start, tags = {}) => {
statsClient.timing(name, start, tags);
};

// timingSummary is used to record observations for a summary metric
const timingSummary = (name, start, tags = {}) => {
if (!enableStats || !statsClient || !enableSummaryMetrics) {
return;
}

statsClient.timingSummary(name, start, tags);
};

const increment = (name, tags = {}) => {
if (!enableStats || !statsClient) {
return;
Expand Down Expand Up @@ -88,4 +99,13 @@ async function metricsController(ctx) {

init();

module.exports = { init, timing, increment, counter, gauge, histogram, metricsController };
module.exports = {
init,
timing,
timingSummary,
increment,
counter,
gauge,
histogram,
metricsController,
};
5 changes: 5 additions & 0 deletions src/util/statsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ class Statsd {
this.statsdClient.timing(name, start, tags);
}

// timingSummary is just a wrapper around timing for statsd.For prometheus, we will have to implement a different function.
timingSummary(name, start, tags = {}) {
this.statsdClient.timing(name, start, tags);
}

increment(name, tags = {}) {
this.statsdClient.increment(name, 1, tags);
}
Expand Down

0 comments on commit be20dc2

Please sign in to comment.