Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ defmodule KafkaEx do
alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse
alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse
alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest
alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse
alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest
alias KafkaEx.Protocol.JoinGroup.Response, as: JoinGroupResponse
alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest
alias KafkaEx.Protocol.LeaveGroup.Response, as: LeaveGroupResponse
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Protocol.Offset.Response, as: OffsetResponse
alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest
Expand All @@ -14,6 +20,8 @@ defmodule KafkaEx do
alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest
alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Produce.Message
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse
alias KafkaEx.Server

@type uri() :: [{binary|char_list, number}]
Expand Down Expand Up @@ -74,6 +82,46 @@ defmodule KafkaEx do
Server.call(worker, :consumer_group)
end

@doc """
Sends a request to join a consumer group.
"""
@spec join_group(JoinGroupRequest.t, Keyword.t) :: JoinGroupResponse.t
def join_group(request, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:join_group, request, timeout}, opts)
end

@doc """
Sends a request to synchronize with a consumer group.
"""
@spec sync_group(SyncGroupRequest.t, Keyword.t) :: SyncGroupResponse.t
def sync_group(request, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:sync_group, request, timeout}, opts)
end

@doc """
Sends a request to leave a consumer group.
"""
@spec leave_group(LeaveGroupRequest.t, Keyword.t) :: LeaveGroupResponse.t
def leave_group(request, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:leave_group, request, timeout}, opts)
end

