diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index cb41691d..c2810f92 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -169,7 +169,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do # timeout. def terminate(_reason, %State{generation_id: nil, member_id: nil}), do: :ok def terminate(_reason, %State{} = state) do - :ok = leave(state) + {:ok, _state} = leave(state) Process.unlink(state.worker_name) KafkaEx.stop_worker(state.worker_name) end @@ -190,8 +190,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do # the leader, which is reponsible for assigning partitions to all of the # group members. Once a `JoinGroupResponse` is received, all group members # must send a `SyncGroupRequest` (see sync/2). - - @spec join(State.t) :: {:ok, State.t} defp join( %State{ worker_name: worker_name, @@ -253,41 +251,40 @@ defmodule KafkaEx.ConsumerGroup.Manager do # Upon receiving a successful `SyncGroupResponse`, a group member is free to # start consuming from its assigned partitions, but must send periodic # heartbeats to the coordinating broker. - @spec sync(State.t, assignments) :: {:ok, State.t} defp sync( - %State{ - group_name: group_name, - member_id: member_id, - generation_id: generation_id, - worker_name: worker_name, - session_timeout: session_timeout - } = state, - assignments - ) do + %State{ + group_name: group_name, + member_id: member_id, + generation_id: generation_id, + worker_name: worker_name, + session_timeout: session_timeout + } = state, + assignments + ) do sync_request = %SyncGroupRequest{ group_name: group_name, member_id: member_id, generation_id: generation_id, - assignments: assignments, + assignments: assignments } - sync_group_response = KafkaEx.sync_group( - sync_request, - worker_name: worker_name, - timeout: session_timeout + @session_timeout_padding - ) + %SyncGroupResponse{error_code: error_code, assignments: assignments} = + KafkaEx.sync_group( + sync_request, + worker_name: worker_name, + timeout: session_timeout + @session_timeout_padding + ) - case sync_group_response do - %SyncGroupResponse{error_code: :no_error, assignments: assignments} -> + case error_code do + :no_error -> # On a high-latency connection, the join/sync process takes a long # time. Send a heartbeat as soon as possible to avoid hitting the # session timeout. - new_state = state - |> start_heartbeat_timer() - |> stop_consumer() - |> start_consumer(unpack_assignments(assignments)) - {:ok, new_state} - %SyncGroupResponse{error_code: :rebalance_in_progress} -> + {:ok, state} = start_heartbeat_timer(state) + {:ok, state} = stop_consumer(state) + start_consumer(state, unpack_assignments(assignments)) + + :rebalance_in_progress -> rebalance(state) end end @@ -296,7 +293,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do # broker that the member is leaving the group without having to wait for the # session timeout to expire. Leaving a group triggers a rebalance for the # remaining group members. - @spec leave(State.t) :: :ok defp leave( %State{ worker_name: worker_name, @@ -323,19 +319,17 @@ defmodule KafkaEx.ConsumerGroup.Manager do end) end - :ok + {:ok, state} end # When instructed that a rebalance is in progress, a group member must rejoin # the group with `JoinGroupRequest` (see join/1). To keep the state # synchronized during the join/sync phase, each member pauses its consumers # and commits its offsets before rejoining the group. - @spec rebalance(State.t) :: {:ok, State.t} defp rebalance(%State{} = state) do - state - |> stop_heartbeat_timer() - |> stop_consumer() - |> join() + {:ok, state} = stop_heartbeat_timer(state) + {:ok, state} = stop_consumer(state) + join(state) end ### Timer Management @@ -343,29 +337,27 @@ defmodule KafkaEx.ConsumerGroup.Manager do # Starts a heartbeat process to send heartbeats in the background to keep the # consumers active even if it takes a long time to process a batch of # messages. - @spec start_heartbeat_timer(State.t) :: State.t defp start_heartbeat_timer(%State{} = state) do {:ok, timer} = Heartbeat.start_link(state) - %State{state | heartbeat_timer: timer} + {:ok, %State{state | heartbeat_timer: timer}} end # Stops the heartbeat process. - @spec stop_heartbeat_timer(State.t) :: State.t - defp stop_heartbeat_timer(%State{heartbeat_timer: nil} = state), do: state + defp stop_heartbeat_timer(%State{heartbeat_timer: nil} = state), do: {:ok, state} defp stop_heartbeat_timer( %State{heartbeat_timer: heartbeat_timer} = state ) do if Process.alive?(heartbeat_timer) do :gen_server.stop(heartbeat_timer) end - %State{state | heartbeat_timer: nil} + new_state = %State{state | heartbeat_timer: nil} + {:ok, new_state} end ### Consumer Management # Starts consuming from the member's assigned partitions. - @spec start_consumer(State.t, assignments) :: State.t defp start_consumer( %State{ consumer_module: consumer_module, @@ -383,19 +375,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do consumer_opts ) - %{ + state = %{ state | assignments: assignments, consumer_supervisor_pid: consumer_supervisor_pid } + {:ok, state} end # Stops consuming from the member's assigned partitions and commits offsets. - @spec stop_consumer(State.t) :: State.t defp stop_consumer(%State{supervisor_pid: pid} = state) do :ok = ConsumerGroup.stop_consumer(pid) - state + {:ok, state} end ### Partition Assignment @@ -404,7 +396,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do # interest to this consumer group. This function returns a list of # topic/partition tuples that can be passed to a GenConsumer's # `assign_partitions` method. - @spec assignable_partitions(State.t) :: assignments() defp assignable_partitions( %State{worker_name: worker_name, topics: topics, group_name: group_name} ) do @@ -434,7 +425,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do # of topic/partition tuples, obtained from `assignable_partitions/1`. The # return value is a complete list of member assignments in the format needed # by `SyncGroupResponse`. - @spec assign_partitions(State.t, list(), list()) :: list() defp assign_partitions( %State{partition_assignment_callback: partition_assignment_callback}, members,