-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation #7
KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation #7
Conversation
b7f2814
to
eef8a9f
Compare
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletableRequest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see how this plugs into the background thread and application event processor, specifically how it handles ensuring that duplicate API calls are handled appropriately.
Also, can we refactor TopicMetadataRequestManager
and TopicMetadataFetcher
so that they use the same core response handling logic?
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
8ae018e
to
8616b93
Compare
I think I ommitted kirk's comments: Working on them ... |
8616b93
to
03ff808
Compare
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm getting lost with all of the futures and callbacks. I feel we may want to do some restructuring, renaming, or documenting to keep things straight.
|
||
import java.util.concurrent.CompletableFuture; | ||
|
||
public class CompletableRequest<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some JavaDoc? I'm not totally sure when to use this class vs. using the future from the UnsentRequest
.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Show resolved
Hide resolved
...ain/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
Outdated
Show resolved
Hide resolved
fix broken test Add tests and fix response handler cleaned up and addressed some of the comments from the PR wip wip Testing the impl Added tests MetadataRequestManager and refactor the callbacks Update MetadataRequestManager.java Create MetadataRequestManager.java
90ed7de
to
f09fd9a
Compare
...ain/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
Show resolved
Hide resolved
@kirktrue thanks for the review! Ready for another pass. Think I addressed your comments. |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
Show resolved
Hide resolved
...s/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
Outdated
Show resolved
Hide resolved
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
…o new consumer threading refactor This change introduces the FetchRequestManager that will be responsible for: - Formatting fetch requests to the background thread - Configuring the callback on fetch responses for the background thread The response handler will collect the fetch responses from the broker and create CompletedFetch instances as is done in Fetcher. The foreground logic will decompress the data into a Record, which will then be deserialized into a ConsumerRecord for returning to the user.
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
TopicMetadataRequestManager implementation --------- Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
This is the implementation of the TopicMetadataRequestManager, responsible for handling topic metadata from
partitionsFor
andlistTopics
The PR also refactors the current callback mechanism because I think the usage is rather inorganic. Now, user can pass in a BiConsumer to be triggered when the request is completed.
@kirktrue @lianetm