Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions packages/kafka/lib/AbstractKafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,28 @@ export abstract class AbstractKafkaConsumer<
)
this.messageBatchStream.on('error', (error) => this.handlerError(error))
} else {
this.consumerStream.on('data', (message) =>
this.consume(
message.topic,
message as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
),
)
// biome-ignore lint/style/noNonNullAssertion: consumerStream is always created
const stream = this.consumerStream!

// we are not waiting for the stream to complete
// because init() must return promised void
this.handleSyncStream(stream).catch(this.handlerError)
}

this.consumerStream.on('error', (error) => this.handlerError(error))
}

private async handleSyncStream(
stream: MessagesStream<string, object, string, string>,
): Promise<void> {
for await (const message of stream) {
await this.consume(
message.topic,
message as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
)
}
}

async close(): Promise<void> {
if (!this.consumerStream && !this.messageBatchStream) {
// Leaving the group in case consumer joined but streams were not created
Expand Down
2 changes: 1 addition & 1 deletion packages/kafka/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/kafka",
"version": "0.7.6",
"version": "0.7.7",
"engines": {
"node": ">= 22.14.0"
},
Expand Down
252 changes: 252 additions & 0 deletions packages/kafka/test/consumer/PermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { KafkaHandlerConfig, type RequestContext } from '../../lib/index.ts'
import { PermissionPublisher } from '../publisher/PermissionPublisher.ts'
import {
PERMISSION_ADDED_SCHEMA,
PERMISSION_REMOVED_SCHEMA,
type PermissionAdded,
TOPICS,
} from '../utils/permissionSchemas.ts'
Expand Down Expand Up @@ -489,4 +490,255 @@ describe('PermissionConsumer', () => {
})
})
})

