diff --git a/docs/Consuming.md b/docs/Consuming.md index d13b84a93..59bafbdbd 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -26,14 +26,21 @@ await consumer.subscribe({ topic: 'topic-C' }) await consumer.subscribe({ topic: 'topic-D', fromBeginning: true }) ``` -Alternatively, you can subscribe to multiple topics at once using a RegExp: +Alternatively, you can subscribe to multiple topics at once - by supplying either a list: + +```javascript +await consumer.connect() +await consumer.subscribe({ topic: ['topic-A', 'topic-B'] }) +``` + +or by using a RegExp: ```javascript await consumer.connect() await consumer.subscribe({ topic: /topic-(eu|us)-.*/i }) ``` -The consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`. +When suppling a RegExp, the consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`. KafkaJS offers you two ways to process your data: `eachMessage` and `eachBatch` diff --git a/src/consumer/__tests__/subscribe.spec.js b/src/consumer/__tests__/subscribe.spec.js index 048ec711a..1ef8e78a8 100644 --- a/src/consumer/__tests__/subscribe.spec.js +++ b/src/consumer/__tests__/subscribe.spec.js @@ -46,14 +46,49 @@ describe('Consumer', () => { ) }) - it('throws an error if the topic is not a String or RegExp', async () => { + it('throws an error if the topic is not a String, String[] or RegExp', async () => { await expect(consumer.subscribe({ topic: 1 })).rejects.toHaveProperty( 'message', - 'Invalid topic 1 (number), the topic name has to be a String or a RegExp' + 'Invalid topic 1 (number), the topic name has to be a String, String[] or a RegExp' ) }) }) + describe('with string array', () => { + it('subscribes to all topics supplied in the array', async () => { + const topicA = `topic-A` + const topicB = `topic-B` + const topicC = `topic-C` + + await createTopic({ topic: topicA }) + await createTopic({ topic: topicB }) + await createTopic({ topic: topicC }) + + const messagesConsumed = [] + await consumer.connect() + await consumer.subscribe({ topic: [topicA, topicB], fromBeginning: true }) + + consumer.run({ eachMessage: async event => messagesConsumed.push(event) }) + await waitForConsumerToJoinGroup(consumer) + + await producer.connect() + await producer.sendBatch({ + acks: 1, + topicMessages: [ + { topic: topicA, messages: [{ key: 'key-a', value: 'value-a' }] }, + { topic: topicB, messages: [{ key: 'key-b', value: 'value-b' }] }, + { topic: topicC, messages: [{ key: 'key-c', value: 'value-c' }] }, + ], + }) + + await waitForMessages(messagesConsumed, { number: 2 }) + expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual([ + 'value-a', + 'value-b', + ]) + }) + }) + describe('with regex', () => { it('subscribes to all matching topics', async () => { const testScope = secureRandom() diff --git a/src/consumer/index.js b/src/consumer/index.js index 32d630e42..dfd3d45a5 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -184,9 +184,11 @@ module.exports = ({ } const isRegExp = topic instanceof RegExp - if (typeof topic !== 'string' && !isRegExp) { + const isStringArray = Array.isArray(topic) && topic.every(t => typeof t === 'string') + + if (typeof topic !== 'string' && !isRegExp && !isStringArray) { throw new KafkaJSNonRetriableError( - `Invalid topic ${topic} (${typeof topic}), the topic name has to be a String or a RegExp` + `Invalid topic ${topic} (${typeof topic}), the topic name has to be a String, String[] or a RegExp` ) } @@ -205,6 +207,8 @@ module.exports = ({ }) topicsToSubscribe.push(...matchedTopics) + } else if (isStringArray) { + topicsToSubscribe.push(...topic) } else { topicsToSubscribe.push(topic) } diff --git a/types/index.d.ts b/types/index.d.ts index 9416eacb9..bb964150b 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -901,7 +901,7 @@ export type ConsumerRunConfig = { eachMessage?: EachMessageHandler } -export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean } +export type ConsumerSubscribeTopic = { topic: string | string[] | RegExp; fromBeginning?: boolean } export type Consumer = { connect(): Promise