diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index b7bc465c..0ab35c42 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -73,96 +73,92 @@ defmodule KafkaEx.Server0P9P0 do {:ok, state} end - def kafka_server_join_group(join_group_request, network_timeout, state) do - true = consumer_group?(state) - {broker, state} = broker_for_consumer_group_with_update(state) - wire_request = JoinGroup.create_request( - state.correlation_id, - @client_id, - join_group_request + def kafka_server_join_group(request, network_timeout, state_in) do + true = consumer_group?(state_in) + + {response, state_out} = consumer_group_sync_request( + request, + JoinGroup, + network_timeout, + state_in ) - sync_timeout = config_sync_timeout(network_timeout) - response = broker - |> NetworkClient.send_sync_request(wire_request, sync_timeout) - |> JoinGroup.parse_response - if response.error_code == :not_coordinator_for_consumer do - {_, updated_state} = update_consumer_metadata(state) - kafka_server_join_group( - join_group_request, - network_timeout, - updated_state - ) - else - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} - end + {:reply, response, state_out} end - def kafka_server_sync_group(sync_group_request, network_timeout, state) do - true = consumer_group?(state) - {broker, state} = broker_for_consumer_group_with_update(state) - wire_request = SyncGroup.create_request( - state.correlation_id, - @client_id, - sync_group_request + def kafka_server_sync_group(request, network_timeout, state_in) do + true = consumer_group?(state_in) + + {response, state_out} = consumer_group_sync_request( + request, + SyncGroup, + network_timeout, + state_in ) - sync_timeout = config_sync_timeout(network_timeout) - response = broker - |> NetworkClient.send_sync_request(wire_request, sync_timeout) - |> SyncGroup.parse_response - if response.error_code == :not_coordinator_for_consumer do - {_, updated_state} = update_consumer_metadata(state) - kafka_server_sync_group( - sync_group_request, - network_timeout, - updated_state - ) - else - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} - end + {:reply, response, state_out} end - def kafka_server_leave_group(request, network_timeout, state) do - true = consumer_group?(state) - {broker, state} = broker_for_consumer_group_with_update(state) - wire_request = LeaveGroup.create_request( - state.correlation_id, - @client_id, - request + def kafka_server_leave_group(request, network_timeout, state_in) do + true = consumer_group?(state_in) + + {response, state_out} = consumer_group_sync_request( + request, + LeaveGroup, + network_timeout, + state_in ) - sync_timeout = config_sync_timeout(network_timeout) - response = broker - |> NetworkClient.send_sync_request(wire_request, sync_timeout) - |> LeaveGroup.parse_response + {:reply, response, state_out} + end - if response.error_code == :not_coordinator_for_consumer do - {_, updated_state} = update_consumer_metadata(state) - kafka_server_leave_group(request, network_timeout, updated_state) - else - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} - end + def kafka_server_heartbeat(request, network_timeout, state_in) do + true = consumer_group?(state_in) + + {response, state_out} = consumer_group_sync_request( + request, + Heartbeat, + network_timeout, + state_in + ) + + {:reply, response, state_out} end - def kafka_server_heartbeat(request, network_timeout, state) do - true = consumer_group?(state) + defp consumer_group_sync_request( + request, + protocol_module, + network_timeout, + state + ) do {broker, state} = broker_for_consumer_group_with_update(state) - wire_request = Heartbeat.create_request( + + sync_timeout = config_sync_timeout(network_timeout) + + wire_request = protocol_module.create_request( state.correlation_id, @client_id, request ) - sync_timeout = config_sync_timeout(network_timeout) - response = broker - |> NetworkClient.send_sync_request(wire_request, sync_timeout) - |> Heartbeat.parse_response + wire_response = NetworkClient.send_sync_request( + broker, + wire_request, + sync_timeout + ) + response = protocol_module.parse_response(wire_response) + + state_out = %{state | correlation_id: state.correlation_id + 1} if response.error_code == :not_coordinator_for_consumer do - {_, updated_state} = update_consumer_metadata(state) - kafka_server_heartbeat(request, network_timeout, updated_state) + {_, updated_state_out} = update_consumer_metadata(state_out) + consumer_group_sync_request( + request, + protocol_module, + network_timeout, + updated_state_out + ) else - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + {response, state_out} end end