Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a given consumer group is not receiving messages from different topics and KafkaJS didn't throw any error #699

Closed
AdelUnito opened this issue Apr 15, 2020 · 4 comments

Comments

@AdelUnito
Copy link

AdelUnito commented Apr 15, 2020

Hello everyone, I'm using kafkajs v1.12.0 with an AWS MSK cluster

my producer code looks like this:

const kafkaClient = new KafkaClient({
    clientId: KAFKA_CLIENT_ID,
    brokers: process.env.KAFKA_BROKERS_URLS.toString().split(','),
    connectionTimeout: 10000,
});

const producer = kafkaClient.producer({
    allowAutoTopicCreation: true,
    createPartitioner: Partitioners.JavaCompatiblePartitioner,
});

producer.on(CONNECT, e => logger.info(`Producer connected at ${e.timestamp}`));
producer.on(DISCONNECT, e => logger.info(`Producer disconnected at ${e.timestamp}`));
producer.on(REQUEST_TIMEOUT, e => logger.warn(`Producer request timed out at ${e.timestamp}`, JSON.stringify(e.payload)));

try {
await producer.connect();
await producer.send({
  topic: 'my-topic',
  compression: CompressionTypes.Snappy,
  messages,
});
} catch (err) {
	logger.error(err)
}

After publishing messages, my RecordMetadata looks like:

[ { topicName: 'my-topic', partition: 0, errorCode: 0, baseOffset: '37182', logAppendTime: '-1', logStartOffset: '0' } ]

in a different application my consumer code looks like this:

const kafkaClient = new KafkaClient({
    clientId: KAFKA_CLIENT_ID,
    brokers: process.env.KAFKA_BROKERS_URLS.toString().split(','),
    connectionTimeout: 10000,
});

const consumer = kafkaClient.consumer({
    groupId: KAFKA_CONSUMER_GROUP,
    allowAutoTopicCreation: false,
});

const { CONNECT, DISCONNECT, GROUP_JOIN, CRASH, REQUEST_TIMEOUT, STOP } = consumer.events;
consumer.on(CONNECT, e => logger.info(`A Consumer from Consumer Group (${KAFKA_CONSUMER_GROUP}) connected at ${e.timestamp}`));
consumer.on(DISCONNECT, e => logger.info(`A Consumer from Consumer Group (${KAFKA_CONSUMER_GROUP}) disconnected at ${e.timestamp}`));
consumer.on(GROUP_JOIN, e => logger.info(`A Consumer (${e.payload.memberId}) joined the Consumer Group (${e.payload.groupId}) at ${e.timestamp}`, `LeaderId=${e.payload.leaderId}`));
consumer.on(CRASH, e => logger.error(`A Consumer from the Consumer Group (${KAFKA_CONSUMER_GROUP}) crashed at ${e.timestamp}`, e.payload.error));
consumer.on(REQUEST_TIMEOUT, e => logger.warn(`Consumer request timed out at ${e.timestamp}`, JSON.stringify(e.payload)));
consumer.on(STOP, e => logger.info(`Consumer stopped`, JSON.stringify(e)));

try {
	await consumer.connect();
	await consumer.subscribe({ topic: 'my-topic', fromBeginning: false });
	await consumer.run({
	    eachMessage: async ({ topic, partition, message }) => { // This was never executed.
	      logger.info(`------ Debugging getMessages function`);
	      const messageAsJson: EventMessage = JSON.parse(message.value.toString());
	      await this.saveData(messageAsJson);
	    },
	})
} catch (err) {
	logger.error(err)
}

// 2nd consumer

const consumer2 = kafkaClient.consumer({
    groupId: KAFKA_CONSUMER_GROUP,
    allowAutoTopicCreation: false,
});

const { CONNECT, DISCONNECT, GROUP_JOIN, CRASH, REQUEST_TIMEOUT, STOP } = consumer2.events;
consumer2.on(CONNECT, e => logger.info(`A Consumer from Consumer Group (${KAFKA_CONSUMER_GROUP}) connected at ${e.timestamp}`));
consumer2.on(DISCONNECT, e => logger.info(`A Consumer from Consumer Group (${KAFKA_CONSUMER_GROUP}) disconnected at ${e.timestamp}`));
consumer2.on(GROUP_JOIN, e => logger.info(`A Consumer (${e.payload.memberId}) joined the Consumer Group (${e.payload.groupId}) at ${e.timestamp}`, `LeaderId=${e.payload.leaderId}`));
consumer2.on(CRASH, e => logger.error(`A Consumer from the Consumer Group (${KAFKA_CONSUMER_GROUP}) crashed at ${e.timestamp}`, e.payload.error));
consumer2.on(REQUEST_TIMEOUT, e => logger.warn(`Consumer request timed out at ${e.timestamp}`, JSON.stringify(e.payload)));
consumer2.on(STOP, e => logger.info(`Consumer stopped`, JSON.stringify(e)));

