From 9ade43e79023c47f4a152c58fa6b492ac5ec5539 Mon Sep 17 00:00:00 2001 From: Joshua Scott Date: Wed, 16 May 2018 15:12:14 -0500 Subject: [PATCH 1/4] Improvements in Manager implementation This changes most of the state-altering private functions in Manager to return `{:ok, state}` so that we can catch and report errors a little more easily when they happen --- lib/kafka_ex/consumer_group/manager.ex | 103 ++++++++++++++----------- 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index cb41691d..79e31c09 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -160,8 +160,10 @@ defmodule KafkaEx.ConsumerGroup.Manager do # If the heartbeat gets an error, we need to rebalance. def handle_info({:EXIT, heartbeat_timer, {:shutdown, :rebalance}}, %State{heartbeat_timer: heartbeat_timer} = state) do - {:ok, state} = rebalance(state) - {:noreply, state} + case rebalance(state) do + {:ok, state} -> {:noreply, state} + {:error, error} -> raise "failed to rebalance with error #{inspect error}" + end end # When terminating, inform the group coordinator that this member is leaving @@ -169,7 +171,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do # timeout. def terminate(_reason, %State{generation_id: nil, member_id: nil}), do: :ok def terminate(_reason, %State{} = state) do - :ok = leave(state) + {:ok, _state} = leave(state) Process.unlink(state.worker_name) KafkaEx.stop_worker(state.worker_name) end @@ -190,8 +192,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do # the leader, which is reponsible for assigning partitions to all of the # group members. Once a `JoinGroupResponse` is received, all group members # must send a `SyncGroupRequest` (see sync/2). - - @spec join(State.t) :: {:ok, State.t} defp join( %State{ worker_name: worker_name, @@ -253,41 +253,50 @@ defmodule KafkaEx.ConsumerGroup.Manager do # Upon receiving a successful `SyncGroupResponse`, a group member is free to # start consuming from its assigned partitions, but must send periodic # heartbeats to the coordinating broker. - @spec sync(State.t, assignments) :: {:ok, State.t} defp sync( - %State{ - group_name: group_name, - member_id: member_id, - generation_id: generation_id, - worker_name: worker_name, - session_timeout: session_timeout - } = state, - assignments - ) do + %State{ + group_name: group_name, + member_id: member_id, + generation_id: generation_id, + worker_name: worker_name, + session_timeout: session_timeout + } = state, + assignments + ) do sync_request = %SyncGroupRequest{ group_name: group_name, member_id: member_id, generation_id: generation_id, - assignments: assignments, + assignments: assignments } - sync_group_response = KafkaEx.sync_group( - sync_request, - worker_name: worker_name, - timeout: session_timeout + @session_timeout_padding - ) + %SyncGroupResponse{error_code: error_code, assignments: assignments} = + KafkaEx.sync_group( + sync_request, + worker_name: worker_name, + timeout: session_timeout + @session_timeout_padding + ) + + unpacked_assignments = unpack_assignments(assignments) - case sync_group_response do - %SyncGroupResponse{error_code: :no_error, assignments: assignments} -> + case error_code do + :no_error -> # On a high-latency connection, the join/sync process takes a long # time. Send a heartbeat as soon as possible to avoid hitting the # session timeout. - new_state = state - |> start_heartbeat_timer() - |> stop_consumer() - |> start_consumer(unpack_assignments(assignments)) - {:ok, new_state} - %SyncGroupResponse{error_code: :rebalance_in_progress} -> + with {:ok, state} <- start_heartbeat_timer(state), + {:ok, state} <- stop_consumer(state), + {:ok, state} <- + start_consumer(state, unpack_assignments(assignments)) do + {:ok, state} + else + e -> + raise "Failed to restart consumers after joining group: #{ + inspect(e) + }" + end + + :rebalance_in_progress -> rebalance(state) end end @@ -296,7 +305,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do # broker that the member is leaving the group without having to wait for the # session timeout to expire. Leaving a group triggers a rebalance for the # remaining group members. - @spec leave(State.t) :: :ok defp leave( %State{ worker_name: worker_name, @@ -323,19 +331,24 @@ defmodule KafkaEx.ConsumerGroup.Manager do end) end - :ok + {:ok, state} end # When instructed that a rebalance is in progress, a group member must rejoin # the group with `JoinGroupRequest` (see join/1). To keep the state # synchronized during the join/sync phase, each member pauses its consumers # and commits its offsets before rejoining the group. - @spec rebalance(State.t) :: {:ok, State.t} defp rebalance(%State{} = state) do - state - |> stop_heartbeat_timer() - |> stop_consumer() - |> join() + with {:ok, state} <- stop_heartbeat_timer(state), + {:ok, state} <- stop_consumer(state), + {:ok, state} <- join(state) + do + {:ok, state} + else + {:error, e} -> + Logger.warn("Error in rebalance: #{inspect e}") + {:error, e} + end end ### Timer Management @@ -343,29 +356,27 @@ defmodule KafkaEx.ConsumerGroup.Manager do # Starts a heartbeat process to send heartbeats in the background to keep the # consumers active even if it takes a long time to process a batch of # messages. - @spec start_heartbeat_timer(State.t) :: State.t defp start_heartbeat_timer(%State{} = state) do {:ok, timer} = Heartbeat.start_link(state) - %State{state | heartbeat_timer: timer} + {:ok, %State{state | heartbeat_timer: timer}} end # Stops the heartbeat process. - @spec stop_heartbeat_timer(State.t) :: State.t - defp stop_heartbeat_timer(%State{heartbeat_timer: nil} = state), do: state + defp stop_heartbeat_timer(%State{heartbeat_timer: nil} = state), do: {:ok, state} defp stop_heartbeat_timer( %State{heartbeat_timer: heartbeat_timer} = state ) do if Process.alive?(heartbeat_timer) do :gen_server.stop(heartbeat_timer) end - %State{state | heartbeat_timer: nil} + new_state = %State{state | heartbeat_timer: nil} + {:ok, new_state} end ### Consumer Management # Starts consuming from the member's assigned partitions. - @spec start_consumer(State.t, assignments) :: State.t defp start_consumer( %State{ consumer_module: consumer_module, @@ -383,19 +394,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do consumer_opts ) - %{ + state = %{ state | assignments: assignments, consumer_supervisor_pid: consumer_supervisor_pid } + {:ok, state} end # Stops consuming from the member's assigned partitions and commits offsets. - @spec stop_consumer(State.t) :: State.t defp stop_consumer(%State{supervisor_pid: pid} = state) do :ok = ConsumerGroup.stop_consumer(pid) - state + {:ok, state} end ### Partition Assignment @@ -404,7 +415,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do # interest to this consumer group. This function returns a list of # topic/partition tuples that can be passed to a GenConsumer's # `assign_partitions` method. - @spec assignable_partitions(State.t) :: assignments() defp assignable_partitions( %State{worker_name: worker_name, topics: topics, group_name: group_name} ) do @@ -434,7 +444,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do # of topic/partition tuples, obtained from `assignable_partitions/1`. The # return value is a complete list of member assignments in the format needed # by `SyncGroupResponse`. - @spec assign_partitions(State.t, list(), list()) :: list() defp assign_partitions( %State{partition_assignment_callback: partition_assignment_callback}, members, From 3a16f3f1687d84de5403d64d58ed340af97d3807 Mon Sep 17 00:00:00 2001 From: Joshua Scott Date: Wed, 16 May 2018 15:24:38 -0500 Subject: [PATCH 2/4] support pre-1.3 elixir --- lib/kafka_ex/consumer_group/manager.ex | 27 ++++++-------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 79e31c09..0d34ba5e 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -284,17 +284,9 @@ defmodule KafkaEx.ConsumerGroup.Manager do # On a high-latency connection, the join/sync process takes a long # time. Send a heartbeat as soon as possible to avoid hitting the # session timeout. - with {:ok, state} <- start_heartbeat_timer(state), - {:ok, state} <- stop_consumer(state), - {:ok, state} <- - start_consumer(state, unpack_assignments(assignments)) do - {:ok, state} - else - e -> - raise "Failed to restart consumers after joining group: #{ - inspect(e) - }" - end + {:ok, state} = start_heartbeat_timer(state) + {:ok, state} = stop_consumer(state) + start_consumer(state, unpack_assignments(assignments)) :rebalance_in_progress -> rebalance(state) @@ -339,16 +331,9 @@ defmodule KafkaEx.ConsumerGroup.Manager do # synchronized during the join/sync phase, each member pauses its consumers # and commits its offsets before rejoining the group. defp rebalance(%State{} = state) do - with {:ok, state} <- stop_heartbeat_timer(state), - {:ok, state} <- stop_consumer(state), - {:ok, state} <- join(state) - do - {:ok, state} - else - {:error, e} -> - Logger.warn("Error in rebalance: #{inspect e}") - {:error, e} - end + {:ok, state} = stop_heartbeat_timer(state), + {:ok, state} = stop_consumer(state), + join(state) end ### Timer Management From 0c045d5fbd8072d65854d690c24414df020c6b5d Mon Sep 17 00:00:00 2001 From: Joshua Scott Date: Wed, 16 May 2018 15:27:14 -0500 Subject: [PATCH 3/4] typo --- lib/kafka_ex/consumer_group/manager.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 0d34ba5e..335aeb8f 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -331,8 +331,8 @@ defmodule KafkaEx.ConsumerGroup.Manager do # synchronized during the join/sync phase, each member pauses its consumers # and commits its offsets before rejoining the group. defp rebalance(%State{} = state) do - {:ok, state} = stop_heartbeat_timer(state), - {:ok, state} = stop_consumer(state), + {:ok, state} = stop_heartbeat_timer(state) + {:ok, state} = stop_consumer(state) join(state) end From 6f9a2c2d9af139a0e721e8df997ffc4905463934 Mon Sep 17 00:00:00 2001 From: Joshua Scott Date: Wed, 16 May 2018 15:33:50 -0500 Subject: [PATCH 4/4] clean up --- lib/kafka_ex/consumer_group/manager.ex | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 335aeb8f..c2810f92 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -160,10 +160,8 @@ defmodule KafkaEx.ConsumerGroup.Manager do # If the heartbeat gets an error, we need to rebalance. def handle_info({:EXIT, heartbeat_timer, {:shutdown, :rebalance}}, %State{heartbeat_timer: heartbeat_timer} = state) do - case rebalance(state) do - {:ok, state} -> {:noreply, state} - {:error, error} -> raise "failed to rebalance with error #{inspect error}" - end + {:ok, state} = rebalance(state) + {:noreply, state} end # When terminating, inform the group coordinator that this member is leaving @@ -277,8 +275,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do timeout: session_timeout + @session_timeout_padding ) - unpacked_assignments = unpack_assignments(assignments) - case error_code do :no_error -> # On a high-latency connection, the join/sync process takes a long