Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions lib/metrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export function initMetrics (prometheus) {
if (!prometheus?.registry || !prometheus?.client) return null
const { client, registry } = prometheus

return {
messagesInFlight: new client.Gauge({
name: 'kafka_hooks_messages_in_flight',
help: 'Number of messages currently being processed',
labelNames: ['topic'],
registers: [registry]
}),

httpRequestDuration: new client.Histogram({
name: 'kafka_hooks_http_request_duration_seconds',
help: 'HTTP request duration for webhook deliveries',
labelNames: ['topic', 'method', 'status_code'],
buckets: [0.1, 0.5, 1, 2, 5, 10],
registers: [registry]
}),

dlqMessages: new client.Counter({
name: 'kafka_hooks_dlq_messages_total',
help: 'Total number of messages sent to the DLQ (Dead Letter Queue)',
labelNames: ['topic', 'reason'],
registers: [registry]
})
}
}
43 changes: 39 additions & 4 deletions lib/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ import {
pathParamsHeader,
queryStringHeader
} from './definitions.js'
import { initMetrics } from './metrics.js'

export async function processMessage (logger, dlqProducer, mappings, message) {
export async function processMessage (logger, dlqProducer, mappings, message, metrics) {
const topic = message.topic
const value = message.value
const { url, dlq, method, retryDelay, headers: protoHeaders, retries, includeAttemptInRequests } = mappings[topic]

if (metrics) {
metrics.messagesInFlight.inc({ topic })
}

const headers = {
...protoHeaders,
...Object.fromEntries(message.headers),
Expand All @@ -30,6 +36,8 @@ export async function processMessage (logger, dlqProducer, mappings, message) {

// Perform the delivery
const errors = []
let lastStatusCode = null

for (let attempt = 0; attempt < retries; attempt++) {
try {
if (includeAttemptInRequests) {
Expand All @@ -38,7 +46,7 @@ export async function processMessage (logger, dlqProducer, mappings, message) {

headers['content-length'] = Buffer.byteLength(value)

const requestedAt = new Date()
const requestedAt = Date.now()
const {
statusCode,
headers: responseHeaders,
Expand All @@ -49,14 +57,29 @@ export async function processMessage (logger, dlqProducer, mappings, message) {
body: value
})

lastStatusCode = statusCode

if (metrics) {
const duration = (Date.now() - requestedAt) / 1000
metrics.httpRequestDuration.observe(
{ topic, method, status_code: statusCode },
duration
)
}

// Success, nothing else to do
if (statusCode < BAD_REQUEST) {
await body.dump()

if (metrics) {
metrics.messagesInFlight.dec({ topic })
}

return
}

const error = new HttpError(statusCode, 'Webhook replied with an error', {
requestedAt: requestedAt.toISOString(),
requestedAt: new Date(requestedAt).toISOString(),
attempt,
headers: responseHeaders,
/* c8 ignore next */
Expand Down Expand Up @@ -90,6 +113,17 @@ export async function processMessage (logger, dlqProducer, mappings, message) {
}
}

if (metrics) {
metrics.messagesInFlight.dec({ topic })

if (dlq) {
metrics.dlqMessages.inc({
topic,
reason: lastStatusCode ? `http_${lastStatusCode}` : 'network_error'
})
}
}

if (!dlq) {
logger.error({ dlqMessage }, 'Error while processing message')
return
Expand Down Expand Up @@ -215,14 +249,15 @@ export async function setupKafka (server, configuration) {
server.log.info(`Kafka consumer started with concurrency ${configuration.kafka.concurrency} ...`)

const responseProcessor = createResponseProcessor(server.log, pendingRequests)
const metrics = initMetrics(globalThis.platformatic?.prometheus)

forEach(
stream,
message => {
if (responseMappings[message.topic]) {
return responseProcessor(message)
} else {
return processMessage(server.log, dlqProducer, topicsMappings, message)
return processMessage(server.log, dlqProducer, topicsMappings, message, metrics)
}
},
/* c8 ignore next 8 */
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"neostandard": "^0.12.1",
"pino": "^9.6.0",
"pino-pretty": "^13.0.0",
"prom-client": "^15.1.3",
"prettier": "^3.5.3",
"wait-on": "^8.0.3"
},
Expand Down
4 changes: 3 additions & 1 deletion test/fixtures/kafka-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ export async function createMonitor (valueDeserializer = safeJsonDeserializer) {
maxWaitTime: 500,
deserializers: {
value: valueDeserializer
}
},
/* c8 ignore next */
metrics: globalThis.platformatic?.prometheus
})

const topics = ['plt-kafka-hooks-success', 'plt-kafka-hooks-fail', 'plt-kafka-hooks-retry', defaultDlqTopic]
Expand Down
151 changes: 151 additions & 0 deletions test/metrics.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { test } from 'node:test'
import { strictEqual, deepStrictEqual, ok } from 'node:assert'
import { initMetrics } from '../lib/metrics.js'
import promClient from 'prom-client'

test('initMetrics should return null when prometheus is not provided', async () => {
const result = initMetrics()
strictEqual(result, null)
})

test('initMetrics should return null when prometheus is null', async () => {
const result = initMetrics(null)
strictEqual(result, null)
})

test('initMetrics should return null when prometheus.registry is missing', async () => {
const prometheus = { client: promClient }
const result = initMetrics(prometheus)
strictEqual(result, null)
})

test('initMetrics should return null when prometheus.client is missing', async () => {
const prometheus = { registry: new promClient.Registry() }
const result = initMetrics(prometheus)
strictEqual(result, null)
})

test('initMetrics should return metrics object when prometheus is properly configured', async () => {
const registry = new promClient.Registry()
const prometheus = { registry, client: promClient }
const metrics = initMetrics(prometheus)

ok(metrics, 'Metrics object should be created')
ok(metrics.messagesInFlight, 'messagesInFlight metric should exist')
ok(metrics.httpRequestDuration, 'httpRequestDuration metric should exist')
ok(metrics.dlqMessages, 'dlqMessages metric should exist')
})

test('messagesInFlight should be configured as Gauge with correct properties', async () => {
const registry = new promClient.Registry()
const prometheus = { registry, client: promClient }
const metrics = initMetrics(prometheus)

const gauge = metrics.messagesInFlight
strictEqual(gauge.name, 'kafka_hooks_messages_in_flight')
strictEqual(gauge.help, 'Number of messages currently being processed')
deepStrictEqual(gauge.labelNames, ['topic'])
})

test('httpRequestDuration should be configured as Histogram with correct properties', async () => {
const registry = new promClient.Registry()
const prometheus = { registry, client: promClient }
const metrics = initMetrics(prometheus)

const histogram = metrics.httpRequestDuration
strictEqual(histogram.name, 'kafka_hooks_http_request_duration_seconds')
strictEqual(histogram.help, 'HTTP request duration for webhook deliveries')
deepStrictEqual(histogram.labelNames, ['topic', 'method', 'status_code'])
deepStrictEqual(histogram.buckets, [0.1, 0.5, 1, 2, 5, 10])
})

test('dlqMessages should be configured as Counter with correct properties', async () => {
const registry = new promClient.Registry()
const prometheus = { registry, client: promClient }
const metrics = initMetrics(prometheus)

const counter = metrics.dlqMessages
strictEqual(counter.name, 'kafka_hooks_dlq_messages_total')
strictEqual(counter.help, 'Total number of messages sent to the DLQ (Dead Letter Queue)')
deepStrictEqual(counter.labelNames, ['topic', 'reason'])
})

test('metrics should be functional - Counter operations', async () => {
const registry = new promClient.Registry()
const prometheus = { registry, client: promClient }
const metrics = initMetrics(prometheus)

metrics.dlqMessages.inc({ topic: 'test-topic', reason: 'http_500' })
metrics.dlqMessages.inc({ topic: 'test-topic', reason: 'network_error' }, 3)

const updatedMetrics = await registry.getMetricsAsJSON()
const dlqMetric = updatedMetrics.find(m => m.name === 'kafka_hooks_dlq_messages_total')

const http500Value = dlqMetric.values.find(v =>
v.labels.topic === 'test-topic' && v.labels.reason === 'http_500'
).value
const networkErrorValue = dlqMetric.values.find(v =>
v.labels.topic === 'test-topic' && v.labels.reason === 'network_error'
).value

strictEqual(http500Value, 1)
strictEqual(networkErrorValue, 3)
})

test('metrics should be functional - Gauge operations', async () => {
const registry = new promClient.Registry()
const prometheus = { registry, client: promClient }
const metrics = initMetrics(prometheus)

metrics.messagesInFlight.inc({ topic: 'test-topic' })
metrics.messagesInFlight.inc({ topic: 'test-topic' }, 2)

let registryMetrics = await registry.getMetricsAsJSON()
let gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight')
let value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value
strictEqual(value, 3)

metrics.messagesInFlight.dec({ topic: 'test-topic' })

registryMetrics = await registry.getMetricsAsJSON()
gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight')
value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value
strictEqual(value, 2)

metrics.messagesInFlight.set({ topic: 'test-topic' }, 5)

registryMetrics = await registry.getMetricsAsJSON()
gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight')
value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value
strictEqual(value, 5)
})

test('metrics should be functional - Histogram operations', async () => {
const registry = new promClient.Registry()
const prometheus = { registry, client: promClient }
const metrics = initMetrics(prometheus)

metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '200' }, 0.5)
metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '200' }, 1.2)
metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '500' }, 2.1)

const registryMetrics = await registry.getMetricsAsJSON()
const histogramMetric = registryMetrics.find(m => m.name === 'kafka_hooks_http_request_duration_seconds')

const countValue = histogramMetric.values.find(v =>
v.labels.topic === 'test-topic' &&
v.labels.method === 'POST' &&
v.labels.status_code === '200' &&
v.metricName === 'kafka_hooks_http_request_duration_seconds_count'
).value

const sumValue = histogramMetric.values.find(v =>
v.labels.topic === 'test-topic' &&
v.labels.method === 'POST' &&
v.labels.status_code === '200' &&
v.metricName === 'kafka_hooks_http_request_duration_seconds_sum'
).value

strictEqual(countValue, 2) // Two observations for 200 status code
strictEqual(sumValue, 1.7) // 0.5 + 1.2 = 1.7
})
Loading