-
-
Notifications
You must be signed in to change notification settings - Fork 522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple kakfa consumer with the same groupId and different topics #1040
Comments
You cannot have different consumers in the same consumer group be subscribed to different topics. If they are subscribing to different topics, they should not be in the same consumer group. Either use different group ids for different consumer groups (if you want each consumer group to only consume from one topic) or have a single consumer group consume from all three topics. |
Is this behavior a limitation of kafkaJS or does the kafka protocol behave like this? |
This is how Kafka consumer groups work. |
Do we have any proof from documentation?? |
This is not true. Using Kafka Java client OR "node-rdkafka", I can get what @radinail need: |
So, It seems like a limitation of KafkaJS. Thanks. |
So I looked into this a bit more, to try to understand how they have achieved this, and I think I understand now how they've done this in the Java client. It is indeed possible to do.
What confused me was that the consumer group itself is always acting on the superset of all subscribed topics - the core difference is actually in assignment and not assigning partitions across the entire consumer group. Each assigner needs to implement support for this, rather than it being something that the consumer itself does. |
Took a stab this morning just to see if this might be an easy thing to implement, and it's almost easy. Thought I'd jot down what I learned before I forget. The assigner logic requires just a small change by decoding the member metadata using The only thing that prevents it from being an almost trivial change is that the leader needs to have metadata loaded for all topics that the group is subscribed to, even if it itself is not subscribed to those topics. The current design is that the consumer only ever fetches metadata for topics it is subscribed to, so that needs to change a bit so that it gets metadata for all topics that the whole group is subscribed to, before handing off to the assigner. |
Any plan to implement this feature in a short time? 😅😅😅 |
We have also same issue with this , running on three instances hitting same broker |
The case when different instances (replicas) in one consumer group consume different sets of topics also arises every time one applies a rolling update which adds another topic to be consumed. If the application starts to consume a topic, but the group leader is elected from the old replicas without it, the partitions of the topic are not assigned. Since we monitor the partition assignment in our apps, this failure prevented the rolling upgrade. We added a fix for the issue, but did not really have the time to contribute it back upstream properly. At least I can drop the fixed assigner here, it may help... It may have issues, but it seems to work for us. // { PartitionAssigner
async assign({ members }: { members: GroupMember[]; topics: string[] }) {
const assignment = {};
const memberTopics: Record<string, string[]> = {};
members.forEach((member) => {
const meta = AssignerProtocol.MemberMetadata.decode(member.memberMetadata)!;
memberTopics[member.memberId] = meta.topics;
});
const topicMembers = Object.keys(memberTopics).reduce((acc: Record<string, string[]>, memberId: string) => {
memberTopics[memberId].forEach((topic) => {
if (acc[topic]) {
acc[topic].push(memberId);
} else {
acc[topic] = [memberId];
}
});
return acc;
}, {});
Object.values(topicMembers).forEach((v) => v.sort());
const consumedTopics = uniq(flatten(Object.values(memberTopics)));
for (const topic of consumedTopics) {
await cluster.addTargetTopic(topic);
}
const topicsPartionArrays = consumedTopics.map((topic) => {
const partitionMetadata = cluster.findTopicPartitionMetadata(topic);
return partitionMetadata.map((m) => ({ topic: topic, partitionId: m.partitionId }));
});
const topicsPartitions = flatten(topicsPartionArrays);
topicsPartitions.forEach(({ topic, partitionId }, i) => {
const assignee = topicMembers[topic][i % topicMembers[topic].length];
if (!assignment[assignee]) {
assignment[assignee] = Object.create(null);
}
if (!assignment[assignee][topic]) {
assignment[assignee][topic] = [];
}
assignment[assignee][topic].push(partitionId);
});
return Object.keys(assignment).map((memberId) => ({
memberId,
memberAssignment: AssignerProtocol.MemberAssignment.encode({
version: this.version,
assignment: assignment[memberId],
userData: Buffer.of(),
}),
}));
}
// } |
We have same issue on our side, any update or plans to fix this ? |
Same culprit here on our side. We were suffering from low performance and failing messages in our applications, so I had a look and discovered just a single consumer (and group) subscribed to multiple topics which slowly filled up processing all the messages. So I was thinking, hey, in our Java based applications it works too, same group with multiple consumers, each one subscribed to a different topic. Quickly updating the code and.... no topics assigned? at all? |
Today we collided with this issue, what about any news or plans to fix this or quick-fix? |
Any update on this? @Nevon Any comment on solution proposed by @MartinBurian? |
The assigner implementation I posted above can simply be plugged into the Kafka consumer: const kafka = new Kafka(...);
const consumer = kafka.consumer({
// ...
partitionAssigners: [ myPartitionAssigner ]
}); Just note that all members of a consumer group must agree on a common partition assigner name during rebalance, so make sure the assigner name is the same as the default, or update all replicas in the same CG at once instead of a rolling update. I should also mention that the implementation may not distribute the partitions optimally when the CG members have different subscriptions, but I consider that to be a transient state and am not really concerned with it. It would be nice if it was a part of the library, I just really don't have the time to contribute it properly with tests and stuff, sorry 👪. |
Thanks @MartinBurian . |
Bumped into this one today after scratching all the configuration options. |
They have solved this problem by using topics with an array in Consumer, but when I want to configure each consumer for each topic to operate independently, it still doesn't work. |
+1 on this. I'm having the same problem. |
+1 on this. I'm having the same problem. Is there any update about this? |
+1, I'm facing the same issue. |
Describe the bug
I'm trying to have 3 consumers with the same groupId and 3 topics, and each consumer subscribe to one topic.
When i use the same groupId, i just see one consumer in "describe --all-groups" command
But when i use different groupIds, i can see the three consumers
My question is :
Why i having the same groupId for all the consumers is not working ?
To Reproduce
import { Kafka } from 'kafkajs';
const readble1 = new Kafka({
brokers: ['localhost:9092'],
}).consumer({ groupId: 'GROUP_TEST' });
const readble2 = new Kafka({
brokers: ['localhost:9092'],
}).consumer({ groupId: 'GROUP_TEST' });
const readble3 = new Kafka({
brokers: ['localhost:9092'],
}).consumer({ groupId: 'GROUP_TEST' });
const init = async (consumer, topic) => {
await consumer.connect();
await consumer.subscribe({ topic });
console.log('subscirbe ok to topic = ', topic);
await consumer.run({
eachMessage: async (payload) => {
consumer.pause([{ topic: payload?.topic }]);
try {
if (!payload?.message) {
throw new Error('empty message');
}
} catch (error) {
console.log('error = ', error);
}
consumer.resume([{ topic: payload?.topic }]);
},
});
};
init(readble1, 'TOPIC1');
init(readble2, 'TOPIC2');
init(readble3, 'TOPIC3');
The text was updated successfully, but these errors were encountered: