From 43af6a4c4d11fe6eefa222c02f7ac62c608f8cb2 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 28 Nov 2017 11:20:51 -0500 Subject: [PATCH 1/2] Refactor consumer group requests, fix corr id The correlation id wasn't getting correctly implemented every time in my previous fix. Refactoring seemed like a good idea to hopefully get better consistency. --- lib/kafka_ex/server_0_p_9_p_0.ex | 160 +++++++++++++++++-------------- 1 file changed, 90 insertions(+), 70 deletions(-) diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index b7bc465c..7d27524b 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -73,96 +73,116 @@ defmodule KafkaEx.Server0P9P0 do {:ok, state} end - def kafka_server_join_group(join_group_request, network_timeout, state) do - true = consumer_group?(state) - {broker, state} = broker_for_consumer_group_with_update(state) - wire_request = JoinGroup.create_request( - state.correlation_id, - @client_id, - join_group_request - ) - sync_timeout = config_sync_timeout(network_timeout) - response = broker - |> NetworkClient.send_sync_request(wire_request, sync_timeout) - |> JoinGroup.parse_response - - if response.error_code == :not_coordinator_for_consumer do - {_, updated_state} = update_consumer_metadata(state) - kafka_server_join_group( - join_group_request, - network_timeout, - updated_state + def kafka_server_join_group(join_group_request, network_timeout, state_in) do + true = consumer_group?(state_in) + + request_builder = fn(state) -> + JoinGroup.create_request( + state.correlation_id, + @client_id, + join_group_request ) - else - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - end + response_parser = &JoinGroup.parse_response/1 - def kafka_server_sync_group(sync_group_request, network_timeout, state) do - true = consumer_group?(state) - {broker, state} = broker_for_consumer_group_with_update(state) - wire_request = SyncGroup.create_request( - state.correlation_id, - @client_id, - sync_group_request + {response, state_out} = consumer_group_sync_request( + request_builder, + response_parser, + network_timeout, + state_in ) - sync_timeout = config_sync_timeout(network_timeout) - response = broker - |> NetworkClient.send_sync_request(wire_request, sync_timeout) - |> SyncGroup.parse_response - if response.error_code == :not_coordinator_for_consumer do - {_, updated_state} = update_consumer_metadata(state) - kafka_server_sync_group( - sync_group_request, - network_timeout, - updated_state + {:reply, response, state_out} + end + + def kafka_server_sync_group(sync_group_request, network_timeout, state_in) do + true = consumer_group?(state_in) + + request_builder = fn(state) -> + SyncGroup.create_request( + state.correlation_id, + @client_id, + sync_group_request ) - else - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end + response_parser = &SyncGroup.parse_response/1 + + {response, state_out} = consumer_group_sync_request( + request_builder, + response_parser, + network_timeout, + state_in + ) + + {:reply, response, state_out} end - def kafka_server_leave_group(request, network_timeout, state) do - true = consumer_group?(state) - {broker, state} = broker_for_consumer_group_with_update(state) - wire_request = LeaveGroup.create_request( - state.correlation_id, - @client_id, - request + def kafka_server_leave_group(request, network_timeout, state_in) do + true = consumer_group?(state_in) + + request_builder = fn(state) -> + LeaveGroup.create_request(state.correlation_id, @client_id, request) + end + response_parser = &LeaveGroup.parse_response/1 + + {response, state_out} = consumer_group_sync_request( + request_builder, + response_parser, + network_timeout, + state_in ) - sync_timeout = config_sync_timeout(network_timeout) - response = broker - |> NetworkClient.send_sync_request(wire_request, sync_timeout) - |> LeaveGroup.parse_response + {:reply, response, state_out} + end - if response.error_code == :not_coordinator_for_consumer do - {_, updated_state} = update_consumer_metadata(state) - kafka_server_leave_group(request, network_timeout, updated_state) - else - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + def kafka_server_heartbeat(request, network_timeout, state_in) do + true = consumer_group?(state_in) + + request_builder = fn(state) -> + Heartbeat.create_request(state.correlation_id, @client_id, request) end + response_parser = &Heartbeat.parse_response/1 + + {response, state_out} = consumer_group_sync_request( + request_builder, + response_parser, + network_timeout, + state_in + ) + + {:reply, response, state_out} end - def kafka_server_heartbeat(request, network_timeout, state) do - true = consumer_group?(state) + defp consumer_group_sync_request( + request_builder, + response_parser, + network_timeout, + state + ) do {broker, state} = broker_for_consumer_group_with_update(state) - wire_request = Heartbeat.create_request( - state.correlation_id, - @client_id, - request - ) + sync_timeout = config_sync_timeout(network_timeout) - response = broker - |> NetworkClient.send_sync_request(wire_request, sync_timeout) - |> Heartbeat.parse_response + + wire_request = request_builder.(state) + wire_response = NetworkClient.send_sync_request( + broker, + wire_request, + sync_timeout + ) + response = response_parser.(wire_response) + + state_out = %{state | correlation_id: state.correlation_id + 1} if response.error_code == :not_coordinator_for_consumer do - {_, updated_state} = update_consumer_metadata(state) - kafka_server_heartbeat(request, network_timeout, updated_state) + {_, updated_state_out} = update_consumer_metadata(state_out) + consumer_group_sync_request( + request_builder, + response_parser, + network_timeout, + updated_state_out + ) else - {:reply, response, %{state | correlation_id: state.correlation_id + 1}} + {response, state_out} end end From 941a9e1d86252e8c5ef6e3d8235795a38215be42 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 28 Nov 2017 11:53:53 -0500 Subject: [PATCH 2/2] Refactor around common protocol methods --- lib/kafka_ex/server_0_p_9_p_0.ex | 64 ++++++++++---------------------- 1 file changed, 20 insertions(+), 44 deletions(-) diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 7d27524b..0ab35c42 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -73,21 +73,12 @@ defmodule KafkaEx.Server0P9P0 do {:ok, state} end - def kafka_server_join_group(join_group_request, network_timeout, state_in) do + def kafka_server_join_group(request, network_timeout, state_in) do true = consumer_group?(state_in) - request_builder = fn(state) -> - JoinGroup.create_request( - state.correlation_id, - @client_id, - join_group_request - ) - end - response_parser = &JoinGroup.parse_response/1 - {response, state_out} = consumer_group_sync_request( - request_builder, - response_parser, + request, + JoinGroup, network_timeout, state_in ) @@ -95,21 +86,12 @@ defmodule KafkaEx.Server0P9P0 do {:reply, response, state_out} end - def kafka_server_sync_group(sync_group_request, network_timeout, state_in) do + def kafka_server_sync_group(request, network_timeout, state_in) do true = consumer_group?(state_in) - request_builder = fn(state) -> - SyncGroup.create_request( - state.correlation_id, - @client_id, - sync_group_request - ) - end - response_parser = &SyncGroup.parse_response/1 - {response, state_out} = consumer_group_sync_request( - request_builder, - response_parser, + request, + SyncGroup, network_timeout, state_in ) @@ -120,14 +102,9 @@ defmodule KafkaEx.Server0P9P0 do def kafka_server_leave_group(request, network_timeout, state_in) do true = consumer_group?(state_in) - request_builder = fn(state) -> - LeaveGroup.create_request(state.correlation_id, @client_id, request) - end - response_parser = &LeaveGroup.parse_response/1 - {response, state_out} = consumer_group_sync_request( - request_builder, - response_parser, + request, + LeaveGroup, network_timeout, state_in ) @@ -138,14 +115,9 @@ defmodule KafkaEx.Server0P9P0 do def kafka_server_heartbeat(request, network_timeout, state_in) do true = consumer_group?(state_in) - request_builder = fn(state) -> - Heartbeat.create_request(state.correlation_id, @client_id, request) - end - response_parser = &Heartbeat.parse_response/1 - {response, state_out} = consumer_group_sync_request( - request_builder, - response_parser, + request, + Heartbeat, network_timeout, state_in ) @@ -154,8 +126,8 @@ defmodule KafkaEx.Server0P9P0 do end defp consumer_group_sync_request( - request_builder, - response_parser, + request, + protocol_module, network_timeout, state ) do @@ -163,21 +135,25 @@ defmodule KafkaEx.Server0P9P0 do sync_timeout = config_sync_timeout(network_timeout) - wire_request = request_builder.(state) + wire_request = protocol_module.create_request( + state.correlation_id, + @client_id, + request + ) wire_response = NetworkClient.send_sync_request( broker, wire_request, sync_timeout ) - response = response_parser.(wire_response) + response = protocol_module.parse_response(wire_response) state_out = %{state | correlation_id: state.correlation_id + 1} if response.error_code == :not_coordinator_for_consumer do {_, updated_state_out} = update_consumer_metadata(state_out) consumer_group_sync_request( - request_builder, - response_parser, + request, + protocol_module, network_timeout, updated_state_out )