Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting kafka static membership #938

Closed

Conversation

huszkacs
Copy link

Static membership requires setting of group.instance.id, which requires JoinGroup API key version 5 and Heartbeat API Key version 3.
Documentation

Validation of new member.id is changed in later releases. When a client tries to join to a consumer group for the first time with empty memberID, server will return with an error called MEMBER_ID_REQUIRED and a generated memberID for the client. That newly generated memberID needs to be passed to connection retry. Learn more here

Heartbeat log has been added as well, as a useful change for maintenance.

@huszkacs
Copy link
Author

All the failed test-cases are passed at my local env.

@achille-roussel
Copy link
Contributor

It looks like the tests in CI pass for Kafka version 2.3.1 and later, this is because the API call version you used are likely not supported for older versions.

One of kafka-go's goals is to offer strong backward compatibility guarantees, so we will need to figure out a way to transparently fallback to older API versions when connecting to older versions (we usually do this with the ApiVersions API call).

@achille-roussel achille-roussel self-assigned this Jul 1, 2022
@huszkacs
Copy link
Author

huszkacs commented Jul 4, 2022

Hi @achille-roussel! Can you point out a similar example from the repo that I can learn from and implement it?

@achille-roussel
Copy link
Contributor

The kafka.Conn type maintains a map of API versions supported by the server https://github.com/segmentio/kafka-go/blob/main/conn.go#L65-L66 which we use to determine which request and response version to use client-side.

That being said, the change might be simpler to implement once #947 has been merged, otherwise we may have to do API version selection in many places instead of having it abstracted away by the kafka.Transport type.

@huszkacs
Copy link
Author

Right, I think I know what you mean.

I did small investigation, (sadly before I didn't got time to work with the PR). I started to think it would be nice to have a general type for each API protocol that can deliver all the versions data. But after I realized that general type is going to be almost always the latest version of the protocol. Therefor it just makes sense to not introducing a general type, just simply using interfaces.

There is version negotiation as you said and yes it is implemented at the conn level and only used there. I think it is impossible to avoid, since that gives us the information of how to parse the request/response to binary that kafka server accepts. There must be a switch pattern or a factory pattern that allows us to create the proper request version and read the corresponding response version. The caller of the coordinator functions will never know the negotiated version in advance, therefor the response must be go through some sort of abstraction which I think the interface just more appropriate.

I will push a new change as a suggestion and will see that if you like that or not.

@huszkacs huszkacs force-pushed the feature/supporting_static_membership branch from 00a5651 to f8cc793 Compare September 8, 2022 09:38
@huszkacs huszkacs force-pushed the feature/supporting_static_membership branch 2 times, most recently from dacacfa to c2a0017 Compare September 15, 2022 11:14
@huszkacs huszkacs closed this Sep 15, 2022
@huszkacs huszkacs force-pushed the feature/supporting_static_membership branch from c2a0017 to ee37c7f Compare September 15, 2022 11:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants