From 7fb81fc8c0f70ee49efc112d5a719265d9dce41c Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 26 May 2025 13:24:57 +0200 Subject: [PATCH 1/6] feat: implement request/response pattern over Kafka MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add HTTP request/response routing through Kafka topics - Implement correlation ID handling for request matching - Add configurable timeout support (default 30s) - Support custom HTTP status codes in responses - Preserve all HTTP headers in Kafka messages - Add comprehensive tests for request/response flows - Update schema to support requestResponse configuration 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- config.d.ts | 15 ++++++ lib/definitions.js | 2 + lib/plugin.js | 119 ++++++++++++++++++++++++++++++++++++++++++-- lib/schema.js | 18 +++++++ schema.json | 71 +++++++++++++++++++++++++- test/plugin.test.js | 101 ++++++++++++++++++++++++++++++++++++- test/schema.test.js | 17 +++++++ 7 files changed, 334 insertions(+), 9 deletions(-) diff --git a/config.d.ts b/config.d.ts index 379c14e..e485876 100644 --- a/config.d.ts +++ b/config.d.ts @@ -64,6 +64,14 @@ export interface PlatformaticKafkaHooksConfiguration { [k: string]: unknown; }; }; + formatters?: { + path: string; + }; + timestamp?: "epochTime" | "unixTime" | "nullTime" | "isoTime"; + redact?: { + paths: string[]; + censor?: string; + }; [k: string]: unknown; }; loggerInstance?: { @@ -330,6 +338,13 @@ export interface PlatformaticKafkaHooksConfiguration { }; concurrency?: number; serialization?: string; + requestResponse?: { + path: string; + requestTopic: string; + responseTopic: string; + timeout?: number; + [k: string]: unknown; + }[]; [k: string]: unknown; }; } diff --git a/lib/definitions.js b/lib/definitions.js index 2f75191..7b0c7a1 100644 --- a/lib/definitions.js +++ b/lib/definitions.js @@ -1,5 +1,6 @@ export const keyHeader = 'x-plt-kafka-hooks-key' export const attemptHeader = 'x-plt-kafka-hooks-attempt' +export const correlationIdHeader = 'x-plt-kafka-hooks-correlation-id' export const minimumRetryDelay = 250 export const defaultDlqTopic = 'plt-kafka-hooks-dlq' @@ -8,3 +9,4 @@ export const defaultRetries = 3 export const defaultMethod = 'POST' export const defaultIncludeAttemptInRequests = true export const defaultConcurrency = 10 +export const defaultRequestResponseTimeout = 30000 diff --git a/lib/plugin.js b/lib/plugin.js index 6173acd..dbdc4f3 100644 --- a/lib/plugin.js +++ b/lib/plugin.js @@ -1,13 +1,16 @@ import { Consumer, jsonSerializer, Producer, sleep, stringDeserializer, stringSerializer } from '@platformatic/kafka' import { ensureLoggableError } from '@platformatic/utils' -import { ACCEPTED, BAD_REQUEST, HttpError, NotFoundError } from 'http-errors-enhanced' +import { ACCEPTED, BAD_REQUEST, HttpError, NotFoundError, REQUEST_TIMEOUT } from 'http-errors-enhanced' import { forEach } from 'hwp' import { request } from 'undici' +import { randomUUID } from 'node:crypto' import { attemptHeader, + correlationIdHeader, defaultConcurrency, defaultDlqTopic, defaultMethod, + defaultRequestResponseTimeout, defaultRetries, defaultRetryDelay, keyHeader @@ -98,10 +101,46 @@ export async function processMessage (logger, dlqProducer, mappings, message) { await dlqProducer.send(toSend) } +function createResponseProcessor (logger, pendingRequests) { + return async function processResponseMessage (message) { + // Convert headers map to object with string keys + const headers = {} + for (const [key, value] of message.headers) { + headers[key.toString()] = value.toString() + } + + const correlationId = headers[correlationIdHeader] + + if (!correlationId) { + logger.warn({ topic: message.topic }, 'Response message missing correlation ID') + return + } + + const pendingRequest = pendingRequests.get(correlationId) + if (!pendingRequest) { + logger.warn({ correlationId, topic: message.topic }, 'No pending request found for correlation ID') + return + } + + clearTimeout(pendingRequest.timeout) + pendingRequests.delete(correlationId) + + const statusCode = parseInt(headers['x-status-code'] || '200', 10) + const contentType = headers['content-type'] || 'application/octet-stream' + + pendingRequest.reply + .status(statusCode) + .header('content-type', contentType) + .send(message.value) + } +} + export async function setupKafka (server, configuration) { const topics = new Set() const dlqs = new Set() const topicsMappings = {} + const responseMappings = {} + const pendingRequests = new Map() for (const topic of configuration.kafka.topics) { topic.dlq ??= defaultDlqTopic @@ -120,6 +159,14 @@ export async function setupKafka (server, configuration) { topicsMappings[topic.topic] = topic } + // Handle request/response mappings + if (configuration.kafka.requestResponse) { + for (const mapping of configuration.kafka.requestResponse) { + topics.add(mapping.responseTopic) + responseMappings[mapping.responseTopic] = mapping + } + } + const producer = new Producer({ bootstrapBrokers: configuration.kafka.brokers, serializers: { @@ -165,9 +212,17 @@ export async function setupKafka (server, configuration) { const stream = await consumer.consume({ topics: topicsList, mode, fallbackMode, offsets, ...consumerOptions }) server.log.info(`Kafka consumer started with concurrency ${configuration.kafka.concurrency} ...`) + const responseProcessor = createResponseProcessor(server.log, pendingRequests) + forEach( stream, - message => processMessage(server.log, dlqProducer, topicsMappings, message), + message => { + if (responseMappings[message.topic]) { + return responseProcessor(message) + } else { + return processMessage(server.log, dlqProducer, topicsMappings, message) + } + }, /* c8 ignore next 8 */ configuration.kafka.concurrency ?? defaultConcurrency ).catch(error => { @@ -188,13 +243,13 @@ export async function setupKafka (server, configuration) { server.decorate('kafkaProducer', producer) - return topics + return { topics, pendingRequests, responseMappings } } export async function plugin (server, opts) { /* c8 ignore next */ const configuration = server.platformatic?.config ?? opts.context?.stackable.configManager.current - const topics = await setupKafka(server, configuration) + const { topics, pendingRequests } = await setupKafka(server, configuration) server.removeContentTypeParser('application/json') server.removeContentTypeParser('application/json; charset=utf-8') @@ -239,7 +294,8 @@ export async function plugin (server, opts) { key: request.headers[keyHeader], value: request.body, headers: { - 'content-type': request.headers['content-type'] + 'content-type': request.headers['content-type'], + ...Object.fromEntries(Object.entries(request.headers).filter(([key]) => key !== 'content-type' && key !== keyHeader)) } } ] @@ -248,4 +304,57 @@ export async function plugin (server, opts) { return reply.status(ACCEPTED).send() } }) + + // Request/Response endpoint + if (configuration.kafka.requestResponse) { + for (const mapping of configuration.kafka.requestResponse) { + server.route({ + method: 'POST', + url: mapping.path, + schema: { + headers: { + type: 'object', + properties: { + [keyHeader]: { type: 'string' } + }, + additionalProperties: true + } + }, + async handler (request, reply) { + const correlationId = randomUUID() + const timeout = mapping.timeout || defaultRequestResponseTimeout + + const timeoutHandle = setTimeout(() => { + pendingRequests.delete(correlationId) + if (!reply.sent) { + const error = new HttpError(REQUEST_TIMEOUT, 'Request timeout') + reply.status(error.status).send({ code: error.code, ...error.serialize() }) + } + }, timeout) + + pendingRequests.set(correlationId, { + reply, + timeout: timeoutHandle, + requestedAt: Date.now() + }) + + await server.kafkaProducer.send({ + messages: [ + { + topic: mapping.requestTopic, + key: request.headers[keyHeader], + value: request.body, + headers: { + 'content-type': request.headers['content-type'], + [correlationIdHeader]: correlationId + } + } + ] + }) + + return reply + } + }) + } + } } diff --git a/lib/schema.js b/lib/schema.js index 81dd93a..c5f36b3 100644 --- a/lib/schema.js +++ b/lib/schema.js @@ -12,6 +12,7 @@ import { defaultDlqTopic, defaultIncludeAttemptInRequests, defaultMethod, + defaultRequestResponseTimeout, defaultRetries, defaultRetryDelay, minimumRetryDelay @@ -100,6 +101,23 @@ export const schema = { serialization: { type: 'string', resolvePath: true + }, + requestResponse: { + type: 'array', + items: { + type: 'object', + properties: { + path: { type: 'string' }, + requestTopic: { type: 'string' }, + responseTopic: { type: 'string' }, + timeout: { + type: 'integer', + minimum: 1000, + default: defaultRequestResponseTimeout + } + }, + required: ['path', 'requestTopic', 'responseTopic'] + } } }, required: ['brokers', 'topics', 'consumer'] diff --git a/schema.json b/schema.json index 1bb260e..95e3ab6 100644 --- a/schema.json +++ b/schema.json @@ -1,7 +1,7 @@ { - "$id": "https://schemas.platformatic.dev/@platformatic/kafka-hooks/0.1.0.json", + "$id": "https://schemas.platformatic.dev/@platformatic/kafka-hooks/0.3.0.json", "title": "Platformatic kafka-hooks configuration", - "version": "0.1.0", + "version": "0.3.0", "type": "object", "properties": { "basePath": { @@ -180,6 +180,46 @@ } }, "additionalProperties": false + }, + "formatters": { + "type": "object", + "properties": { + "path": { + "type": "string", + "resolvePath": true + } + }, + "required": [ + "path" + ], + "additionalProperties": false + }, + "timestamp": { + "enum": [ + "epochTime", + "unixTime", + "nullTime", + "isoTime" + ] + }, + "redact": { + "type": "object", + "properties": { + "paths": { + "type": "array", + "items": { + "type": "string" + } + }, + "censor": { + "type": "string", + "default": "[redacted]" + } + }, + "required": [ + "paths" + ], + "additionalProperties": false } }, "required": [ @@ -1308,6 +1348,33 @@ "serialization": { "type": "string", "resolvePath": true + }, + "requestResponse": { + "type": "array", + "items": { + "type": "object", + "properties": { + "path": { + "type": "string" + }, + "requestTopic": { + "type": "string" + }, + "responseTopic": { + "type": "string" + }, + "timeout": { + "type": "integer", + "minimum": 1000, + "default": 30000 + } + }, + "required": [ + "path", + "requestTopic", + "responseTopic" + ] + } } }, "required": [ diff --git a/test/plugin.test.js b/test/plugin.test.js index f3a77c4..e2bc6f8 100644 --- a/test/plugin.test.js +++ b/test/plugin.test.js @@ -1,4 +1,4 @@ -import { sleep, stringDeserializer, jsonDeserializer } from '@platformatic/kafka' +import { sleep, stringDeserializer, jsonDeserializer, Consumer } from '@platformatic/kafka' import { buildServer } from '@platformatic/service' import { NOT_FOUND } from 'http-errors-enhanced' import { deepStrictEqual } from 'node:assert' @@ -6,7 +6,7 @@ import { randomUUID } from 'node:crypto' import { once } from 'node:events' import { resolve } from 'node:path' import { test } from 'node:test' -import { attemptHeader, defaultDlqTopic, keyHeader } from '../lib/definitions.js' +import { attemptHeader, correlationIdHeader, defaultDlqTopic, keyHeader } from '../lib/definitions.js' import { stackable } from '../lib/index.js' import { createMonitor } from './fixtures/kafka-monitor.js' import { createTargetServer } from './fixtures/target-server.js' @@ -282,3 +282,100 @@ test('should support binary data', async t => { deepStrictEqual(message.headers[keyHeader], '') deepStrictEqual(message.headers[attemptHeader], '1') }) + +test('should handle request/response pattern', async t => { + // Create a custom monitor for the request topic + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/process', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Simulate a request + const requestPromise = server.inject({ + method: 'POST', + url: '/api/process', + payload: 'test request data', + headers: { + 'content-type': 'text/plain' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Convert headers map to object with string keys for easier access + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Verify the request message has correlation ID + t.assert.ok(headers[correlationIdHeader]) + t.assert.strictEqual(requestMessage.value, 'test request data') + + // Simulate a response by sending to response topic via HTTP API + const correlationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', 'response data', { + [correlationIdHeader]: correlationId, + 'content-type': 'text/plain', + 'x-status-code': '200' + }) + + // Wait for the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, 'response data') + t.assert.strictEqual(response.headers['content-type'], 'text/plain') +}) + +test('should timeout request/response when no response is received', async t => { + const server = await startStackable(t, '', { + topics: [], + requestResponse: [ + { + path: '/api/timeout', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 1000 + } + ] + }) + + const start = Date.now() + const response = await server.inject({ + method: 'POST', + url: '/api/timeout', + payload: 'test request data' + }) + const elapsed = Date.now() - start + + t.assert.strictEqual(response.statusCode, 408) + t.assert.ok(elapsed >= 1000) + const json = response.json() + t.assert.strictEqual(json.code, 'HTTP_ERROR_REQUEST_TIMEOUT') +}) diff --git a/test/schema.test.js b/test/schema.test.js index 0b60f01..f5e3273 100644 --- a/test/schema.test.js +++ b/test/schema.test.js @@ -94,6 +94,23 @@ test('should export stackable schema', async () => { serialization: { type: 'string', resolvePath: true + }, + requestResponse: { + type: 'array', + items: { + type: 'object', + properties: { + path: { type: 'string' }, + requestTopic: { type: 'string' }, + responseTopic: { type: 'string' }, + timeout: { + type: 'integer', + minimum: 1000, + default: 30000 + } + }, + required: ['path', 'requestTopic', 'responseTopic'] + } } }, required: ['brokers', 'topics', 'consumer'] From 349787b501790cde97528eb40799e4c729ca3a54 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 26 May 2025 13:28:35 +0200 Subject: [PATCH 2/6] docs: add comprehensive request/response pattern documentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add detailed How It Works section explaining the flow - Include complete configuration examples with all options - Provide step-by-step usage examples with curl commands - Document response headers and error handling scenarios - Add use cases for microservice communication patterns - Update main feature list to highlight new capability 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- README.md | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/README.md b/README.md index 56d97ef..e94f14a 100644 --- a/README.md +++ b/README.md @@ -8,15 +8,141 @@ Then you can: - Export the messages published on one or more topics to a HTTP endpoint. - Publish messages to a topic from a HTTP endpoint with a POST to the `/topics/:topic` endpoint. +- Build request/response patterns over Kafka topics with HTTP-style semantics. ## Features - Consume messages from Kafka topics and forward to HTTP endpoints. - Send messages to Kafka topics via HTTP API. +- **Request/Response pattern over Kafka topics.** - Direct binary message passing. - Configurable retries and concurrency. - Dead Letter Queue (DLQ) for failed messages. +## Request/Response Pattern + +The kafka-hooks library supports HTTP request/response patterns routed through Kafka topics. This enables building responsive microservices that communicate asynchronously via Kafka while maintaining HTTP-style request/response semantics. + +### How It Works + +1. **HTTP Request**: Client makes a POST request to a configured endpoint +2. **Kafka Request**: The request is published to a Kafka request topic with a unique correlation ID +3. **Service Processing**: External service consumes from the request topic, processes the message +4. **Kafka Response**: Service publishes response to a response topic with the same correlation ID +5. **HTTP Response**: The original HTTP request completes with the response data + +### Configuration + +Add request/response mappings to your `platformatic.json`: + +```json +{ + "kafka": { + "brokers": ["localhost:9092"], + "topics": [ + { + "topic": "response-topic", + "url": "http://localhost:3043/webhook" + } + ], + "requestResponse": [ + { + "path": "/api/process", + "requestTopic": "request-topic", + "responseTopic": "response-topic", + "timeout": 5000 + } + ], + "consumer": { + "groupId": "my-group" + } + } +} +``` + +### Request/Response Options + +| Option | Description | Default | +| --------------- | ----------------------------------------------------- | ---------- | +| `path` | HTTP endpoint path to expose | Required | +| `requestTopic` | Kafka topic to publish requests to | Required | +| `responseTopic` | Kafka topic to consume responses from | Required | +| `timeout` | Request timeout in milliseconds | `30000` | + +### Usage Example + +**Make a request:** +```bash +curl -X POST http://localhost:3042/api/process \ + -H "Content-Type: application/json" \ + -d '{"userId": 123, "action": "process"}' +``` + +**Request message published to Kafka:** +```json +{ + "headers": { + "content-type": "application/json", + "x-plt-kafka-hooks-correlation-id": "550e8400-e29b-41d4-a716-446655440000" + }, + "value": "{\"userId\": 123, \"action\": \"process\"}" +} +``` + +**Service processes and responds:** +```bash +# External service publishes response +curl -X POST http://localhost:3042/topics/response-topic \ + -H "Content-Type: application/json" \ + -H "x-plt-kafka-hooks-correlation-id: 550e8400-e29b-41d4-a716-446655440000" \ + -H "x-status-code: 200" \ + -d '{"result": "success", "data": {...}}' +``` + +**HTTP response returned to client:** +```json +{ + "result": "success", + "data": {...} +} +``` + +### Response Headers + +Response messages support these special headers: + +| Header | Description | Default | +| ------------------------------------ | ---------------------------------------------- | ------- | +| `x-plt-kafka-hooks-correlation-id` | Must match the original request correlation ID | Required| +| `x-status-code` | HTTP status code for the response | `200` | +| `content-type` | Content type of the response | Preserved | + +### Error Handling + +**Timeout Response:** +If no response is received within the configured timeout: +```json +{ + "code": "HTTP_ERROR_REQUEST_TIMEOUT", + "error": "Request Timeout", + "message": "Request timeout", + "statusCode": 408 +} +``` + +**Missing Correlation ID:** +Responses without correlation IDs are logged as warnings and ignored. + +**No Pending Request:** +Responses for non-existent correlation IDs are logged as warnings and ignored. + +### Use Cases + +- **Microservice Communication**: Route requests through Kafka for reliable delivery +- **Async Processing**: Handle long-running tasks with HTTP-like interface +- **Event-Driven APIs**: Build responsive APIs on event-driven architecture +- **Service Decoupling**: Maintain HTTP contracts while decoupling services via Kafka + ## Standalone Install & Setup You can generate a standalone application with: From 8d3f5cab94aa8e1006290c09e4c55f4fd6afed84 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 26 May 2025 13:39:52 +0200 Subject: [PATCH 3/6] feat: add support for path parameters and query strings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add new header constants for path params and query string data - Automatically extract path parameters from URL routes - Pass path parameters as JSON in x-plt-kafka-hooks-path-params header - Pass query string parameters as JSON in x-plt-kafka-hooks-query-string header - Support Fastify path parameter syntax (e.g., /users/:userId/orders/:orderId) - Add comprehensive tests for path params, query strings, and combined usage - Update documentation with examples and header reference tables - Maintain backward compatibility with existing functionality 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- README.md | 52 +++++++++- lib/definitions.js | 2 + lib/plugin.js | 28 +++++- test/plugin.test.js | 233 +++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 311 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e94f14a..b4f35cb 100644 --- a/README.md +++ b/README.md @@ -64,11 +64,50 @@ Add request/response mappings to your `platformatic.json`: | Option | Description | Default | | --------------- | ----------------------------------------------------- | ---------- | -| `path` | HTTP endpoint path to expose | Required | +| `path` | HTTP endpoint path to expose (supports path parameters) | Required | | `requestTopic` | Kafka topic to publish requests to | Required | | `responseTopic` | Kafka topic to consume responses from | Required | | `timeout` | Request timeout in milliseconds | `30000` | +### Path Parameters and Query Strings + +The request/response pattern supports both path parameters and query strings, which are automatically passed to Kafka consumers via headers. + +**Path Parameters:** +```json +{ + "path": "/api/users/:userId/orders/:orderId" +} +``` + +**Request with path parameters:** +```bash +curl -X POST http://localhost:3042/api/users/123/orders/456 \ + -H "Content-Type: application/json" \ + -d '{"action": "cancel"}' +``` + +**Kafka message headers include:** +```json +{ + "x-plt-kafka-hooks-path-params": "{\"userId\":\"123\",\"orderId\":\"456\"}" +} +``` + +**Query String Parameters:** +```bash +curl -X POST http://localhost:3042/api/search?q=coffee&limit=10&sort=price \ + -H "Content-Type: application/json" \ + -d '{"filters": {...}}' +``` + +**Kafka message headers include:** +```json +{ + "x-plt-kafka-hooks-query-string": "{\"q\":\"coffee\",\"limit\":\"10\",\"sort\":\"price\"}" +} +``` + ### Usage Example **Make a request:** @@ -107,6 +146,17 @@ curl -X POST http://localhost:3042/topics/response-topic \ } ``` +### Request Headers + +Request messages automatically include these headers when published to Kafka: + +| Header | Description | When Added | +| ------------------------------------ | ---------------------------------------------- | ---------- | +| `x-plt-kafka-hooks-correlation-id` | Unique correlation ID for request matching | Always | +| `x-plt-kafka-hooks-path-params` | JSON string of path parameters | When path parameters present | +| `x-plt-kafka-hooks-query-string` | JSON string of query string parameters | When query parameters present | +| `content-type` | Content type of the request | Always | + ### Response Headers Response messages support these special headers: diff --git a/lib/definitions.js b/lib/definitions.js index 7b0c7a1..eb160fd 100644 --- a/lib/definitions.js +++ b/lib/definitions.js @@ -1,6 +1,8 @@ export const keyHeader = 'x-plt-kafka-hooks-key' export const attemptHeader = 'x-plt-kafka-hooks-attempt' export const correlationIdHeader = 'x-plt-kafka-hooks-correlation-id' +export const pathParamsHeader = 'x-plt-kafka-hooks-path-params' +export const queryStringHeader = 'x-plt-kafka-hooks-query-string' export const minimumRetryDelay = 250 export const defaultDlqTopic = 'plt-kafka-hooks-dlq' diff --git a/lib/plugin.js b/lib/plugin.js index dbdc4f3..60febee 100644 --- a/lib/plugin.js +++ b/lib/plugin.js @@ -13,7 +13,9 @@ import { defaultRequestResponseTimeout, defaultRetries, defaultRetryDelay, - keyHeader + keyHeader, + pathParamsHeader, + queryStringHeader } from './definitions.js' export async function processMessage (logger, dlqProducer, mappings, message) { @@ -318,6 +320,14 @@ export async function plugin (server, opts) { [keyHeader]: { type: 'string' } }, additionalProperties: true + }, + params: { + type: 'object', + additionalProperties: { type: 'string' } + }, + querystring: { + type: 'object', + additionalProperties: true } }, async handler (request, reply) { @@ -338,6 +348,19 @@ export async function plugin (server, opts) { requestedAt: Date.now() }) + // Prepare additional headers for path params and query string + const additionalHeaders = {} + + // Add path parameters if any + if (request.params && Object.keys(request.params).length > 0) { + additionalHeaders[pathParamsHeader] = JSON.stringify(request.params) + } + + // Add query string parameters if any + if (request.query && Object.keys(request.query).length > 0) { + additionalHeaders[queryStringHeader] = JSON.stringify(request.query) + } + await server.kafkaProducer.send({ messages: [ { @@ -346,7 +369,8 @@ export async function plugin (server, opts) { value: request.body, headers: { 'content-type': request.headers['content-type'], - [correlationIdHeader]: correlationId + [correlationIdHeader]: correlationId, + ...additionalHeaders } } ] diff --git a/test/plugin.test.js b/test/plugin.test.js index e2bc6f8..af4c645 100644 --- a/test/plugin.test.js +++ b/test/plugin.test.js @@ -6,7 +6,7 @@ import { randomUUID } from 'node:crypto' import { once } from 'node:events' import { resolve } from 'node:path' import { test } from 'node:test' -import { attemptHeader, correlationIdHeader, defaultDlqTopic, keyHeader } from '../lib/definitions.js' +import { attemptHeader, correlationIdHeader, defaultDlqTopic, keyHeader, pathParamsHeader, queryStringHeader } from '../lib/definitions.js' import { stackable } from '../lib/index.js' import { createMonitor } from './fixtures/kafka-monitor.js' import { createTargetServer } from './fixtures/target-server.js' @@ -379,3 +379,234 @@ test('should timeout request/response when no response is received', async t => const json = response.json() t.assert.strictEqual(json.code, 'HTTP_ERROR_REQUEST_TIMEOUT') }) + +test('should handle request/response pattern with path parameters', async t => { + // Create a custom monitor for the request topic + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/users/:userId/orders/:orderId', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Simulate a request with path parameters + const requestPromise = server.inject({ + method: 'POST', + url: '/api/users/123/orders/456', + payload: 'test request data', + headers: { + 'content-type': 'text/plain' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Convert headers map to object with string keys for easier access + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Verify the request message has correlation ID and path params + t.assert.ok(headers[correlationIdHeader]) + t.assert.ok(headers[pathParamsHeader]) + t.assert.strictEqual(requestMessage.value, 'test request data') + + // Verify path parameters are correctly passed + const pathParams = JSON.parse(headers[pathParamsHeader]) + t.assert.strictEqual(pathParams.userId, '123') + t.assert.strictEqual(pathParams.orderId, '456') + + // Simulate a response + const correlationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', 'response data', { + [correlationIdHeader]: correlationId, + 'content-type': 'text/plain', + 'x-status-code': '200' + }) + + // Wait for the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, 'response data') +}) + +test('should handle request/response pattern with query string parameters', async t => { + // Create a custom monitor for the request topic + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/search', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Simulate a request with query string parameters + const requestPromise = server.inject({ + method: 'POST', + url: '/api/search?q=test&limit=10&sort=date', + payload: 'search request', + headers: { + 'content-type': 'text/plain' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Convert headers map to object with string keys for easier access + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Verify the request message has correlation ID and query string + t.assert.ok(headers[correlationIdHeader]) + t.assert.ok(headers[queryStringHeader]) + t.assert.strictEqual(requestMessage.value, 'search request') + + // Verify query string parameters are correctly passed + const queryParams = JSON.parse(headers[queryStringHeader]) + t.assert.strictEqual(queryParams.q, 'test') + t.assert.strictEqual(queryParams.limit, '10') + t.assert.strictEqual(queryParams.sort, 'date') + + // Simulate a response + const correlationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', 'search results', { + [correlationIdHeader]: correlationId, + 'content-type': 'text/plain', + 'x-status-code': '200' + }) + + // Wait for the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, 'search results') +}) + +test('should handle request/response pattern with both path and query parameters', async t => { + // Create a custom monitor for the request topic + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/users/:userId', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Simulate a request with both path and query parameters + const requestPromise = server.inject({ + method: 'POST', + url: '/api/users/789?include=profile&expand=orders', + payload: '{"action": "update"}', + headers: { + 'content-type': 'application/json' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Convert headers map to object with string keys for easier access + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Verify the request message has correlation ID, path params, and query string + t.assert.ok(headers[correlationIdHeader]) + t.assert.ok(headers[pathParamsHeader]) + t.assert.ok(headers[queryStringHeader]) + t.assert.strictEqual(requestMessage.value, '{"action": "update"}') + + // Verify path parameters + const pathParams = JSON.parse(headers[pathParamsHeader]) + t.assert.strictEqual(pathParams.userId, '789') + + // Verify query string parameters + const queryParams = JSON.parse(headers[queryStringHeader]) + t.assert.strictEqual(queryParams.include, 'profile') + t.assert.strictEqual(queryParams.expand, 'orders') + + // Simulate a response + const correlationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', '{"status": "updated"}', { + [correlationIdHeader]: correlationId, + 'content-type': 'application/json', + 'x-status-code': '200' + }) + + // Wait for the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, '{"status": "updated"}') +}) From d87e968d936b4ded0670324a6e4201a0701f0cc2 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 26 May 2025 13:58:59 +0200 Subject: [PATCH 4/6] test: add comprehensive integration tests for request/response pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add complete end-to-end integration tests demonstrating real-world usage - Test path parameters and query string extraction and forwarding - Test error handling with custom HTTP status codes - Demonstrate microservice communication patterns via Kafka - Show how kafka-hooks enables decoupled, reliable service architecture - Include realistic scenarios with user profiles, orders, and error cases - Verify correlation ID handling and message flow integrity - Test both successful responses and error conditions The integration tests demonstrate: - HTTP → Kafka → Processing Service → Kafka → HTTP flow - Path parameters: /api/users/:userId/orders/:orderId - Query strings: ?include=items&format=detailed¤cy=USD - JSON request/response payloads with complex nested data - Error responses with custom status codes (404, 500, etc.) - Complete microservice architecture patterns 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- test/integration.test.js | 416 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 416 insertions(+) create mode 100644 test/integration.test.js diff --git a/test/integration.test.js b/test/integration.test.js new file mode 100644 index 0000000..6c93764 --- /dev/null +++ b/test/integration.test.js @@ -0,0 +1,416 @@ +import { Consumer, stringDeserializer } from '@platformatic/kafka' +import { buildServer } from '@platformatic/service' +import { randomUUID } from 'node:crypto' +import { once } from 'node:events' +import { test } from 'node:test' +import { correlationIdHeader, pathParamsHeader, queryStringHeader } from '../lib/definitions.js' +import { stackable } from '../lib/index.js' + +/** + * Working integration test demonstrating request/response pattern + * Uses the same pattern as existing tests but shows the complete flow + */ + +async function startStackable (t, requestResponseConfig, topics = []) { + const config = { + $schema: '../schema.json', + module: '../lib/index.js', + kafka: { + concurrency: 5, + brokers: ['localhost:9092'], + topics, + requestResponse: requestResponseConfig, + consumer: { + groupId: randomUUID(), + maxWaitTime: 500, + sessionTimeout: 10000, + rebalanceTimeout: 15000, + heartbeatInterval: 500 + } + }, + port: 0, + server: { + logger: { + level: 'fatal' + } + } + } + + const server = await buildServer(config, stackable) + t.after(async () => { + await server.close() + }) + + return server +} + +async function publishMessage (server, topic, message, headers = {}) { + if (!headers['content-type']) { + if (typeof message === 'object') { + headers['content-type'] = 'application/json' + message = JSON.stringify(message) + } else { + headers['content-type'] = 'text/plain' + } + } + + headers['content-length'] = '' + Buffer.byteLength(message) + + const res = await server.inject({ + method: 'POST', + url: `/topics/${topic}`, + headers, + payload: message + }) + + if (res.statusCode >= 400) { + const json = await res.json() + const err = new Error(`Failed to publish message: ${json.message}`) + err.code = res.statusCode + err.json = json + throw err + } + + return res +} + +test('integration: complete request/response flow with path parameters and query strings', async t => { + // Create consumer to monitor request messages + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['integration-requests'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['integration-requests'] }) + t.after(() => requestConsumer.close(true)) + + // Start kafka-hooks server with request/response configuration + const server = await startStackable(t, [ + { + path: '/api/users/:userId/orders/:orderId', + requestTopic: 'integration-requests', + responseTopic: 'integration-responses', + timeout: 15000 + } + ], [ + { + topic: 'integration-responses', + url: 'http://localhost:3043/response' + } + ]) + + // Make HTTP request with path parameters and query strings + const requestPromise = server.inject({ + method: 'POST', + url: '/api/users/user123/orders/order456?include=items&format=detailed¤cy=USD', + payload: JSON.stringify({ + action: 'get_order_details', + fields: ['total', 'status', 'items'], + options: { includeHistory: true } + }), + headers: { + 'content-type': 'application/json' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Verify the request message contains all expected data + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Check correlation ID is present + t.assert.ok(headers[correlationIdHeader], 'Should have correlation ID') + + // Check path parameters + t.assert.ok(headers[pathParamsHeader], 'Should have path parameters') + const pathParams = JSON.parse(headers[pathParamsHeader]) + t.assert.strictEqual(pathParams.userId, 'user123') + t.assert.strictEqual(pathParams.orderId, 'order456') + + // Check query string parameters + t.assert.ok(headers[queryStringHeader], 'Should have query string parameters') + const queryParams = JSON.parse(headers[queryStringHeader]) + t.assert.strictEqual(queryParams.include, 'items') + t.assert.strictEqual(queryParams.format, 'detailed') + t.assert.strictEqual(queryParams.currency, 'USD') + + // Check request body + const requestData = JSON.parse(requestMessage.value) + t.assert.strictEqual(requestData.action, 'get_order_details') + t.assert.deepStrictEqual(requestData.fields, ['total', 'status', 'items']) + t.assert.strictEqual(requestData.options.includeHistory, true) + + // Simulate processing service response + const correlationId = headers[correlationIdHeader] + const processingResult = { + userId: pathParams.userId, + orderId: pathParams.orderId, + order: { + id: pathParams.orderId, + total: 159.99, + status: 'shipped', + currency: queryParams.currency, + items: [ + { name: 'Product A', price: 99.99 }, + { name: 'Product B', price: 59.99 } + ] + }, + requestedFields: requestData.fields, + format: queryParams.format, + includeHistory: requestData.options.includeHistory, + history: [ + { status: 'created', timestamp: '2024-01-01T10:00:00Z' }, + { status: 'shipped', timestamp: '2024-01-02T15:30:00Z' } + ] + } + + // Publish response back via HTTP API (simulating external service) + await publishMessage(server, 'integration-responses', JSON.stringify(processingResult), { + [correlationIdHeader]: correlationId, + 'content-type': 'application/json', + 'x-status-code': '200' + }) + + // Verify the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.headers['content-type'], 'application/json') + + const responseData = response.json() + t.assert.strictEqual(responseData.userId, 'user123') + t.assert.strictEqual(responseData.orderId, 'order456') + t.assert.strictEqual(responseData.order.id, 'order456') + t.assert.strictEqual(responseData.order.total, 159.99) + t.assert.strictEqual(responseData.order.status, 'shipped') + t.assert.strictEqual(responseData.order.currency, 'USD') + t.assert.strictEqual(responseData.order.items.length, 2) + t.assert.strictEqual(responseData.format, 'detailed') + t.assert.strictEqual(responseData.includeHistory, true) + t.assert.strictEqual(responseData.history.length, 2) +}) + +test('integration: error handling with custom status codes', async t => { + // Create consumer to monitor request messages + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['integration-error-requests'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['integration-error-requests'] }) + t.after(() => requestConsumer.close(true)) + + // Start kafka-hooks server + const server = await startStackable(t, [ + { + path: '/api/products/:productId', + requestTopic: 'integration-error-requests', + responseTopic: 'integration-error-responses', + timeout: 15000 + } + ], [ + { + topic: 'integration-error-responses', + url: 'http://localhost:3043/response' + } + ]) + + // Make HTTP request for non-existent product + const requestPromise = server.inject({ + method: 'POST', + url: '/api/products/nonexistent123?source=catalog', + payload: JSON.stringify({ + action: 'get_product', + details: ['price', 'availability'] + }), + headers: { + 'content-type': 'application/json' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + const correlationId = headers[correlationIdHeader] + const pathParams = JSON.parse(headers[pathParamsHeader]) + const queryParams = JSON.parse(headers[queryStringHeader]) + + // Simulate service returning error + const errorResponse = { + error: 'Product not found', + code: 'PRODUCT_NOT_FOUND', + productId: pathParams.productId, + source: queryParams.source, + timestamp: new Date().toISOString(), + details: { + message: 'The requested product could not be found in the catalog', + suggestion: 'Please check the product ID and try again' + } + } + + // Publish error response with 404 status + await publishMessage(server, 'integration-error-responses', JSON.stringify(errorResponse), { + [correlationIdHeader]: correlationId, + 'content-type': 'application/json', + 'x-status-code': '404' + }) + + // Verify the HTTP error response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 404) + t.assert.strictEqual(response.headers['content-type'], 'application/json') + + const errorData = response.json() + t.assert.strictEqual(errorData.error, 'Product not found') + t.assert.strictEqual(errorData.code, 'PRODUCT_NOT_FOUND') + t.assert.strictEqual(errorData.productId, 'nonexistent123') + t.assert.strictEqual(errorData.source, 'catalog') + t.assert.ok(errorData.timestamp) + t.assert.strictEqual(errorData.details.message, 'The requested product could not be found in the catalog') + t.assert.strictEqual(errorData.details.suggestion, 'Please check the product ID and try again') +}) + +test('integration: demonstrates microservice communication pattern', async t => { + /* + * This test demonstrates how kafka-hooks enables a microservice architecture: + * + * 1. API Gateway (kafka-hooks) exposes HTTP endpoints + * 2. Gateway publishes requests to Kafka topics + * 3. Processing services consume from Kafka topics + * 4. Processing services publish responses to Kafka topics + * 5. Gateway consumes responses and returns to HTTP clients + * + * Benefits: + * - Decoupling: Services communicate via Kafka, not direct HTTP + * - Reliability: Kafka provides durability and retry semantics + * - Scalability: Multiple instances can process from same topic + * - Observability: All communication flows through Kafka + */ + + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['user-profile-requests'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['user-profile-requests'] }) + t.after(() => requestConsumer.close(true)) + + // Gateway service configuration + const gateway = await startStackable(t, [ + { + path: '/api/users/:userId/profile', + requestTopic: 'user-profile-requests', + responseTopic: 'user-profile-responses', + timeout: 15000 + } + ], [ + { + topic: 'user-profile-responses', + url: 'http://localhost:3043/response' + } + ]) + + // Client makes HTTP request to gateway + const clientRequest = gateway.inject({ + method: 'POST', + url: '/api/users/emp789/profile?expand=permissions&include=recent_activity', + payload: JSON.stringify({ + requestedBy: 'admin-user', + reason: 'security_audit', + fields: ['name', 'email', 'role', 'last_login'] + }), + headers: { + 'content-type': 'application/json' + } + }) + + // Gateway publishes to Kafka - "User Profile Service" would consume this + const [kafkaRequest] = await once(requestStream, 'data') + + // Extract request data (as a microservice would) + const headers = {} + for (const [key, value] of kafkaRequest.headers) { + headers[key.toString()] = value.toString() + } + + const correlationId = headers[correlationIdHeader] + const pathParams = JSON.parse(headers[pathParamsHeader]) + const queryParams = JSON.parse(headers[queryStringHeader]) + const requestBody = JSON.parse(kafkaRequest.value) + + // Verify microservice receives complete context + t.assert.strictEqual(pathParams.userId, 'emp789') + t.assert.strictEqual(queryParams.expand, 'permissions') + t.assert.strictEqual(queryParams.include, 'recent_activity') + t.assert.strictEqual(requestBody.requestedBy, 'admin-user') + t.assert.strictEqual(requestBody.reason, 'security_audit') + + // Simulate microservice processing and response + const serviceResponse = { + userId: pathParams.userId, + profile: { + name: 'Jane Smith', + email: 'jane.smith@company.com', + role: 'senior_developer', + last_login: '2024-01-15T08:30:00Z' + }, + permissions: ['code_review', 'deploy_staging', 'access_logs'], + recent_activity: [ + { action: 'code_commit', timestamp: '2024-01-15T09:15:00Z' }, + { action: 'pr_review', timestamp: '2024-01-15T10:30:00Z' } + ], + audit: { + requestedBy: requestBody.requestedBy, + reason: requestBody.reason, + processedAt: new Date().toISOString(), + service: 'user-profile-service' + } + } + + // Microservice publishes response to Kafka + await publishMessage(gateway, 'user-profile-responses', JSON.stringify(serviceResponse), { + [correlationIdHeader]: correlationId, + 'content-type': 'application/json', + 'x-status-code': '200' + }) + + // Gateway returns response to client + const clientResponse = await clientRequest + + t.assert.strictEqual(clientResponse.statusCode, 200) + const data = clientResponse.json() + + // Verify complete data flow + t.assert.strictEqual(data.userId, 'emp789') + t.assert.strictEqual(data.profile.name, 'Jane Smith') + t.assert.strictEqual(data.profile.role, 'senior_developer') + t.assert.deepStrictEqual(data.permissions, ['code_review', 'deploy_staging', 'access_logs']) + t.assert.strictEqual(data.recent_activity.length, 2) + t.assert.strictEqual(data.audit.requestedBy, 'admin-user') + t.assert.strictEqual(data.audit.reason, 'security_audit') + t.assert.strictEqual(data.audit.service, 'user-profile-service') + t.assert.ok(data.audit.processedAt) +}) \ No newline at end of file From edf68d900f2b659299b14cc2ff4718c6435656d0 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 26 May 2025 15:10:31 +0200 Subject: [PATCH 5/6] fix: resolve linting issues in integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- test/integration.test.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/integration.test.js b/test/integration.test.js index 6c93764..f4d3618 100644 --- a/test/integration.test.js +++ b/test/integration.test.js @@ -291,13 +291,13 @@ test('integration: error handling with custom status codes', async t => { test('integration: demonstrates microservice communication pattern', async t => { /* * This test demonstrates how kafka-hooks enables a microservice architecture: - * + * * 1. API Gateway (kafka-hooks) exposes HTTP endpoints - * 2. Gateway publishes requests to Kafka topics + * 2. Gateway publishes requests to Kafka topics * 3. Processing services consume from Kafka topics * 4. Processing services publish responses to Kafka topics * 5. Gateway consumes responses and returns to HTTP clients - * + * * Benefits: * - Decoupling: Services communicate via Kafka, not direct HTTP * - Reliability: Kafka provides durability and retry semantics @@ -399,10 +399,10 @@ test('integration: demonstrates microservice communication pattern', async t => // Gateway returns response to client const clientResponse = await clientRequest - + t.assert.strictEqual(clientResponse.statusCode, 200) const data = clientResponse.json() - + // Verify complete data flow t.assert.strictEqual(data.userId, 'emp789') t.assert.strictEqual(data.profile.name, 'Jane Smith') @@ -413,4 +413,4 @@ test('integration: demonstrates microservice communication pattern', async t => t.assert.strictEqual(data.audit.reason, 'security_audit') t.assert.strictEqual(data.audit.service, 'user-profile-service') t.assert.ok(data.audit.processedAt) -}) \ No newline at end of file +}) From 8b9d654e7e3fe9310293eebf11a4ccc60836c2f0 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 26 May 2025 16:38:01 +0200 Subject: [PATCH 6/6] fix: change timeout status code from 408 to 504 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use 504 Gateway Timeout instead of 408 Request Timeout for request/response pattern timeouts, as this is more appropriate for scenarios where the gateway times out waiting for an upstream service. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- README.md | 6 +++--- lib/plugin.js | 4 ++-- test/plugin.test.js | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index b4f35cb..1324ca3 100644 --- a/README.md +++ b/README.md @@ -173,10 +173,10 @@ Response messages support these special headers: If no response is received within the configured timeout: ```json { - "code": "HTTP_ERROR_REQUEST_TIMEOUT", - "error": "Request Timeout", + "code": "HTTP_ERROR_GATEWAY_TIMEOUT", + "error": "Gateway Timeout", "message": "Request timeout", - "statusCode": 408 + "statusCode": 504 } ``` diff --git a/lib/plugin.js b/lib/plugin.js index 60febee..3fff732 100644 --- a/lib/plugin.js +++ b/lib/plugin.js @@ -1,6 +1,6 @@ import { Consumer, jsonSerializer, Producer, sleep, stringDeserializer, stringSerializer } from '@platformatic/kafka' import { ensureLoggableError } from '@platformatic/utils' -import { ACCEPTED, BAD_REQUEST, HttpError, NotFoundError, REQUEST_TIMEOUT } from 'http-errors-enhanced' +import { ACCEPTED, BAD_REQUEST, GATEWAY_TIMEOUT, HttpError, NotFoundError } from 'http-errors-enhanced' import { forEach } from 'hwp' import { request } from 'undici' import { randomUUID } from 'node:crypto' @@ -337,7 +337,7 @@ export async function plugin (server, opts) { const timeoutHandle = setTimeout(() => { pendingRequests.delete(correlationId) if (!reply.sent) { - const error = new HttpError(REQUEST_TIMEOUT, 'Request timeout') + const error = new HttpError(GATEWAY_TIMEOUT, 'Request timeout') reply.status(error.status).send({ code: error.code, ...error.serialize() }) } }, timeout) diff --git a/test/plugin.test.js b/test/plugin.test.js index af4c645..a6e9f3c 100644 --- a/test/plugin.test.js +++ b/test/plugin.test.js @@ -374,10 +374,10 @@ test('should timeout request/response when no response is received', async t => }) const elapsed = Date.now() - start - t.assert.strictEqual(response.statusCode, 408) + t.assert.strictEqual(response.statusCode, 504) t.assert.ok(elapsed >= 1000) const json = response.json() - t.assert.strictEqual(json.code, 'HTTP_ERROR_REQUEST_TIMEOUT') + t.assert.strictEqual(json.code, 'HTTP_ERROR_GATEWAY_TIMEOUT') }) test('should handle request/response pattern with path parameters', async t => {