describe('sync message processing', () => {
let publisher: PermissionPublisher
let consumer: PermissionConsumer | undefined

beforeAll(() => {
publisher = new PermissionPublisher(testContext.cradle)
})

beforeEach(async () => {
// Close and clear previous consumer to avoid message accumulation
if (consumer) {
await consumer.close()
consumer.clear()
}
})

afterAll(async () => {
await publisher.close()
await consumer?.close()
})

it('should process messages one at a time using handleSyncStream', async () => {
// Given - track processing order and timing
const processingOrder: string[] = []
const processingTimestamps: Record<string, { start: number; end: number }> = {}
const testMessageIds = ['sync-1', 'sync-2', 'sync-3']

consumer = new PermissionConsumer(testContext.cradle, {
handlers: {
'permission-added': new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, async (message) => {
// Only track messages from this test
if (!testMessageIds.includes(message.value.id)) {
consumer!.addedMessages.push(message)
return
}

const messageId = message.value.id
processingOrder.push(`start-${messageId}`)
processingTimestamps[messageId] = { start: Date.now(), end: 0 }

// Simulate async work to verify sequential processing
await new Promise((resolve) => setTimeout(resolve, 50))

processingOrder.push(`end-${messageId}`)
processingTimestamps[messageId]!.end = Date.now()
consumer!.addedMessages.push(message)
}),
},
})

await consumer.init()

// When - publish multiple messages at once
await Promise.all([
publisher.publish('permission-added', { id: 'sync-1', type: 'added', permissions: [] }),
publisher.publish('permission-added', { id: 'sync-2', type: 'added', permissions: [] }),
publisher.publish('permission-added', { id: 'sync-3', type: 'added', permissions: [] }),
])

// Then - wait for all messages to be processed
await consumer.handlerSpy.waitForMessageWithId('sync-1', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('sync-2', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('sync-3', 'consumed')

// Verify messages were processed sequentially (one completes before next starts)
expect(processingOrder).toEqual([
'start-sync-1',
'end-sync-1',
'start-sync-2',
'end-sync-2',
'start-sync-3',
'end-sync-3',
])

// Verify each message completes before the next one starts
expect(processingTimestamps['sync-1']!.end).toBeLessThan(
processingTimestamps['sync-2']!.start,
)
expect(processingTimestamps['sync-2']!.end).toBeLessThan(
processingTimestamps['sync-3']!.start,
)

const testMessages = consumer.addedMessages.filter((m) => testMessageIds.includes(m.value.id))
expect(testMessages).toHaveLength(3)
expect(testMessages[0]!.value.id).toBe('sync-1')
expect(testMessages[1]!.value.id).toBe('sync-2')
expect(testMessages[2]!.value.id).toBe('sync-3')
})

it('should process messages in order even when published rapidly', async () => {
// Given
const testMessageIds = ['rapid-1', 'rapid-2', 'rapid-3', 'rapid-4', 'rapid-5']
consumer = new PermissionConsumer(testContext.cradle)
await consumer.init()

// When - publish messages rapidly without waiting
const publishPromises = []
for (let i = 1; i <= 5; i++) {
publishPromises.push(
publisher.publish('permission-added', {
id: `rapid-${i}`,
type: 'added',
permissions: [],
}),
)
}
await Promise.all(publishPromises)

// Then - wait for all messages to be processed
for (let i = 1; i <= 5; i++) {
await consumer.handlerSpy.waitForMessageWithId(`rapid-${i}`, 'consumed')
}

// Verify messages were processed in order
const testMessages = consumer.addedMessages.filter((m) => testMessageIds.includes(m.value.id))
expect(testMessages).toHaveLength(5)
for (let i = 0; i < 5; i++) {
expect(testMessages[i]!.value.id).toBe(`rapid-${i + 1}`)
}
})

it('should ensure previous message completes before next message starts processing', async () => {
// Given - use a handler that takes time and tracks concurrency
let concurrentProcessing = 0
let maxConcurrency = 0
const testMessageIds = ['concurrency-1', 'concurrency-2', 'concurrency-3']
const processedMessages: string[] = []

consumer = new PermissionConsumer(testContext.cradle, {
handlers: {
'permission-added': new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, async (message) => {
// Only track messages from this test
if (!testMessageIds.includes(message.value.id)) {
consumer!.addedMessages.push(message)
return
}

concurrentProcessing++
maxConcurrency = Math.max(maxConcurrency, concurrentProcessing)

// Simulate processing time
await new Promise((resolve) => setTimeout(resolve, 30))

concurrentProcessing--
processedMessages.push(message.value.id)
consumer!.addedMessages.push(message)
}),
},
})
await consumer.init()

// When - publish multiple messages
await Promise.all([
publisher.publish('permission-added', {
id: 'concurrency-1',
type: 'added',
permissions: [],
}),
publisher.publish('permission-added', {
id: 'concurrency-2',
type: 'added',
permissions: [],
}),
publisher.publish('permission-added', {
id: 'concurrency-3',
type: 'added',
permissions: [],
}),
])

// Then - wait for all messages
await consumer.handlerSpy.waitForMessageWithId('concurrency-1', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('concurrency-2', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('concurrency-3', 'consumed')

// Verify only one message was processed at a time (max concurrency = 1)
expect(maxConcurrency).toBe(1)
expect(processedMessages).toHaveLength(3)
expect(processedMessages).toContain('concurrency-1')
expect(processedMessages).toContain('concurrency-2')
expect(processedMessages).toContain('concurrency-3')
})

it('should process messages synchronously across different topics', async () => {
// Given
const processingOrder: string[] = []
const testMessageIds = ['cross-topic-1', 'cross-topic-2', 'cross-topic-3']

consumer = new PermissionConsumer(testContext.cradle, {
handlers: {
'permission-added': new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, async (message) => {
// Only track messages from this test
if (!testMessageIds.includes(message.value.id)) {
consumer!.addedMessages.push(message)
return
}
processingOrder.push(`added-${message.value.id}`)
await new Promise((resolve) => setTimeout(resolve, 20))
consumer!.addedMessages.push(message)
}),
'permission-removed': new KafkaHandlerConfig(
PERMISSION_REMOVED_SCHEMA,
async (message) => {
// Only track messages from this test
if (!testMessageIds.includes(message.value.id)) {
consumer!.removedMessages.push(message)
return
}
processingOrder.push(`removed-${message.value.id}`)
await new Promise((resolve) => setTimeout(resolve, 20))
consumer!.removedMessages.push(message)
},
),
},
})
await consumer.init()

// When - publish messages to different topics
await Promise.all([
publisher.publish('permission-added', {
id: 'cross-topic-1',
type: 'added',
permissions: [],
}),
publisher.publish('permission-removed', {
id: 'cross-topic-2',
type: 'removed',
permissions: [],
}),
publisher.publish('permission-added', {
id: 'cross-topic-3',
type: 'added',
permissions: [],
}),
])

// Then - wait for all messages
await consumer.handlerSpy.waitForMessageWithId('cross-topic-1', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('cross-topic-2', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('cross-topic-3', 'consumed')

// Verify messages were processed sequentially (one at a time)
// Note: The exact order depends on Kafka's partition assignment, but each should complete before next starts
expect(processingOrder.length).toBe(3)
const testMessages =
consumer.addedMessages.filter((m) => testMessageIds.includes(m.value.id)).length +
consumer.removedMessages.filter((m) => testMessageIds.includes(m.value.id)).length
expect(testMessages).toBe(3)
})
})
})