Skip to content

Commit

Permalink
Remove deprecated argument topic from consumer.subscribe
Browse files Browse the repository at this point in the history
Replaced by `topics` in #1313
  • Loading branch information
Nevon committed May 2, 2022
1 parent 8358172 commit beabfc3
Show file tree
Hide file tree
Showing 35 changed files with 104 additions and 201 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ const run = async () => {

// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.subscribe({ topics: ['test-topic'], fromBeginning: true })

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
Expand Down
20 changes: 10 additions & 10 deletions docs/ConsumerExample.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.subscribe({ topics: [topic], fromBeginning: true })
await consumer.run({
// eachBatch: async ({ batch }) => {
// console.log(batch)
Expand Down Expand Up @@ -69,7 +69,7 @@ signalTraps.forEach(type => {
A similar example in TypeScript

```typescript
import { Consumer, ConsumerSubscribeTopic, EachBatchPayload, Kafka, EachMessagePayload } from 'kafkajs'
import { Consumer, ConsumerSubscribeTopics, EachBatchPayload, Kafka, EachMessagePayload } from 'kafkajs'

export default class ExampleConsumer {
private kafkaConsumer: Consumer
Expand All @@ -81,14 +81,14 @@ export default class ExampleConsumer {
}

public async startConsumer(): Promise<void> {
const topic: ConsumerSubscribeTopic = {
topic: 'example-topic',
const subscription: ConsumerSubscribeTopics = {
topics: ['example-topic'],
fromBeginning: false
}

try {
await this.kafkaConsumer.connect()
await this.kafkaConsumer.subscribe(topic)
await this.kafkaConsumer.subscribe(subscription)

await this.kafkaConsumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
Expand All @@ -103,14 +103,14 @@ export default class ExampleConsumer {
}

public async startBatchConsumer(): Promise<void> {
const topic: ConsumerSubscribeTopic = {
topic: 'example-topic',
const subscription: ConsumerSubscribeTopics = {
topics: ['example-topic'],
fromBeginning: false
}

try {
await this.kafkaConsumer.connect()
await this.kafkaConsumer.subscribe(topic)
await this.kafkaConsumer.subscribe(subscription)
await this.kafkaConsumer.run({
eachBatch: async (eatchBatchPayload: EachBatchPayload) => {
const { topic, partition, batch } = eachBatchPayload
Expand Down Expand Up @@ -165,12 +165,12 @@ const kafka = new Kafka({
},
})

const topic = 'topic-test'
const topics = ['topic-test']
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.subscribe({ topics, fromBeginning: true })
await consumer.run({
// eachBatch: async ({ batch }) => {
// console.log(batch)
Expand Down
2 changes: 1 addition & 1 deletion docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Finally, to verify that our message has indeed been produced to the topic, let's
const consumer = kafka.consumer({ groupId: 'test-group' })

await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.subscribe({ topics: ['test-topic'], fromBeginning: true })

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
Expand Down
4 changes: 2 additions & 2 deletions examples/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ const kafka = new Kafka({
},
})

const topic = 'topic-test'
const topics = ['topic-test']
const consumer = kafka.consumer({ groupId: 'test-group' })

let msgNumber = 0
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.subscribe({ topics, fromBeginning: true })
await consumer.run({
// eachBatch: async ({ batch }) => {
// console.log(batch)
Expand Down
2 changes: 1 addition & 1 deletion src/admin/__tests__/deleteGroups.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('Admin', () => {
await createTopic({ topic: topicName })

const messagesConsumed = []
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })
consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
await waitForConsumerToJoinGroup(consumer)

Expand Down
2 changes: 1 addition & 1 deletion src/admin/__tests__/deleteTopicRecords.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ describe('Admin > deleteTopicRecords', () => {
maxWaitTimeInMs: 100,
logger,
})
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })

// validate the modulus partitioner allocates 20 messages 13:7
expect(
Expand Down
2 changes: 1 addition & 1 deletion src/admin/__tests__/describeGroups.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('Admin', () => {
const messagesConsumed = []
await Promise.all(
consumers.map(async consumer => {
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })
consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
await waitForConsumerToJoinGroup(consumer)
})
Expand Down
2 changes: 1 addition & 1 deletion src/admin/__tests__/fetchOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ describe('Admin', () => {
})

await consumer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })
consumer.run({ eachMessage: () => {} })
await waitForConsumerToJoinGroup(consumer)

Expand Down
2 changes: 1 addition & 1 deletion src/admin/__tests__/fetchTopicOffsetsByTimestamp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe('Admin', () => {
logger: newLogger(),
})
await consumer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })
/** real timestamp in messages after `fromTimestamp` */
let realTimestamp = 0
consumer.run({
Expand Down
2 changes: 1 addition & 1 deletion src/admin/__tests__/listGroups.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ describe('Admin', () => {
await createTopic({ topic: topicName })

const messagesConsumed = []
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })
consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
await waitForConsumerToJoinGroup(consumer)

Expand Down
2 changes: 1 addition & 1 deletion src/admin/__tests__/resetOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ describe('Admin', () => {
test('throws an error if the consumer group is running', async () => {
consumer = createConsumer({ groupId, cluster: createCluster(), logger: newLogger() })
await consumer.connect()
await consumer.subscribe({ topic: topicName })
await consumer.subscribe({ topics: [topicName] })
consumer.run({ eachMessage: () => true })
await waitForConsumerToJoinGroup(consumer)

Expand Down
2 changes: 1 addition & 1 deletion src/admin/__tests__/setOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe('Admin', () => {
test('throws an error if the consumer group is running', async () => {
consumer = createConsumer({ groupId, cluster: createCluster(), logger: newLogger() })
await consumer.connect()
await consumer.subscribe({ topic: topicName })
await consumer.subscribe({ topics: [topicName] })
await consumer.run({ eachMessage: () => true })

admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
Expand Down
2 changes: 1 addition & 1 deletion src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ module.exports = ({
groupId,
})

await consumer.subscribe({ topic, fromBeginning: true })
await consumer.subscribe({ topics: [topic], fromBeginning: true })
const description = await consumer.describeGroup()

if (!isConsumerGroupRunning(description)) {
Expand Down
2 changes: 1 addition & 1 deletion src/broker/__tests__/describeGroups.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ describe('Broker > DescribeGroups', () => {

test('request', async () => {
await consumer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })
await consumer.run({ eachMessage: jest.fn() })
const response = await broker.describeGroups({ groupIds: [groupId] })

Expand Down
4 changes: 2 additions & 2 deletions src/consumer/__tests__/assignerProtocolIntegration.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ describe('Consumer > assignerProtocol > integration', () => {
await Promise.all([consumer1.connect(), consumer2.connect()])

// Subscribe both consumers to the same topic, and start the consumer groups
consumer1.subscribe({ topic: topicName })
consumer1.subscribe({ topics: [topicName] })
consumer1.run({ eachMessage: () => {} })
consumer2.subscribe({ topic: topicName })
consumer2.subscribe({ topics: [topicName] })
consumer2.run({ eachMessage: () => {} })

await Promise.all([
Expand Down
12 changes: 5 additions & 7 deletions src/consumer/__tests__/assignmentForUnsubscribedTopic.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ describe('Consumer', () => {

it('handles receiving assignments for unsubscribed topics', async () => {
await consumer1.connect()
await Promise.all(
topicNames.map(topicName => consumer1.subscribe({ topic: topicName, fromBeginning: true }))
)
await consumer1.subscribe({ topic: topicNames, fromBeginning: true })

consumer1.run({ eachMessage: () => {} })
await waitForConsumerToJoinGroup(consumer1, { label: 'consumer1' })
Expand All @@ -53,7 +51,7 @@ describe('Consumer', () => {
})

await consumer2.connect()
await consumer2.subscribe({ topic: topicNames[0], fromBeginning: true })
await consumer2.subscribe({ topics: topicNames.slice(0, 1), fromBeginning: true })

consumer2.run({ eachMessage: () => {} })
const event = await waitForConsumerToJoinGroup(consumer2, { label: 'consumer2' })
Expand All @@ -76,7 +74,7 @@ describe('Consumer', () => {
let assignments = await Promise.all(
[consumer1, consumer2].map(async consumer => {
await consumer.connect()
await consumer.subscribe({ topic: topicNames[0] })
await consumer.subscribe({ topics: topicNames.slice(0, 1) })
consumer.run({ eachMessage: () => {} })
return waitForConsumerToJoinGroup(consumer)
})
Expand All @@ -98,7 +96,7 @@ describe('Consumer', () => {
})

await consumer1.connect()
await Promise.all(topicNames.map(topic => consumer1.subscribe({ topic })))
await consumer1.subscribe({ topics: topicNames })

// Second consumer is also replaced, subscribing to both topics
await consumer2.disconnect()
Expand All @@ -112,7 +110,7 @@ describe('Consumer', () => {
})

await consumer2.connect()
await Promise.all(topicNames.map(topic => consumer2.subscribe({ topic })))
await consumer2.subscribe({ topics: topicNames })

consumer1.run({ eachMessage: () => {} })
consumer2.run({ eachMessage: () => {} })
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/__tests__/commitOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe('Consumer', () => {

const offsetsConsumed = []

await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })
consumer.run({
autoCommit: false,
eachMessage: async event => {
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/__tests__/connection.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('Consumer', () => {

await consumer.connect()
await producer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.subscribe({ topics: [topicName], fromBeginning: true })

const messages = []
consumer.run({
Expand Down
Loading

0 comments on commit beabfc3

Please sign in to comment.