try {
	await consumer2.connect();
	await consumer2.subscribe({ topic: 'my-topic2', fromBeginning: false });
	await consumer2.run({
	    eachMessage: async ({ topic, partition, message }) => { // This was never executed.
	      logger.info(`------ Debugging getMessages function`);
	      const messageAsJson: EventMessage = JSON.parse(message.value.toString());
	      await this.saveData(messageAsJson);
	    },
	})
} catch (err) {
	logger.error(err)
}

My consumer logs are:

A Consumer from Consumer Group (kirby-for-web) connected at 1586980671445
A Consumer (gid-7d97643a-70a1-480b-8dad-823e8c23b9e3) joined the Consumer Group (kirby-for-web) at 1586980676467 LeaderId=gid-3e1347af-22d2-4456-bb87-f42837104ef6
A Consumer (gid-3e1347af-22d2-4456-bb87-f42837104ef6) joined the Consumer Group (kirby-for-web) at 1586980676470 LeaderId=gid-3e1347af-22d2-4456-bb87-f42837104ef6

For further inspection I tried getting the topics meta data with the admin and this was the result

{ topics: [ { name: 'my-topic2', partitions: [ { partitionErrorCode: 0, partitionId: 0, leader: 4, replicas: [ 4, 3, 2 ], isr: [ 4, 3, 2 ], offlineReplicas: [] } ] }, { name: 'my-topic', partitions: [ { partitionErrorCode: 0, partitionId: 0, leader: 1, replicas: [ 1, 4, 3 ], isr: [ 1, 4, 3 ], offlineReplicas: [] } ] } ] }

I could see Network RX/TX packets traffic in the AWS brokers

Screen Shot 2020-04-15 at 4 57 51 PM

I'm not sure why the eachMessage function was never executed so I wasn't able to receive the messages in the consumer

@AdelUnito AdelUnito changed the title Kafka consumer to processing messages Kafkajs consumer not receiving messages Apr 15, 2020
@Nevon
Copy link
Collaborator

Nevon commented Apr 16, 2020

Just to make sure, you don't have any ACLs applied on the topic that would prevent you from reading from it? Also, are you continuously producing messages from your producer? Otherwise with fromBeginning set to false your consumer will start from the end of the topic if there is no other committed offset to resume from for that group.

You can run your consumer with debug log level to get more information on what your consumer is up to: https://kafka.js.org/docs/configuration#log-level

@AdelUnito
Copy link
Author

@Nevon Thanks for helping out, I finally found the issue.

I was using the same consumer group (using the same groupId) to consume two different topics,
due to this, I wasn't receiving messages in either of them.

After using a unique groupId my code is working. (I wonder if the kafkajs can be noisy about this issue)

@AdelUnito AdelUnito reopened this Apr 16, 2020
@AdelUnito AdelUnito changed the title Kafkajs consumer not receiving messages a consumer group is not receiving messages from different topics and KafkaJS didn't throw any error Apr 16, 2020
@AdelUnito
Copy link
Author

@Nevon I wonder why we cannot fetch messages from different topics in the same consumer group?

if it is intentional can we mention this in the documentation or be noisy about it (throw an error)

@AdelUnito AdelUnito changed the title a consumer group is not receiving messages from different topics and KafkaJS didn't throw any error a given consumer group is not receiving messages from different topics and KafkaJS didn't throw any error Apr 16, 2020
@Nevon
Copy link
Collaborator

Nevon commented Apr 16, 2020

Well, the concept of a consumer group in Kafka is that you have multiple identical nodes that together form a group. It doesn't make sense from a Kafka conceptual point of view not to.

We do emit a warn log about this if we receive assignments for topics that we aren't subscribed to, because that's the only way we can find out that you are doing this. The warn message is here: https://github.com/tulios/kafkajs/blob/master/src/consumer/consumerGroup.js#L181-L192 and it links to an FAQ explaining the issue: https://kafka.js.org/docs/faq#why-am-i-receiving-messages-for-topics-im-not-subscribed-to We cannot throw an error because if you are deploying a new version of your application that subscribes to new topics, your old consumers would receive assignments for topics they aren't assigned to until they are fully replaced by the new instances.

@Nevon Nevon closed this as completed Apr 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants