Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions test/plugin.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,120 @@ test('should handle request/response pattern with both path and query parameters
t.assert.strictEqual(response.statusCode, 200)
t.assert.strictEqual(response.payload, '{"status": "updated"}')
})

test('should ignore response message missing correlation ID', async t => {
const server = await startStackable(t, '', {
topics: [
{
topic: 'plt-kafka-hooks-response',
url: 'http://localhost:3043/response'
}
],
requestResponse: [
{
path: '/api/process',
requestTopic: 'plt-kafka-hooks-request',
responseTopic: 'plt-kafka-hooks-response',
timeout: 1000 // Short timeout for this test
}
]
})

// Start a legitimate request that will timeout
const requestPromise = server.inject({
method: 'POST',
url: '/api/process',
payload: 'test request',
headers: {
'content-type': 'text/plain'
}
})

// Send a response message without correlation ID - this should be ignored
await publishMessage(server, 'plt-kafka-hooks-response', 'response without correlation', {
'content-type': 'text/plain',
'x-status-code': '200'
// No correlationIdHeader
})

// The request should still timeout because the invalid response was ignored
const response = await requestPromise
t.assert.strictEqual(response.statusCode, 504) // Gateway timeout

const json = response.json()
t.assert.strictEqual(json.code, 'HTTP_ERROR_GATEWAY_TIMEOUT')
})

test('should handle no pending request found for correlation ID', async t => {
const server = await startStackable(t, '', {
topics: [
{
topic: 'plt-kafka-hooks-response',
url: 'http://localhost:3043/response'
}
],
requestResponse: [
{
path: '/api/process',
requestTopic: 'plt-kafka-hooks-request',
responseTopic: 'plt-kafka-hooks-response',
timeout: 5000
}
]
})

// Random correlation ID that doesn't correspond to any pending request
const nonExistentCorrelationId = randomUUID()

// Send a response message with a correlation ID that has no pending request
await publishMessage(server, 'plt-kafka-hooks-response', 'orphaned response', {
[correlationIdHeader]: nonExistentCorrelationId,
'content-type': 'text/plain',
'x-status-code': '200'
})

// Verify the system still works normally by doing a proper request/response cycle
const requestConsumer = new Consumer({
groupId: randomUUID(),
bootstrapBrokers: 'localhost:9092',
maxWaitTime: 500,
deserializers: {
value: stringDeserializer
}
})

await requestConsumer.metadata({ topics: ['plt-kafka-hooks-request'], autocreateTopics: true })
const requestStream = await requestConsumer.consume({ topics: ['plt-kafka-hooks-request'] })
t.after(() => requestConsumer.close(true))

// Start a legitimate request
const requestPromise = server.inject({
method: 'POST',
url: '/api/process',
payload: 'legitimate request',
headers: {
'content-type': 'text/plain'
}
})

// Wait for the request to be published
const [requestMessage] = await once(requestStream, 'data')

const headers = {}
for (const [key, value] of requestMessage.headers) {
headers[key.toString()] = value.toString()
}

// Send proper response with the correct correlation ID
const correctCorrelationId = headers[correlationIdHeader]
await publishMessage(server, 'plt-kafka-hooks-response', 'legitimate response', {
[correlationIdHeader]: correctCorrelationId,
'content-type': 'text/plain',
'x-status-code': '200'
})

// Verify the legitimate request still works properly
const response = await requestPromise
t.assert.strictEqual(response.statusCode, 200)
t.assert.strictEqual(response.payload, 'legitimate response')
})