From 2ef4ebb220c4c9441e50e3896bf7e76c857082ef Mon Sep 17 00:00:00 2001 From: Ben Date: Sat, 12 Mar 2022 16:37:43 +0000 Subject: [PATCH 1/2] feat: make consumer.subscribe({ topic }) also accept an array of topics --- docs/Consuming.md | 11 +++++-- src/consumer/__tests__/subscribe.spec.js | 39 ++++++++++++++++++++++-- src/consumer/index.js | 8 +++-- types/index.d.ts | 2 +- 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/docs/Consuming.md b/docs/Consuming.md index d13b84a93..59bafbdbd 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -26,14 +26,21 @@ await consumer.subscribe({ topic: 'topic-C' }) await consumer.subscribe({ topic: 'topic-D', fromBeginning: true }) ``` -Alternatively, you can subscribe to multiple topics at once using a RegExp: +Alternatively, you can subscribe to multiple topics at once - by supplying either a list: + +```javascript +await consumer.connect() +await consumer.subscribe({ topic: ['topic-A', 'topic-B'] }) +``` + +or by using a RegExp: ```javascript await consumer.connect() await consumer.subscribe({ topic: /topic-(eu|us)-.*/i }) ``` -The consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`. +When suppling a RegExp, the consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`. KafkaJS offers you two ways to process your data: `eachMessage` and `eachBatch` diff --git a/src/consumer/__tests__/subscribe.spec.js b/src/consumer/__tests__/subscribe.spec.js index 048ec711a..1ef8e78a8 100644 --- a/src/consumer/__tests__/subscribe.spec.js +++ b/src/consumer/__tests__/subscribe.spec.js @@ -46,14 +46,49 @@ describe('Consumer', () => { ) }) - it('throws an error if the topic is not a String or RegExp', async () => { + it('throws an error if the topic is not a String, String[] or RegExp', async () => { await expect(consumer.subscribe({ topic: 1 })).rejects.toHaveProperty( 'message', - 'Invalid topic 1 (number), the topic name has to be a String or a RegExp' + 'Invalid topic 1 (number), the topic name has to be a String, String[] or a RegExp' ) }) }) + describe('with string array', () => { + it('subscribes to all topics supplied in the array', async () => { + const topicA = `topic-A` + const topicB = `topic-B` + const topicC = `topic-C` + + await createTopic({ topic: topicA }) + await createTopic({ topic: topicB }) + await createTopic({ topic: topicC }) + + const messagesConsumed = [] + await consumer.connect() + await consumer.subscribe({ topic: [topicA, topicB], fromBeginning: true }) + + consumer.run({ eachMessage: async event => messagesConsumed.push(event) }) + await waitForConsumerToJoinGroup(consumer) + + await producer.connect() + await producer.sendBatch({ + acks: 1, + topicMessages: [ + { topic: topicA, messages: [{ key: 'key-a', value: 'value-a' }] }, + { topic: topicB, messages: [{ key: 'key-b', value: 'value-b' }] }, + { topic: topicC, messages: [{ key: 'key-c', value: 'value-c' }] }, + ], + }) + + await waitForMessages(messagesConsumed, { number: 2 }) + expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual([ + 'value-a', + 'value-b', + ]) + }) + }) + describe('with regex', () => { it('subscribes to all matching topics', async () => { const testScope = secureRandom() diff --git a/src/consumer/index.js b/src/consumer/index.js index 32d630e42..dfd3d45a5 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -184,9 +184,11 @@ module.exports = ({ } const isRegExp = topic instanceof RegExp - if (typeof topic !== 'string' && !isRegExp) { + const isStringArray = Array.isArray(topic) && topic.every(t => typeof t === 'string') + + if (typeof topic !== 'string' && !isRegExp && !isStringArray) { throw new KafkaJSNonRetriableError( - `Invalid topic ${topic} (${typeof topic}), the topic name has to be a String or a RegExp` + `Invalid topic ${topic} (${typeof topic}), the topic name has to be a String, String[] or a RegExp` ) } @@ -205,6 +207,8 @@ module.exports = ({ }) topicsToSubscribe.push(...matchedTopics) + } else if (isStringArray) { + topicsToSubscribe.push(...topic) } else { topicsToSubscribe.push(topic) } diff --git a/types/index.d.ts b/types/index.d.ts index 9416eacb9..bb964150b 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -901,7 +901,7 @@ export type ConsumerRunConfig = { eachMessage?: EachMessageHandler } -export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean } +export type ConsumerSubscribeTopic = { topic: string | string[] | RegExp; fromBeginning?: boolean } export type Consumer = { connect(): Promise From 76dd51c65a6d913bd4b473cbed8a063c6dbb09b2 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Fri, 8 Apr 2022 13:56:28 +0200 Subject: [PATCH 2/2] Add "topics" argument to consumer.subscribe --- docs/Consuming.md | 30 ++--- src/consumer/__tests__/subscribe.spec.js | 156 ++++++++++++++--------- src/consumer/index.js | 60 +++++---- types/index.d.ts | 8 +- types/tests.ts | 2 +- 5 files changed, 146 insertions(+), 110 deletions(-) diff --git a/docs/Consuming.md b/docs/Consuming.md index 59bafbdbd..558b25463 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -16,31 +16,23 @@ Subscribing to some topics: ```javascript await consumer.connect() -await consumer.subscribe({ topic: 'topic-A' }) +await consumer.subscribe({ topics: ['topic-A'] }) -// Subscribe can be called several times -await consumer.subscribe({ topic: 'topic-B' }) -await consumer.subscribe({ topic: 'topic-C' }) +// You can subscribe to multiple topics at once +await consumer.subscribe({ topics: ['topic-B', 'topic-C'] }) // It's possible to start from the beginning of the topic -await consumer.subscribe({ topic: 'topic-D', fromBeginning: true }) +await consumer.subscribe({ topics: ['topic-D'], fromBeginning: true }) ``` -Alternatively, you can subscribe to multiple topics at once - by supplying either a list: +Alternatively, you can subscribe to any topic that matches a regular expression: ```javascript await consumer.connect() -await consumer.subscribe({ topic: ['topic-A', 'topic-B'] }) +await consumer.subscribe({ topics: [/topic-(eu|us)-.*/i] }) ``` -or by using a RegExp: - -```javascript -await consumer.connect() -await consumer.subscribe({ topic: /topic-(eu|us)-.*/i }) -``` - -When suppling a RegExp, the consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`. +When suppling a regular expression, the consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`. KafkaJS offers you two ways to process your data: `eachMessage` and `eachBatch` @@ -212,8 +204,8 @@ The usual usage pattern for offsets stored outside of Kafka is as follows: The consumer group will use the latest committed offset when starting to fetch messages. If the offset is invalid or not defined, `fromBeginning` defines the behavior of the consumer group. This can be configured when subscribing to a topic: ```javascript -await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }) -await consumer.subscribe({ topic: 'other-topic', fromBeginning: false }) +await consumer.subscribe({ topics: ['test-topic'], fromBeginning: true }) +await consumer.subscribe({ topics: ['other-topic'], fromBeginning: false }) ``` When `fromBeginning` is `true`, the group will use the earliest offset. If set to `false`, it will use the latest offset. The default is `false`. @@ -268,7 +260,7 @@ Example: A situation where this could be useful is when an external dependency u ```javascript await consumer.connect() -await consumer.subscribe({ topic: 'jobs' }) +await consumer.subscribe({ topics: ['jobs'] }) await consumer.run({ eachMessage: async ({ topic, message }) => { try { @@ -329,7 +321,7 @@ To move the offset position in a topic/partition the `Consumer` provides the met ```javascript await consumer.connect() -await consumer.subscribe({ topic: 'example' }) +await consumer.subscribe({ topics: ['example'] }) // you don't need to await consumer#run consumer.run({ eachMessage: async ({ topic, message }) => true }) diff --git a/src/consumer/__tests__/subscribe.spec.js b/src/consumer/__tests__/subscribe.spec.js index 1ef8e78a8..4ac17603f 100644 --- a/src/consumer/__tests__/subscribe.spec.js +++ b/src/consumer/__tests__/subscribe.spec.js @@ -38,35 +38,27 @@ describe('Consumer', () => { producer && (await producer.disconnect()) }) - describe('when subscribe', () => { - it('throws an error if the topic is invalid', async () => { - await expect(consumer.subscribe({ topic: null })).rejects.toHaveProperty( + describe('when subscribing to multiple topics', () => { + it('throws an error if one of the topics is invalid', async () => { + await expect(consumer.subscribe({ topics: [1] })).rejects.toHaveProperty( 'message', - 'Invalid topic null' + 'Invalid topic 1 (number), the topic name has to be a String or a RegExp' ) }) - it('throws an error if the topic is not a String, String[] or RegExp', async () => { - await expect(consumer.subscribe({ topic: 1 })).rejects.toHaveProperty( - 'message', - 'Invalid topic 1 (number), the topic name has to be a String, String[] or a RegExp' - ) - }) - }) - - describe('with string array', () => { - it('subscribes to all topics supplied in the array', async () => { - const topicA = `topic-A` - const topicB = `topic-B` - const topicC = `topic-C` + it('subscribes by topic name as a string or regex', async () => { + const testScope = secureRandom() + const regexMatchingTopic = `pattern-${testScope}-regex-${secureRandom()}` + const topics = [`topic-${secureRandom()}`, `topic-${secureRandom()}`, regexMatchingTopic] - await createTopic({ topic: topicA }) - await createTopic({ topic: topicB }) - await createTopic({ topic: topicC }) + await Promise.all(topics.map(topic => createTopic({ topic }))) const messagesConsumed = [] await consumer.connect() - await consumer.subscribe({ topic: [topicA, topicB], fromBeginning: true }) + await consumer.subscribe({ + topics: [topics[0], topics[1], new RegExp(`pattern-${testScope}-regex-.*`, 'i')], + fromBeginning: true, + }) consumer.run({ eachMessage: async event => messagesConsumed.push(event) }) await waitForConsumerToJoinGroup(consumer) @@ -75,59 +67,99 @@ describe('Consumer', () => { await producer.sendBatch({ acks: 1, topicMessages: [ - { topic: topicA, messages: [{ key: 'key-a', value: 'value-a' }] }, - { topic: topicB, messages: [{ key: 'key-b', value: 'value-b' }] }, - { topic: topicC, messages: [{ key: 'key-c', value: 'value-c' }] }, + { topic: topics[0], messages: [{ key: 'drink', value: 'drink' }] }, + { topic: topics[1], messages: [{ key: 'your', value: 'your' }] }, + { topic: topics[2], messages: [{ key: 'ovaltine', value: 'ovaltine' }] }, ], }) - await waitForMessages(messagesConsumed, { number: 2 }) - expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual([ - 'value-a', - 'value-b', - ]) + await waitForMessages(messagesConsumed, { number: 3 }) + expect(messagesConsumed.map(m => m.message.value.toString())).toEqual( + expect.arrayContaining(['drink', 'your', 'ovaltine']) + ) }) }) - describe('with regex', () => { - it('subscribes to all matching topics', async () => { - const testScope = secureRandom() - const topicUS = `pattern-${testScope}-us-${secureRandom()}` - const topicSE = `pattern-${testScope}-se-${secureRandom()}` - const topicUK = `pattern-${testScope}-uk-${secureRandom()}` - const topicBR = `pattern-${testScope}-br-${secureRandom()}` - - await createTopic({ topic: topicUS }) - await createTopic({ topic: topicSE }) - await createTopic({ topic: topicUK }) - await createTopic({ topic: topicBR }) + describe('Deprecated "topic" interface', () => { + describe('when subscribing', () => { + it('throws an error if the topic is invalid', async () => { + await expect(consumer.subscribe({ topic: null })).rejects.toHaveProperty( + 'message', + 'Missing required argument "topics"' + ) + }) - const messagesConsumed = [] - await consumer.connect() - await consumer.subscribe({ - topic: new RegExp(`pattern-${testScope}-(se|br)-.*`, 'i'), - fromBeginning: true, + it('throws an error if the topic is not a String or RegExp', async () => { + await expect(consumer.subscribe({ topic: 1 })).rejects.toHaveProperty( + 'message', + 'Invalid topic 1 (number), the topic name has to be a String or a RegExp' + ) }) - consumer.run({ eachMessage: async event => messagesConsumed.push(event) }) - await waitForConsumerToJoinGroup(consumer) + describe('with a string', () => { + it('subscribes to the topic', async () => { + const topic = `topic-${secureRandom()}` - await producer.connect() - await producer.sendBatch({ - acks: 1, - topicMessages: [ - { topic: topicUS, messages: [{ key: `key-us`, value: `value-us` }] }, - { topic: topicUK, messages: [{ key: `key-uk`, value: `value-uk` }] }, - { topic: topicSE, messages: [{ key: `key-se`, value: `value-se` }] }, - { topic: topicBR, messages: [{ key: `key-br`, value: `value-br` }] }, - ], + await createTopic({ topic }) + + const messagesConsumed = [] + await consumer.connect() + await consumer.subscribe({ topic, fromBeginning: true }) + + consumer.run({ eachMessage: async event => messagesConsumed.push(event) }) + await waitForConsumerToJoinGroup(consumer) + + await producer.connect() + await producer.sendBatch({ + acks: 1, + topicMessages: [{ topic, messages: [{ key: 'key-a', value: 'value-a' }] }], + }) + + await waitForMessages(messagesConsumed, { number: 1 }) + expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual(['value-a']) + }) }) - await waitForMessages(messagesConsumed, { number: 2 }) - expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual([ - 'value-br', - 'value-se', - ]) + describe('with regex', () => { + it('subscribes to all matching topics', async () => { + const testScope = secureRandom() + const topicUS = `pattern-${testScope}-us-${secureRandom()}` + const topicSE = `pattern-${testScope}-se-${secureRandom()}` + const topicUK = `pattern-${testScope}-uk-${secureRandom()}` + const topicBR = `pattern-${testScope}-br-${secureRandom()}` + + await Promise.all( + [topicUS, topicSE, topicUK, topicBR].map(topic => createTopic({ topic })) + ) + + const messagesConsumed = [] + await consumer.connect() + await consumer.subscribe({ + topic: new RegExp(`pattern-${testScope}-(se|br)-.*`, 'i'), + fromBeginning: true, + }) + + consumer.run({ eachMessage: async event => messagesConsumed.push(event) }) + await waitForConsumerToJoinGroup(consumer) + + await producer.connect() + await producer.sendBatch({ + acks: 1, + topicMessages: [ + { topic: topicUS, messages: [{ key: `key-us`, value: `value-us` }] }, + { topic: topicUK, messages: [{ key: `key-uk`, value: `value-uk` }] }, + { topic: topicSE, messages: [{ key: `key-se`, value: `value-se` }] }, + { topic: topicBR, messages: [{ key: `key-br`, value: `value-br` }] }, + ], + }) + + await waitForMessages(messagesConsumed, { number: 2 }) + expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual([ + 'value-br', + 'value-se', + ]) + }) + }) }) }) }) diff --git a/src/consumer/index.js b/src/consumer/index.js index dfd3d45a5..fbbbff9d6 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -174,43 +174,51 @@ module.exports = ({ } /** @type {import("../../types").Consumer["subscribe"]} */ - const subscribe = async ({ topic, fromBeginning = false }) => { + const subscribe = async ({ topic, topics: subscriptionTopics, fromBeginning = false }) => { if (consumerGroup) { throw new KafkaJSNonRetriableError('Cannot subscribe to topic while consumer is running') } - if (!topic) { - throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`) + if (!topic && !subscriptionTopics) { + throw new KafkaJSNonRetriableError('Missing required argument "topics"') } - const isRegExp = topic instanceof RegExp - const isStringArray = Array.isArray(topic) && topic.every(t => typeof t === 'string') + if (subscriptionTopics != null && !Array.isArray(subscriptionTopics)) { + throw new KafkaJSNonRetriableError('Argument "topics" must be an array') + } - if (typeof topic !== 'string' && !isRegExp && !isStringArray) { - throw new KafkaJSNonRetriableError( - `Invalid topic ${topic} (${typeof topic}), the topic name has to be a String, String[] or a RegExp` - ) + const subscriptions = subscriptionTopics || [topic] + + for (const subscription of subscriptions) { + if (typeof subscription !== 'string' && !(subscription instanceof RegExp)) { + throw new KafkaJSNonRetriableError( + `Invalid topic ${subscription} (${typeof subscription}), the topic name has to be a String or a RegExp` + ) + } } + const hasRegexSubscriptions = subscriptions.some(subscription => subscription instanceof RegExp) + const metadata = hasRegexSubscriptions ? await cluster.metadata() : undefined + const topicsToSubscribe = [] - if (isRegExp) { - const topicRegExp = topic - const metadata = await cluster.metadata() - const matchedTopics = metadata.topicMetadata - .map(({ topic: topicName }) => topicName) - .filter(topicName => topicRegExp.test(topicName)) - - logger.debug('Subscription based on RegExp', { - groupId, - topicRegExp: topicRegExp.toString(), - matchedTopics, - }) + for (const subscription of subscriptions) { + const isRegExp = subscription instanceof RegExp + if (isRegExp) { + const topicRegExp = subscription + const matchedTopics = metadata.topicMetadata + .map(({ topic: topicName }) => topicName) + .filter(topicName => topicRegExp.test(topicName)) + + logger.debug('Subscription based on RegExp', { + groupId, + topicRegExp: topicRegExp.toString(), + matchedTopics, + }) - topicsToSubscribe.push(...matchedTopics) - } else if (isStringArray) { - topicsToSubscribe.push(...topic) - } else { - topicsToSubscribe.push(topic) + topicsToSubscribe.push(...matchedTopics) + } else { + topicsToSubscribe.push(subscription) + } } for (const t of topicsToSubscribe) { diff --git a/types/index.d.ts b/types/index.d.ts index bb964150b..895e2d6e4 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -901,12 +901,16 @@ export type ConsumerRunConfig = { eachMessage?: EachMessageHandler } -export type ConsumerSubscribeTopic = { topic: string | string[] | RegExp; fromBeginning?: boolean } +/** + * @deprecated Replaced by ConsumerSubscribeTopics + */ +export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean } +export type ConsumerSubscribeTopics = { topics: (string | RegExp)[]; fromBeginning?: boolean } export type Consumer = { connect(): Promise disconnect(): Promise - subscribe(topic: ConsumerSubscribeTopic): Promise + subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise stop(): Promise run(config?: ConsumerRunConfig): Promise commitOffsets(topicPartitions: Array): Promise diff --git a/types/tests.ts b/types/tests.ts index b8b7e07d9..c64b63330 100644 --- a/types/tests.ts +++ b/types/tests.ts @@ -61,7 +61,7 @@ removeListener() const runConsumer = async () => { await consumer.connect() - await consumer.subscribe({ topic }) + await consumer.subscribe({ topics: [topic] }) await consumer.run({ eachBatch: async ({ batch,