diff --git a/lib/metrics.js b/lib/metrics.js new file mode 100644 index 0000000..1d1fdef --- /dev/null +++ b/lib/metrics.js @@ -0,0 +1,28 @@ +export function initMetrics (prometheus) { + if (!prometheus?.registry || !prometheus?.client) return null + const { client, registry } = prometheus + + return { + messagesInFlight: new client.Gauge({ + name: 'kafka_hooks_messages_in_flight', + help: 'Number of messages currently being processed', + labelNames: ['topic'], + registers: [registry] + }), + + httpRequestDuration: new client.Histogram({ + name: 'kafka_hooks_http_request_duration_seconds', + help: 'HTTP request duration for webhook deliveries', + labelNames: ['topic', 'method', 'status_code'], + buckets: [0.1, 0.5, 1, 2, 5, 10], + registers: [registry] + }), + + dlqMessages: new client.Counter({ + name: 'kafka_hooks_dlq_messages_total', + help: 'Total number of messages sent to the DLQ (Dead Letter Queue)', + labelNames: ['topic', 'reason'], + registers: [registry] + }) + } +} diff --git a/lib/plugin.js b/lib/plugin.js index 3fff732..e624bf0 100644 --- a/lib/plugin.js +++ b/lib/plugin.js @@ -17,11 +17,17 @@ import { pathParamsHeader, queryStringHeader } from './definitions.js' +import { initMetrics } from './metrics.js' -export async function processMessage (logger, dlqProducer, mappings, message) { +export async function processMessage (logger, dlqProducer, mappings, message, metrics) { const topic = message.topic const value = message.value const { url, dlq, method, retryDelay, headers: protoHeaders, retries, includeAttemptInRequests } = mappings[topic] + + if (metrics) { + metrics.messagesInFlight.inc({ topic }) + } + const headers = { ...protoHeaders, ...Object.fromEntries(message.headers), @@ -30,6 +36,8 @@ export async function processMessage (logger, dlqProducer, mappings, message) { // Perform the delivery const errors = [] + let lastStatusCode = null + for (let attempt = 0; attempt < retries; attempt++) { try { if (includeAttemptInRequests) { @@ -38,7 +46,7 @@ export async function processMessage (logger, dlqProducer, mappings, message) { headers['content-length'] = Buffer.byteLength(value) - const requestedAt = new Date() + const requestedAt = Date.now() const { statusCode, headers: responseHeaders, @@ -49,14 +57,29 @@ export async function processMessage (logger, dlqProducer, mappings, message) { body: value }) + lastStatusCode = statusCode + + if (metrics) { + const duration = (Date.now() - requestedAt) / 1000 + metrics.httpRequestDuration.observe( + { topic, method, status_code: statusCode }, + duration + ) + } + // Success, nothing else to do if (statusCode < BAD_REQUEST) { await body.dump() + + if (metrics) { + metrics.messagesInFlight.dec({ topic }) + } + return } const error = new HttpError(statusCode, 'Webhook replied with an error', { - requestedAt: requestedAt.toISOString(), + requestedAt: new Date(requestedAt).toISOString(), attempt, headers: responseHeaders, /* c8 ignore next */ @@ -90,6 +113,17 @@ export async function processMessage (logger, dlqProducer, mappings, message) { } } + if (metrics) { + metrics.messagesInFlight.dec({ topic }) + + if (dlq) { + metrics.dlqMessages.inc({ + topic, + reason: lastStatusCode ? `http_${lastStatusCode}` : 'network_error' + }) + } + } + if (!dlq) { logger.error({ dlqMessage }, 'Error while processing message') return @@ -215,6 +249,7 @@ export async function setupKafka (server, configuration) { server.log.info(`Kafka consumer started with concurrency ${configuration.kafka.concurrency} ...`) const responseProcessor = createResponseProcessor(server.log, pendingRequests) + const metrics = initMetrics(globalThis.platformatic?.prometheus) forEach( stream, @@ -222,7 +257,7 @@ export async function setupKafka (server, configuration) { if (responseMappings[message.topic]) { return responseProcessor(message) } else { - return processMessage(server.log, dlqProducer, topicsMappings, message) + return processMessage(server.log, dlqProducer, topicsMappings, message, metrics) } }, /* c8 ignore next 8 */ diff --git a/package.json b/package.json index 0872666..b28c972 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ "neostandard": "^0.12.1", "pino": "^9.6.0", "pino-pretty": "^13.0.0", + "prom-client": "^15.1.3", "prettier": "^3.5.3", "wait-on": "^8.0.3" }, diff --git a/test/fixtures/kafka-monitor.js b/test/fixtures/kafka-monitor.js index 1c28529..5bbd9d8 100644 --- a/test/fixtures/kafka-monitor.js +++ b/test/fixtures/kafka-monitor.js @@ -18,7 +18,9 @@ export async function createMonitor (valueDeserializer = safeJsonDeserializer) { maxWaitTime: 500, deserializers: { value: valueDeserializer - } + }, + /* c8 ignore next */ + metrics: globalThis.platformatic?.prometheus }) const topics = ['plt-kafka-hooks-success', 'plt-kafka-hooks-fail', 'plt-kafka-hooks-retry', defaultDlqTopic] diff --git a/test/metrics.test.js b/test/metrics.test.js new file mode 100644 index 0000000..20b9691 --- /dev/null +++ b/test/metrics.test.js @@ -0,0 +1,151 @@ +import { test } from 'node:test' +import { strictEqual, deepStrictEqual, ok } from 'node:assert' +import { initMetrics } from '../lib/metrics.js' +import promClient from 'prom-client' + +test('initMetrics should return null when prometheus is not provided', async () => { + const result = initMetrics() + strictEqual(result, null) +}) + +test('initMetrics should return null when prometheus is null', async () => { + const result = initMetrics(null) + strictEqual(result, null) +}) + +test('initMetrics should return null when prometheus.registry is missing', async () => { + const prometheus = { client: promClient } + const result = initMetrics(prometheus) + strictEqual(result, null) +}) + +test('initMetrics should return null when prometheus.client is missing', async () => { + const prometheus = { registry: new promClient.Registry() } + const result = initMetrics(prometheus) + strictEqual(result, null) +}) + +test('initMetrics should return metrics object when prometheus is properly configured', async () => { + const registry = new promClient.Registry() + const prometheus = { registry, client: promClient } + const metrics = initMetrics(prometheus) + + ok(metrics, 'Metrics object should be created') + ok(metrics.messagesInFlight, 'messagesInFlight metric should exist') + ok(metrics.httpRequestDuration, 'httpRequestDuration metric should exist') + ok(metrics.dlqMessages, 'dlqMessages metric should exist') +}) + +test('messagesInFlight should be configured as Gauge with correct properties', async () => { + const registry = new promClient.Registry() + const prometheus = { registry, client: promClient } + const metrics = initMetrics(prometheus) + + const gauge = metrics.messagesInFlight + strictEqual(gauge.name, 'kafka_hooks_messages_in_flight') + strictEqual(gauge.help, 'Number of messages currently being processed') + deepStrictEqual(gauge.labelNames, ['topic']) +}) + +test('httpRequestDuration should be configured as Histogram with correct properties', async () => { + const registry = new promClient.Registry() + const prometheus = { registry, client: promClient } + const metrics = initMetrics(prometheus) + + const histogram = metrics.httpRequestDuration + strictEqual(histogram.name, 'kafka_hooks_http_request_duration_seconds') + strictEqual(histogram.help, 'HTTP request duration for webhook deliveries') + deepStrictEqual(histogram.labelNames, ['topic', 'method', 'status_code']) + deepStrictEqual(histogram.buckets, [0.1, 0.5, 1, 2, 5, 10]) +}) + +test('dlqMessages should be configured as Counter with correct properties', async () => { + const registry = new promClient.Registry() + const prometheus = { registry, client: promClient } + const metrics = initMetrics(prometheus) + + const counter = metrics.dlqMessages + strictEqual(counter.name, 'kafka_hooks_dlq_messages_total') + strictEqual(counter.help, 'Total number of messages sent to the DLQ (Dead Letter Queue)') + deepStrictEqual(counter.labelNames, ['topic', 'reason']) +}) + +test('metrics should be functional - Counter operations', async () => { + const registry = new promClient.Registry() + const prometheus = { registry, client: promClient } + const metrics = initMetrics(prometheus) + + metrics.dlqMessages.inc({ topic: 'test-topic', reason: 'http_500' }) + metrics.dlqMessages.inc({ topic: 'test-topic', reason: 'network_error' }, 3) + + const updatedMetrics = await registry.getMetricsAsJSON() + const dlqMetric = updatedMetrics.find(m => m.name === 'kafka_hooks_dlq_messages_total') + + const http500Value = dlqMetric.values.find(v => + v.labels.topic === 'test-topic' && v.labels.reason === 'http_500' + ).value + const networkErrorValue = dlqMetric.values.find(v => + v.labels.topic === 'test-topic' && v.labels.reason === 'network_error' + ).value + + strictEqual(http500Value, 1) + strictEqual(networkErrorValue, 3) +}) + +test('metrics should be functional - Gauge operations', async () => { + const registry = new promClient.Registry() + const prometheus = { registry, client: promClient } + const metrics = initMetrics(prometheus) + + metrics.messagesInFlight.inc({ topic: 'test-topic' }) + metrics.messagesInFlight.inc({ topic: 'test-topic' }, 2) + + let registryMetrics = await registry.getMetricsAsJSON() + let gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight') + let value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value + strictEqual(value, 3) + + metrics.messagesInFlight.dec({ topic: 'test-topic' }) + + registryMetrics = await registry.getMetricsAsJSON() + gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight') + value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value + strictEqual(value, 2) + + metrics.messagesInFlight.set({ topic: 'test-topic' }, 5) + + registryMetrics = await registry.getMetricsAsJSON() + gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight') + value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value + strictEqual(value, 5) +}) + +test('metrics should be functional - Histogram operations', async () => { + const registry = new promClient.Registry() + const prometheus = { registry, client: promClient } + const metrics = initMetrics(prometheus) + + metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '200' }, 0.5) + metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '200' }, 1.2) + metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '500' }, 2.1) + + const registryMetrics = await registry.getMetricsAsJSON() + const histogramMetric = registryMetrics.find(m => m.name === 'kafka_hooks_http_request_duration_seconds') + + const countValue = histogramMetric.values.find(v => + v.labels.topic === 'test-topic' && + v.labels.method === 'POST' && + v.labels.status_code === '200' && + v.metricName === 'kafka_hooks_http_request_duration_seconds_count' + ).value + + const sumValue = histogramMetric.values.find(v => + v.labels.topic === 'test-topic' && + v.labels.method === 'POST' && + v.labels.status_code === '200' && + v.metricName === 'kafka_hooks_http_request_duration_seconds_sum' + ).value + + strictEqual(countValue, 2) // Two observations for 200 status code + strictEqual(sumValue, 1.7) // 0.5 + 1.2 = 1.7 +}) diff --git a/test/processMessage.test.js b/test/processMessage.test.js new file mode 100644 index 0000000..2276e95 --- /dev/null +++ b/test/processMessage.test.js @@ -0,0 +1,179 @@ +import { stringDeserializer, jsonDeserializer, Consumer, Producer, stringSerializer, jsonSerializer } from '@platformatic/kafka' +import { randomUUID } from 'node:crypto' +import { test } from 'node:test' +import { strictEqual, ok } from 'node:assert' +import { processMessage } from '../lib/plugin.js' +import { defaultDlqTopic } from '../lib/definitions.js' +import { MockAgent, setGlobalDispatcher, Agent } from 'undici' +import promClient from 'prom-client' +import { initMetrics } from '../lib/metrics.js' + +const metrics = initMetrics({ client: promClient, registry: promClient.register }) + +async function createDlqProducer () { + const producer = new Producer({ + bootstrapBrokers: ['localhost:9092'], + serializers: { + key: stringSerializer, + value: jsonSerializer, + headerKey: stringSerializer, + headerValue: stringSerializer + } + }) + + return producer +} + +async function createDlqConsumer (t, topics = [defaultDlqTopic]) { + const consumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: ['localhost:9092'], + maxWaitTime: 500, + deserializers: { + key: stringDeserializer, + value: jsonDeserializer, + headerKey: stringDeserializer, + headerValue: stringDeserializer + } + }) + + await consumer.metadata({ topics, autocreateTopics: true }) + const stream = await consumer.consume({ topics }) + + t.after(() => consumer.close(true)) + + return { consumer, stream } +} + +function createMockMessage (topic, value, key = '', headers = {}) { + return { + topic, + value: Buffer.isBuffer(value) ? value : Buffer.from(value), + key: Buffer.from(key), + headers: new Map(Object.entries(headers)), + partition: 0, + offset: BigInt(Date.now()) + } +} + +function createMockLogger () { + const logs = [] + return { + error: (obj, msg) => logs.push({ level: 'error', obj, msg }), + warn: (obj, msg) => logs.push({ level: 'warn', obj, msg }), + info: (obj, msg) => logs.push({ level: 'info', obj, msg }), + getLogs: () => logs, + clearLogs: () => { + logs.length = 0 + } + } +} + +// Helper functions to get metric values +function getGaugeValue (gaugeName, labels) { + const metric = promClient.register.getSingleMetric(gaugeName) + if (!metric) return 0 + + const result = metric.get() + if (!result || !result.values || !Array.isArray(result.values)) return 0 + + const match = result.values.find(v => { + return Object.keys(labels).every(key => v.labels[key] === labels[key]) + }) + return match ? match.value : 0 +} + +function getCounterValue (counterName, labels) { + const metric = promClient.register.getSingleMetric(counterName) + if (!metric) return 0 + + const result = metric.get() + if (!result || !result.values || !Array.isArray(result.values)) return 0 + + const match = result.values.find(v => { + return Object.keys(labels).every(key => v.labels[key] === labels[key]) + }) + return match ? match.value : 0 +} + +test('processMessage should update success metrics when request succeeds', async t => { + const logger = createMockLogger() + const dlqProducer = await createDlqProducer() + t.after(() => dlqProducer.close()) + + const mockAgent = new MockAgent() + mockAgent.disableNetConnect() + setGlobalDispatcher(mockAgent) + + t.after(async () => { + await mockAgent.close() + setGlobalDispatcher(new Agent()) + }) + + const mockPool = mockAgent.get('http://127.0.0.1:8080') + mockPool + .intercept({ + path: '/success', + method: 'POST' + }) + .reply(200, { success: true }) + + const mappings = { + 'test-topic': { + url: 'http://127.0.0.1:8080/success', + dlq: defaultDlqTopic, + method: 'POST', + retryDelay: 100, + headers: {}, + retries: 2, + includeAttemptInRequests: true + } + } + + const message = createMockMessage('test-topic', 'test-value', 'test-key') + + const initialInflightCount = getGaugeValue('kafka_hooks_messages_in_flight', { topic: 'test-topic' }) + await processMessage(logger, dlqProducer, mappings, message, metrics) + + strictEqual(getGaugeValue('kafka_hooks_messages_in_flight', { topic: 'test-topic' }), initialInflightCount) + strictEqual(getCounterValue('kafka_hooks_dlq_messages_total', { topic: 'test-topic', reason: 'network_error' }), 0) + + mockAgent.assertNoPendingInterceptors() +}) + +test('processMessage should not send to DLQ when dlq is false', async t => { + const logger = createMockLogger() + const dlqProducer = await createDlqProducer() + t.after(() => dlqProducer.close()) + + const { stream } = await createDlqConsumer(t) + + const mappings = { + 'test-topic': { + url: 'http://127.0.0.1:1/fail', // Non-existent server + dlq: false, // DLQ disabled + method: 'POST', + retryDelay: 100, + headers: {}, + retries: 1, + includeAttemptInRequests: true + } + } + + const message = createMockMessage('test-topic', 'test-value') + + let dlqMessageReceived = false + stream.on('data', () => { + dlqMessageReceived = true + }) + + const initialNetworkErrorCount = getCounterValue('kafka_hooks_dlq_messages_total', { topic: 'test-topic', reason: 'network_error' }) + + await processMessage(logger, dlqProducer, mappings, message, metrics) + + strictEqual(getCounterValue('kafka_hooks_dlq_messages_total', { topic: 'test-topic', reason: 'network_error' }), initialNetworkErrorCount) + strictEqual(dlqMessageReceived, false) + + const logs = logger.getLogs() + ok(logs.some(log => log.level === 'error' && log.obj.dlqMessage)) +})