-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3328: Enhance AbstractConsumerSeekAware with Extended callback for Multi-Group Listeners #3341
Conversation
assertThat(registeredCallbacks).containsExactlyInAnyOrderElementsOf(getCallbacks).isNotEmpty(); | ||
assertThat(registeredTopicPartitions).containsExactlyInAnyOrderElementsOf(getTopicPartitions).hasSize(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When concurrency = 2
, if the partitions are all assigned to only one consumer in each group,
callbacks will be two
, and four
if the partitions are assigned split within the group.
This varies every time you run the test, so I used isNotEmpty()
instead of specific value.
assertThat(registeredCallbacks).containsExactlyInAnyOrderElementsOf(getCallbacks).isNotEmpty();
spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java
Outdated
Show resolved
Hide resolved
77c22a4
to
6e22643
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good so far!
Just some minor suggestions to perfect it.
Thanks
@@ -184,6 +184,9 @@ public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware { | |||
|
|||
/** | |||
* Rewind all partitions one record. | |||
* As of version 3.3, for multi-group listeners, it's recommended to use `getTopicsAndCallbacks()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no reason to mention this in the docs since the version of the doc is exact match to the code.
So, better to just adjust these samples in the doc to whatever is expected from the code.
spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
Outdated
Show resolved
Hide resolved
@artembilan Also one minor suggestion: I'd like to rename the private final Map<TopicPartition, List<ConsumerSeekCallback>> topicToCallbacks = new ConcurrentHashMap<>();
private final Map<ConsumerSeekCallback, List<TopicPartition>> callbacksToTopic = new ConcurrentHashMap<>(); Please take a look at my work and let me know what you think. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callbackToTopics
sounds reasonable.
Looks like that's the last concern we have in this PR.
Thanks
Thanks! I still have some work like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulling locally for final review and merge.
@artembilan |
Background
AbstractConsumerSeekAware
in a multi-group listeners scenario, there are cases where the number of registered callbacks differs from the number of discovered callbacks.callbacks
Map inAbstractConsumerSeekAware
class being simplyConsumerSeekCallback
. This causes some callbacks looking at the same partition to be missing.Changes
callbacks
Map inAbstractConsumerSeekAware
class fromConsumerSeekCallback
toList<ConsumerSeekCallback>
. Also modify some methods, test codes and docs that are affected by this change.registeredSeekCallback()
and the ones you can get viagetSeekCallbacks()
match completely.