diff --git a/test/plugin.test.js b/test/plugin.test.js index a6e9f3c..faac332 100644 --- a/test/plugin.test.js +++ b/test/plugin.test.js @@ -610,3 +610,120 @@ test('should handle request/response pattern with both path and query parameters t.assert.strictEqual(response.statusCode, 200) t.assert.strictEqual(response.payload, '{"status": "updated"}') }) + +test('should ignore response message missing correlation ID', async t => { + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/process', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 1000 // Short timeout for this test + } + ] + }) + + // Start a legitimate request that will timeout + const requestPromise = server.inject({ + method: 'POST', + url: '/api/process', + payload: 'test request', + headers: { + 'content-type': 'text/plain' + } + }) + + // Send a response message without correlation ID - this should be ignored + await publishMessage(server, 'plt-kafka-hooks-response', 'response without correlation', { + 'content-type': 'text/plain', + 'x-status-code': '200' + // No correlationIdHeader + }) + + // The request should still timeout because the invalid response was ignored + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 504) // Gateway timeout + + const json = response.json() + t.assert.strictEqual(json.code, 'HTTP_ERROR_GATEWAY_TIMEOUT') +}) + +test('should handle no pending request found for correlation ID', async t => { + const server = await startStackable(t, '', { + topics: [ + { + topic: 'plt-kafka-hooks-response', + url: 'http://localhost:3043/response' + } + ], + requestResponse: [ + { + path: '/api/process', + requestTopic: 'plt-kafka-hooks-request', + responseTopic: 'plt-kafka-hooks-response', + timeout: 5000 + } + ] + }) + + // Random correlation ID that doesn't correspond to any pending request + const nonExistentCorrelationId = randomUUID() + + // Send a response message with a correlation ID that has no pending request + await publishMessage(server, 'plt-kafka-hooks-response', 'orphaned response', { + [correlationIdHeader]: nonExistentCorrelationId, + 'content-type': 'text/plain', + 'x-status-code': '200' + }) + + // Verify the system still works normally by doing a proper request/response cycle + const requestConsumer = new Consumer({ + groupId: randomUUID(), + bootstrapBrokers: 'localhost:9092', + maxWaitTime: 500, + deserializers: { + value: stringDeserializer + } + }) + + await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true }) + const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] }) + t.after(() => requestConsumer.close(true)) + + // Start a legitimate request + const requestPromise = server.inject({ + method: 'POST', + url: '/api/process', + payload: 'legitimate request', + headers: { + 'content-type': 'text/plain' + } + }) + + // Wait for the request to be published + const [requestMessage] = await once(requestStream, 'data') + + const headers = {} + for (const [key, value] of requestMessage.headers) { + headers[key.toString()] = value.toString() + } + + // Send proper response with the correct correlation ID + const correctCorrelationId = headers[correlationIdHeader] + await publishMessage(server, 'plt-kafka-hooks-response', 'legitimate response', { + [correlationIdHeader]: correctCorrelationId, + 'content-type': 'text/plain', + 'x-status-code': '200' + }) + + // Verify the legitimate request still works properly + const response = await requestPromise + t.assert.strictEqual(response.statusCode, 200) + t.assert.strictEqual(response.payload, 'legitimate response') +})