Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: consume per partition #2

Merged
merged 6 commits into from
Jan 24, 2024
Merged

Feat: consume per partition #2

merged 6 commits into from
Jan 24, 2024

Conversation

martijnimhoff
Copy link
Collaborator

@martijnimhoff martijnimhoff commented Jan 11, 2024

Pull request checklist:

  • Include new test cases (either end-to-end or unit tests) with your change.

    Existing test cases do not include consuming via the standard API. So did not make those for this case.

  • Follow our style guides.
  • Make sure all tests are still passing and the linter does not report any issues.

    Current master has quite a few linter errors. I made sure to not add any new errors and I've fixed a few.

  • End files with a new line.
  • Document the new code in the comments (if it is JavaScript) so the documentation generator can update the reference documentation.
  • Avoid platform-dependent code.
  • Note: If making modifications to the underlying C++, please use built-in precompiler directives to detect such platform specificities. Use Nan whenever possible to abstract node/v8 version incompatibility.
  • Make sure your branch is up to date and rebased.
  • Squash extraneous commits unless their history truly adds value to the library.

@martijnimhoff
Copy link
Collaborator Author

@jdbruijn @everhardt Could you review before we submit the PR to Blizzard?

@jdbruijn
Copy link
Member

Yes, but will be next week.

@everhardt
Copy link

This implementation allows consuming some partitions individually and leaving the others in the global queue. The current way of doing that has a few weird consequences:

  1. once you called .consume(..) for a specific partition, it cannot be brought back into the consumer.consume of the global queue. I practice I don't think this is an issue and a small function .forwardToGlobalQueue(partitionId) can always be added
  2. if you don't manually call .consume(..) on the global queue you won't get any rebalance events. The alternative of doing this automatically somewhere seems less attractive to me
  3. if you don't listen out the rebalance events, some partitions will remain be in the global queue and will thus be part of the callback of the .consume(..) call of the global queue, probably even in flowing mode.

@everhardt
Copy link

@martijnimhoff What's the status of this one?

@jdbruijn
Copy link
Member

AFAIK waiting for my review, then we can merge it here and start using this in the Teleport code, as well as opening an PR in the upstream. Review in progress BTW

image

lib/kafka-consumer.js Outdated Show resolved Hide resolved
src/workers.cc Show resolved Hide resolved
src/workers.cc Show resolved Hide resolved
src/kafka-consumer.cc Outdated Show resolved Hide resolved
lib/kafka-consumer.js Outdated Show resolved Hide resolved
src/workers.cc Outdated Show resolved Hide resolved
@jdbruijn jdbruijn merged commit 1ae884d into master Jan 24, 2024
1 check passed
@everhardt
Copy link

@martijnimhoff how did you address my third point?

@martijnimhoff
Copy link
Collaborator Author

if you don't listen out the rebalance events, some partitions will remain be in the global queue and will thus be part of the callback of the .consume(..) call of the global queue, probably even in flowing mode.

It's a strange situation, but perhaps a use-case would exist in which you want to control the consumption per partition of just a few partitions and the remaining partitions could be consumed via the callback. So I would say it's an odd, but desired behavior. I'm actually now sure if this will function correctly... I'll do a test to see what happens.

@martijnimhoff
Copy link
Collaborator Author

@everhardt it indeed results in data loss.

When the consume per partition starts after a timeout of 1s. All the messages in the partition (50) are already consumed by the regular .consume queue. This can be seen in the data event and in the callback.

Perhaps i can see if can create a consumer configuration which disables the forwarding to the general queue for all partitions. Something like this:

var consumer = new KafkaNode.KafkaConsumer({
   ...
  'consume_per_partition': true,
}, {
   ...
});

@martijnimhoff martijnimhoff deleted the feat-consume-per-partition branch January 25, 2024 09:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants