diff --git a/README.md b/README.md index 56d97ef..1324ca3 100644 --- a/README.md +++ b/README.md @@ -8,15 +8,191 @@ Then you can: - Export the messages published on one or more topics to a HTTP endpoint. - Publish messages to a topic from a HTTP endpoint with a POST to the `/topics/:topic` endpoint. +- Build request/response patterns over Kafka topics with HTTP-style semantics. ## Features - Consume messages from Kafka topics and forward to HTTP endpoints. - Send messages to Kafka topics via HTTP API. +- **Request/Response pattern over Kafka topics.** - Direct binary message passing. - Configurable retries and concurrency. - Dead Letter Queue (DLQ) for failed messages. +## Request/Response Pattern + +The kafka-hooks library supports HTTP request/response patterns routed through Kafka topics. This enables building responsive microservices that communicate asynchronously via Kafka while maintaining HTTP-style request/response semantics. + +### How It Works + +1. **HTTP Request**: Client makes a POST request to a configured endpoint +2. **Kafka Request**: The request is published to a Kafka request topic with a unique correlation ID +3. **Service Processing**: External service consumes from the request topic, processes the message +4. **Kafka Response**: Service publishes response to a response topic with the same correlation ID +5. **HTTP Response**: The original HTTP request completes with the response data + +### Configuration + +Add request/response mappings to your `platformatic.json`: + +```json +{ + "kafka": { + "brokers": ["localhost:9092"], + "topics": [ + { + "topic": "response-topic", + "url": "http://localhost:3043/webhook" + } + ], + "requestResponse": [ + { + "path": "/api/process", + "requestTopic": "request-topic", + "responseTopic": "response-topic", + "timeout": 5000 + } + ], + "consumer": { + "groupId": "my-group" + } + } +} +``` + +### Request/Response Options + +| Option | Description | Default | +| --------------- | ----------------------------------------------------- | ---------- | +| `path` | HTTP endpoint path to expose (supports path parameters) | Required | +| `requestTopic` | Kafka topic to publish requests to | Required | +| `responseTopic` | Kafka topic to consume responses from | Required | +| `timeout` | Request timeout in milliseconds | `30000` | + +### Path Parameters and Query Strings + +The request/response pattern supports both path parameters and query strings, which are automatically passed to Kafka consumers via headers. + +**Path Parameters:** +```json +{ + "path": "/api/users/:userId/orders/:orderId" +} +``` + +**Request with path parameters:** +```bash +curl -X POST http://localhost:3042/api/users/123/orders/456 \ + -H "Content-Type: application/json" \ + -d '{"action": "cancel"}' +``` + +**Kafka message headers include:** +```json +{ + "x-plt-kafka-hooks-path-params": "{\"userId\":\"123\",\"orderId\":\"456\"}" +} +``` + +**Query String Parameters:** +```bash +curl -X POST http://localhost:3042/api/search?q=coffee&limit=10&sort=price \ + -H "Content-Type: application/json" \ + -d '{"filters": {...}}' +``` + +**Kafka message headers include:** +```json +{ + "x-plt-kafka-hooks-query-string": "{\"q\":\"coffee\",\"limit\":\"10\",\"sort\":\"price\"}" +} +``` + +### Usage Example + +**Make a request:** +```bash +curl -X POST http://localhost:3042/api/process \ + -H "Content-Type: application/json" \ + -d '{"userId": 123, "action": "process"}' +``` + +**Request message published to Kafka:** +```json +{ + "headers": { + "content-type": "application/json", + "x-plt-kafka-hooks-correlation-id": "550e8400-e29b-41d4-a716-446655440000" + }, + "value": "{\"userId\": 123, \"action\": \"process\"}" +} +``` + +**Service processes and responds:** +```bash +# External service publishes response +curl -X POST http://localhost:3042/topics/response-topic \ + -H "Content-Type: application/json" \ + -H "x-plt-kafka-hooks-correlation-id: 550e8400-e29b-41d4-a716-446655440000" \ + -H "x-status-code: 200" \ + -d '{"result": "success", "data": {...}}' +``` + +**HTTP response returned to client:** +```json +{ + "result": "success", + "data": {...} +} +``` + +### Request Headers + +Request messages automatically include these headers when published to Kafka: + +| Header | Description | When Added | +| ------------------------------------ | ---------------------------------------------- | ---------- | +| `x-plt-kafka-hooks-correlation-id` | Unique correlation ID for request matching | Always | +| `x-plt-kafka-hooks-path-params` | JSON string of path parameters | When path parameters present | +| `x-plt-kafka-hooks-query-string` | JSON string of query string parameters | When query parameters present | +| `content-type` | Content type of the request | Always | + +### Response Headers + +Response messages support these special headers: + +| Header | Description | Default | +| ------------------------------------ | ---------------------------------------------- | ------- | +| `x-plt-kafka-hooks-correlation-id` | Must match the original request correlation ID | Required| +| `x-status-code` | HTTP status code for the response | `200` | +| `content-type` | Content type of the response | Preserved | + +### Error Handling + +**Timeout Response:** +If no response is received within the configured timeout: +```json +{ + "code": "HTTP_ERROR_GATEWAY_TIMEOUT", + "error": "Gateway Timeout", + "message": "Request timeout", + "statusCode": 504 +} +``` + +**Missing Correlation ID:** +Responses without correlation IDs are logged as warnings and ignored. + +**No Pending Request:** +Responses for non-existent correlation IDs are logged as warnings and ignored. + +### Use Cases + +- **Microservice Communication**: Route requests through Kafka for reliable delivery +- **Async Processing**: Handle long-running tasks with HTTP-like interface +- **Event-Driven APIs**: Build responsive APIs on event-driven architecture +- **Service Decoupling**: Maintain HTTP contracts while decoupling services via Kafka + ## Standalone Install & Setup You can generate a standalone application with: diff --git a/config.d.ts b/config.d.ts index 379c14e..e485876 100644 --- a/config.d.ts +++ b/config.d.ts @@ -64,6 +64,14 @@ export interface PlatformaticKafkaHooksConfiguration { [k: string]: unknown; }; }; + formatters?: { + path: string; + }; + timestamp?: "epochTime" | "unixTime" | "nullTime" | "isoTime"; + redact?: { + paths: string[]; + censor?: string; + }; [k: string]: unknown; }; loggerInstance?: { @@ -330,6 +338,13 @@ export interface PlatformaticKafkaHooksConfiguration { }; concurrency?: number; serialization?: string; + requestResponse?: { + path: string; + requestTopic: string; + responseTopic: string; + timeout?: number; + [k: string]: unknown; + }[]; [k: string]: unknown; }; } diff --git a/lib/definitions.js b/lib/definitions.js index 2f75191..eb160fd 100644 --- a/lib/definitions.js +++ b/lib/definitions.js @@ -1,5 +1,8 @@ export const keyHeader = 'x-plt-kafka-hooks-key' export const attemptHeader = 'x-plt-kafka-hooks-attempt' +export const correlationIdHeader = 'x-plt-kafka-hooks-correlation-id' +export const pathParamsHeader = 'x-plt-kafka-hooks-path-params' +export const queryStringHeader = 'x-plt-kafka-hooks-query-string' export const minimumRetryDelay = 250 export const defaultDlqTopic = 'plt-kafka-hooks-dlq' @@ -8,3 +11,4 @@ export const defaultRetries = 3 export const defaultMethod = 'POST' export const defaultIncludeAttemptInRequests = true export const defaultConcurrency = 10 +export const defaultRequestResponseTimeout = 30000 diff --git a/lib/plugin.js b/lib/plugin.js index 6173acd..3fff732 100644 --- a/lib/plugin.js +++ b/lib/plugin.js @@ -1,16 +1,21 @@ import { Consumer, jsonSerializer, Producer, sleep, stringDeserializer, stringSerializer } from '@platformatic/kafka' import { ensureLoggableError } from '@platformatic/utils' -import { ACCEPTED, BAD_REQUEST, HttpError, NotFoundError } from 'http-errors-enhanced' +import { ACCEPTED, BAD_REQUEST, GATEWAY_TIMEOUT, HttpError, NotFoundError } from 'http-errors-enhanced' import { forEach } from 'hwp' import { request } from 'undici' +import { randomUUID } from 'node:crypto' import { attemptHeader, + correlationIdHeader, defaultConcurrency, defaultDlqTopic, defaultMethod, + defaultRequestResponseTimeout, defaultRetries, defaultRetryDelay, - keyHeader + keyHeader, + pathParamsHeader, + queryStringHeader } from './definitions.js' export async function processMessage (logger, dlqProducer, mappings, message) { @@ -98,10 +103,46 @@ export async function processMessage (logger, dlqProducer, mappings, message) { await dlqProducer.send(toSend) } +function createResponseProcessor (logger, pendingRequests) { + return async function processResponseMessage (message) { + // Convert headers map to object with string keys + const headers = {} + for (const [key, value] of message.headers) { + headers[key.toString()] = value.toString() + } + + const correlationId = headers[correlationIdHeader] + + if (!correlationId) { + logger.warn({ topic: message.topic }, 'Response message missing correlation ID') + return + } + + const pendingRequest = pendingRequests.get(correlationId) + if (!pendingRequest) { + logger.warn({ correlationId, topic: message.topic }, 'No pending request found for correlation ID') + return + } + + clearTimeout(pendingRequest.timeout) + pendingRequests.delete(correlationId) + + const statusCode = parseInt(headers['x-status-code'] || '200', 10) + const contentType = headers['content-type'] || 'application/octet-stream' + + pendingRequest.reply + .status(statusCode) + .header('content-type', contentType) + .send(message.value) + } +} + export async function setupKafka (server, configuration) { const topics = new Set() const dlqs = new Set() const topicsMappings = {} + const responseMappings = {} + const pendingRequests = new Map() for (const topic of configuration.kafka.topics) { topic.dlq ??= defaultDlqTopic @@ -120,6 +161,14 @@ export async function setupKafka (server, configuration) { topicsMappings[topic.topic] = topic } + // Handle request/response mappings + if (configuration.kafka.requestResponse) { + for (const mapping of configuration.kafka.requestResponse) { + topics.add(mapping.responseTopic) + responseMappings[mapping.responseTopic] = mapping + } + } + const producer = new Producer({ bootstrapBrokers: configuration.kafka.brokers, serializers: { @@ -165,9 +214,17 @@ export async function setupKafka (server, configuration) { const stream = await consumer.consume({ topics: topicsList, mode, fallbackMode, offsets, ...consumerOptions }) server.log.info(`Kafka consumer started with concurrency ${configuration.kafka.concurrency} ...`) + const responseProcessor = createResponseProcessor(server.log, pendingRequests) + forEach( stream, - message => processMessage(server.log, dlqProducer, topicsMappings, message), + message => { + if (responseMappings[message.topic]) { + return responseProcessor(message) + } else { + return processMessage(server.log, dlqProducer, topicsMappings, message) + } + }, /* c8 ignore next 8 */ configuration.kafka.concurrency ?? defaultConcurrency ).catch(error => { @@ -188,13 +245,13 @@ export async function setupKafka (server, configuration) { server.decorate('kafkaProducer', producer) - return topics + return { topics, pendingRequests, responseMappings } } export async function plugin (server, opts) { /* c8 ignore next */ const configuration = server.platformatic?.config ?? opts.context?.stackable.configManager.current - const topics = await setupKafka(server, configuration) + const { topics, pendingRequests } = await setupKafka(server, configuration) server.removeContentTypeParser('application/json') server.removeContentTypeParser('application/json; charset=utf-8') @@ -239,7 +296,8 @@ export async function plugin (server, opts) { key: request.headers[keyHeader], value: request.body, headers: { - 'content-type': request.headers['content-type'] + 'content-type': request.headers['content-type'], + ...Object.fromEntries(Object.entries(request.headers).filter(([key]) => key !== 'content-type' && key !== keyHeader)) } } ] @@ -248,4 +306,79 @@ export async function plugin (server, opts) { return reply.status(ACCEPTED).send() } }) + + // Request/Response endpoint + if (configuration.kafka.requestResponse) { + for (const mapping of configuration.kafka.requestResponse) { + server.route({ + method: 'POST', + url: mapping.path, + schema: { + headers: { + type: 'object', + properties: { + [keyHeader]: { type: 'string' } + }, + additionalProperties: true + }, + params: { + type: 'object', + additionalProperties: { type: 'string' } + }, + querystring: { + type: 'object', + additionalProperties: true + } + }, + async handler (request, reply) { + const correlationId = randomUUID() + const timeout = mapping.timeout || defaultRequestResponseTimeout + + const timeoutHandle = setTimeout(() => { + pendingRequests.delete(correlationId) + if (!reply.sent) { + const error = new HttpError(GATEWAY_TIMEOUT, 'Request timeout') + reply.status(error.status).send({ code: error.code, ...error.serialize() }) + } + }, timeout) + + pendingRequests.set(correlationId, { + reply, + timeout: timeoutHandle, + requestedAt: Date.now() + }) + + // Prepare additional headers for path params and query string + const additionalHeaders = {} + + // Add path parameters if any + if (request.params && Object.keys(request.params).length > 0) { + additionalHeaders[pathParamsHeader] = JSON.stringify(request.params) + } + + // Add query string parameters if any + if (request.query && Object.keys(request.query).length > 0) { + additionalHeaders[queryStringHeader] = JSON.stringify(request.query) + } + + await server.kafkaProducer.send({ + messages: [ + { + topic: mapping.requestTopic, + key: request.headers[keyHeader], + value: request.body, + headers: { + 'content-type': request.headers['content-type'], + [correlationIdHeader]: correlationId, + ...additionalHeaders + } + } + ] + }) + + return reply + } + }) + } + } } diff --git a/lib/schema.js b/lib/schema.js index 81dd93a..c5f36b3 100644 --- a/lib/schema.js +++ b/lib/schema.js @@ -12,6 +12,7 @@ import { defaultDlqTopic, defaultIncludeAttemptInRequests, defaultMethod, + defaultRequestResponseTimeout, defaultRetries, defaultRetryDelay, minimumRetryDelay @@ -100,6 +101,23 @@ export const schema = { serialization: { type: 'string', resolvePath: true + }, + requestResponse: { + type: 'array', + items: { + type: 'object', + properties: { + path: { type: 'string' }, + requestTopic: { type: 'string' }, + responseTopic: { type: 'string' }, + timeout: { + type: 'integer', + minimum: 1000, + default: defaultRequestResponseTimeout + } + }, + required: ['path', 'requestTopic', 'responseTopic'] + } } }, required: ['brokers', 'topics', 'consumer'] diff --git a/schema.json b/schema.json index 1bb260e..95e3ab6 100644 --- a/schema.json +++ b/schema.json @@ -1,7 +1,7 @@ { - "$id": "https://schemas.platformatic.dev/@platformatic/kafka-hooks/0.1.0.json", + "$id": "https://schemas.platformatic.dev/@platformatic/kafka-hooks/0.3.0.json", "title": "Platformatic kafka-hooks configuration", - "version": "0.1.0", + "version": "0.3.0", "type": "object", "properties": { "basePath": { @@ -180,6 +180,46 @@ } }, "additionalProperties": false + }, + "formatters": { + "type": "object", + "properties": { + "path": { + "type": "string", + "resolvePath": true + } + }, + "required": [ + "path" + ], + "additionalProperties": false + }, + "timestamp": { + "enum": [ + "epochTime", + "unixTime", + "nullTime", + "isoTime" + ] + }, + "redact": { + "type": "object", + "properties": { + "paths": { + "type": "array", + "items": { + "type": "string" + } + }, + "censor": { + "type": "string", + "default": "[redacted]" + } + }, + "required": [ + "paths" + ], + "additionalProperties": false } }, "required": [ @@ -1308,6 +1348,33 @@ "serialization": { "type": "string", "resolvePath": true + }, + "requestResponse": { + "type": "array", + "items": { + "type": "object", + "properties": { + "path": { + "type": "string" + }, + "requestTopic": { + "type": "string" + }, + "responseTopic": { + "type": "string" + }, + "timeout": { + "type": "integer", + "minimum": 1000, + "default": 30000 + } + }, + "required": [ + "path", + "requestTopic", + "responseTopic" + ] + } } }, "required": [ diff --git a/test/integration.test.js b/test/integration.test.js new file mode 100644 index 0000000..f4d3618 --- /dev/null +++ b/test/integration.test.js @@ -0,0 +1,416 @@ +import { Consumer, stringDeserializer } from '@platformatic/kafka' +import { buildServer } from '@platformatic/service' +import { randomUUID } from 'node:crypto' +import { once } from 'node:events' +import { test } from 'node:test' +import { correlationIdHeader, pathParamsHeader, queryStringHeader } from '../lib/definitions.js' +import { stackable } from '../lib/index.js' + +/** + * Working integration test demonstrating request/response pattern + * Uses the same pattern as existing tests but shows the complete flow + */ + +async function startStackable (t, requestResponseConfig, topics = []) { + const config = { + $schema: '../schema.json', + module: '../lib/index.js', + kafka: { + concurrency: 5, + brokers: ['localhost:9092'], + topics, + requestResponse: requestResponseConfig, + consumer: { + groupId: randomUUID(), + maxWaitTime: 500, + sessionTimeout: 10000, + rebalanceTimeout: 15000, + heartbeatInterval: 500 + } + }, + port: 0, + server: { + logger: { + level: 'fatal' + } + } + } + + const server = await buildServer(config, stackable) + t.after(async () => { + await server.close() + }) + + return server +} + +async function publishMessage (server, topic, message, headers = {}) { + if (!headers['content-type']) { + if (typeof message === 'object') { + headers['content-type'] = 'application/json' + message = JSON.stringify(message) + } else { + headers['content-type'] = 'text/plain' + } + } + + headers['content-length'] = '' + Buffer.byteLength(message) + + const res = await server.inject({ + method: 'POST', + url: `/topics/${topic}`, + headers, + payload: message + }) + + if (res.statusCode >= 400) { + const json = await res.json() + const err = new Error(`Failed to publish message: ${json.message}`) + err.code = res.statusCode + err.json = json + throw err + } + + return res +} + +test('integration: complete request/response flow with path parameters and query strings', async t => { + // Create consumer to monitor request messages + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['integration-requests'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['integration-requests'] }) + t.after(() => requestConsumer.close(true)) + + // Start kafka-hooks server with request/response configuration + const server = await startStackable(t, [ + { + path: '/api/users/:userId/orders/:orderId', + requestTopic: 'integration-requests', + responseTopic: 'integration-responses', + timeout: 15000 + } + ], [ + { + topic: 'integration-responses', + url: 'http://localhost:3043/response' + } + ]) + + // Make HTTP request with path parameters and query strings + const requestPromise = server.inject({ + method: 'POST', + url: '/api/users/user123/orders/order456?include=items&format=detailed¤cy=USD', + payload: JSON.stringify({ + action: 'get_order_details', + fields: ['total', 'status', 'items'], + options: { includeHistory: true } + }), + headers: { + 'content-type': 'application/json' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Verify the request message contains all expected data + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Check correlation ID is present + t.assert.ok(headers[correlationIdHeader], 'Should have correlation ID') + + // Check path parameters + t.assert.ok(headers[pathParamsHeader], 'Should have path parameters') + const pathParams = JSON.parse(headers[pathParamsHeader]) + t.assert.strictEqual(pathParams.userId, 'user123') + t.assert.strictEqual(pathParams.orderId, 'order456') + + // Check query string parameters + t.assert.ok(headers[queryStringHeader], 'Should have query string parameters') + const queryParams = JSON.parse(headers[queryStringHeader]) + t.assert.strictEqual(queryParams.include, 'items') + t.assert.strictEqual(queryParams.format, 'detailed') + t.assert.strictEqual(queryParams.currency, 'USD') + + // Check request body + const requestData = JSON.parse(requestMessage.value) + t.assert.strictEqual(requestData.action, 'get_order_details') + t.assert.deepStrictEqual(requestData.fields, ['total', 'status', 'items']) + t.assert.strictEqual(requestData.options.includeHistory, true) + + // Simulate processing service response + const correlationId = headers[correlationIdHeader] + const processingResult = { + userId: pathParams.userId, + orderId: pathParams.orderId, + order: { + id: pathParams.orderId, + total: 159.99, + status: 'shipped', + currency: queryParams.currency, + items: [ + { name: 'Product A', price: 99.99 }, + { name: 'Product B', price: 59.99 } + ] + }, + requestedFields: requestData.fields, + format: queryParams.format, + includeHistory: requestData.options.includeHistory, + history: [ + { status: 'created', timestamp: '2024-01-01T10:00:00Z' }, + { status: 'shipped', timestamp: '2024-01-02T15:30:00Z' } + ] + } + + // Publish response back via HTTP API (simulating external service) + await publishMessage(server, 'integration-responses', JSON.stringify(processingResult), { + [correlationIdHeader]: correlationId, + 'content-type': 'application/json', + 'x-status-code': '200' + }) + + // Verify the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.headers['content-type'], 'application/json') + + const responseData = response.json() + t.assert.strictEqual(responseData.userId, 'user123') + t.assert.strictEqual(responseData.orderId, 'order456') + t.assert.strictEqual(responseData.order.id, 'order456') + t.assert.strictEqual(responseData.order.total, 159.99) + t.assert.strictEqual(responseData.order.status, 'shipped') + t.assert.strictEqual(responseData.order.currency, 'USD') + t.assert.strictEqual(responseData.order.items.length, 2) + t.assert.strictEqual(responseData.format, 'detailed') + t.assert.strictEqual(responseData.includeHistory, true) + t.assert.strictEqual(responseData.history.length, 2) +}) + +test('integration: error handling with custom status codes', async t => { + // Create consumer to monitor request messages + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['integration-error-requests'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['integration-error-requests'] }) + t.after(() => requestConsumer.close(true)) + + // Start kafka-hooks server + const server = await startStackable(t, [ + { + path: '/api/products/:productId', + requestTopic: 'integration-error-requests', + responseTopic: 'integration-error-responses', + timeout: 15000 + } + ], [ + { + topic: 'integration-error-responses', + url: 'http://localhost:3043/response' + } + ]) + + // Make HTTP request for non-existent product + const requestPromise = server.inject({ + method: 'POST', + url: '/api/products/nonexistent123?source=catalog', + payload: JSON.stringify({ + action: 'get_product', + details: ['price', 'availability'] + }), + headers: { + 'content-type': 'application/json' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + const correlationId = headers[correlationIdHeader] + const pathParams = JSON.parse(headers[pathParamsHeader]) + const queryParams = JSON.parse(headers[queryStringHeader]) + + // Simulate service returning error + const errorResponse = { + error: 'Product not found', + code: 'PRODUCT_NOT_FOUND', + productId: pathParams.productId, + source: queryParams.source, + timestamp: new Date().toISOString(), + details: { + message: 'The requested product could not be found in the catalog', + suggestion: 'Please check the product ID and try again' + } + } + + // Publish error response with 404 status + await publishMessage(server, 'integration-error-responses', JSON.stringify(errorResponse), { + [correlationIdHeader]: correlationId, + 'content-type': 'application/json', + 'x-status-code': '404' + }) + + // Verify the HTTP error response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 404) + t.assert.strictEqual(response.headers['content-type'], 'application/json') + + const errorData = response.json() + t.assert.strictEqual(errorData.error, 'Product not found') + t.assert.strictEqual(errorData.code, 'PRODUCT_NOT_FOUND') + t.assert.strictEqual(errorData.productId, 'nonexistent123') + t.assert.strictEqual(errorData.source, 'catalog') + t.assert.ok(errorData.timestamp) + t.assert.strictEqual(errorData.details.message, 'The requested product could not be found in the catalog') + t.assert.strictEqual(errorData.details.suggestion, 'Please check the product ID and try again') +}) + +test('integration: demonstrates microservice communication pattern', async t => { + /* + * This test demonstrates how kafka-hooks enables a microservice architecture: + * + * 1. API Gateway (kafka-hooks) exposes HTTP endpoints + * 2. Gateway publishes requests to Kafka topics + * 3. Processing services consume from Kafka topics + * 4. Processing services publish responses to Kafka topics + * 5. Gateway consumes responses and returns to HTTP clients + * + * Benefits: + * - Decoupling: Services communicate via Kafka, not direct HTTP + * - Reliability: Kafka provides durability and retry semantics + * - Scalability: Multiple instances can process from same topic + * - Observability: All communication flows through Kafka + */ + + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['user-profile-requests'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['user-profile-requests'] }) + t.after(() => requestConsumer.close(true)) + + // Gateway service configuration + const gateway = await startStackable(t, [ + { + path: '/api/users/:userId/profile', + requestTopic: 'user-profile-requests', + responseTopic: 'user-profile-responses', + timeout: 15000 + } + ], [ + { + topic: 'user-profile-responses', + url: 'http://localhost:3043/response' + } + ]) + + // Client makes HTTP request to gateway + const clientRequest = gateway.inject({ + method: 'POST', + url: '/api/users/emp789/profile?expand=permissions&include=recent_activity', + payload: JSON.stringify({ + requestedBy: 'admin-user', + reason: 'security_audit', + fields: ['name', 'email', 'role', 'last_login'] + }), + headers: { + 'content-type': 'application/json' + } + }) + + // Gateway publishes to Kafka - "User Profile Service" would consume this + const [kafkaRequest] = await once(requestStream, 'data') + + // Extract request data (as a microservice would) + const headers = {} + for (const [key, value] of kafkaRequest.headers) { + headers[key.toString()] = value.toString() + } + + const correlationId = headers[correlationIdHeader] + const pathParams = JSON.parse(headers[pathParamsHeader]) + const queryParams = JSON.parse(headers[queryStringHeader]) + const requestBody = JSON.parse(kafkaRequest.value) + + // Verify microservice receives complete context + t.assert.strictEqual(pathParams.userId, 'emp789') + t.assert.strictEqual(queryParams.expand, 'permissions') + t.assert.strictEqual(queryParams.include, 'recent_activity') + t.assert.strictEqual(requestBody.requestedBy, 'admin-user') + t.assert.strictEqual(requestBody.reason, 'security_audit') + + // Simulate microservice processing and response + const serviceResponse = { + userId: pathParams.userId, + profile: { + name: 'Jane Smith', + email: 'jane.smith@company.com', + role: 'senior_developer', + last_login: '2024-01-15T08:30:00Z' + }, + permissions: ['code_review', 'deploy_staging', 'access_logs'], + recent_activity: [ + { action: 'code_commit', timestamp: '2024-01-15T09:15:00Z' }, + { action: 'pr_review', timestamp: '2024-01-15T10:30:00Z' } + ], + audit: { + requestedBy: requestBody.requestedBy, + reason: requestBody.reason, + processedAt: new Date().toISOString(), + service: 'user-profile-service' + } + } + + // Microservice publishes response to Kafka + await publishMessage(gateway, 'user-profile-responses', JSON.stringify(serviceResponse), { + [correlationIdHeader]: correlationId, + 'content-type': 'application/json', + 'x-status-code': '200' + }) + + // Gateway returns response to client + const clientResponse = await clientRequest + + t.assert.strictEqual(clientResponse.statusCode, 200) + const data = clientResponse.json() + + // Verify complete data flow + t.assert.strictEqual(data.userId, 'emp789') + t.assert.strictEqual(data.profile.name, 'Jane Smith') + t.assert.strictEqual(data.profile.role, 'senior_developer') + t.assert.deepStrictEqual(data.permissions, ['code_review', 'deploy_staging', 'access_logs']) + t.assert.strictEqual(data.recent_activity.length, 2) + t.assert.strictEqual(data.audit.requestedBy, 'admin-user') + t.assert.strictEqual(data.audit.reason, 'security_audit') + t.assert.strictEqual(data.audit.service, 'user-profile-service') + t.assert.ok(data.audit.processedAt) +}) diff --git a/test/plugin.test.js b/test/plugin.test.js index f3a77c4..a6e9f3c 100644 --- a/test/plugin.test.js +++ b/test/plugin.test.js @@ -1,4 +1,4 @@ -import { sleep, stringDeserializer, jsonDeserializer } from '@platformatic/kafka' +import { sleep, stringDeserializer, jsonDeserializer, Consumer } from '@platformatic/kafka' import { buildServer } from '@platformatic/service' import { NOT_FOUND } from 'http-errors-enhanced' import { deepStrictEqual } from 'node:assert' @@ -6,7 +6,7 @@ import { randomUUID } from 'node:crypto' import { once } from 'node:events' import { resolve } from 'node:path' import { test } from 'node:test' -import { attemptHeader, defaultDlqTopic, keyHeader } from '../lib/definitions.js' +import { attemptHeader, correlationIdHeader, defaultDlqTopic, keyHeader, pathParamsHeader, queryStringHeader } from '../lib/definitions.js' import { stackable } from '../lib/index.js' import { createMonitor } from './fixtures/kafka-monitor.js' import { createTargetServer } from './fixtures/target-server.js' @@ -282,3 +282,331 @@ test('should support binary data', async t => { deepStrictEqual(message.headers[keyHeader], '') deepStrictEqual(message.headers[attemptHeader], '1') }) + +test('should handle request/response pattern', async t => { + // Create a custom monitor for the request topic + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/process', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Simulate a request + const requestPromise = server.inject({ + method: 'POST', + url: '/api/process', + payload: 'test request data', + headers: { + 'content-type': 'text/plain' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Convert headers map to object with string keys for easier access + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Verify the request message has correlation ID + t.assert.ok(headers[correlationIdHeader]) + t.assert.strictEqual(requestMessage.value, 'test request data') + + // Simulate a response by sending to response topic via HTTP API + const correlationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', 'response data', { + [correlationIdHeader]: correlationId, + 'content-type': 'text/plain', + 'x-status-code': '200' + }) + + // Wait for the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, 'response data') + t.assert.strictEqual(response.headers['content-type'], 'text/plain') +}) + +test('should timeout request/response when no response is received', async t => { + const server = await startStackable(t, '', { + topics: [], + requestResponse: [ + { + path: '/api/timeout', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 1000 + } + ] + }) + + const start = Date.now() + const response = await server.inject({ + method: 'POST', + url: '/api/timeout', + payload: 'test request data' + }) + const elapsed = Date.now() - start + + t.assert.strictEqual(response.statusCode, 504) + t.assert.ok(elapsed >= 1000) + const json = response.json() + t.assert.strictEqual(json.code, 'HTTP_ERROR_GATEWAY_TIMEOUT') +}) + +test('should handle request/response pattern with path parameters', async t => { + // Create a custom monitor for the request topic + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/users/:userId/orders/:orderId', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Simulate a request with path parameters + const requestPromise = server.inject({ + method: 'POST', + url: '/api/users/123/orders/456', + payload: 'test request data', + headers: { + 'content-type': 'text/plain' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Convert headers map to object with string keys for easier access + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Verify the request message has correlation ID and path params + t.assert.ok(headers[correlationIdHeader]) + t.assert.ok(headers[pathParamsHeader]) + t.assert.strictEqual(requestMessage.value, 'test request data') + + // Verify path parameters are correctly passed + const pathParams = JSON.parse(headers[pathParamsHeader]) + t.assert.strictEqual(pathParams.userId, '123') + t.assert.strictEqual(pathParams.orderId, '456') + + // Simulate a response + const correlationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', 'response data', { + [correlationIdHeader]: correlationId, + 'content-type': 'text/plain', + 'x-status-code': '200' + }) + + // Wait for the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, 'response data') +}) + +test('should handle request/response pattern with query string parameters', async t => { + // Create a custom monitor for the request topic + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/search', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Simulate a request with query string parameters + const requestPromise = server.inject({ + method: 'POST', + url: '/api/search?q=test&limit=10&sort=date', + payload: 'search request', + headers: { + 'content-type': 'text/plain' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Convert headers map to object with string keys for easier access + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Verify the request message has correlation ID and query string + t.assert.ok(headers[correlationIdHeader]) + t.assert.ok(headers[queryStringHeader]) + t.assert.strictEqual(requestMessage.value, 'search request') + + // Verify query string parameters are correctly passed + const queryParams = JSON.parse(headers[queryStringHeader]) + t.assert.strictEqual(queryParams.q, 'test') + t.assert.strictEqual(queryParams.limit, '10') + t.assert.strictEqual(queryParams.sort, 'date') + + // Simulate a response + const correlationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', 'search results', { + [correlationIdHeader]: correlationId, + 'content-type': 'text/plain', + 'x-status-code': '200' + }) + + // Wait for the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, 'search results') +}) + +test('should handle request/response pattern with both path and query parameters', async t => { + // Create a custom monitor for the request topic + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/users/:userId', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Simulate a request with both path and query parameters + const requestPromise = server.inject({ + method: 'POST', + url: '/api/users/789?include=profile&expand=orders', + payload: '{"action": "update"}', + headers: { + 'content-type': 'application/json' + } + }) + + // Wait for the request to be published to Kafka + const [requestMessage] = await once(requestStream, 'data') + + // Convert headers map to object with string keys for easier access + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Verify the request message has correlation ID, path params, and query string + t.assert.ok(headers[correlationIdHeader]) + t.assert.ok(headers[pathParamsHeader]) + t.assert.ok(headers[queryStringHeader]) + t.assert.strictEqual(requestMessage.value, '{"action": "update"}') + + // Verify path parameters + const pathParams = JSON.parse(headers[pathParamsHeader]) + t.assert.strictEqual(pathParams.userId, '789') + + // Verify query string parameters + const queryParams = JSON.parse(headers[queryStringHeader]) + t.assert.strictEqual(queryParams.include, 'profile') + t.assert.strictEqual(queryParams.expand, 'orders') + + // Simulate a response + const correlationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', '{"status": "updated"}', { + [correlationIdHeader]: correlationId, + 'content-type': 'application/json', + 'x-status-code': '200' + }) + + // Wait for the HTTP response + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, '{"status": "updated"}') +}) diff --git a/test/schema.test.js b/test/schema.test.js index 0b60f01..f5e3273 100644 --- a/test/schema.test.js +++ b/test/schema.test.js @@ -94,6 +94,23 @@ test('should export stackable schema', async () => { serialization: { type: 'string', resolvePath: true + }, + requestResponse: { + type: 'array', + items: { + type: 'object', + properties: { + path: { type: 'string' }, + requestTopic: { type: 'string' }, + responseTopic: { type: 'string' }, + timeout: { + type: 'integer', + minimum: 1000, + default: 30000 + } + }, + required: ['path', 'requestTopic', 'responseTopic'] + } } }, required: ['brokers', 'topics', 'consumer']