Initial support for consumer coordinator #420
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
I've added basic support for the consumer coordinator in the Kafka broker, specifically for fetching offsets from the brokers that have been committed to Kafka, as opposed to Zookeeper. Internally, we use this library for some API and monitoring work, and one of the functions we need is to retrieve offsets that have been committed using kafka storage.
I've added a client request method for sending consumer metadata requests, along with the appropriate encoder and decoder routines. I have also added a _send_consumer_aware_request method which uses the consumer metadata information to find the coordinator for the specified consumer group and send requests to that broker. These parts are the basic support framework for communicating with the consumer coordinator.
In addition, to support fetching kafka-committed offsets without breaking the existing zookeeper-committed offsets support, I have added a send_offset_fetch_request_kafka method to the client. This allows the caller to perform the offset fetch request using version 1 of the protocol (which is identical on the wire to version 0, but is for kafka-committed offsets). Because the broker is using different request version numbers for Zookeeper and Kafka committed offset fetches, I believe this is the best way to support both in parallel.
Moving forwards, support should be added for offset commit requests to use Kafka storage as well, potentially with the same model.