Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 35 additions & 45 deletions lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# timeout.
def terminate(_reason, %State{generation_id: nil, member_id: nil}), do: :ok
def terminate(_reason, %State{} = state) do
:ok = leave(state)
{:ok, _state} = leave(state)
Process.unlink(state.worker_name)
KafkaEx.stop_worker(state.worker_name)
end
Expand All @@ -190,8 +190,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# the leader, which is reponsible for assigning partitions to all of the
# group members. Once a `JoinGroupResponse` is received, all group members
# must send a `SyncGroupRequest` (see sync/2).

@spec join(State.t) :: {:ok, State.t}
defp join(
%State{
worker_name: worker_name,
Expand Down Expand Up @@ -253,41 +251,40 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# Upon receiving a successful `SyncGroupResponse`, a group member is free to
# start consuming from its assigned partitions, but must send periodic
# heartbeats to the coordinating broker.
@spec sync(State.t, assignments) :: {:ok, State.t}
defp sync(
%State{
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
worker_name: worker_name,
session_timeout: session_timeout
} = state,
assignments
) do
%State{
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
worker_name: worker_name,
session_timeout: session_timeout
} = state,
assignments
) do
sync_request = %SyncGroupRequest{
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
assignments: assignments,
assignments: assignments
}

sync_group_response = KafkaEx.sync_group(
sync_request,
worker_name: worker_name,
timeout: session_timeout + @session_timeout_padding
)
%SyncGroupResponse{error_code: error_code, assignments: assignments} =
KafkaEx.sync_group(
sync_request,
worker_name: worker_name,
timeout: session_timeout + @session_timeout_padding
)

case sync_group_response do
%SyncGroupResponse{error_code: :no_error, assignments: assignments} ->
case error_code do
:no_error ->
# On a high-latency connection, the join/sync process takes a long
# time. Send a heartbeat as soon as possible to avoid hitting the
# session timeout.
new_state = state
|> start_heartbeat_timer()
|> stop_consumer()
|> start_consumer(unpack_assignments(assignments))
{:ok, new_state}
%SyncGroupResponse{error_code: :rebalance_in_progress} ->
{:ok, state} = start_heartbeat_timer(state)
{:ok, state} = stop_consumer(state)
start_consumer(state, unpack_assignments(assignments))

:rebalance_in_progress ->
rebalance(state)
end
end
Expand All @@ -296,7 +293,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# broker that the member is leaving the group without having to wait for the
# session timeout to expire. Leaving a group triggers a rebalance for the
# remaining group members.
@spec leave(State.t) :: :ok
defp leave(
%State{
worker_name: worker_name,
Expand All @@ -323,49 +319,45 @@ defmodule KafkaEx.ConsumerGroup.Manager do
end)
end

:ok
{:ok, state}
end

# When instructed that a rebalance is in progress, a group member must rejoin
# the group with `JoinGroupRequest` (see join/1). To keep the state
# synchronized during the join/sync phase, each member pauses its consumers
# and commits its offsets before rejoining the group.
@spec rebalance(State.t) :: {:ok, State.t}
defp rebalance(%State{} = state) do
state
|> stop_heartbeat_timer()
|> stop_consumer()
|> join()
{:ok, state} = stop_heartbeat_timer(state)
{:ok, state} = stop_consumer(state)
join(state)
end

### Timer Management

# Starts a heartbeat process to send heartbeats in the background to keep the
# consumers active even if it takes a long time to process a batch of
# messages.
@spec start_heartbeat_timer(State.t) :: State.t
defp start_heartbeat_timer(%State{} = state) do
{:ok, timer} = Heartbeat.start_link(state)

%State{state | heartbeat_timer: timer}
{:ok, %State{state | heartbeat_timer: timer}}
end

# Stops the heartbeat process.
@spec stop_heartbeat_timer(State.t) :: State.t
defp stop_heartbeat_timer(%State{heartbeat_timer: nil} = state), do: state
defp stop_heartbeat_timer(%State{heartbeat_timer: nil} = state), do: {:ok, state}
defp stop_heartbeat_timer(
%State{heartbeat_timer: heartbeat_timer} = state
) do
if Process.alive?(heartbeat_timer) do
:gen_server.stop(heartbeat_timer)
end
%State{state | heartbeat_timer: nil}
new_state = %State{state | heartbeat_timer: nil}
{:ok, new_state}
end

### Consumer Management

# Starts consuming from the member's assigned partitions.
@spec start_consumer(State.t, assignments) :: State.t
defp start_consumer(
%State{
consumer_module: consumer_module,
Expand All @@ -383,19 +375,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do
consumer_opts
)

%{
state = %{
state |
assignments: assignments,
consumer_supervisor_pid: consumer_supervisor_pid
}
{:ok, state}
end

# Stops consuming from the member's assigned partitions and commits offsets.
@spec stop_consumer(State.t) :: State.t
defp stop_consumer(%State{supervisor_pid: pid} = state) do
:ok = ConsumerGroup.stop_consumer(pid)

state
{:ok, state}
end

### Partition Assignment
Expand All @@ -404,7 +396,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# interest to this consumer group. This function returns a list of
# topic/partition tuples that can be passed to a GenConsumer's
# `assign_partitions` method.
@spec assignable_partitions(State.t) :: assignments()
defp assignable_partitions(
%State{worker_name: worker_name, topics: topics, group_name: group_name}
) do
Expand Down Expand Up @@ -434,7 +425,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# of topic/partition tuples, obtained from `assignable_partitions/1`. The
# return value is a complete list of member assignments in the format needed
# by `SyncGroupResponse`.
@spec assign_partitions(State.t, list(), list()) :: list()
defp assign_partitions(
%State{partition_assignment_callback: partition_assignment_callback},
members,
Expand Down