New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ak-cherry-pick]KAFKA-14950: implement assign() and assignment() and resolve some conflicts #32
Conversation
We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata. Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Jun Rao <junrao@gmail.com>
@@ -61,7 +61,7 @@ public class ConsumerTestBuilder implements Closeable { | |||
static final int REQUEST_TIMEOUT_MS = 500; | |||
|
|||
final LogContext logContext = new LogContext(); | |||
final Time time = new MockTime(1, 0, 0); | |||
final Time time = new MockTime(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm doing this on purpose - I don't think we want auto-tick for most of the time. If we do I can revert this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor changes needed, but otherwise LGTM.
...ava/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
Show resolved
Hide resolved
.../org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
Show resolved
Hide resolved
@@ -102,7 +102,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { | |||
Collections.unmodifiableList(pendingRequests.drain(currentTimeMs))); | |||
} | |||
|
|||
private void maybeAutoCommit() { | |||
public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The offsets param seems unused, is the intention to use it in the future?
private void updateMetadata(long milliseconds) { | ||
final MetadataUpdateApplicationEvent event = new MetadataUpdateApplicationEvent(milliseconds); | ||
private void updateMetadata() { | ||
final NewTopicsMetadataUpdateRequestEvent event = new NewTopicsMetadataUpdateRequestEvent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding, why can't we rely here on the existing metadata object with the metadata.requestUpdateForNewTopics
call? I see that's still used for this same purpose in other places like the PrototypeAsyncConsumer.subscribe, but maybe I missing the reason for this and then is the subscribe that might need to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm good point. Originally I wanted metadata only owned by the background thread so that the ownership is clearer, which also allows us to remove the synchronization blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we wanted to remove as much concurrent access to objects as possible. It wasn't necessarily for performance reasons, I think it was more for removing the complexity of the code. I think the synchronization does have a performance impact, though that's not really the point of the refactoring.
Thanks for the changes! Left some comments but LGTM |
Update ApplicationEventProcessor.java
This is a cherry pick of 14950 into ctr-staging.