Skip to content

Conversation

@dantswain
Copy link
Collaborator

I believe this should fix #247

In addition to #248 and #249 I think there were two lingering issues:

  1. Not stopping the previous consumer supervisor on rebalance. This accounts for the {:error, {:already_started, _}} errors.
  2. Not gracefully handling a change in consumer group coordinator node.

I think (1) is pretty straightforward but I would appreciate some discussion on (2). The previous behavior could fall under the "let it fail" mantra but I think it was causing enough failures to run into restart policies. I don't think what I've done here is overly complicated - it just detects the failure and re-acquires the group metadata then recursively tries itself again.

I also had a concern about the right scope for this to happen. Part of me says it should happen in the server implementation because one could argue that the server implementation is intended to abstract away the notion of talking to a cluster, and this is dealing with figuring out which cluster node we need to talk to. Part of me says that following the consumer group coordinator node is part of the behavior of the consumer group itself.

The changes to the server implementation also feel a little un-DRY, but I didn't want to do any significant refactoring until the above concerns were addressed.

If this approach seems reasonable, we may actually want to implement something similar for other server transactions - e.g., update metadata whenever we get :not_leader_for_partition.

I'll update #247 with instructions on how to reproduce the original problem.

@bjhaid and @joshuawscott I know it's the holiday week but I'd really appreciate your input on this. @CJPoll this hopefully fixes our issue.

This gets rid of the `{:error, {:already_started, _}}` issues that were
(I think) causing consumer groups from gracefully handling rebalances.
true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
request = JoinGroup.create_request(state.correlation_id, @client_id, join_group_request)
wire_request = JoinGroup.create_request(state.correlation_id, @client_id, join_group_request)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 97).

wire_request = JoinGroup.create_request(state.correlation_id, @client_id, join_group_request)
response = broker
|> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout))
|> NetworkClient.send_sync_request(wire_request, config_sync_timeout(network_timeout))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 92).


if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_join_group(join_group_request, network_timeout, updated_state)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 81).

true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
request = SyncGroup.create_request(state.correlation_id, @client_id, sync_group_request)
wire_request = SyncGroup.create_request(state.correlation_id, @client_id, sync_group_request)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 97).

wire_request = SyncGroup.create_request(state.correlation_id, @client_id, sync_group_request)
response = broker
|> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout))
|> NetworkClient.send_sync_request(wire_request, config_sync_timeout(network_timeout))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 92).


if response.error_code == :not_coordinator_for_consumer do
{_, updated_state} = update_consumer_metadata(state)
kafka_server_sync_group(sync_group_request, network_timeout, updated_state)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 81).

true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
request = LeaveGroup.create_request(state.correlation_id, @client_id, request)
wire_request = LeaveGroup.create_request(state.correlation_id, @client_id, request)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 87).


response = broker
|> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout))
|> NetworkClient.send_sync_request(wire_request, config_sync_timeout(network_timeout))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 92).

true = consumer_group?(state)
{broker, state} = broker_for_consumer_group_with_update(state)
request = Heartbeat.create_request(state.correlation_id, @client_id, request)
wire_request = Heartbeat.create_request(state.correlation_id, @client_id, request)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 86).

wire_request = Heartbeat.create_request(state.correlation_id, @client_id, request)
response = broker
|> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout))
|> NetworkClient.send_sync_request(wire_request, config_sync_timeout(network_timeout))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line is too long (max is 80, was 92).

@dantswain
Copy link
Collaborator Author

I will address the Ebert issues before final merge, pending requested discussion.

Copy link
Member

@joshuawscott joshuawscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine to me. I guess it could be debated where to put the code, but there is a lot of refactoring that should be done, and it feels like it would be better to undertake that after the bugs are worked out sufficiently?

@sourcelevel-bot
Copy link

Ebert has finished reviewing this Pull Request and has found:

  • 5 fixed issues! 🎉

But beware that this branch is 2 commits behind the kafkaex:master branch, and a review of an up to date branch would produce more accurate results.

You can see more details about this review at https://ebertapp.io/github/kafkaex/kafka_ex/pulls/251.

@dantswain
Copy link
Collaborator Author

I'm going to merge this but holding off until Monday to do a release.

@dantswain dantswain merged commit 93facc0 into master Nov 22, 2017
@dantswain dantswain deleted the consumer_coordinator_fix branch November 22, 2017 16:09
robotarmy pushed a commit to RAM9/kafka_ex that referenced this pull request Apr 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

A broker going down or coming up causes a never-ending loop of rebalances

4 participants