-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-345] Static membership #888
Conversation
Had a look through all the changes, which seem pretty straightforward protocol upgrades to me so far. Didn't check them against the protocol, but relying on tests to highlight any unlikely mistakes there.
This seems like the safe default behaviour. Any idea on how the group instance id would be set? According to KIP-345, this allows prioritising of state changes over liveliness. That seems like a trade off to make on the application level, completely depending on the amount of state change you have vs. your availability requirements. From there, I assume it's the right thing to let the user control the use of a |
It is up to the application to ensure that this is set, is unique in the group and is retained across restarts. Could be set through configuration or some environmental information (machine ids etc.) Eventually we'll expose it as a consumer option, but since we don't actually support the functionality yet, I'm not exposing it. |
Great stuff @Nevon, and I agree that just the protocol update shouldn't cause any other side effects. |
This PR is the start adding support for static membership (#884). The corresponding KIP is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
As a start, I have upgraded most of the API versions to the required version. For most of them, there is no significant difference, except adding client side throttling which we now support in JoinGroup, SyncGroup, Heartbeat, LeaveGroup and OffsetCommit. At the moment, I have not added any facility for actually setting the
groupInstanceId
property on the consumer, which means that we retain the old behavior of dynamic membership.I ran into some issues when I started working on upgrading
OffsetCommit
to V6+. V6 addsleader_epoch
to the partition you are committing offsets for, as part of KIP-320. I haven't looked into it much yet, so I may have the wrong idea, but from a brief look it sounds like we need to support Fetch v12, which addscurrent_leader_epoch
to the response, and then keep track of that and include it in the OffsetCommit request. But frankly, KIP-320 is pretty large (check the heading "Consumer Handling" for all the behavior it expects to change).I'm submitting this as a draft, because I wanted to get some feedback and help to figure out what we actually need to do to move forward. I think we could go ahead and merge this separately, as I don't believe there should be any implications as long as
groupInstanceId
isnull
(which it is), but let's make sure.In order to figure out what changed in each version of the APIs, I found that the request schema files in the Java client actually has some comments at least giving you a hint, so that you know what to look for in the KIPs. For example: https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json#L20-L32