From ba1c82e97b09c221781dcd77342db6c23c265693 Mon Sep 17 00:00:00 2001 From: David Cuddeback Date: Thu, 2 Feb 2017 16:56:06 -0800 Subject: [PATCH] implements consumer groups --- lib/kafka_ex.ex | 50 ++- lib/kafka_ex/consumer_group.ex | 309 ++++++++++++++ lib/kafka_ex/gen_consumer.ex | 474 ++++++++++++++++++++++ lib/kafka_ex/protocol/heartbeat.ex | 22 +- lib/kafka_ex/protocol/join_group.ex | 14 +- lib/kafka_ex/protocol/leave_group.ex | 21 +- lib/kafka_ex/protocol/sync_group.ex | 37 +- lib/kafka_ex/server.ex | 32 +- lib/kafka_ex/server_0_p_8_p_0.ex | 8 +- lib/kafka_ex/server_0_p_8_p_2.ex | 8 +- lib/kafka_ex/server_0_p_9_p_0.ex | 32 +- test/integration/server0_p_9_p_0_test.exs | 78 +++- test/protocol/heartbeat_test.exs | 8 +- test/protocol/join_group_test.exs | 4 +- test/protocol/leave_group_test.exs | 7 +- test/protocol/sync_group_test.exs | 20 +- 16 files changed, 1036 insertions(+), 88 deletions(-) create mode 100644 lib/kafka_ex/consumer_group.ex create mode 100644 lib/kafka_ex/gen_consumer.ex diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index ea6241a0..24f1b29a 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -6,6 +6,12 @@ defmodule KafkaEx do alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest + alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest + alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse + alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest + alias KafkaEx.Protocol.JoinGroup.Response, as: JoinGroupResponse + alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest + alias KafkaEx.Protocol.LeaveGroup.Response, as: LeaveGroupResponse alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse alias KafkaEx.Protocol.Offset.Response, as: OffsetResponse alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest @@ -14,6 +20,8 @@ defmodule KafkaEx do alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest alias KafkaEx.Protocol.Produce.Message + alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest + alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse alias KafkaEx.Server @type uri() :: [{binary|char_list, number}] @@ -74,6 +82,46 @@ defmodule KafkaEx do Server.call(worker, :consumer_group) end + @doc """ + Sends a request to join a consumer group. + """ + @spec join_group(JoinGroupRequest.t, Keyword.t) :: JoinGroupResponse.t + def join_group(request, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + timeout = Keyword.get(opts, :timeout) + Server.call(worker_name, {:join_group, request, timeout}, opts) + end + + @doc """ + Sends a request to synchronize with a consumer group. + """ + @spec sync_group(SyncGroupRequest.t, Keyword.t) :: SyncGroupResponse.t + def sync_group(request, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + timeout = Keyword.get(opts, :timeout) + Server.call(worker_name, {:sync_group, request, timeout}, opts) + end + + @doc """ + Sends a request to leave a consumer group. + """ + @spec leave_group(LeaveGroupRequest.t, Keyword.t) :: LeaveGroupResponse.t + def leave_group(request, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + timeout = Keyword.get(opts, :timeout) + Server.call(worker_name, {:leave_group, request, timeout}, opts) + end + + @doc """ + Sends a heartbeat to maintain membership in a consumer group. + """ + @spec heartbeat(HeartbeatRequest.t, Keyword.t) :: HeartbeatResponse.t + def heartbeat(request, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + timeout = Keyword.get(opts, :timeout) + Server.call(worker_name, {:heartbeat, request, timeout}, opts) + end + @doc """ Return metadata for the given topic; returns for all topics if topic is empty string @@ -196,7 +244,7 @@ defmodule KafkaEx do }, opts) end - @spec offset_commit(atom, OffsetCommitRequest.t) :: OffsetCommitResponse.t + @spec offset_commit(atom, OffsetCommitRequest.t) :: [OffsetCommitResponse.t] def offset_commit(worker_name, offset_commit_request) do Server.call(worker_name, {:offset_commit, offset_commit_request}) end diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex new file mode 100644 index 00000000..4115a7b7 --- /dev/null +++ b/lib/kafka_ex/consumer_group.ex @@ -0,0 +1,309 @@ +defmodule KafkaEx.ConsumerGroup do + @moduledoc """ + A process that manages membership in a Kafka consumer group. + + Consumers in a consumer group coordinate with each other through a Kafka broker to distribute the + work of consuming one or several topics without any overlap. This is facilitated by the [Kafka + client-side assignment + protocol](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal). + + Any time group membership changes (a member joins or leaves the group), a Kafka broker initiates + group synchronization by asking one of the group members (the leader) to provide partition + assignments for the whole group. Partition assignment is handled by the + `c:KafkaEx.GenConsumer.assign_partitions/2` callback of the provided consumer module. + + A `ConsumerGroup` process is responsible for: + + 1. Maintaining membership in a Kafka consumer group. + 2. Determining partition assignments if elected as the group leader. + 3. Launching and terminating `GenConsumer` processes based on its assigned partitions. + + To use a `ConsumerGroup`, a developer must define a module that implements the + `KafkaEx.GenConsumer` behaviour and start a `ConsumerGroup` with that module. + + ## Example + + The following consumer prints each message with the name of the node that's consuming the message: + + ``` + defmodule DistributedConsumer do + use KafkaEx.GenConsumer + + def handle_message(%Message{value: message}, state) do + IO.puts(to_string(node()) <> ": " <> inspect(message)) + {:ack, state} + end + end + + # use DistributedConsumer in a consumer group + {:ok, pid} = KafkaEx.ConsumerGroup.start_link(DistributedConsumer, "test_group", ["test_topic"]) + ``` + + Running this on multiple nodes might display the following: + + ```txt + node1@host: "messages" + node2@host: "on" + node2@host: "multiple" + node1@host: "nodes" + ``` + + It is not necessary for the nodes to be connected, because `ConsumerGroup` uses Kafka's built-in + group coordination protocol. + """ + + use GenServer + + alias KafkaEx.Config + alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest + alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest + alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse + alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest + alias KafkaEx.Protocol.LeaveGroup.Response, as: LeaveGroupResponse + alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse + alias KafkaEx.Protocol.Metadata.TopicMetadata + alias KafkaEx.Protocol.Metadata.PartitionMetadata + alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest + alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse + + require Logger + + defmodule State do + @moduledoc false + defstruct [ + :worker_name, + :heartbeat_interval, + :session_timeout, + :consumer_module, + :consumer_opts, + :group_name, + :topics, + :partitions, + :member_id, + :generation_id, + :assignments, + :consumer_pid, + ] + end + + @heartbeat_interval 5_000 + @session_timeout 30_000 + + # Client API + + @doc """ + Starts a `ConsumerGroup` process linked to the current process. + + This can be used to start a `ConsumerGroup` as part of a supervision tree. + + `module` is a module that implements the `KafkaEx.GenConsumer` behaviour. `group_name` is the name + of the consumer group. `topics` is a list of topics that the consumer group should consume from. + `opts` can be any options accepted by `GenConsumer` or `GenServer`. + + ### Return Values + + This function has the same return values as `GenServer.start_link/3`. + + If the consumer group is successfully created and initialized, this function returns `{:ok, pid}`, + where `pid` is the PID of the consumer group process. + """ + @spec start_link(module, binary, [binary], KafkaEx.GenConsumer.options) :: GenServer.on_start + def start_link(consumer_module, group_name, topics, opts \\ []) do + {server_opts, consumer_opts} = Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt]) + + GenServer.start_link(__MODULE__, {consumer_module, group_name, topics, consumer_opts}, server_opts) + end + + # GenServer callbacks + + def init({consumer_module, group_name, topics, opts}) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + heartbeat_interval = Keyword.get(opts, :heartbeat_interval, Application.get_env(:kafka_ex, :heartbeat_interval, @heartbeat_interval)) + session_timeout = Keyword.get(opts, :session_timeout, Application.get_env(:kafka_ex, :session_timeout, @session_timeout)) + + consumer_opts = Keyword.drop(opts, [:heartbeat_interval, :session_timeout]) + + state = %State{ + worker_name: worker_name, + heartbeat_interval: heartbeat_interval, + session_timeout: session_timeout, + consumer_module: consumer_module, + consumer_opts: consumer_opts, + group_name: group_name, + topics: topics, + member_id: "", + } + + Process.flag(:trap_exit, true) + + {:ok, state, 0} + end + + def handle_info(:timeout, %State{generation_id: nil, member_id: ""} = state) do + new_state = join(state) + + {:noreply, new_state, new_state.heartbeat_interval} + end + + def handle_info(:timeout, %State{} = state) do + new_state = heartbeat(state) + + {:noreply, new_state, new_state.heartbeat_interval} + end + + def handle_info({:EXIT, pid, reason}, %State{consumer_pid: pid} = state) do + new_state = %State{state | consumer_pid: nil} + + {:stop, reason, new_state} + end + + def handle_info({:EXIT, _pid, _reason}, %State{} = state) do + {:noreply, state, state.heartbeat_interval} + end + + def terminate(_reason, %State{} = state) do + leave(state) + end + + # Helpers + + defp join(%State{worker_name: worker_name, session_timeout: session_timeout, group_name: group_name, topics: topics, member_id: member_id} = state) do + join_request = %JoinGroupRequest{ + group_name: group_name, + member_id: member_id, + topics: topics, + session_timeout: session_timeout, + } + + join_response = KafkaEx.join_group(join_request, worker_name: worker_name, timeout: session_timeout + 5000) + new_state = %State{state | member_id: join_response.member_id, generation_id: join_response.generation_id} + + Logger.debug("Joined consumer group #{group_name}") + + if join_response.member_id == join_response.leader_id do + sync_leader(new_state, join_response.members) + else + sync_follower(new_state) + end + end + + defp sync_leader(%State{worker_name: worker_name, topics: topics, partitions: nil} = state, members) do + %MetadataResponse{topic_metadatas: topic_metadatas} = KafkaEx.metadata(worker_name: worker_name) + + partitions = Enum.flat_map(topics, fn (topic) -> + %TopicMetadata{error_code: :no_error, partition_metadatas: partition_metadatas} = Enum.find(topic_metadatas, &(&1.topic == topic)) + + Enum.map(partition_metadatas, fn (%PartitionMetadata{error_code: :no_error, partition_id: partition_id}) -> + {topic, partition_id} + end) + end) + + sync_leader(%State{state | partitions: partitions}, members) + end + + defp sync_leader(%State{worker_name: worker_name, session_timeout: session_timeout, + group_name: group_name, generation_id: generation_id, member_id: member_id} = state, members) do + assignments = assign_partitions(state, members) + + sync_request = %SyncGroupRequest{ + group_name: group_name, + member_id: member_id, + generation_id: generation_id, + assignments: assignments, + } + + sync_request + |> KafkaEx.sync_group(worker_name: worker_name, timeout: session_timeout + 5000) + |> update_assignments(state) + end + + defp sync_follower(%State{worker_name: worker_name, session_timeout: session_timeout, + group_name: group_name, generation_id: generation_id, member_id: member_id} = state) do + sync_request = %SyncGroupRequest{ + group_name: group_name, + member_id: member_id, + generation_id: generation_id, + assignments: [], + } + + sync_request + |> KafkaEx.sync_group(timeout: session_timeout + 5000, worker_name: worker_name) + |> update_assignments(state) + end + + defp update_assignments(%SyncGroupResponse{error_code: :rebalance_in_progress}, %State{} = state), do: rebalance(state) + defp update_assignments(%SyncGroupResponse{error_code: :no_error, assignments: assignments}, %State{} = state) do + start_consumer(state, assignments) + end + + defp heartbeat(%State{worker_name: worker_name, group_name: group_name, generation_id: generation_id, member_id: member_id} = state) do + heartbeat_request = %HeartbeatRequest{ + group_name: group_name, + member_id: member_id, + generation_id: generation_id, + } + + case KafkaEx.heartbeat(heartbeat_request, worker_name: worker_name) do + %HeartbeatResponse{error_code: :no_error} -> + state + + %HeartbeatResponse{error_code: :rebalance_in_progress} -> + rebalance(state) + end + end + + defp rebalance(%State{} = state) do + state + |> stop_consumer() + |> join() + end + + defp leave(%State{worker_name: worker_name, group_name: group_name, member_id: member_id} = state) do + stop_consumer(state) + + leave_request = %LeaveGroupRequest{ + group_name: group_name, + member_id: member_id, + } + + %LeaveGroupResponse{error_code: :no_error} = KafkaEx.leave_group(leave_request, worker_name: worker_name) + + Logger.debug("Left consumer group #{group_name}") + end + + defp start_consumer(%State{consumer_module: consumer_module, consumer_opts: consumer_opts, + group_name: group_name, consumer_pid: nil} = state, assignments) do + assignments = + Enum.flat_map(assignments, fn ({topic, partition_ids}) -> + Enum.map(partition_ids, &({topic, &1})) + end) + + {:ok, pid} = KafkaEx.GenConsumer.Supervisor.start_link(consumer_module, group_name, assignments, consumer_opts) + + %State{state | assignments: assignments, consumer_pid: pid} + end + + defp stop_consumer(%State{consumer_pid: nil} = state), do: state + defp stop_consumer(%State{consumer_pid: pid} = state) when is_pid(pid) do + :ok = Supervisor.stop(pid) + %State{state | consumer_pid: nil} + end + + defp assign_partitions(%State{consumer_module: consumer_module, partitions: partitions}, members) do + assignments = + consumer_module.assign_partitions(members, partitions) + |> Enum.map(fn ({member, topic_partitions}) -> + assigns = + topic_partitions + |> Enum.group_by(&(elem(&1, 0)), &(elem(&1, 1))) + |> Enum.into([]) + + {member, assigns} + end) + |> Map.new + + Enum.map(members, fn (member) -> + {member, Map.get(assignments, member, [])} + end) + end +end diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex new file mode 100644 index 00000000..2f1c4ced --- /dev/null +++ b/lib/kafka_ex/gen_consumer.ex @@ -0,0 +1,474 @@ +defmodule KafkaEx.GenConsumer do + @moduledoc """ + A behaviour module for implementing a Kafka consumer. + + A `GenConsumer` is an Elixir process that consumes messages from Kafka. A single `GenConsumer` + process consumes from a single partition of a Kafka topic. Several `GenConsumer` processes can be + used to consume from multiple partitions or even multiple topics. Partition assignments for a + group of `GenConsumer`s can be defined manually using `KafkaEx.GenConsumer.Supervisor` or + coordinated across a cluster of nodes using `KafkaEx.ConsumerGroup`. + + ## Example + + The `GenConsumer` behaviour abstracts common Kafka consumer interactions. `GenConsumer` will take + care of the details of determining a starting offset, fetching messages from a Kafka broker, and + committing offsets for consumed messages. Developers are only required to implement + `c:handle_message/2` to process messages from the queue. + + The following is a minimal example that logs each message as it's consumed: + + ``` + defmodule ExampleGenConsumer do + use KafkaEx.GenConsumer + + require Logger + + def handle_message(%Message{value: message}, state) do + Logger.debug("message: " <> inspect(message)) + {:ack, state} + end + end + ``` + + `c:handle_message/2` will be called for each message that's fetched from a Kafka broker. In this + example, since `c:handle_message/2` always returns `{:ack, new_state}`, the message offsets will + be auto-committed. + + ## Auto-Committing Offsets + + `GenConsumer` manages a consumer's offsets by committing the offsets of acknowledged messages. + Messages are acknowledged by returning `{:ack, new_state}` from `c:handle_message/2`. + Acknowledged messages are not committed immediately. To avoid excessive network calls, + acknowledged messages may be batched and committed periodically. Offsets are also committed when a + `GenServer` is terminated. + + How often a `GenConsumer` auto-commits offsets is controlled by the two configuration values + `:commit_interval` and `:commit_threshold`. These can be set globally in the `:kafka_ex` app's + environment or on a per-consumer basis by passing options to `start_link/5`: + + ``` + # In config/config.exs + config :kafka_ex, + commit_interval: 5000, + commit_threshold: 100 + + # As options to start_link/5 + KafkaEx.GenConsumer.start_link(MyConsumer, "my_group", "topic", 0, + commit_interval: 5000, + commit_threshold: 100) + ``` + + * `:commit_interval` is the maximum time (in milliseconds) that a `GenConsumer` will delay + committing the offset for an acknowledged message. + * `:commit_threshold` is the maximum number of acknowledged messages that a `GenConsumer` will + allow to be uncommitted before triggering an auto-commit. + + For low-volume topics, `:commit_interval` is the dominant factor for how often a `GenConsumer` + auto-commits. For high-volume topics, `:commit_threshold` is the dominant factor. + + ## Callbacks + + There are three callbacks that are required to be implemented in a `GenConsumer`. By adding `use + KafkaEx.GenServer` to a module, two of the callbacks will be defined with default behavior, + leaving you to implement `c:handle_message/2`. + + ## Integration with OTP + + A `GenConsumer` is a specialized `GenServer`. It can be supervised, registered, and debugged the + same as any other `GenServer`. However, its arguments for `c:GenServer.init/1` are unspecified, so + `start_link/5` should be used to start a `GenConsumer` process instead of `GenServer` primitives. + + ## Testing + + A `GenConsumer` can be unit-tested without a running Kafka broker by sending messages directly to + its `c:handle_message/2` function. The following recipe can be used as a starting point when + testing a `GenConsumer`: + + ``` + defmodule ExampleGenConsumerTest do + use ExUnit.Case, async: true + + alias KafkaEx.Protocol.Fetch.Message + + @topic "topic" + @partition 0 + + setup do + {:ok, state} = ExampleGenConsumer.init(@topic, @partition) + {:ok, %{state: state}} + end + + test "it acks a message", %{state: state} do + message = %Message{offset: 0, value: "hello"} + {response, _new_state} = ExampleGenConsumer.handle_message(message, state) + assert response == :ack + end + end + ``` + """ + + use GenServer + + alias KafkaEx.Config + alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest + alias KafkaEx.Protocol.OffsetCommit.Response, as: OffsetCommitResponse + alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest + alias KafkaEx.Protocol.OffsetFetch.Response, as: OffsetFetchResponse + alias KafkaEx.Protocol.Offset.Response, as: OffsetResponse + alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse + alias KafkaEx.Protocol.Fetch.Message + + require Logger + + @typedoc """ + The ID of a member of a consumer group, assigned by a Kafka broker. + """ + @type member_id :: binary + + @typedoc """ + The name of a Kafka topic. + """ + @type topic :: binary + + @typedoc """ + The ID of a partition of a Kafka topic. + """ + @type partition_id :: integer + + @typedoc """ + A partition of a particular Kafka topic. + """ + @type partition :: {topic, partition_id} + + @typedoc """ + Option values used when starting a `GenConsumer`. + """ + @type option :: {:worker_name, atom | pid} + | {:commit_interval, non_neg_integer} + | {:commit_threshold, non_neg_integer} + + @typedoc """ + Options used when starting a `GenConsumer`. + """ + @type options :: [option | GenServer.option] + + @doc """ + Invoked when the server is started. `start_link/5` will block until it returns. + + `topic` and `partition` are the arguments passed to `start_link/5`. They identify the Kafka + partition that the `GenConsumer` will consume from. + + Returning `{:ok, state}` will cause `start_link/5` to return `{:ok, pid}` and the process to start + consuming from its assigned partition. `state` becomes the consumer's state. + + Any other return value will cause the `start_link/5` to return `{:error, error}` and the process + to exit. + """ + @callback init(topic :: topic, partition :: partition_id) :: {:ok, state :: term} + + @doc """ + Invoked for each message consumed from a Kafka queue. + + `message` is a message fetched from a Kafka broker and `state` is the current state of the + `GenConsumer`. + + Returning `{:ack, new_state}` acknowledges `message` and continues to consume from the Kafka queue + with new state `new_state`. Acknowledged messages will be auto-committed (possibly at a later + time) based on the `:commit_interval` and `:commit_threshold` options. + + Returning `{:commit, new_state}` commits `message` synchronously before continuing to consume from + the Kafka queue with new state `new_state`. Committing a message synchronously means that no more + messages will be consumed until the message's offset is committed. `:commit` should be used + sparingly, since committing every message synchronously would impact a consumer's performance and + could result in excessive network traffic. + """ + @callback handle_message(message :: Message.t, state :: term) :: {:ack, new_state :: term} + | {:commit, new_state :: term} + + @doc """ + Invoked to determine partition assignments for a coordinated consumer group. + + `members` is a list of member IDs and `partitions` is a list of partitions that need to be + assigned to a group member. + + The return value must be a map with member IDs as keys and a list of partition assignments as + values. For each member ID in the returned map, the assigned partitions will become the + `assignments` argument to `KafkaEx.GenConsumer.Supervisor.start_link/4` in the corresponding + member process. Any member that's omitted from the return value will not be assigned any + partitions. + + If this callback is not implemented, the default implementation by `use KafkaEx.GenConsumer` + implements a simple round-robin assignment. + + ### Example + + Given the following `members` and `partitions` to be assigned: + + ``` + members = ["member1", "member2", "member3"] + partitions = [{"topic", 0}, {"topic", 1}, {"topic", 2}] + ``` + + One possible assignment is as follows: + + ``` + ExampleGenConsumer.assign_partitions(members, partitions) + #=> %{"member1" => [{"topic", 0}, {"topic", 2}], "member2" => [{"topic", 1}]} + ``` + + In this case, the consumer group process for `"member1"` will launch two `GenConsumer` processes + (one for each of its assigned partitions), `"member2"` will launch one `GenConsumer` process, and + `"member3"` will launch no processes. + """ + @callback assign_partitions(members :: [member_id], partitions :: [partition]) :: %{member_id => [partition]} + + defmacro __using__(_opts) do + quote do + @behaviour KafkaEx.GenConsumer + alias KafkaEx.Protocol.Fetch.Message + + def init(_topic, _partition) do + {:ok, nil} + end + + def assign_partitions(members, partitions) do + Stream.cycle(members) + |> Enum.zip(partitions) + |> Enum.group_by(&(elem(&1, 0)), &(elem(&1, 1))) + end + + defoverridable [init: 2, assign_partitions: 2] + end + end + + defmodule Supervisor do + @moduledoc """ + A supervisor for managing `GenConsumer` processes that are part of a consumer group. + + The supervisor will launch individual `GenConsumer` processes for each partition given by the + `partitions` argument to `start_link/4`. When terminated, each of the supervisor's child + processes will commit its latest offset before terminating. + + This module manages a static list of consumer processes. For dynamically distributing consumers + in a consumer group across a cluster of nodes, see `KafkaEx.ConsumerGroup`. + """ + + use Elixir.Supervisor + + @doc """ + Starts a `GenConsumer.Supervisor` process linked to the current process. + + `module` is a module that implements the `GenConsumer` behaviour. `group_name` is the name of a + consumer group, and `assignments` is a list of partitions for the `GenConsumer`s to consume. + `opts` accepts the same options as `KafkaEx.GenConsumer.start_link/5`. + + ### Return Values + + This function has the same return values as `Supervisor.start_link/3`. + + If the supervisor and its consumers are successfully created, this function returns `{:ok, + pid}`, where `pid` is the PID of the supervisor. + """ + @spec start_link(module, binary, [KafkaEx.GenConsumer.partition], KafkaEx.GenConsumer.options) :: Elixir.Supervisor.on_start + def start_link(consumer_module, group_name, assignments, opts \\ []) do + case Elixir.Supervisor.start_link(__MODULE__, {consumer_module, group_name, assignments, opts}) do + {:ok, pid} -> + Enum.each(assignments, fn ({topic, partition}) -> + case Elixir.Supervisor.start_child(pid, [topic, partition, opts]) do + {:ok, _child} -> nil + {:ok, _child, _info} -> nil + end + end) + + {:ok, pid} + + error -> + error + end + end + + def init({consumer_module, group_name, _assignments, _opts}) do + children = [ + worker(KafkaEx.GenConsumer, [consumer_module, group_name]) + ] + + supervise(children, strategy: :simple_one_for_one) + end + end + + defmodule State do + @moduledoc false + defstruct [ + :consumer_module, + :consumer_state, + :commit_interval, + :commit_threshold, + :worker_name, + :group, + :topic, + :partition, + :current_offset, + :committed_offset, + :acked_offset, + :last_commit, + ] + end + + @commit_interval 5_000 + @commit_threshold 100 + + # Client API + + @doc """ + Starts a `GenConsumer` process linked to the current process. + + This can be used to start the `GenConsumer` as part of a supervision tree. + + Once the consumer has been started, the `c:init/2` function of the given `consumer_module` is + called with the given `topic` and `partition`. `group_name` is the consumer group name that will + be used for managing consumer offsets. + + ### Options + + * `:commit_interval` - the interval in milliseconds that the consumer will wait to commit + acknowledged messages. If not present, the `:commit_interval` environment value is used. + * `:commit_threshold` - the maximum number of messages that can be acknowledged without being + committed. If not present, the `:commit_threshold` environment value is used. + * `:worker_name` - the name of the `KafkaEx.Server` process to use for communicating with the + Kafka brokers. If not present, the default worker is used. + + Any valid options for `GenServer.start_link/3` can also be specified. + + ### Return Values + + This function has the same return values as `GenServer.start_link/3`. + + If the consumer is successfully created and initialized, this function returns `{:ok, pid}`, where + `pid` is the PID of the consumer process. + """ + @spec start_link(module, binary, topic, partition_id, options) :: GenServer.on_start + def start_link(consumer_module, group_name, topic, partition, opts \\ []) do + {server_opts, consumer_opts} = Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt]) + + GenServer.start_link(__MODULE__, {consumer_module, group_name, topic, partition, consumer_opts}, server_opts) + end + + # GenServer callbacks + + def init({consumer_module, group_name, topic, partition, opts}) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + commit_interval = Keyword.get(opts, :commit_interval, Application.get_env(:kafka_ex, :commit_interval, @commit_interval)) + commit_threshold = Keyword.get(opts, :commit_threshold, Application.get_env(:kafka_ex, :commit_threshold, @commit_threshold)) + + {:ok, consumer_state} = consumer_module.init(topic, partition) + + state = %State{ + consumer_module: consumer_module, + consumer_state: consumer_state, + commit_interval: commit_interval, + commit_threshold: commit_threshold, + worker_name: worker_name, + group: group_name, + topic: topic, + partition: partition, + } + + Process.flag(:trap_exit, true) + + {:ok, state, 0} + end + + def handle_info(:timeout, %State{current_offset: nil, last_commit: nil} = state) do + new_state = %State{load_offsets(state) | last_commit: :erlang.monotonic_time(:milli_seconds)} + + {:noreply, new_state, 0} + end + + def handle_info(:timeout, %State{} = state) do + new_state = consume(state) + + {:noreply, new_state, 0} + end + + def terminate(_reason, %State{} = state) do + commit(state) + end + + # Helpers + + defp consume(%State{worker_name: worker_name, topic: topic, partition: partition, current_offset: offset} = state) do + [%FetchResponse{topic: ^topic, partitions: [response = %{error_code: :no_error, partition: ^partition}]}] = + KafkaEx.fetch(topic, partition, offset: offset, auto_commit: false, worker_name: worker_name) + + case response do + %{last_offset: nil, message_set: []} -> + auto_commit(state) + + %{last_offset: _, message_set: messages} -> + Enum.reduce(messages, state, &handle_message/2) + end + end + + defp handle_message(%Message{offset: offset} = message, %State{consumer_module: consumer_module, consumer_state: consumer_state} = state) do + case consumer_module.handle_message(message, consumer_state) do + {:ack, new_state} -> + auto_commit %State{state | consumer_state: new_state, acked_offset: offset + 1, current_offset: offset + 1} + + {:commit, new_state} -> + commit %State{state | consumer_state: new_state, acked_offset: offset + 1, current_offset: offset + 1} + end + end + + defp auto_commit(%State{acked_offset: acked, committed_offset: committed, commit_threshold: threshold, + last_commit: last_commit, commit_interval: interval} = state) do + case acked - committed do + 0 -> + %State{state | last_commit: :erlang.monotonic_time(:milli_seconds)} + + n when n >= threshold -> + commit(state) + + _ -> + if :erlang.monotonic_time(:milli_seconds) - last_commit >= interval do + commit(state) + else + state + end + end + end + + defp commit(%State{acked_offset: offset, committed_offset: offset} = state), do: state + defp commit(%State{worker_name: worker_name, group: group, topic: topic, partition: partition, acked_offset: offset} = state) do + request = %OffsetCommitRequest{ + consumer_group: group, + topic: topic, + partition: partition, + offset: offset, + } + + [%OffsetCommitResponse{topic: ^topic, partitions: [^partition]}] = + KafkaEx.offset_commit(worker_name, request) + + Logger.debug("Committed offset #{topic}/#{partition}@#{offset} for #{group}") + + %State{state | committed_offset: offset, last_commit: :erlang.monotonic_time(:milli_seconds)} + end + + defp load_offsets(%State{worker_name: worker_name, group: group, topic: topic, partition: partition} = state) do + request = %OffsetFetchRequest{consumer_group: group, topic: topic, partition: partition} + + [%OffsetFetchResponse{topic: ^topic, partitions: [%{partition: ^partition, error_code: error_code, offset: offset}]}] = + KafkaEx.offset_fetch(worker_name, request) + + case error_code do + :no_error -> + %State{state | current_offset: offset, committed_offset: offset, acked_offset: offset} + + :unknown_topic_or_partition -> + [%OffsetResponse{topic: ^topic, partition_offsets: [%{partition: ^partition, error_code: :no_error, offset: [offset]}]}] = + KafkaEx.earliest_offset(topic, partition, worker_name) + + %State{state | current_offset: offset, committed_offset: offset, acked_offset: offset} + end + end +end diff --git a/lib/kafka_ex/protocol/heartbeat.ex b/lib/kafka_ex/protocol/heartbeat.ex index 4e8ca91c..6fb81590 100644 --- a/lib/kafka_ex/protocol/heartbeat.ex +++ b/lib/kafka_ex/protocol/heartbeat.ex @@ -3,6 +3,17 @@ defmodule KafkaEx.Protocol.Heartbeat do Implementation of the Kafka Hearbeat request and response APIs """ + defmodule Request do + @moduledoc false + defstruct group_name: nil, member_id: nil, generation_id: nil + + @type t :: %Request{ + group_name: binary, + member_id: binary, + generation_id: integer, + } + end + defmodule Response do @moduledoc false # We could just return the error code instead of having the struct, but this @@ -11,17 +22,16 @@ defmodule KafkaEx.Protocol.Heartbeat do @type t :: %Response{error_code: atom | integer} end - @spec create_request(integer, binary, binary, binary, integer) :: binary - def create_request(correlation_id, client_id, member_id, group_id, generation_id) do + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, request) do KafkaEx.Protocol.create_request(:heartbeat, correlation_id, client_id) <> - << byte_size(group_id) :: 16-signed, group_id :: binary, - generation_id :: 32-signed, - byte_size(member_id) :: 16-signed, member_id :: binary >> + << byte_size(request.group_name) :: 16-signed, request.group_name :: binary, + request.generation_id :: 32-signed, + byte_size(request.member_id) :: 16-signed, request.member_id :: binary >> end @spec parse_response(binary) :: Response.t def parse_response(<< _correlation_id :: 32-signed, error_code :: 16-signed >>) do %Response{error_code: KafkaEx.Protocol.error(error_code)} end - end diff --git a/lib/kafka_ex/protocol/join_group.ex b/lib/kafka_ex/protocol/join_group.ex index e9fa432d..06e1564a 100644 --- a/lib/kafka_ex/protocol/join_group.ex +++ b/lib/kafka_ex/protocol/join_group.ex @@ -4,19 +4,17 @@ defmodule KafkaEx.Protocol.JoinGroup do @moduledoc """ Implementation of the Kafka JoinGroup request and response APIs """ + @protocol_type "consumer" @strategy_name "assign" @metadata_version 0 defmodule Request do @moduledoc false - defstruct correlation_id: nil, - client_id: nil, member_id: nil, + defstruct member_id: nil, group_name: nil, topics: nil, session_timeout: nil @type t :: %Request{ - correlation_id: integer, - client_id: binary, member_id: binary, group_name: binary, topics: [binary], @@ -31,17 +29,15 @@ defmodule KafkaEx.Protocol.JoinGroup do leader_id: binary, member_id: binary, members: [binary]} end - @spec create_request(Request.t) :: binary - def create_request(join_group_req) do + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, %Request{} = join_group_req) do metadata = << @metadata_version :: 16-signed, length(join_group_req.topics) :: 32-signed, topic_data(join_group_req.topics) :: binary, 0 :: 32-signed >> - KafkaEx.Protocol.create_request( - :join_group, join_group_req.correlation_id, join_group_req.client_id - ) <> + KafkaEx.Protocol.create_request(:join_group, correlation_id, client_id) <> << byte_size(join_group_req.group_name) :: 16-signed, join_group_req.group_name :: binary, join_group_req.session_timeout :: 32-signed, byte_size(join_group_req.member_id) :: 16-signed, join_group_req.member_id :: binary, diff --git a/lib/kafka_ex/protocol/leave_group.ex b/lib/kafka_ex/protocol/leave_group.ex index 11576e81..6a4b9ba5 100644 --- a/lib/kafka_ex/protocol/leave_group.ex +++ b/lib/kafka_ex/protocol/leave_group.ex @@ -1,12 +1,27 @@ defmodule KafkaEx.Protocol.LeaveGroup do + defmodule Request do + @moduledoc false + defstruct group_name: nil, member_id: nil + + @type t :: %Request{ + group_name: binary, + member_id: binary, + } + end + defmodule Response do defstruct error_code: nil + + @type t :: %Response{ + error_code: atom | integer, + } end - def create_request(correlation_id, client_id, group_id, member_id) do + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, request) do KafkaEx.Protocol.create_request(:leave_group, correlation_id, client_id) <> - << byte_size(group_id) :: 16-signed, group_id :: binary, - byte_size(member_id) :: 16-signed, member_id :: binary >> + << byte_size(request.group_name) :: 16-signed, request.group_name :: binary, + byte_size(request.member_id) :: 16-signed, request.member_id :: binary >> end def parse_response(<< _correlation_id :: 32-signed, error_code :: 16-signed >>) do diff --git a/lib/kafka_ex/protocol/sync_group.ex b/lib/kafka_ex/protocol/sync_group.ex index efe5c06e..a94be427 100644 --- a/lib/kafka_ex/protocol/sync_group.ex +++ b/lib/kafka_ex/protocol/sync_group.ex @@ -4,6 +4,18 @@ defmodule KafkaEx.Protocol.SyncGroup do """ @member_assignment_version 0 + defmodule Request do + @moduledoc false + defstruct member_id: nil, group_name: nil, generation_id: nil, assignments: [] + + @type t :: %Request{ + member_id: binary, + group_name: binary, + generation_id: integer, + assignments: [{member :: binary, [{topic :: binary, partitions :: [integer]}]}], + } + end + defmodule Assignment do @moduledoc false defstruct topic: nil, partitions: [] @@ -16,23 +28,21 @@ defmodule KafkaEx.Protocol.SyncGroup do @type t :: %Response{error_code: atom | integer, assignments: [Assignment.t]} end - @spec create_request(integer, binary, binary, integer, binary, [{binary, [Assignment.t]}]) :: binary - def create_request(correlation_id, client_id, group_name, generation_id, member_id, assignments) do + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, %Request{} = request) do KafkaEx.Protocol.create_request(:sync_group, correlation_id, client_id) <> - << byte_size(group_name) :: 16-signed, group_name :: binary, - generation_id :: 32-signed, - byte_size(member_id) :: 16-signed, member_id :: binary, - length(assignments) :: 32-signed, group_assignment_data(assignments, "") :: binary + << byte_size(request.group_name) :: 16-signed, request.group_name :: binary, + request.generation_id :: 32-signed, + byte_size(request.member_id) :: 16-signed, request.member_id :: binary, + length(request.assignments) :: 32-signed, group_assignment_data(request.assignments, "") :: binary >> end @spec parse_response(binary) :: Response.t def parse_response(<< _correlation_id :: 32-signed, error_code :: 16-signed, - _member_assignment_len :: 32-signed, - @member_assignment_version :: 16-signed, - assignments_size :: 32-signed, rest :: binary >>) do - assignments = parse_assignments(assignments_size, rest, []) - %Response{error_code: KafkaEx.Protocol.error(error_code), assignments: assignments} + member_assignment_len :: 32-signed, + member_assignment :: size(member_assignment_len)-binary >>) do + %Response{error_code: KafkaEx.Protocol.error(error_code), assignments: parse_member_assignment(member_assignment)} end # Helper functions to create assignment data structure @@ -65,6 +75,11 @@ defmodule KafkaEx.Protocol.SyncGroup do # Helper functions to parse assignments + defp parse_member_assignment(<<>>), do: [] + defp parse_member_assignment(<< @member_assignment_version :: 16-signed, assignments_size :: 32-signed, rest :: binary >>) do + parse_assignments(assignments_size, rest, []) + end + defp parse_assignments(0, _rest, assignments), do: assignments defp parse_assignments(size, << topic_len :: 16-signed, topic :: size(topic_len)-binary, partition_len :: 32-signed, diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index eb69c1e4..8785e63f 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -4,6 +4,9 @@ defmodule KafkaEx.Server do """ alias KafkaEx.Protocol.ConsumerMetadata + alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest + alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest + alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest alias KafkaEx.Protocol.Metadata alias KafkaEx.Protocol.Metadata.Broker alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse @@ -12,6 +15,7 @@ defmodule KafkaEx.Server do alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest alias KafkaEx.Protocol.Produce alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest + alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest alias KafkaEx.Socket defmodule State do @@ -108,28 +112,28 @@ defmodule KafkaEx.Server do {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_join_group(topics :: [binary], session_timeout :: integer, state :: State.t) :: + @callback kafka_server_join_group(JoinGroupRequest.t, network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_sync_group(group_name :: binary, generation_id :: integer, member_id :: binary, assignments :: [binary] , state :: State.t) :: + @callback kafka_server_sync_group(SyncGroupRequest.t, network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_leave_group(group_name :: binary, member_id :: binary, state :: State.t) :: + @callback kafka_server_leave_group(LeaveGroupRequest.t, network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_heartbeat(group_name :: binary, generation_id :: integer, member_id :: binary, state :: State.t) :: + @callback kafka_server_heartbeat(HeartbeatRequest.t, network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:noreply, new_state} | @@ -237,20 +241,20 @@ defmodule KafkaEx.Server do kafka_server_metadata(topic, state) end - def handle_call({:join_group, topics, session_timeout}, _from, state) do - kafka_server_join_group(topics, session_timeout,state) + def handle_call({:join_group, request, network_timeout}, _from, state) do + kafka_server_join_group(request, network_timeout, state) end - def handle_call({:sync_group, group_name, generation_id, member_id, assignments}, _from, state) do - kafka_server_sync_group(group_name, generation_id, member_id, assignments, state) + def handle_call({:sync_group, request, network_timeout}, _from, state) do + kafka_server_sync_group(request, network_timeout, state) end - def handle_call({:leave_group, group_name, member_id}, _from, state) do - kafka_server_leave_group(group_name, member_id, state) + def handle_call({:leave_group, request, network_timeout}, _from, state) do + kafka_server_leave_group(request, network_timeout, state) end - def handle_call({:heartbeat, group_name, generation_id, member_id}, _from, state) do - kafka_server_heartbeat(group_name, generation_id, member_id, state) + def handle_call({:heartbeat, request, network_timeout}, _from, state) do + kafka_server_heartbeat(request, network_timeout, state) end def handle_call({:create_stream, handler, handler_init}, _from, state) do @@ -454,8 +458,8 @@ defmodule KafkaEx.Server do end) end - defp sync_timeout do - Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout) + defp sync_timeout(timeout \\ nil) do + timeout || Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout) end end end diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index 83c82c93..b24fceb0 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -5,8 +5,8 @@ defmodule KafkaEx.Server0P8P0 do # these functions aren't implemented for 0.8.0 @dialyzer [ - {:nowarn_function, kafka_server_heartbeat: 4}, - {:nowarn_function, kafka_server_sync_group: 5}, + {:nowarn_function, kafka_server_heartbeat: 3}, + {:nowarn_function, kafka_server_sync_group: 3}, {:nowarn_function, kafka_server_join_group: 3}, {:nowarn_function, kafka_server_leave_group: 3}, {:nowarn_function, kafka_server_update_consumer_metadata: 1}, @@ -62,9 +62,9 @@ defmodule KafkaEx.Server0P8P0 do def kafka_server_consumer_group(_state), do: raise "Consumer Group is not supported in 0.8.0 version of kafka" def kafka_server_consumer_group_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka" def kafka_server_join_group(_, _, _state), do: raise "Join Group is not supported in 0.8.0 version of kafka" - def kafka_server_sync_group(_, _, _, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" + def kafka_server_sync_group(_, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" - def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" + def kafka_server_heartbeat(_, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" def kafka_server_update_consumer_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka" def kafka_server_start_streaming(_, state = %State{event_pid: nil}) do diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 2677f792..8b4832d4 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -5,8 +5,8 @@ defmodule KafkaEx.Server0P8P2 do # these functions aren't implemented for 0.8.2 @dialyzer [ - {:nowarn_function, kafka_server_heartbeat: 4}, - {:nowarn_function, kafka_server_sync_group: 5}, + {:nowarn_function, kafka_server_heartbeat: 3}, + {:nowarn_function, kafka_server_sync_group: 3}, {:nowarn_function, kafka_server_join_group: 3}, {:nowarn_function, kafka_server_leave_group: 3} ] @@ -145,9 +145,9 @@ defmodule KafkaEx.Server0P8P2 do end def kafka_server_join_group(_, _, _state), do: raise "Join Group is not supported in 0.8.0 version of kafka" - def kafka_server_sync_group(_, _, _, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" + def kafka_server_sync_group(_, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" - def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" + def kafka_server_heartbeat(_, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" defp update_consumer_metadata(state), do: update_consumer_metadata(state, @retry_count, 0) defp update_consumer_metadata(state = %State{consumer_group: consumer_group}, 0, error_code) do diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index c8ba2041..76a520db 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -7,7 +7,6 @@ defmodule KafkaEx.Server0P9P0 do alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse alias KafkaEx.Protocol.Heartbeat alias KafkaEx.Protocol.JoinGroup - alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest alias KafkaEx.Protocol.LeaveGroup alias KafkaEx.Protocol.Metadata.Broker alias KafkaEx.Protocol.SyncGroup @@ -68,49 +67,42 @@ defmodule KafkaEx.Server0P9P0 do {:ok, state} end - def kafka_server_join_group(topics, session_timeout, state) do + def kafka_server_join_group(join_group_request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = JoinGroup.create_request( - %JoinGroupRequest{ - correlation_id: state.correlation_id, - client_id: @client_id, member_id: "", - group_name: state.consumer_group, - topics: topics, session_timeout: session_timeout - } - ) + request = JoinGroup.create_request(state.correlation_id, @client_id, join_group_request) response = broker - |> NetworkClient.send_sync_request(request, sync_timeout()) + |> NetworkClient.send_sync_request(request, sync_timeout(network_timeout)) |> JoinGroup.parse_response {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - def kafka_server_sync_group(group_name, generation_id, member_id, assignments, state) do + def kafka_server_sync_group(sync_group_request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = SyncGroup.create_request(state.correlation_id, @client_id, group_name, generation_id, member_id, assignments) + request = SyncGroup.create_request(state.correlation_id, @client_id, sync_group_request) response = broker - |> NetworkClient.send_sync_request(request, sync_timeout()) + |> NetworkClient.send_sync_request(request, sync_timeout(network_timeout)) |> SyncGroup.parse_response {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - def kafka_server_leave_group(group_name, member_id, state) do + def kafka_server_leave_group(request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = LeaveGroup.create_request(state.correlation_id, @client_id, group_name, member_id) + request = LeaveGroup.create_request(state.correlation_id, @client_id, request) response = broker - |> NetworkClient.send_sync_request(request, sync_timeout()) + |> NetworkClient.send_sync_request(request, sync_timeout(network_timeout)) |> LeaveGroup.parse_response {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - def kafka_server_heartbeat(group_name, generation_id, member_id, state) do + def kafka_server_heartbeat(request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = Heartbeat.create_request(state.correlation_id, @client_id, member_id, group_name, generation_id) + request = Heartbeat.create_request(state.correlation_id, @client_id, request) response = broker - |> NetworkClient.send_sync_request(request, sync_timeout()) + |> NetworkClient.send_sync_request(request, sync_timeout(network_timeout)) |> Heartbeat.parse_response {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end diff --git a/test/integration/server0_p_9_p_0_test.exs b/test/integration/server0_p_9_p_0_test.exs index aa32ac27..265cbf50 100644 --- a/test/integration/server0_p_9_p_0_test.exs +++ b/test/integration/server0_p_9_p_0_test.exs @@ -2,14 +2,25 @@ defmodule KafkaEx.Server0P9P0.Test do use ExUnit.Case import TestHelper + alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest + alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest + alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest + alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest + @moduletag :server_0_p_9_p_0 test "can join a consumer group" do random_group = generate_random_string() KafkaEx.create_worker(:join_group, [uris: uris(), consumer_group: random_group]) - # No wrapper in kafka_ex yet as long as the 0.9 functionality is in progress - answer = GenServer.call(:join_group, {:join_group, ["foo", "bar"], 6000}) + request = %JoinGroupRequest{ + group_name: random_group, + member_id: "", + topics: ["foo", "bar"], + session_timeout: 6000, + } + + answer = KafkaEx.join_group(request, worker_name: :join_group) assert answer.error_code == :no_error assert answer.generation_id == 1 # We should be the leader @@ -21,7 +32,15 @@ defmodule KafkaEx.Server0P9P0.Test do # how this pans out eventually as we add more and more 0.9 consumer group code random_group = generate_random_string() KafkaEx.create_worker(:sync_group, [uris: uris(), consumer_group: random_group]) - answer = GenServer.call(:sync_group, {:join_group, ["foo", "bar"], 6000}) + + request = %JoinGroupRequest{ + group_name: random_group, + member_id: "", + topics: ["foo", "bar"], + session_timeout: 6000, + } + + answer = KafkaEx.join_group(request, worker_name: :sync_group) assert answer.error_code == :no_error member_id = answer.member_id @@ -29,7 +48,14 @@ defmodule KafkaEx.Server0P9P0.Test do my_assignments = [{"foo", [1]}, {"bar", [2]}] assignments = [{member_id, my_assignments}] - answer = GenServer.call(:sync_group, {:sync_group, random_group, generation_id, member_id, assignments}) + request = %SyncGroupRequest{ + group_name: random_group, + member_id: member_id, + generation_id: generation_id, + assignments: assignments, + } + + answer = KafkaEx.sync_group(request, worker_name: :sync_group) assert answer.error_code == :no_error # Parsing happens to return the assignments reversed, which is fine as there's no # ordering. Just reverse what we expect to match @@ -41,12 +67,25 @@ defmodule KafkaEx.Server0P9P0.Test do # how this pans out eventually as we add more and more 0.9 consumer group code random_group = generate_random_string() KafkaEx.create_worker(:leave_group, [uris: uris(), consumer_group: random_group]) - answer = GenServer.call(:leave_group, {:join_group, ["foo", "bar"], 6000}) + + request = %JoinGroupRequest{ + group_name: random_group, + member_id: "", + topics: ["foo", "bar"], + session_timeout: 6000, + } + + answer = KafkaEx.join_group(request, worker_name: :leave_group) assert answer.error_code == :no_error member_id = answer.member_id - answer = GenServer.call(:leave_group, {:leave_group, random_group, member_id}) + request = %LeaveGroupRequest{ + group_name: random_group, + member_id: member_id, + } + + answer = KafkaEx.leave_group(request, worker_name: :leave_group) assert answer.error_code == :no_error end @@ -54,7 +93,15 @@ defmodule KafkaEx.Server0P9P0.Test do # See sync test. Removing repetition in the next iteration random_group = generate_random_string() KafkaEx.create_worker(:heartbeat, [uris: uris(), consumer_group: random_group]) - answer = GenServer.call(:heartbeat, {:join_group, ["foo", "bar"], 6000}) + + request = %JoinGroupRequest{ + group_name: random_group, + member_id: "", + topics: ["foo", "bar"], + session_timeout: 6000, + } + + answer = KafkaEx.join_group(request, worker_name: :heartbeat) assert answer.error_code == :no_error member_id = answer.member_id @@ -62,10 +109,23 @@ defmodule KafkaEx.Server0P9P0.Test do my_assignments = [{"foo", [1]}, {"bar", [2]}] assignments = [{member_id, my_assignments}] - answer = GenServer.call(:heartbeat, {:sync_group, random_group, generation_id, member_id, assignments}) + request = %SyncGroupRequest{ + group_name: random_group, + member_id: member_id, + generation_id: generation_id, + assignments: assignments, + } + + answer = KafkaEx.sync_group(request, worker_name: :heartbeat) assert answer.error_code == :no_error - answer = GenServer.call(:heartbeat, {:heartbeat, random_group, generation_id, member_id}) + request = %HeartbeatRequest{ + group_name: random_group, + member_id: member_id, + generation_id: generation_id, + } + + answer = KafkaEx.heartbeat(request, worker_name: :heartbeat) assert answer.error_code == :no_error end end diff --git a/test/protocol/heartbeat_test.exs b/test/protocol/heartbeat_test.exs index efeb16a2..7ad16aaa 100644 --- a/test/protocol/heartbeat_test.exs +++ b/test/protocol/heartbeat_test.exs @@ -8,7 +8,13 @@ defmodule KafkaEx.Protocol.Heartbeat.Test do 1234 :: 32, # GenerationId 9 :: 16, "member_id" :: binary # MemberId >> - request = KafkaEx.Protocol.Heartbeat.create_request(42, "client_id", "member_id", "group_id", 1234) + heartbeat_request = %KafkaEx.Protocol.Heartbeat.Request{ + group_name: "group_id", + member_id: "member_id", + generation_id: 1234, + } + + request = KafkaEx.Protocol.Heartbeat.create_request(42, "client_id", heartbeat_request) assert request == good_request end diff --git a/test/protocol/join_group_test.exs b/test/protocol/join_group_test.exs index d8fb87fd..f67f0495 100644 --- a/test/protocol/join_group_test.exs +++ b/test/protocol/join_group_test.exs @@ -15,10 +15,8 @@ defmodule KafkaEx.Protocol.JoinGroup.Test do 0 :: 16, # v0 2 :: 32, 9 :: 16, "topic_one" :: binary, 9 :: 16, "topic_two" :: binary, # Topics array 0 :: 32 >> # UserData - request = JoinGroup.create_request( + request = JoinGroup.create_request(42, "client_id", %JoinGroup.Request{ - correlation_id: 42, - client_id: "client_id", member_id: "member_id", group_name: "group", topics: ["topic_one", "topic_two"], diff --git a/test/protocol/leave_group_test.exs b/test/protocol/leave_group_test.exs index 05e0863a..176f5a20 100644 --- a/test/protocol/leave_group_test.exs +++ b/test/protocol/leave_group_test.exs @@ -8,7 +8,12 @@ defmodule KafkaEx.Protocol.LeaveGroup.Test do byte_size("member") :: 16, "member" :: binary, # MemberId >> - request = KafkaEx.Protocol.LeaveGroup.create_request(42, "client_id", "group", "member") + leave_request = %KafkaEx.Protocol.LeaveGroup.Request{ + group_name: "group", + member_id: "member", + } + + request = KafkaEx.Protocol.LeaveGroup.create_request(42, "client_id", leave_request) assert request == good_request end diff --git a/test/protocol/sync_group_test.exs b/test/protocol/sync_group_test.exs index bb3f73c3..aadccddf 100644 --- a/test/protocol/sync_group_test.exs +++ b/test/protocol/sync_group_test.exs @@ -28,8 +28,14 @@ defmodule KafkaEx.Protocol.SyncGroup.Test do byte_size(second_assignments) :: 32, second_assignments :: binary >> - request = KafkaEx.Protocol.SyncGroup.create_request(42, "client_id", "group", 1, "member_one", - [{"member_one", [{"topic1", [1, 3, 5]}]}, {"member_two", [{"topic1", [2, 4, 6]}]}]) + sync_request = %KafkaEx.Protocol.SyncGroup.Request{ + group_name: "group", + member_id: "member_one", + generation_id: 1, + assignments: [{"member_one", [{"topic1", [1, 3, 5]}]}, {"member_two", [{"topic1", [2, 4, 6]}]}], + } + + request = KafkaEx.Protocol.SyncGroup.create_request(42, "client_id", sync_request) assert request == good_request end @@ -44,4 +50,14 @@ defmodule KafkaEx.Protocol.SyncGroup.Test do assignments: [{"topic1", [5, 3, 1]}]} assert KafkaEx.Protocol.SyncGroup.parse_response(response) == expected_response end + + test "parse empty assignments correctly" do + response = << + 42 :: 32, # CorrelationId + 0 :: 16, # ErrorCode + 0 :: 32, # MemberAssignment (empty) + >> + expected_response = %KafkaEx.Protocol.SyncGroup.Response{error_code: :no_error, assignments: []} + assert KafkaEx.Protocol.SyncGroup.parse_response(response) == expected_response + end end