@doc """
Sends a heartbeat to maintain membership in a consumer group.
"""
@spec heartbeat(HeartbeatRequest.t, Keyword.t) :: HeartbeatResponse.t
def heartbeat(request, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
timeout = Keyword.get(opts, :timeout)
Server.call(worker_name, {:heartbeat, request, timeout}, opts)
end

@doc """
Return metadata for the given topic; returns for all topics if topic is empty string

Expand Down Expand Up @@ -196,7 +244,7 @@ defmodule KafkaEx do
}, opts)
end

@spec offset_commit(atom, OffsetCommitRequest.t) :: OffsetCommitResponse.t
@spec offset_commit(atom, OffsetCommitRequest.t) :: [OffsetCommitResponse.t]
def offset_commit(worker_name, offset_commit_request) do
Server.call(worker_name, {:offset_commit, offset_commit_request})
end
Expand Down
309 changes: 309 additions & 0 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
defmodule KafkaEx.ConsumerGroup do
@moduledoc """
A process that manages membership in a Kafka consumer group.

Consumers in a consumer group coordinate with each other through a Kafka broker to distribute the
work of consuming one or several topics without any overlap. This is facilitated by the [Kafka
client-side assignment
protocol](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal).

Any time group membership changes (a member joins or leaves the group), a Kafka broker initiates
group synchronization by asking one of the group members (the leader) to provide partition
assignments for the whole group. Partition assignment is handled by the
`c:KafkaEx.GenConsumer.assign_partitions/2` callback of the provided consumer module.

A `ConsumerGroup` process is responsible for:

1. Maintaining membership in a Kafka consumer group.
2. Determining partition assignments if elected as the group leader.
3. Launching and terminating `GenConsumer` processes based on its assigned partitions.

To use a `ConsumerGroup`, a developer must define a module that implements the
`KafkaEx.GenConsumer` behaviour and start a `ConsumerGroup` with that module.

## Example

The following consumer prints each message with the name of the node that's consuming the message:

```
defmodule DistributedConsumer do
use KafkaEx.GenConsumer

def handle_message(%Message{value: message}, state) do
IO.puts(to_string(node()) <> ": " <> inspect(message))
{:ack, state}
end
end

# use DistributedConsumer in a consumer group
{:ok, pid} = KafkaEx.ConsumerGroup.start_link(DistributedConsumer, "test_group", ["test_topic"])
```

Running this on multiple nodes might display the following:

```txt
node1@host: "messages"
node2@host: "on"
node2@host: "multiple"
node1@host: "nodes"
```

It is not necessary for the nodes to be connected, because `ConsumerGroup` uses Kafka's built-in
group coordination protocol.
"""

use GenServer

alias KafkaEx.Config
alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest
alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse
alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest
alias KafkaEx.Protocol.LeaveGroup.Response, as: LeaveGroupResponse
alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
alias KafkaEx.Protocol.Metadata.TopicMetadata
alias KafkaEx.Protocol.Metadata.PartitionMetadata
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse

require Logger

defmodule State do
@moduledoc false
defstruct [
:worker_name,
:heartbeat_interval,
:session_timeout,
:consumer_module,
:consumer_opts,
:group_name,
:topics,
:partitions,
:member_id,
:generation_id,
:assignments,
:consumer_pid,
]
end

@heartbeat_interval 5_000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call these @default_heartbeat_interval and @default_session_timeout just to be really clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@session_timeout 30_000

# Client API

@doc """
Starts a `ConsumerGroup` process linked to the current process.

This can be used to start a `ConsumerGroup` as part of a supervision tree.

`module` is a module that implements the `KafkaEx.GenConsumer` behaviour. `group_name` is the name
of the consumer group. `topics` is a list of topics that the consumer group should consume from.
`opts` can be any options accepted by `GenConsumer` or `GenServer`.

### Return Values

This function has the same return values as `GenServer.start_link/3`.

If the consumer group is successfully created and initialized, this function returns `{:ok, pid}`,
where `pid` is the PID of the consumer group process.
"""
@spec start_link(module, binary, [binary], KafkaEx.GenConsumer.options) :: GenServer.on_start
def start_link(consumer_module, group_name, topics, opts \\ []) do
{server_opts, consumer_opts} = Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt])

GenServer.start_link(__MODULE__, {consumer_module, group_name, topics, consumer_opts}, server_opts)
end

# GenServer callbacks

def init({consumer_module, group_name, topics, opts}) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
heartbeat_interval = Keyword.get(opts, :heartbeat_interval, Application.get_env(:kafka_ex, :heartbeat_interval, @heartbeat_interval))
session_timeout = Keyword.get(opts, :session_timeout, Application.get_env(:kafka_ex, :session_timeout, @session_timeout))

consumer_opts = Keyword.drop(opts, [:heartbeat_interval, :session_timeout])

state = %State{
worker_name: worker_name,
heartbeat_interval: heartbeat_interval,
session_timeout: session_timeout,
consumer_module: consumer_module,
consumer_opts: consumer_opts,
group_name: group_name,
topics: topics,
member_id: "",
}

Process.flag(:trap_exit, true)

{:ok, state, 0}
end

def handle_info(:timeout, %State{generation_id: nil, member_id: ""} = state) do
new_state = join(state)

{:noreply, new_state, new_state.heartbeat_interval}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to do this with a timer. Timeouts get canceled if the process receives any messages (call, cast, info, etc). You can use :timer.send_after/2 or :timer.send_interval/2 and hold onto the timer reference in the state in case it needs to be canceled. send_interval is fine as long as the action being done is much shorter than the interval between calls, otherwise you might need to use send_after to make sure you don't end up with a process mailbox backlog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. That seems much cleaner than using GenServer timeouts.

end

def handle_info(:timeout, %State{} = state) do
new_state = heartbeat(state)

{:noreply, new_state, new_state.heartbeat_interval}
end

def handle_info({:EXIT, pid, reason}, %State{consumer_pid: pid} = state) do
new_state = %State{state | consumer_pid: nil}

{:stop, reason, new_state}
end

def handle_info({:EXIT, _pid, _reason}, %State{} = state) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will ever match unless you change the previous clause to, say, when is_pid(pid) so that it only matches when pid is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous clause uses pid twice so it only matches when pid matches consumer_pid (the GenConsumer.Supervisor). I think this clause was matching exit signals from the supervisor that was shutdown gracefully during a rebalance (because a new supervisor is assigned to consumer_pid).

{:noreply, state, state.heartbeat_interval}
end

def terminate(_reason, %State{} = state) do
leave(state)
end

# Helpers

defp join(%State{worker_name: worker_name, session_timeout: session_timeout, group_name: group_name, topics: topics, member_id: member_id} = state) do
join_request = %JoinGroupRequest{
group_name: group_name,
member_id: member_id,
topics: topics,
session_timeout: session_timeout,
}

join_response = KafkaEx.join_group(join_request, worker_name: worker_name, timeout: session_timeout + 5000)
new_state = %State{state | member_id: join_response.member_id, generation_id: join_response.generation_id}

Logger.debug("Joined consumer group #{group_name}")

if join_response.member_id == join_response.leader_id do
sync_leader(new_state, join_response.members)
else
sync_follower(new_state)
end
end

defp sync_leader(%State{worker_name: worker_name, topics: topics, partitions: nil} = state, members) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments to describe what each of these functions is doing? The logic is fairly involved and that would help a lot with understanding the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

%MetadataResponse{topic_metadatas: topic_metadatas} = KafkaEx.metadata(worker_name: worker_name)

partitions = Enum.flat_map(topics, fn (topic) ->
%TopicMetadata{error_code: :no_error, partition_metadatas: partition_metadatas} = Enum.find(topic_metadatas, &(&1.topic == topic))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be out of scope but it might clean this up a little if there was a function like MetadataResponse.for_topic(metadata_response, topic)


Enum.map(partition_metadatas, fn (%PartitionMetadata{error_code: :no_error, partition_id: partition_id}) ->
{topic, partition_id}
end)
end)

sync_leader(%State{state | partitions: partitions}, members)
end

defp sync_leader(%State{worker_name: worker_name, session_timeout: session_timeout,
group_name: group_name, generation_id: generation_id, member_id: member_id} = state, members) do
assignments = assign_partitions(state, members)

sync_request = %SyncGroupRequest{
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
assignments: assignments,
}

sync_request
|> KafkaEx.sync_group(worker_name: worker_name, timeout: session_timeout + 5000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5000 should get a module attribute or var so that it isn't a magic number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

|> update_assignments(state)
end

defp sync_follower(%State{worker_name: worker_name, session_timeout: session_timeout,
group_name: group_name, generation_id: generation_id, member_id: member_id} = state) do
sync_request = %SyncGroupRequest{
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
assignments: [],
}

sync_request
|> KafkaEx.sync_group(timeout: session_timeout + 5000, worker_name: worker_name)
|> update_assignments(state)
end

defp update_assignments(%SyncGroupResponse{error_code: :rebalance_in_progress}, %State{} = state), do: rebalance(state)
defp update_assignments(%SyncGroupResponse{error_code: :no_error, assignments: assignments}, %State{} = state) do
start_consumer(state, assignments)
end

defp heartbeat(%State{worker_name: worker_name, group_name: group_name, generation_id: generation_id, member_id: member_id} = state) do
heartbeat_request = %HeartbeatRequest{
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
}

case KafkaEx.heartbeat(heartbeat_request, worker_name: worker_name) do
%HeartbeatResponse{error_code: :no_error} ->
state

%HeartbeatResponse{error_code: :rebalance_in_progress} ->
rebalance(state)
end
end

defp rebalance(%State{} = state) do
state
|> stop_consumer()
|> join()
end

defp leave(%State{worker_name: worker_name, group_name: group_name, member_id: member_id} = state) do
stop_consumer(state)

leave_request = %LeaveGroupRequest{
group_name: group_name,
member_id: member_id,
}

%LeaveGroupResponse{error_code: :no_error} = KafkaEx.leave_group(leave_request, worker_name: worker_name)

Logger.debug("Left consumer group #{group_name}")
end

defp start_consumer(%State{consumer_module: consumer_module, consumer_opts: consumer_opts,
group_name: group_name, consumer_pid: nil} = state, assignments) do
assignments =
Enum.flat_map(assignments, fn ({topic, partition_ids}) ->
Enum.map(partition_ids, &({topic, &1}))
end)

{:ok, pid} = KafkaEx.GenConsumer.Supervisor.start_link(consumer_module, group_name, assignments, consumer_opts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to have this already started and awaiting dispatch. I'll address this in an overall comment.


%State{state | assignments: assignments, consumer_pid: pid}
end

defp stop_consumer(%State{consumer_pid: nil} = state), do: state
defp stop_consumer(%State{consumer_pid: pid} = state) when is_pid(pid) do
:ok = Supervisor.stop(pid)
%State{state | consumer_pid: nil}
end

defp assign_partitions(%State{consumer_module: consumer_module, partitions: partitions}, members) do
assignments =
consumer_module.assign_partitions(members, partitions)
|> Enum.map(fn ({member, topic_partitions}) ->
assigns =
topic_partitions
|> Enum.group_by(&(elem(&1, 0)), &(elem(&1, 1)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit dense - could it be split to a function that has a name indicating what it's doing? Maybe just |> Enum.map(fn({k, v}) -> {k, [v]} end) (if I'm reading correctly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see what I can do to make it easier to follow. This is transforming a list of partitions into the format expected by SyncGroupRequest. That is [{topic1, part1}, {topic1, part2}, {topic2, part1}] gets transformed into [{topic1, [part1, part2], {topic2, part1}].

|> Enum.into([])

{member, assigns}
end)
|> Map.new

Enum.map(members, fn (member) ->
{member, Map.get(assignments, member, [])}
end)
end
end
Loading