Skip to content

Commit

Permalink
feat: make consumer.subscribe({ topic }) also accept an array of topics
Browse files Browse the repository at this point in the history
  • Loading branch information
benvan committed Mar 12, 2022
1 parent 0edaa45 commit 2ef4ebb
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 7 deletions.
11 changes: 9 additions & 2 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,21 @@ await consumer.subscribe({ topic: 'topic-C' })
await consumer.subscribe({ topic: 'topic-D', fromBeginning: true })
```

Alternatively, you can subscribe to multiple topics at once using a RegExp:
Alternatively, you can subscribe to multiple topics at once - by supplying either a list:

```javascript
await consumer.connect()
await consumer.subscribe({ topic: ['topic-A', 'topic-B'] })
```

or by using a RegExp:

```javascript
await consumer.connect()
await consumer.subscribe({ topic: /topic-(eu|us)-.*/i })
```

The consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`.
When suppling a RegExp, the consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`.

KafkaJS offers you two ways to process your data: `eachMessage` and `eachBatch`

Expand Down
39 changes: 37 additions & 2 deletions src/consumer/__tests__/subscribe.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,49 @@ describe('Consumer', () => {
)
})

it('throws an error if the topic is not a String or RegExp', async () => {
it('throws an error if the topic is not a String, String[] or RegExp', async () => {
await expect(consumer.subscribe({ topic: 1 })).rejects.toHaveProperty(
'message',
'Invalid topic 1 (number), the topic name has to be a String or a RegExp'
'Invalid topic 1 (number), the topic name has to be a String, String[] or a RegExp'
)
})
})

describe('with string array', () => {
it('subscribes to all topics supplied in the array', async () => {
const topicA = `topic-A`
const topicB = `topic-B`
const topicC = `topic-C`

await createTopic({ topic: topicA })
await createTopic({ topic: topicB })
await createTopic({ topic: topicC })

const messagesConsumed = []
await consumer.connect()
await consumer.subscribe({ topic: [topicA, topicB], fromBeginning: true })

consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
await waitForConsumerToJoinGroup(consumer)

await producer.connect()
await producer.sendBatch({
acks: 1,
topicMessages: [
{ topic: topicA, messages: [{ key: 'key-a', value: 'value-a' }] },
{ topic: topicB, messages: [{ key: 'key-b', value: 'value-b' }] },
{ topic: topicC, messages: [{ key: 'key-c', value: 'value-c' }] },
],
})

await waitForMessages(messagesConsumed, { number: 2 })
expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual([
'value-a',
'value-b',
])
})
})

describe('with regex', () => {
it('subscribes to all matching topics', async () => {
const testScope = secureRandom()
Expand Down
8 changes: 6 additions & 2 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,11 @@ module.exports = ({
}

const isRegExp = topic instanceof RegExp
if (typeof topic !== 'string' && !isRegExp) {
const isStringArray = Array.isArray(topic) && topic.every(t => typeof t === 'string')

if (typeof topic !== 'string' && !isRegExp && !isStringArray) {
throw new KafkaJSNonRetriableError(
`Invalid topic ${topic} (${typeof topic}), the topic name has to be a String or a RegExp`
`Invalid topic ${topic} (${typeof topic}), the topic name has to be a String, String[] or a RegExp`
)
}

Expand All @@ -205,6 +207,8 @@ module.exports = ({
})

topicsToSubscribe.push(...matchedTopics)
} else if (isStringArray) {
topicsToSubscribe.push(...topic)
} else {
topicsToSubscribe.push(topic)
}
Expand Down
2 changes: 1 addition & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ export type ConsumerRunConfig = {
eachMessage?: EachMessageHandler
}

export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean }
export type ConsumerSubscribeTopic = { topic: string | string[] | RegExp; fromBeginning?: boolean }

export type Consumer = {
connect(): Promise<void>
Expand Down

0 comments on commit 2ef4ebb

Please sign in to comment.