Skip to content

Commit

Permalink
Separate Heartbeat from Manager
Browse files Browse the repository at this point in the history
This moves the heartbeat functionality into a separate GenServer so that
even if the worker startup takes a long time, such as over high latency
connections, the broker won't remove the consumer from the group.

This seems to solve issues we've had where we were getting
:unknown_member_id errors back.

The heartbeat notifies the manager of a need for rebalance by exiting
with a reason of `:rebalance`. The Manager starts a rebalance upon
receiving that message.
  • Loading branch information
joshuawscott committed May 15, 2018
1 parent 1e0f080 commit d8580e4
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 58 deletions.
5 changes: 5 additions & 0 deletions .formatter.exs
@@ -0,0 +1,5 @@
# Used by "mix format"
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 80
]
81 changes: 81 additions & 0 deletions lib/kafka_ex/consumer_group/heartbeat.ex
@@ -0,0 +1,81 @@
defmodule KafkaEx.ConsumerGroup.Heartbeat do
@moduledoc false
# GenServer to send heartbeats to the broker
#
# A `HeartbeatRequest` is sent periodically by each active group member (after
# completing the join/sync phase) to inform the broker that the member is
# still alive and participating in the group. If a group member fails to send
# a heartbeat before the group's session timeout expires, the coordinator
# removes that member from the group and initiates a rebalance.
#
# `HeartbeatResponse` allows the coordinating broker to communicate the
# group's status to each member:
#
# * `:no_error` indicates that the group is up to date and no action is
# needed.
# * `:rebalance_in_progress` a rebalance has been initiated, so each member
# should re-join.
# * `:unknown_member_id` means that this heartbeat was from a previous dead
# generation.
#
# For either of the error conditions, the heartbeat process exits, which is
# trapped by the KafkaEx.ConsumerGroup.Manager and handled by re-joining the
# consumer group. (see KafkaEx.ConsumerGroup.Manager.join/1)

require Logger
alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse

defmodule State do
defstruct [
:worker_name,
:heartbeat_request,
:heartbeat_interval
]
end

def start_link(options) do
GenServer.start_link(__MODULE__, options)
end

def init(%{group_name: group_name, member_id: member_id, generation_id: generation_id, worker_name: worker_name, heartbeat_interval: heartbeat_interval}) do
heartbeat_request = %HeartbeatRequest{
group_name: group_name,
member_id: member_id,
generation_id: generation_id
}

state = %State{
worker_name: worker_name,
heartbeat_request: heartbeat_request,
heartbeat_interval: heartbeat_interval
}

{:ok, state, state.heartbeat_interval}
end

def handle_info(
:timeout,
%State{
worker_name: worker_name,
heartbeat_request: heartbeat_request,
heartbeat_interval: heartbeat_interval
} = state
) do
case KafkaEx.heartbeat(heartbeat_request, worker_name: worker_name) do
%HeartbeatResponse{error_code: :no_error} ->
Logger.debug("XXX: HB OK")
{:noreply, state, heartbeat_interval}

%HeartbeatResponse{error_code: :rebalance_in_progress} ->
Logger.debug("XXX: HB REBALANCE")
{:stop, :rebalance, state}

%HeartbeatResponse{error_code: :unknown_member_id} ->
{:stop, :rebalance, state}

%HeartbeatResponse{error_code: error_code} ->
Logger.warn("Heartbeat failed, got error code #{error_code}")
end
end
end
87 changes: 29 additions & 58 deletions lib/kafka_ex/consumer_group/manager.ex
Expand Up @@ -9,8 +9,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do
alias KafkaEx.ConsumerGroup.PartitionAssignment
alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest
alias KafkaEx.Protocol.JoinGroup.Response, as: JoinGroupResponse
alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse
alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
Expand Down Expand Up @@ -38,12 +36,15 @@ defmodule KafkaEx.ConsumerGroup.Manager do
:assignments,
:heartbeat_timer,
]
@type t :: %__MODULE__{}
end

@heartbeat_interval 5_000
@session_timeout 30_000
@session_timeout_padding 5_000

@type assignments :: [{binary(), integer()}]

# Client API

@doc false
Expand Down Expand Up @@ -157,12 +158,10 @@ defmodule KafkaEx.ConsumerGroup.Manager do
{:noreply, new_state}
end

# After joining the group, a member must periodically send heartbeats to the
# group coordinator.
def handle_info(:heartbeat, %State{} = state) do
{:ok, new_state} = heartbeat(state)

{:noreply, new_state}
# If the heartbeat gets an error, we need to rebalance.
def handle_info({:EXIT, heartbeat_timer, :rebalance}, %State{heartbeat_timer: heartbeat_timer} = state) do
{:ok, state} = rebalance(state)
{:noreply, state}
end

# When terminating, inform the group coordinator that this member is leaving
Expand Down Expand Up @@ -192,6 +191,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# group members. Once a `JoinGroupResponse` is received, all group members
# must send a `SyncGroupRequest` (see sync/2).

