Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make consumer.subscribe accept an array of topics #1313

Merged
merged 3 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@ Subscribing to some topics:
```javascript
await consumer.connect()

await consumer.subscribe({ topic: 'topic-A' })
await consumer.subscribe({ topics: ['topic-A'] })

// Subscribe can be called several times
await consumer.subscribe({ topic: 'topic-B' })
await consumer.subscribe({ topic: 'topic-C' })
// You can subscribe to multiple topics at once
await consumer.subscribe({ topics: ['topic-B', 'topic-C'] })

// It's possible to start from the beginning of the topic
await consumer.subscribe({ topic: 'topic-D', fromBeginning: true })
await consumer.subscribe({ topics: ['topic-D'], fromBeginning: true })
```

Alternatively, you can subscribe to multiple topics at once using a RegExp:
Alternatively, you can subscribe to any topic that matches a regular expression:

```javascript
await consumer.connect()
await consumer.subscribe({ topic: /topic-(eu|us)-.*/i })
await consumer.subscribe({ topics: [/topic-(eu|us)-.*/i] })
```

The consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`.
When suppling a regular expression, the consumer will not match topics created after the subscription. If your broker has `topic-A` and `topic-B`, you subscribe to `/topic-.*/`, then `topic-C` is created, your consumer would not be automatically subscribed to `topic-C`.

KafkaJS offers you two ways to process your data: `eachMessage` and `eachBatch`

Expand Down Expand Up @@ -205,8 +204,8 @@ The usual usage pattern for offsets stored outside of Kafka is as follows:
The consumer group will use the latest committed offset when starting to fetch messages. If the offset is invalid or not defined, `fromBeginning` defines the behavior of the consumer group. This can be configured when subscribing to a topic:

```javascript
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.subscribe({ topic: 'other-topic', fromBeginning: false })
await consumer.subscribe({ topics: ['test-topic'], fromBeginning: true })
await consumer.subscribe({ topics: ['other-topic'], fromBeginning: false })
```

When `fromBeginning` is `true`, the group will use the earliest offset. If set to `false`, it will use the latest offset. The default is `false`.
Expand Down Expand Up @@ -261,7 +260,7 @@ Example: A situation where this could be useful is when an external dependency u

```javascript
await consumer.connect()
await consumer.subscribe({ topic: 'jobs' })
await consumer.subscribe({ topics: ['jobs'] })

