diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 58cdc743..a8f6a8de 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -278,6 +278,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do case sync_group_response do %SyncGroupResponse{error_code: :no_error, assignments: assignments} -> new_state = state + |> stop_consumer() |> start_consumer(unpack_assignments(assignments)) |> start_heartbeat_timer() {:ok, new_state} diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 37cefb9e..b7bc465c 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -14,6 +14,8 @@ defmodule KafkaEx.Server0P9P0 do alias KafkaEx.NetworkClient alias KafkaEx.Server0P8P2 + require Logger + @consumer_group_update_interval 30_000 def start_link(args, name \\ __MODULE__) @@ -74,41 +76,94 @@ defmodule KafkaEx.Server0P9P0 do def kafka_server_join_group(join_group_request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = JoinGroup.create_request(state.correlation_id, @client_id, join_group_request) + wire_request = JoinGroup.create_request( + state.correlation_id, + @client_id, + join_group_request + ) + sync_timeout = config_sync_timeout(network_timeout) response = broker - |> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout)) + |> NetworkClient.send_sync_request(wire_request, sync_timeout) |> JoinGroup.parse_response - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + + if response.error_code == :not_coordinator_for_consumer do + {_, updated_state} = update_consumer_metadata(state) + kafka_server_join_group( + join_group_request, + network_timeout, + updated_state + ) + else + {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + end end def kafka_server_sync_group(sync_group_request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = SyncGroup.create_request(state.correlation_id, @client_id, sync_group_request) + wire_request = SyncGroup.create_request( + state.correlation_id, + @client_id, + sync_group_request + ) + sync_timeout = config_sync_timeout(network_timeout) response = broker - |> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout)) + |> NetworkClient.send_sync_request(wire_request, sync_timeout) |> SyncGroup.parse_response - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + + if response.error_code == :not_coordinator_for_consumer do + {_, updated_state} = update_consumer_metadata(state) + kafka_server_sync_group( + sync_group_request, + network_timeout, + updated_state + ) + else + {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + end end def kafka_server_leave_group(request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = LeaveGroup.create_request(state.correlation_id, @client_id, request) + wire_request = LeaveGroup.create_request( + state.correlation_id, + @client_id, + request + ) + + sync_timeout = config_sync_timeout(network_timeout) response = broker - |> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout)) + |> NetworkClient.send_sync_request(wire_request, sync_timeout) |> LeaveGroup.parse_response - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + + if response.error_code == :not_coordinator_for_consumer do + {_, updated_state} = update_consumer_metadata(state) + kafka_server_leave_group(request, network_timeout, updated_state) + else + {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + end end def kafka_server_heartbeat(request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = Heartbeat.create_request(state.correlation_id, @client_id, request) + wire_request = Heartbeat.create_request( + state.correlation_id, + @client_id, + request + ) + sync_timeout = config_sync_timeout(network_timeout) response = broker - |> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout)) + |> NetworkClient.send_sync_request(wire_request, sync_timeout) |> Heartbeat.parse_response - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + + if response.error_code == :not_coordinator_for_consumer do + {_, updated_state} = update_consumer_metadata(state) + kafka_server_heartbeat(request, network_timeout, updated_state) + else + {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + end end defp update_consumer_metadata(state), do: update_consumer_metadata(state, @retry_count, 0) @@ -125,9 +180,21 @@ defmodule KafkaEx.Server0P9P0 do |> ConsumerMetadata.parse_response case response.error_code do - :no_error -> {response, %{state | consumer_metadata: response, correlation_id: state.correlation_id + 1}} + :no_error -> + { + response, + %{ + state | + consumer_metadata: response, + correlation_id: state.correlation_id + 1 + } + } _ -> :timer.sleep(400) - update_consumer_metadata(%{state | correlation_id: state.correlation_id + 1}, retry - 1, response.error_code) + update_consumer_metadata( + %{state | correlation_id: state.correlation_id + 1}, + retry - 1, + response.error_code + ) end end