@spec join(State.t) :: {:ok, State.t}
defp join(
%State{
worker_name: worker_name,
Expand All @@ -215,10 +215,9 @@ defmodule KafkaEx.ConsumerGroup.Manager do
)

# crash the worker if we recieve an error, but do it with a meaningful
# error message
# error message
if join_response.error_code != :no_error do
raise "Error joining consumer group #{group_name}: " <>
"#{inspect join_response.error_code}"
raise "Error joining consumer group #{group_name}: " <> "#{inspect join_response.error_code}"
end

Logger.debug(fn -> "Joined consumer group #{group_name}" end)
Expand Down Expand Up @@ -254,7 +253,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# Upon receiving a successful `SyncGroupResponse`, a group member is free to
# start consuming from its assigned partitions, but must send periodic
# heartbeats to the coordinating broker.

@spec sync(State.t, assignments) :: {:ok, State.t}
defp sync(
%State{
group_name: group_name,
Expand Down Expand Up @@ -283,57 +282,21 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# On a high-latency connection, the join/sync process takes a long
# time. Send a heartbeat as soon as possible to avoid hitting the
# session timeout.
send(self(), :heartbeat)
new_state = state
|> start_heartbeat_timer()
|> stop_consumer()
|> start_consumer(unpack_assignments(assignments))
|> start_heartbeat_timer()
{:ok, new_state}
%SyncGroupResponse{error_code: :rebalance_in_progress} ->
rebalance(state)
end
end

# `HeartbeatRequest` is sent periodically by each active group member (after
# completing the join/sync phase) to inform the broker that the member is
# still alive and participating in the group. If a group member fails to send
# a heartbeat before the group's session timeout expires, the coordinator
# removes that member from the group and initiates a rebalance.
#
# `HeartbeatResponse` allows the coordinating broker to communicate the
# group's status to each member:
#
# * `:no_error` indicates that the group is up to date and no action is
# needed. * `:rebalance_in_progress` instructs each member to rejoin the
# group by sending a `JoinGroupRequest` (see join/1).
defp heartbeat(
%State{
worker_name: worker_name,
group_name: group_name,
generation_id: generation_id,
member_id: member_id
} = state
) do
heartbeat_request = %HeartbeatRequest{
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
}

case KafkaEx.heartbeat(heartbeat_request, worker_name: worker_name) do
%HeartbeatResponse{error_code: :no_error} ->
new_state = start_heartbeat_timer(state)
{:ok, new_state}
%HeartbeatResponse{error_code: :rebalance_in_progress} ->
Logger.debug(fn -> "Rebalancing consumer group #{group_name}" end)
rebalance(state)
end
end

# `LeaveGroupRequest` is used to voluntarily leave a group. This tells the
# broker that the member is leaving the group without having to wait for the
# session timeout to expire. Leaving a group triggers a rebalance for the
# remaining group members.
@spec leave(State.t) :: :ok
defp leave(
%State{
worker_name: worker_name,
Expand Down Expand Up @@ -367,6 +330,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# the group with `JoinGroupRequest` (see join/1). To keep the state
# synchronized during the join/sync phase, each member pauses its consumers
# and commits its offsets before rejoining the group.
@spec rebalance(State.t) :: {:ok, State.t}
defp rebalance(%State{} = state) do
state
|> stop_heartbeat_timer()
Expand All @@ -376,28 +340,32 @@ defmodule KafkaEx.ConsumerGroup.Manager do

### Timer Management

# Starts a timer for the next heartbeat.
defp start_heartbeat_timer(
%State{heartbeat_interval: heartbeat_interval} = state
) do
{:ok, timer} = :timer.send_after(heartbeat_interval, :heartbeat)
# Starts a heartbeat process to send heartbeats in the background to keep the
# consumers active even if it takes a long time to process a batch of
# messages.
@spec start_heartbeat_timer(State.t) :: State.t
defp start_heartbeat_timer(%State{} = state) do
{:ok, timer} = KafkaEx.ConsumerGroup.Heartbeat.start_link(state)

%State{state | heartbeat_timer: timer}
end

# Stops any active heartbeat timer.
# Stops the heartbeat process.
@spec stop_heartbeat_timer(State.t) :: State.t
defp stop_heartbeat_timer(%State{heartbeat_timer: nil} = state), do: state
defp stop_heartbeat_timer(
%State{heartbeat_timer: heartbeat_timer} = state
) do
{:ok, :cancel} = :timer.cancel(heartbeat_timer)

if Process.alive?(heartbeat_timer) do
GenServer.stop(heartbeat_timer)
end
%State{state | heartbeat_timer: nil}
end

### Consumer Management

# Starts consuming from the member's assigned partitions.
@spec start_consumer(State.t, assignments) :: State.t
defp start_consumer(
%State{
consumer_module: consumer_module,
Expand All @@ -423,6 +391,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
end

# Stops consuming from the member's assigned partitions and commits offsets.
@spec stop_consumer(State.t) :: State.t
defp stop_consumer(%State{supervisor_pid: pid} = state) do
:ok = ConsumerGroup.stop_consumer(pid)

Expand All @@ -435,6 +404,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# interest to this consumer group. This function returns a list of
# topic/partition tuples that can be passed to a GenConsumer's
# `assign_partitions` method.
@spec assignable_partitions(State.t) :: assignments()
defp assignable_partitions(
%State{worker_name: worker_name, topics: topics, group_name: group_name}
) do
Expand Down Expand Up @@ -464,6 +434,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
# of topic/partition tuples, obtained from `assignable_partitions/1`. The
# return value is a complete list of member assignments in the format needed
# by `SyncGroupResponse`.
@spec assign_partitions(State.t, list(), list()) :: list()
defp assign_partitions(
%State{partition_assignment_callback: partition_assignment_callback},
members,
Expand Down

0 comments on commit d8580e4

Please sign in to comment.