await consumer.run({ eachMessage: async ({ topic, message }) => {
try {
Expand Down Expand Up @@ -322,7 +321,7 @@ To move the offset position in a topic/partition the `Consumer` provides the met

```javascript
await consumer.connect()
await consumer.subscribe({ topic: 'example' })
await consumer.subscribe({ topics: ['example'] })

// you don't need to await consumer#run
consumer.run({ eachMessage: async ({ topic, message }) => true })
Expand Down
129 changes: 98 additions & 31 deletions src/consumer/__tests__/subscribe.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,25 @@ describe('Consumer', () => {
producer && (await producer.disconnect())
})

describe('when subscribe', () => {
it('throws an error if the topic is invalid', async () => {
await expect(consumer.subscribe({ topic: null })).rejects.toHaveProperty(
'message',
'Invalid topic null'
)
})

it('throws an error if the topic is not a String or RegExp', async () => {
await expect(consumer.subscribe({ topic: 1 })).rejects.toHaveProperty(
describe('when subscribing to multiple topics', () => {
it('throws an error if one of the topics is invalid', async () => {
await expect(consumer.subscribe({ topics: [1] })).rejects.toHaveProperty(
'message',
'Invalid topic 1 (number), the topic name has to be a String or a RegExp'
)
})
})

describe('with regex', () => {
it('subscribes to all matching topics', async () => {
it('subscribes by topic name as a string or regex', async () => {
const testScope = secureRandom()
const topicUS = `pattern-${testScope}-us-${secureRandom()}`
const topicSE = `pattern-${testScope}-se-${secureRandom()}`
const topicUK = `pattern-${testScope}-uk-${secureRandom()}`
const topicBR = `pattern-${testScope}-br-${secureRandom()}`
const regexMatchingTopic = `pattern-${testScope}-regex-${secureRandom()}`
const topics = [`topic-${secureRandom()}`, `topic-${secureRandom()}`, regexMatchingTopic]

await createTopic({ topic: topicUS })
await createTopic({ topic: topicSE })
await createTopic({ topic: topicUK })
await createTopic({ topic: topicBR })
await Promise.all(topics.map(topic => createTopic({ topic })))

const messagesConsumed = []
await consumer.connect()
await consumer.subscribe({
topic: new RegExp(`pattern-${testScope}-(se|br)-.*`, 'i'),
topics: [topics[0], topics[1], new RegExp(`pattern-${testScope}-regex-.*`, 'i')],
fromBeginning: true,
})

Expand All @@ -81,18 +67,99 @@ describe('Consumer', () => {
await producer.sendBatch({
acks: 1,
topicMessages: [
{ topic: topicUS, messages: [{ key: `key-us`, value: `value-us` }] },
{ topic: topicUK, messages: [{ key: `key-uk`, value: `value-uk` }] },
{ topic: topicSE, messages: [{ key: `key-se`, value: `value-se` }] },
{ topic: topicBR, messages: [{ key: `key-br`, value: `value-br` }] },
{ topic: topics[0], messages: [{ key: 'drink', value: 'drink' }] },
{ topic: topics[1], messages: [{ key: 'your', value: 'your' }] },
{ topic: topics[2], messages: [{ key: 'ovaltine', value: 'ovaltine' }] },
],
})

await waitForMessages(messagesConsumed, { number: 2 })
expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual([
'value-br',
'value-se',
])
await waitForMessages(messagesConsumed, { number: 3 })
expect(messagesConsumed.map(m => m.message.value.toString())).toEqual(
expect.arrayContaining(['drink', 'your', 'ovaltine'])
)
})
})

describe('Deprecated "topic" interface', () => {
describe('when subscribing', () => {
it('throws an error if the topic is invalid', async () => {
await expect(consumer.subscribe({ topic: null })).rejects.toHaveProperty(
'message',
'Missing required argument "topics"'
)
})

it('throws an error if the topic is not a String or RegExp', async () => {
await expect(consumer.subscribe({ topic: 1 })).rejects.toHaveProperty(
'message',
'Invalid topic 1 (number), the topic name has to be a String or a RegExp'
)
})

describe('with a string', () => {
it('subscribes to the topic', async () => {
const topic = `topic-${secureRandom()}`

await createTopic({ topic })

const messagesConsumed = []
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })

consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
await waitForConsumerToJoinGroup(consumer)

await producer.connect()
await producer.sendBatch({
acks: 1,
topicMessages: [{ topic, messages: [{ key: 'key-a', value: 'value-a' }] }],
})

await waitForMessages(messagesConsumed, { number: 1 })
expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual(['value-a'])
})
})

describe('with regex', () => {
it('subscribes to all matching topics', async () => {
const testScope = secureRandom()
const topicUS = `pattern-${testScope}-us-${secureRandom()}`
const topicSE = `pattern-${testScope}-se-${secureRandom()}`
const topicUK = `pattern-${testScope}-uk-${secureRandom()}`
const topicBR = `pattern-${testScope}-br-${secureRandom()}`

await Promise.all(
[topicUS, topicSE, topicUK, topicBR].map(topic => createTopic({ topic }))
)

const messagesConsumed = []
await consumer.connect()
await consumer.subscribe({
topic: new RegExp(`pattern-${testScope}-(se|br)-.*`, 'i'),
fromBeginning: true,
})

consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
await waitForConsumerToJoinGroup(consumer)

await producer.connect()
await producer.sendBatch({
acks: 1,
topicMessages: [
{ topic: topicUS, messages: [{ key: `key-us`, value: `value-us` }] },
{ topic: topicUK, messages: [{ key: `key-uk`, value: `value-uk` }] },
{ topic: topicSE, messages: [{ key: `key-se`, value: `value-se` }] },
{ topic: topicBR, messages: [{ key: `key-br`, value: `value-br` }] },
],
})

await waitForMessages(messagesConsumed, { number: 2 })
expect(messagesConsumed.map(m => m.message.value.toString()).sort()).toEqual([
'value-br',
'value-se',
])
})
})
})
})
})
58 changes: 35 additions & 23 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,39 +131,51 @@ module.exports = ({
})

/** @type {import("../../types").Consumer["subscribe"]} */
const subscribe = async ({ topic, fromBeginning = false }) => {
const subscribe = async ({ topic, topics: subscriptionTopics, fromBeginning = false }) => {
if (consumerGroup) {
throw new KafkaJSNonRetriableError('Cannot subscribe to topic while consumer is running')
}

if (!topic) {
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`)
if (!topic && !subscriptionTopics) {
throw new KafkaJSNonRetriableError('Missing required argument "topics"')
}

const isRegExp = topic instanceof RegExp
if (typeof topic !== 'string' && !isRegExp) {
throw new KafkaJSNonRetriableError(
`Invalid topic ${topic} (${typeof topic}), the topic name has to be a String or a RegExp`
)
if (subscriptionTopics != null && !Array.isArray(subscriptionTopics)) {
throw new KafkaJSNonRetriableError('Argument "topics" must be an array')
}

const subscriptions = subscriptionTopics || [topic]

for (const subscription of subscriptions) {
if (typeof subscription !== 'string' && !(subscription instanceof RegExp)) {
throw new KafkaJSNonRetriableError(
`Invalid topic ${subscription} (${typeof subscription}), the topic name has to be a String or a RegExp`
)
}
}

const hasRegexSubscriptions = subscriptions.some(subscription => subscription instanceof RegExp)
const metadata = hasRegexSubscriptions ? await cluster.metadata() : undefined

const topicsToSubscribe = []
if (isRegExp) {
const topicRegExp = topic
const metadata = await cluster.metadata()
const matchedTopics = metadata.topicMetadata
.map(({ topic: topicName }) => topicName)
.filter(topicName => topicRegExp.test(topicName))

logger.debug('Subscription based on RegExp', {
groupId,
topicRegExp: topicRegExp.toString(),
matchedTopics,
})
for (const subscription of subscriptions) {
const isRegExp = subscription instanceof RegExp
if (isRegExp) {
const topicRegExp = subscription
const matchedTopics = metadata.topicMetadata
.map(({ topic: topicName }) => topicName)
.filter(topicName => topicRegExp.test(topicName))

logger.debug('Subscription based on RegExp', {
groupId,
topicRegExp: topicRegExp.toString(),
matchedTopics,
})

topicsToSubscribe.push(...matchedTopics)
} else {
topicsToSubscribe.push(topic)
topicsToSubscribe.push(...matchedTopics)
} else {
topicsToSubscribe.push(subscription)
}
}

for (const t of topicsToSubscribe) {
Expand Down
6 changes: 5 additions & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -954,12 +954,16 @@ export type ConsumerRunConfig = {
eachMessage?: EachMessageHandler
}

/**
* @deprecated Replaced by ConsumerSubscribeTopics
*/
export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean }
export type ConsumerSubscribeTopics = { topics: (string | RegExp)[]; fromBeginning?: boolean }

export type Consumer = {
connect(): Promise<void>
disconnect(): Promise<void>
subscribe(topic: ConsumerSubscribeTopic): Promise<void>
subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void>
stop(): Promise<void>
run(config?: ConsumerRunConfig): Promise<void>
commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): Promise<void>
Expand Down
2 changes: 1 addition & 1 deletion types/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ removeListener()

const runConsumer = async () => {
await consumer.connect()
await consumer.subscribe({ topic })
await consumer.subscribe({ topics: [topic] })
await consumer.run({
eachBatch: async ({
batch,
Expand Down