Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Batch KafkaListener to allow ConsumerRecords as a parameter #701

Closed
zach-schoenberger opened this issue Jun 1, 2018 · 1 comment

Comments

@zach-schoenberger
Copy link
Contributor

The current implementation of batch listener combines all records returned into a list. This loses a lot of information that is provided when the data is returned as a ConsumerRecords. Most importantly, the records are already grouped by topic and partition at this point, which can be useful when implementing processing logic. I would suggest adding additional processing logic to allow the KafkaListener to pass ConsumerRecords as a parameter when batch processing is enabled.

@garyrussell
Copy link
Contributor

Good point; the reasoning for converting to a list was so that a FilteringBatchMessageListenerAdapter could wrap the listener, but I agree it should be an option.

@garyrussell garyrussell added this to the 2.2.M1 milestone Jun 8, 2018
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 8, 2018
Resolves spring-projects#701

Support passing the entire `ConsumerRecords<?, ?>` object to the listener.
Record filtering is not applied in this case.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 8, 2018
Resolves spring-projects#701

Support passing the entire `ConsumerRecords<?, ?>` object to the listener.
Record filtering is not applied in this case.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 8, 2018
Resolves spring-projects#701

Support passing the entire `ConsumerRecords<?, ?>` object to the listener.
Record filtering is not applied in this case.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 8, 2018
Resolves spring-projects#701

Support passing the entire `ConsumerRecords<?, ?>` object to the listener.
Record filtering is not applied in this case.
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
Resolves spring-projects/spring-kafka#701

Support passing the entire `ConsumerRecords<?, ?>` object to the listener.
Record filtering is not applied in this case.

Polishing - use the consumer records' map's keyset instead of creating new TopicPartitions when finding offsets to commit.

Optimization; the highest offset is always the last in the list; similar optimization for SeekToCurrentBatchErrorHandler.

Polishing - PR comments

* Polishing code style
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants