diff --git a/README.md b/README.md index d6dc462c..8b1619f2 100644 --- a/README.md +++ b/README.md @@ -24,24 +24,12 @@ KakfaEx supports the following Kafka features: * Fetch Messages * Message Compression with Snappy and gzip * Offset Management (fetch / commit / autocommit) +* Consumer Groups See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and [A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) for details of these features. -KafkaEx **does support** consumer groups for message consumption. This feature -was added in Kafka 0.8.2. This translates to providing a consumer group -name when committing offsets. It is up to the client to assign partitions to -workers in this mode of operation. - -KafkaEx currently provides **limited support** for the [Kafka ConsumerGroup -API](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI) -that was added in Kafka 0.9.0. Most of the protocol requests are implemented -in KafkaEx, but we do not yet support automatic joining and management of -consumer group memebership (e.g., automatically assigning partitions to -clients). We are actively working on an implementation for automatic consumer -group management. - ## Using KafkaEx in an Elixir project The standard approach for adding dependencies to an Elixir application applies: @@ -90,6 +78,35 @@ You can also override options when creating a worker, see below. ## Usage Examples +### Consumer Groups + +To use a consumer group, first implement a handler module using +`KafkaEx.GenConsumer`. + +``` +defmodule ExampleGenConsumer do + use KafkaEx.GenConsumer + + alias KafkaEx.Protocol.Fetch.Message + + require Logger + + # note - messages are delivered in batches + def handle_message_set(message_set, state) do + for %Message{value: message} <- message_set do + Logger.debug(fn -> "message: " <> inspect(message) end) + end + {:async_commit, state} + end +end +``` + +Then add a `KafkaEx.ConsumerGroup` to your application's supervision +tree and configure it to use the implementation module. + +See the `KafkaEx.GenConsumer` and `KafkaEx.ConsumerGroup` documentation for +details. + ### Create a KafkaEx Worker KafkaEx worker processes manage the state of the connection to the Kafka broker. diff --git a/config/config.exs b/config/config.exs index 1b906d86..816b98cf 100644 --- a/config/config.exs +++ b/config/config.exs @@ -26,6 +26,11 @@ config :kafka_ex, max_restarts: 10, # Supervision max_seconds - the time frame in which :max_restarts applies max_seconds: 60, + # Interval in milliseconds that GenConsumer waits to commit offsets. + commit_interval: 5_000, + # Threshold number of messages consumed for GenConsumer to commit offsets + # to the broker. + commit_threshold: 100, # This is the flag that enables use of ssl use_ssl: true, # see SSL OPTION DESCRIPTIONS - CLIENT SIDE at http://erlang.org/doc/man/ssl.html diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 58140ec3..2039fad2 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -6,6 +6,12 @@ defmodule KafkaEx do alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest + alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest + alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse + alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest + alias KafkaEx.Protocol.JoinGroup.Response, as: JoinGroupResponse + alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest + alias KafkaEx.Protocol.LeaveGroup.Response, as: LeaveGroupResponse alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse alias KafkaEx.Protocol.Offset.Response, as: OffsetResponse alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest @@ -14,6 +20,8 @@ defmodule KafkaEx do alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest alias KafkaEx.Protocol.Produce.Message + alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest + alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse alias KafkaEx.Server alias KafkaEx.Stream @@ -75,6 +83,46 @@ defmodule KafkaEx do Server.call(worker, :consumer_group) end + @doc """ + Sends a request to join a consumer group. + """ + @spec join_group(JoinGroupRequest.t, Keyword.t) :: JoinGroupResponse.t + def join_group(request, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + timeout = Keyword.get(opts, :timeout) + Server.call(worker_name, {:join_group, request, timeout}, opts) + end + + @doc """ + Sends a request to synchronize with a consumer group. + """ + @spec sync_group(SyncGroupRequest.t, Keyword.t) :: SyncGroupResponse.t + def sync_group(request, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + timeout = Keyword.get(opts, :timeout) + Server.call(worker_name, {:sync_group, request, timeout}, opts) + end + + @doc """ + Sends a request to leave a consumer group. + """ + @spec leave_group(LeaveGroupRequest.t, Keyword.t) :: LeaveGroupResponse.t + def leave_group(request, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + timeout = Keyword.get(opts, :timeout) + Server.call(worker_name, {:leave_group, request, timeout}, opts) + end + + @doc """ + Sends a heartbeat to maintain membership in a consumer group. + """ + @spec heartbeat(HeartbeatRequest.t, Keyword.t) :: HeartbeatResponse.t + def heartbeat(request, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + timeout = Keyword.get(opts, :timeout) + Server.call(worker_name, {:heartbeat, request, timeout}, opts) + end + @doc """ Return metadata for the given topic; returns for all topics if topic is empty string @@ -197,7 +245,7 @@ defmodule KafkaEx do }, opts) end - @spec offset_commit(atom, OffsetCommitRequest.t) :: OffsetCommitResponse.t + @spec offset_commit(atom, OffsetCommitRequest.t) :: [OffsetCommitResponse.t] def offset_commit(worker_name, offset_commit_request) do Server.call(worker_name, {:offset_commit, offset_commit_request}) end diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex new file mode 100644 index 00000000..d020188a --- /dev/null +++ b/lib/kafka_ex/consumer_group.ex @@ -0,0 +1,308 @@ +defmodule KafkaEx.ConsumerGroup do + @moduledoc """ + A process that manages membership in a Kafka consumer group. + + Consumers in a consumer group coordinate with each other through a Kafka + broker to distribute the work of consuming one or several topics without any + overlap. This is facilitated by the + [Kafka client-side assignment protocol](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal). + + Any time group membership changes (a member joins or leaves the group), a + Kafka broker initiates group synchronization by asking one of the group + members (the leader elected by the broker) to provide partition assignments + for the whole group. KafkaEx uses a round robin partition assignment + algorithm by deafult. This can be overridden by passing a callback function + in the `:partition_assignment_callback` option. See + `KafkaEx.ConsumerGroup.PartitionAssignment` for details on partition + assignment functions. + + A `KafkaEx.ConsumerGroup` process is responsible for: + + 1. Maintaining membership in a Kafka consumer group. + 2. Determining partition assignments if elected as the group leader. + 3. Launching and terminating `KafkaEx.GenConsumer` processes based on its + assigned partitions. + + To use a `KafkaEx.ConsumerGroup`, a developer must define a module that + implements the `KafkaEx.GenConsumer` behaviour and start a + `KafkaEx.ConsumerGroup` configured to use that module. + + ## Example + + Suppose we want to consume from a topic called `"example_topic"` with a + consumer group named `"example_group"` and we have a `KafkaEx.GenConsumer` + implementation called `ExampleGenConsumer` (see the `KafkaEx.GenConsumer` + documentation). We could start a consumer group in our application's + supervision tree as follows: + + ``` + defmodule MyApp do + use Application + + def start(_type, _args) do + import Supervisor.Spec + + consumer_group_opts = [ + # setting for the ConsumerGroup + heartbeat_interval: 1_000, + # this setting will be forwarded to the GenConsumer + commit_interval: 1_000 + ] + + gen_consumer_impl = ExampleGenConsumer + consumer_group_name = "example_group" + topic_names = ["example_topic"] + + children = [ + # ... other children + supervisor( + KafkaEx.ConsumerGroup, + [gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts] + ) + ] + + Supervisor.start_link(children, strategy: :one_for_one) + end + end + ``` + + **Note** It is not necessary for the Elixir nodes in a consumer group to be + connected (i.e., using distributed Erlang methods). The coordination of + group consumers is mediated by the broker. + + See `start_link/4` for configuration details. + """ + + use Supervisor + + alias KafkaEx.ConsumerGroup.PartitionAssignment + alias KafkaEx.GenConsumer + + @typedoc """ + Option values used when starting a consumer group + + * `:heartbeat_interval` - How frequently, in milliseconds, to send heartbeats + to the broker. This impacts how quickly we will process partition + changes as consumers start/stop. Default: 5000 (5 seconds). + * `:session_timeout` - Consumer group session timeout in milliseconds. + Default: 30000 (30 seconds). See below. + * Any of `t:KafkaEx.GenConsumer.option/0`, + which will be passed on to consumers + * `:gen_server_opts` - `t:GenServer.options/0` passed on to the manager + GenServer + * `:name` - Name for the consumer group supervisor + * `:max_restarts`, `:max_seconds` - Supervisor restart policy parameters + * `:partition_assignment_callback` - See + `t:KafkaEx.ConsumerGroup.PartitionAssignment.callback/0` + + Note `:session_timeout` is registered with the broker and determines how long + before the broker will de-register a consumer from which it has not heard a + heartbeat. This value must between the broker cluster's configured values + for `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (6000 + and 30000 by default). See + [https://kafka.apache.org/documentation/#configuration](https://kafka.apache.org/documentation/#configuration). + """ + @type option :: KafkaEx.GenConsumer.option + | {:heartbeat_interval, pos_integer} + | {:session_timeout, pos_integer} + | {:partition_assignment_callback, PartitionAssignment.callback} + | {:gen_server_opts, GenServer.options} + | {:name, Supervisor.name} + | {:max_restarts, non_neg_integer} + | {:max_seconds, non_neg_integer} + + @type options :: [option] + + @doc """ + Starts a consumer group process tree process linked to the current process. + + This can be used to start a `KafkaEx.ConsumerGroup` as part of a supervision + tree. + + `module` is a module that implements the `KafkaEx.GenConsumer` behaviour. + `group_name` is the name of the consumer group. `topics` is a list of topics + that the consumer group should consume from. `opts` can be composed of + options for the supervisor as well as for the `KafkEx.GenConsumer` processes + that will be spawned by the supervisor. See `t:option/0` for details. + + *Note* When starting a consumer group with multiple topics, you should + propagate this configuration change to your consumers. If you add a topic to + an existing consumer group from a single consumer, it may take a long time + to propagate depending on the leader election process. + + ### Return Values + + This function has the same return values as `Supervisor.start_link/3`. + """ + @spec start_link(module, binary, [binary], options) :: Supervisor.on_start + def start_link(consumer_module, group_name, topics, opts \\ []) do + {supervisor_opts, module_opts} = + Keyword.split(opts, [:name, :strategy, :max_restarts, :max_seconds]) + + Supervisor.start_link( + __MODULE__, + {consumer_module, group_name, topics, module_opts}, + supervisor_opts + ) + end + + @doc """ + Returns the generation id of the consumer group. + + The generation id is provided by the broker on sync. Returns `nil` if + queried before the initial sync has completed. + """ + @spec generation_id(Supervisor.supervisor) :: integer | nil + def generation_id(supervisor_pid) do + call_manager(supervisor_pid, :generation_id) + end + + @doc """ + Returns the consumer group member id + + The id is assigned by the broker. Returns `nil` if queried before the + initial sync has completed. + """ + @spec member_id(Supervisor.supervisor) :: binary | nil + def member_id(supervisor_pid) do + call_manager(supervisor_pid, :member_id) + end + + @doc """ + Returns the member id of the consumer group's leader + + This is provided by the broker on sync. Returns `nil` if queried before the + initial sync has completed + """ + @spec leader_id(Supervisor.supervisor) :: binary | nil + def leader_id(supervisor_pid) do + call_manager(supervisor_pid, :leader_id) + end + + @doc """ + Returns true if this consumer is the leader of the consumer group + + Leaders are elected by the broker and are responsible for assigning + partitions. Returns false if queried before the intiial sync has completed. + """ + @spec leader?(Supervisor.supervisor) :: boolean + def leader?(supervisor_pid) do + call_manager(supervisor_pid, :am_leader) + end + + @doc """ + Returns a list of topic and partition assignments for which this consumer is + responsible. + + These are assigned by the leader and communicated by the broker on sync. + """ + @spec assignments(Supervisor.supervisor) :: + [{topic :: binary, partition_id :: non_neg_integer}] + def assignments(supervisor_pid) do + call_manager(supervisor_pid, :assignments) + end + + @doc """ + Returns the pid of the `KafkaEx.GenConsumer.Supervisor` that supervises this + member's consumers. + + Returns `nil` if called before the initial sync. + """ + @spec consumer_supervisor_pid(Supervisor.supervisor) :: nil | pid + def consumer_supervisor_pid(supervisor_pid) do + call_manager(supervisor_pid, :consumer_supervisor_pid) + end + + @doc """ + Returns the pids of consumer processes + """ + @spec consumer_pids(Supervisor.supervisor) :: [pid] + def consumer_pids(supervisor_pid) do + supervisor_pid + |> consumer_supervisor_pid + |> GenConsumer.Supervisor.child_pids + end + + @doc """ + Returns a map from `{topic, partition_id}` to consumer pid + """ + @spec partition_consumer_map(Supervisor.supervisor) :: + %{{topic :: binary, partition_id :: non_neg_integer} => pid} + def partition_consumer_map(supervisor_pid) do + supervisor_pid + |> consumer_pids + |> Enum.map(fn(pid) -> + {GenConsumer.partition(pid), pid} + end) + |> Enum.into(%{}) + end + + @doc """ + Returns true if at least one child consumer process is alive + """ + @spec active?(Supervisor.supervisor) :: boolean + def active?(supervisor_pid) do + consumer_supervisor = consumer_supervisor_pid(supervisor_pid) + if consumer_supervisor && Process.alive?(consumer_supervisor) do + GenConsumer.Supervisor.active?(consumer_supervisor) + else + false + end + end + + @doc false # used by ConsumerGroup.Manager to set partition assignments + def start_consumer(pid, consumer_module, group_name, assignments, opts) do + child = supervisor( + KafkaEx.GenConsumer.Supervisor, + [consumer_module, group_name, assignments, opts], + id: :consumer + ) + + case Supervisor.start_child(pid, child) do + {:ok, consumer_pid} -> {:ok, consumer_pid} + {:ok, consumer_pid, _info} -> {:ok, consumer_pid} + end + end + + @doc false # used by ConsumerGroup to pause consumption during rebalance + def stop_consumer(pid) do + case Supervisor.terminate_child(pid, :consumer) do + :ok -> + Supervisor.delete_child(pid, :consumer) + + {:error, :not_found} -> + :ok + end + end + + @doc false + def init({consumer_module, group_name, topics, opts}) do + opts = Keyword.put(opts, :supervisor_pid, self()) + + children = [ + worker( + KafkaEx.ConsumerGroup.Manager, + [consumer_module, group_name, topics, opts] + ), + ] + + supervise(children, strategy: :one_for_all) + end + + defp call_manager(supervisor_pid, call) do + supervisor_pid + |> get_manager_pid + |> GenServer.call(call) + end + + defp get_manager_pid(supervisor_pid) do + {_, pid, _, _} = Enum.find( + Supervisor.which_children(supervisor_pid), + fn + ({KafkaEx.ConsumerGroup.Manager, _, _, _}) -> true + ({_, _, _, _}) -> false + end + ) + pid + end +end diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex new file mode 100644 index 00000000..3a899f4d --- /dev/null +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -0,0 +1,476 @@ +defmodule KafkaEx.ConsumerGroup.Manager do + @moduledoc false + + # actual consumer group management implementation + + use GenServer + + alias KafkaEx.ConsumerGroup + alias KafkaEx.ConsumerGroup.PartitionAssignment + alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest + alias KafkaEx.Protocol.JoinGroup.Response, as: JoinGroupResponse + alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest + alias KafkaEx.Protocol.Heartbeat.Response, as: HeartbeatResponse + alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest + alias KafkaEx.Protocol.LeaveGroup.Response, as: LeaveGroupResponse + alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse + alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest + alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse + + require Logger + + defmodule State do + @moduledoc false + defstruct [ + :supervisor_pid, + :worker_name, + :heartbeat_interval, + :session_timeout, + :consumer_module, + :consumer_opts, + :partition_assignment_callback, + :group_name, + :topics, + :member_id, + :leader_id, + :consumer_supervisor_pid, + :members, + :generation_id, + :assignments, + :heartbeat_timer, + ] + end + + @heartbeat_interval 5_000 + @session_timeout 30_000 + @session_timeout_padding 5_000 + + # Client API + + @doc false + # use `KafkaEx.ConsumerGroup.start_link/4` instead + @spec start_link(module, binary, [binary], KafkaEx.GenConsumer.options) :: + GenServer.on_start + def start_link(consumer_module, group_name, topics, opts \\ []) do + gen_server_opts = Keyword.get(opts, :gen_server_opts, []) + consumer_opts = Keyword.drop(opts, [:gen_server_opts]) + + GenServer.start_link( + __MODULE__, + {consumer_module, group_name, topics, consumer_opts}, + gen_server_opts + ) + end + + # GenServer callbacks + + def init({consumer_module, group_name, topics, opts}) do + heartbeat_interval = Keyword.get( + opts, + :heartbeat_interval, + Application.get_env(:kafka_ex, :heartbeat_interval, @heartbeat_interval) + ) + session_timeout = Keyword.get( + opts, + :session_timeout, + Application.get_env(:kafka_ex, :session_timeout, @session_timeout) + ) + partition_assignment_callback = Keyword.get( + opts, + :partition_assignment_callback, + &PartitionAssignment.round_robin/2 + ) + + supervisor_pid = Keyword.fetch!(opts, :supervisor_pid) + consumer_opts = Keyword.drop( + opts, + [ + :supervisor_pid, + :heartbeat_interval, + :session_timeout, + :partition_assignment_callback + ] + ) + + {:ok, worker_name} = + KafkaEx.create_worker(:no_name, consumer_group: group_name) + + state = %State{ + supervisor_pid: supervisor_pid, + worker_name: worker_name, + heartbeat_interval: heartbeat_interval, + session_timeout: session_timeout, + consumer_module: consumer_module, + partition_assignment_callback: partition_assignment_callback, + consumer_opts: consumer_opts, + group_name: group_name, + topics: topics, + member_id: nil, + } + + Process.flag(:trap_exit, true) + + {:ok, state, 0} + end + + ###################################################################### + # handle_call clauses - mostly for ops queries + def handle_call(:generation_id, _from, state) do + {:reply, state.generation_id, state} + end + + def handle_call(:member_id, _from, state) do + {:reply, state.member_id, state} + end + + def handle_call(:leader_id, _from, state) do + {:reply, state.leader_id, state} + end + + def handle_call(:am_leader, _from, state) do + {:reply, state.leader_id && state.member_id == state.leader_id, state} + end + + def handle_call(:assignments, _from, state) do + {:reply, state.assignments, state} + end + + def handle_call(:consumer_supervisor_pid, _from, state) do + {:reply, state.consumer_supervisor_pid, state} + end + ###################################################################### + + # If `member_id` and `generation_id` aren't set, we haven't yet joined the + # group. `member_id` and `generation_id` are initialized by + # `JoinGroupResponse`. + def handle_info( + :timeout, %State{generation_id: nil, member_id: nil} = state + ) do + {:ok, new_state} = join(state) + + {:noreply, new_state} + end + + # After joining the group, a member must periodically send heartbeats to the + # group coordinator. + def handle_info(:heartbeat, %State{} = state) do + {:ok, new_state} = heartbeat(state) + + {:noreply, new_state} + end + + # When terminating, inform the group coordinator that this member is leaving + # the group so that the group can rebalance without waiting for a session + # timeout. + def terminate(_reason, %State{generation_id: nil, member_id: nil}), do: :ok + def terminate(_reason, %State{} = state) do + :ok = leave(state) + end + + ### Helpers + + # `JoinGroupRequest` is used to set the active members of a group. The + # response blocks until the broker has decided that it has a full list of + # group members. This requires that all active members send a + # `JoinGroupRequest`. For active members, this is triggered by the broker + # responding to a heartbeat with a `:rebalance_in_progress` error code. If + # any group members fail to send a `JoinGroupRequest` before the session + # timeout expires, then those group members are removed from the group and + # synchronization continues without them. + # + # `JoinGroupResponse` tells each member its unique member ID as well as the + # group's current generation ID. The broker will pick one group member to be + # the leader, which is reponsible for assigning partitions to all of the + # group members. Once a `JoinGroupResponse` is received, all group members + # must send a `SyncGroupRequest` (see sync/2). + + defp join( + %State{ + worker_name: worker_name, + session_timeout: session_timeout, + group_name: group_name, + topics: topics, + member_id: member_id + } = state + ) do + join_request = %JoinGroupRequest{ + group_name: group_name, + member_id: member_id || "", + topics: topics, + session_timeout: session_timeout, + } + + join_response = %JoinGroupResponse{error_code: :no_error} = + KafkaEx.join_group( + join_request, + worker_name: worker_name, + timeout: session_timeout + @session_timeout_padding + ) + + Logger.debug(fn -> "Joined consumer group #{group_name}" end) + + new_state = %State{ + state | + leader_id: join_response.leader_id, + member_id: join_response.member_id, + generation_id: join_response.generation_id + } + + assignments = + if JoinGroupResponse.leader?(join_response) do + # Leader is responsible for assigning partitions to all group members. + partitions = assignable_partitions(new_state) + assign_partitions(new_state, join_response.members, partitions) + else + # Follower does not assign partitions; must be empty. + [] + end + + sync(new_state, assignments) + end + + # `SyncGroupRequest` is used to distribute partition assignments to all group + # members. All group members must send this request after receiving a + # response to a `JoinGroupRequest`. The request blocks until assignments are + # provided by the leader. The leader sends partition assignments (given by + # the `assignments` parameter) as part of its `SyncGroupRequest`. For all + # other members, `assignments` must be empty. + # + # `SyncGroupResponse` contains the individual member's partition assignments. + # Upon receiving a successful `SyncGroupResponse`, a group member is free to + # start consuming from its assigned partitions, but must send periodic + # heartbeats to the coordinating broker. + + defp sync( + %State{ + group_name: group_name, + member_id: member_id, + generation_id: generation_id, + worker_name: worker_name, + session_timeout: session_timeout + } = state, + assignments + ) do + sync_request = %SyncGroupRequest{ + group_name: group_name, + member_id: member_id, + generation_id: generation_id, + assignments: assignments, + } + + sync_group_response = KafkaEx.sync_group( + sync_request, + worker_name: worker_name, + timeout: session_timeout + @session_timeout_padding + ) + + case sync_group_response do + %SyncGroupResponse{error_code: :no_error, assignments: assignments} -> + new_state = state + |> start_consumer(unpack_assignments(assignments)) + |> start_heartbeat_timer() + {:ok, new_state} + %SyncGroupResponse{error_code: :rebalance_in_progress} -> + rebalance(state) + end + end + + # `HeartbeatRequest` is sent periodically by each active group member (after + # completing the join/sync phase) to inform the broker that the member is + # still alive and participating in the group. If a group member fails to send + # a heartbeat before the group's session timeout expires, the coordinator + # removes that member from the group and initiates a rebalance. + # + # `HeartbeatResponse` allows the coordinating broker to communicate the + # group's status to each member: + # + # * `:no_error` indicates that the group is up to date and no action is + # needed. * `:rebalance_in_progress` instructs each member to rejoin the + # group by sending a `JoinGroupRequest` (see join/1). + defp heartbeat( + %State{ + worker_name: worker_name, + group_name: group_name, + generation_id: generation_id, + member_id: member_id + } = state + ) do + heartbeat_request = %HeartbeatRequest{ + group_name: group_name, + member_id: member_id, + generation_id: generation_id, + } + + case KafkaEx.heartbeat(heartbeat_request, worker_name: worker_name) do + %HeartbeatResponse{error_code: :no_error} -> + new_state = start_heartbeat_timer(state) + {:ok, new_state} + %HeartbeatResponse{error_code: :rebalance_in_progress} -> + Logger.debug(fn -> "Rebalancing consumer group #{group_name}" end) + rebalance(state) + end + end + + # `LeaveGroupRequest` is used to voluntarily leave a group. This tells the + # broker that the member is leaving the group without having to wait for the + # session timeout to expire. Leaving a group triggers a rebalance for the + # remaining group members. + defp leave( + %State{ + worker_name: worker_name, + group_name: group_name, + member_id: member_id + } = state + ) do + stop_heartbeat_timer(state) + + leave_request = %LeaveGroupRequest{ + group_name: group_name, + member_id: member_id, + } + + %LeaveGroupResponse{error_code: :no_error} = + KafkaEx.leave_group(leave_request, worker_name: worker_name) + + Logger.debug(fn -> "Left consumer group #{group_name}" end) + + :ok + end + + # When instructed that a rebalance is in progress, a group member must rejoin + # the group with `JoinGroupRequest` (see join/1). To keep the state + # synchronized during the join/sync phase, each member pauses its consumers + # and commits its offsets before rejoining the group. + defp rebalance(%State{} = state) do + state + |> stop_heartbeat_timer() + |> stop_consumer() + |> join() + end + + ### Timer Management + + # Starts a timer for the next heartbeat. + defp start_heartbeat_timer( + %State{heartbeat_interval: heartbeat_interval} = state + ) do + {:ok, timer} = :timer.send_after(heartbeat_interval, :heartbeat) + + %State{state | heartbeat_timer: timer} + end + + # Stops any active heartbeat timer. + defp stop_heartbeat_timer(%State{heartbeat_timer: nil} = state), do: state + defp stop_heartbeat_timer( + %State{heartbeat_timer: heartbeat_timer} = state + ) do + {:ok, :cancel} = :timer.cancel(heartbeat_timer) + + %State{state | heartbeat_timer: nil} + end + + ### Consumer Management + + # Starts consuming from the member's assigned partitions. + defp start_consumer( + %State{ + consumer_module: consumer_module, + consumer_opts: consumer_opts, + group_name: group_name, + supervisor_pid: pid + } = state, + assignments + ) do + {:ok, consumer_supervisor_pid} = ConsumerGroup.start_consumer( + pid, + consumer_module, + group_name, + assignments, + consumer_opts + ) + + %{ + state | + assignments: assignments, + consumer_supervisor_pid: consumer_supervisor_pid + } + end + + # Stops consuming from the member's assigned partitions and commits offsets. + defp stop_consumer(%State{supervisor_pid: pid} = state) do + :ok = ConsumerGroup.stop_consumer(pid) + + state + end + + ### Partition Assignment + + # Queries the Kafka brokers for a list of partitions for the topics of + # interest to this consumer group. This function returns a list of + # topic/partition tuples that can be passed to a GenConsumer's + # `assign_partitions` method. + defp assignable_partitions( + %State{worker_name: worker_name, topics: topics} + ) do + metadata = KafkaEx.metadata(worker_name: worker_name) + + Enum.flat_map(topics, fn (topic) -> + partitions = MetadataResponse.partitions_for_topic(metadata, topic) + + Enum.map(partitions, fn (partition) -> + {topic, partition} + end) + end) + end + + # This function is used by the group leader to determine partition + # assignments during the join/sync phase. `members` is provided to the leader + # by the coordinating broker in `JoinGroupResponse`. `partitions` is a list + # of topic/partition tuples, obtained from `assignable_partitions/1`. The + # return value is a complete list of member assignments in the format needed + # by `SyncGroupResponse`. + defp assign_partitions( + %State{partition_assignment_callback: partition_assignment_callback}, + members, + partitions + ) do + # Delegate partition assignment to GenConsumer module. + assignments = partition_assignment_callback.(members, partitions) + + # Convert assignments to format expected by Kafka protocol. + packed_assignments = + Enum.map(assignments, fn ({member, topic_partitions}) -> + {member, pack_assignments(topic_partitions)} + end) + assignments_map = Enum.into(packed_assignments, %{}) + + # Fill in empty assignments for missing member IDs. + Enum.map(members, fn (member) -> + {member, Map.get(assignments_map, member, [])} + end) + end + + # Converts assignments from Kafka's protocol format to topic/partition tuples. + # + # Example: + # + # unpack_assignments([{"foo", [0, 1]}]) #=> [{"foo", 0}, {"foo", 1}] + defp unpack_assignments(assignments) do + Enum.flat_map(assignments, fn ({topic, partition_ids}) -> + Enum.map(partition_ids, &({topic, &1})) + end) + end + + # Converts assignments from topic/partition tuples to Kafka's protocol format. + # + # Example: + # + # pack_assignments([{"foo", 0}, {"foo", 1}]) #=> [{"foo", [0, 1]}] + defp pack_assignments(assignments) do + assignments + |> Enum.reduce(%{}, fn({topic, partition}, assignments) -> + Map.update(assignments, topic, [partition], &(&1 ++ [partition])) + end) + |> Map.to_list + end +end diff --git a/lib/kafka_ex/consumer_group/partition_assignment.ex b/lib/kafka_ex/consumer_group/partition_assignment.ex new file mode 100644 index 00000000..d2c53ecc --- /dev/null +++ b/lib/kafka_ex/consumer_group/partition_assignment.ex @@ -0,0 +1,90 @@ +defmodule KafkaEx.ConsumerGroup.PartitionAssignment do + @moduledoc """ + Contains typespecs and reference algorithms for assigning partitions + + `round_robin/2` is used by `KafkaEx.ConsumerGroup` by default and should + suffice in most cases. + + For custom assignments, any function matching the + `t:callback/0` type spec can be used. + """ + + @typedoc """ + The ID (string) of a member of a consumer group, assigned by a Kafka broker. + """ + @type member_id :: binary + + @typedoc """ + The string name of a Kafka topic. + """ + @type topic :: binary + + @typedoc """ + The integer ID of a partition of a Kafka topic. + """ + @type partition_id :: integer + + @typedoc """ + A partition of a single topic (embeds the name of the topic). + """ + @type partition :: {topic, partition_id} + + @typedoc """ + A function that can assign partitions. + + `members` is a list of member IDs and `partitions` is a list of partitions + that need to be assigned to a group member. + + The return value must be a map with member IDs as keys and a list of + partition assignments as values. For each member ID in the returned map, the + assigned partitions will become the `assignments` argument to + `KafkaEx.GenConsumer.Supervisor.start_link/4` in the corresponding member + process. Any member that's omitted from the return value will not be assigned + any partitions. + + ### Example + + Given the following `members` and `partitions` to be assigned: + + ``` + members = ["member1", "member2", "member3"] + partitions = [{"topic", 0}, {"topic", 1}, {"topic", 2}] + ``` + + One possible assignment is as follows: + + ``` + ExampleGenConsumer.assign_partitions(members, partitions) + #=> %{"member1" => [{"topic", 0}, {"topic", 2}], "member2" => [{"topic", 1}]} + ``` + + In this case, the consumer group process for `"member1"` will launch two + `KafkaEx.GenConsumer` processes (one for each of its assigned partitions), + `"member2"` will launch one `KafkaEx.GenConsumer` process, and `"member3"` will + launch no processes. + """ + @type callback :: + ((members :: [member_id], partitions :: [partition]) -> + %{member_id => [partition]}) + + @doc """ + Round robin assignment + + Iterates over the partitions and members, giving the first member the first + partition, the second member the second partition, etc, looping back to the + beginning of the list of members when finished. + + Example: + iex> KafkaEx.ConsumerGroup.PartitionAssignment(["m1", "m2"], [{"t1", 0}, {"t2, 1"}, {"t3", 2}]) + %{"m1" => [{"t1", 0}, {"t3", 2}], "m2" => [{"t2", 1}]} + """ + @spec round_robin([binary], [partition]) :: %{binary => [partition]} + def round_robin(members, partitions) do + members + |> Stream.cycle + |> Enum.zip(partitions) + |> Enum.reduce(%{}, fn({member, partition}, assignments) -> + Map.update(assignments, member, [partition], &(&1 ++ [partition])) + end) + end +end diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex new file mode 100644 index 00000000..1428a50e --- /dev/null +++ b/lib/kafka_ex/gen_consumer.ex @@ -0,0 +1,612 @@ +defmodule KafkaEx.GenConsumer do + @moduledoc """ + A behaviour module for implementing a Kafka consumer. + + A `KafkaEx.GenConsumer` is an Elixir process that consumes messages from + Kafka. A single `KafkaEx.GenConsumer` process consumes from a single + partition of a Kafka topic. Several `KafkaEx.GenConsumer` processes can be + used to consume from multiple partitions or even multiple topics. Partition + assignments for a group of `KafkaEx.GenConsumer`s can be defined manually + using `KafkaEx.GenConsumer.Supervisor` or coordinated across a cluster of + nodes using `KafkaEx.ConsumerGroup`. + + A `KafkaEx.GenConsumer` must implement three callbacks. Two of these will be + defined with default behavior if you add `use KafkaEx.GenConsumer` to your + module, leaving just `c:handle_message_set/2` to be implemented. This is the + recommended usage. + + ## Example + + The `KafkaEx.GenConsumer` behaviour abstracts common Kafka consumer + interactions. `KafkaEx.GenConsumer` will take care of the details of + determining a starting offset, fetching messages from a Kafka broker, and + committing offsets for consumed messages. Developers are only required to + implement `c:handle_message_set/2` to process messages. + + The following is a minimal example that logs each message as it's consumed: + + ``` + defmodule ExampleGenConsumer do + use KafkaEx.GenConsumer + + alias KafkaEx.Protocol.Fetch.Message + + require Logger + + # note - messages are delivered in batches + def handle_message_set(message_set, state) do + for %Message{value: message} <- message_set do + Logger.debug(fn -> "message: " <> inspect(message) end) + end + {:async_commit, state} + end + end + ``` + + `c:handle_message_set/2` will be called with the batch of messages fetched + from the broker. The number of messages in a batch is determined by the + number of messages available and the `max_bytes` and `min_bytes` parameters + of the fetch request (which can be configured in KafkaEx). In this example, + because `c:handle_message_set/2` always returns `{:async_commit, new_state}`, + the message offsets will be automatically committed asynchronously. + + ## Committing Offsets + + `KafkaEx.GenConsumer` manages a consumer's offsets by committing the the offsets + of consumed messages. KafkaEx supports two commit strategies: asynchronous + and synchronous. The return value of `c:handle_message_set/2` determines + which strategy is used: + + * `{:sync_commit, new_state}` causes synchronous offset commits. + * `{:async_commit, new_state}` causes asynchronous offset commits. + + Note that with both of the offset commit strategies, only of the final offset + in the message set is committed and this is done after the messages are + consumed. If you want to commit the offset of every message consumed, use + the synchronous offset commit strategy and implement calls to + `KafkaEx.offset_commit/2` within your consumer as appropriate. + + ### Synchronous offset commits + + When `c:handle_message_set/2` returns `{:sync_commit, new_state}`, the offset + of the final message in the message set is committed immediately before + fetching any more messages. This strategy requires a significant amount of + communication with the broker and could correspondingly degrade consumer + performance, but it will keep the offset commits tightly synchronized with + the consumer state. + + Choose the synchronous offset commit strategy if you want to favor + consistency of offset commits over performance, or if you have a low rate of + message arrival. The definition of a "low rate" depends on the situation, + but tens of messages per second could be considered a "low rate" in most + situations. + + ### Asynchronous offset commits + + When `c:handle_message_set/2` returns `{:async_commit, new_state}`, KafkaEx + will not commit offsets after every message set consumed. To avoid + excessive network calls, the offsets are committed periodically (and when + the worker terminates). + + How often a `KafkaEx.GenConsumer` auto-commits offsets is controlled by the two + configuration values `:commit_interval` and `:commit_threshold`. + + * `:commit_interval` is the maximum time (in milliseconds) that a + `KafkaEx.GenConsumer` will delay committing the offset for an acknowledged + message. + + * `:commit_threshold` is the maximum number of acknowledged messages that a + `KafkaEx.GenConsumer` will allow to be uncommitted before triggering a + commit. + + These can be set globally in the `:kafka_ex` app's environment or on a + per-consumer basis by passing options to `start_link/5`: + + ``` + # In config/config.exs + config :kafka_ex, + commit_interval: 5000, + commit_threshold: 100 + + # As options to start_link/5 + KafkaEx.GenConsumer.start_link(MyConsumer, "my_group", "topic", 0, + commit_interval: 5000, + commit_threshold: 100) + ``` + + For low-volume topics, `:commit_interval` is the dominant factor for how + often a `KafkaEx.GenConsumer` auto-commits. For high-volume topics, + `:commit_threshold` is the dominant factor. + + ## Handler state and interaction + + Use the `c:init/2` to initialize consumer state and `c:handle_call/3` + to interact. + + Example: + + ``` + defmodule MyConsumer do + use KafkaEx.GenConsumer + + defmodule State do + defstruct messages: [], calls: 0 + end + + def init(_topic, _partition) do + {:ok, %State{}} + end + + def handle_message_set(message_set, state) do + {:async_commit, %{state | messages: state.messages ++ message_set}} + end + + def handle_call(:messages, _from, messages_so_far) + {:reply, state.messages, %{state | calls: state.calls + 1}} + end + end + + {:ok, pid} = GenConsumer.start_link(MyConsumer, "consumer_group", "topic", 0) + GenConsumer.call(pid, :messages) + ``` + + **NOTE** If you do not implement a `c:handle_call/3` callback, any calls to + `GenConsumer.call/3` that go to your consumer will cause a `MatchError`. + + ## Testing + + A `KafkaEx.GenConsumer` can be unit-tested without a running Kafka broker by sending + messages directly to its `c:handle_message_set/2` function. The following + recipe can be used as a starting point when testing a `KafkaEx.GenConsumer`: + + ``` + defmodule ExampleGenConsumerTest do + use ExUnit.Case, async: true + + alias KafkaEx.Protocol.Fetch.Message + + @topic "topic" + @partition 0 + + setup do + {:ok, state} = ExampleGenConsumer.init(@topic, @partition) + {:ok, %{state: state}} + end + + test "it acks a message", %{state: state} do + message_set = [%Message{offset: 0, value: "hello"}] + {response, _new_state} = + ExampleGenConsumer.handle_message_set(message_set, state) + assert response == :async_commit + end + end + ``` + """ + + use GenServer + + alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest + alias KafkaEx.Protocol.OffsetCommit.Response, as: OffsetCommitResponse + alias KafkaEx.Protocol.OffsetFetch.Request, as: OffsetFetchRequest + alias KafkaEx.Protocol.OffsetFetch.Response, as: OffsetFetchResponse + alias KafkaEx.Protocol.Offset.Response, as: OffsetResponse + alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse + alias KafkaEx.Protocol.Fetch.Message + + require Logger + + @typedoc """ + Option values used when starting a `KafkaEx.GenConsumer`. + """ + @type option :: {:commit_interval, non_neg_integer} + | {:commit_threshold, non_neg_integer} + + @typedoc """ + Options used when starting a `KafkaEx.GenConsumer`. + """ + @type options :: [option | GenServer.option] + + @doc """ + Invoked when the server is started. `start_link/5` will block until it + returns. + + `topic` and `partition` are the arguments passed to `start_link/5`. They + identify the Kafka partition that the `KafkaEx.GenConsumer` will consume from. + + Returning `{:ok, state}` will cause `start_link/5` to return `{:ok, pid}` and + the process to start consuming from its assigned partition. `state` becomes + the consumer's state. + + Any other return value will cause the `start_link/5` to return `{:error, + error}` and the process to exit. + """ + @callback init(topic :: binary, partition :: non_neg_integer) :: + {:ok, state :: term} + + @doc """ + Invoked for each message set consumed from a Kafka topic partition. + + `message_set` is a message set fetched from a Kafka broker and `state` is the + current state of the `KafkaEx.GenConsumer`. + + Returning `{:async_commit, new_state}` acknowledges `message` and continues + to consume from the Kafka queue with new state `new_state`. Acknowledged + messages will be auto-committed (possibly at a later time) based on the + `:commit_interval` and `:commit_threshold` options. + + Returning `{:sync_commit, new_state}` commits `message` synchronously before + continuing to consume from the Kafka queue with new state `new_state`. + Committing a message synchronously means that no more messages will be + consumed until the message's offset is committed. `:sync_commit` should be + used sparingly, since committing every message synchronously would impact a + consumer's performance and could result in excessive network traffic. + """ + @callback handle_message_set(message_set :: [Message.t], state :: term) :: + {:async_commit, new_state :: term} | {:sync_commit, new_state :: term} + + @doc """ + Invoked by `KafkaEx.GenConsumer.call/3`. + + Note the default implementation will cause a `MatchError`. If you want to + interact with your consumer, you must implement a handle_call function. + """ + @callback handle_call(call :: term, from :: GenServer.from, state :: term) + :: {:reply, reply_value :: term, new_state :: term} + + defmacro __using__(_opts) do + quote do + @behaviour KafkaEx.GenConsumer + alias KafkaEx.Protocol.Fetch.Message + + def init(_topic, _partition) do + {:ok, nil} + end + + def handle_call(_call, _from, _consumer_state) do + # the user must implement this if they expect to recieve calls + :handle_call_not_implemented + end + + defoverridable [init: 2, handle_call: 3] + end + end + + defmodule State do + @moduledoc false + defstruct [ + :consumer_module, + :consumer_state, + :commit_interval, + :commit_threshold, + :worker_name, + :group, + :topic, + :partition, + :current_offset, + :committed_offset, + :acked_offset, + :last_commit, + ] + end + + @commit_interval 5_000 + @commit_threshold 100 + + # Client API + + @doc """ + Starts a `KafkaEx.GenConsumer` process linked to the current process. + + This can be used to start the `KafkaEx.GenConsumer` as part of a supervision tree. + + Once the consumer has been started, the `c:init/2` function of + `consumer_module` is called with `topic` and `partition` as its arguments. + `group_name` is the consumer group name that will be used for managing + consumer offsets. + + ### Options + + * `:commit_interval` - The interval in milliseconds that the consumer will + wait to commit offsets of handled messages. Default 5_000. + + * `:commit_threshold` - Threshold number of messages consumed to commit + offsets to the broker. Default 100. + + Both `:commit_interval` and `:commit_threshold` default to the application + config (e.g., `Application.get_env/2`) if that value is present, or the + stated default if the application config is not present. + + Any valid options for `GenServer.start_link/3` can also be specified. + + ### Return Values + + This function has the same return values as `GenServer.start_link/3`. + """ + @spec start_link( + callback_module :: module, + consumer_group_name :: binary, + topic_name :: binary, + partition_id :: non_neg_integer, + options + ) :: GenServer.on_start + def start_link(consumer_module, group_name, topic, partition, opts \\ []) do + {server_opts, consumer_opts} = + Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt]) + + GenServer.start_link( + __MODULE__, + {consumer_module, group_name, topic, partition, consumer_opts}, + server_opts + ) + end + + @doc """ + Returns the topic and partition id for this consumer process + """ + @spec partition(GenServer.server) :: + {topic :: binary, partition_id :: non_neg_integer} + def partition(gen_consumer) do + GenServer.call(gen_consumer, :partition) + end + + @doc """ + Forwards a `GenServer.call/3` to the consumer implementation with the + consumer's state. + + The implementation must return a `GenServer.call/3`-compatible value of the + form `{:reply, reply_value, new_consumer_state}`. The GenConsumer will + turn this into an immediate timeout, which drives continued message + consumption. + + See the moduledoc for an example. + """ + @spec call(GenServer.server, term, timeout) :: term + def call(gen_consumer, message, timeout \\ 5000) do + GenServer.call(gen_consumer, {:consumer_call, message}, timeout) + end + + # GenServer callbacks + + def init({consumer_module, group_name, topic, partition, opts}) do + commit_interval = Keyword.get( + opts, + :commit_interval, + Application.get_env(:kafka_ex, :commit_interval, @commit_interval) + ) + commit_threshold = Keyword.get( + opts, + :commit_threshold, + Application.get_env(:kafka_ex, :commit_threshold, @commit_threshold) + ) + + {:ok, consumer_state} = consumer_module.init(topic, partition) + {:ok, worker_name} = + KafkaEx.create_worker(:no_name, consumer_group: group_name) + + state = %State{ + consumer_module: consumer_module, + consumer_state: consumer_state, + commit_interval: commit_interval, + commit_threshold: commit_threshold, + worker_name: worker_name, + group: group_name, + topic: topic, + partition: partition, + } + + Process.flag(:trap_exit, true) + + {:ok, state, 0} + end + + def handle_call(:partition, _from, state) do + {:reply, {state.topic, state.partition}, state, 0} + end + + def handle_call( + {:consumer_call, message}, + from, + %State{ + consumer_module: consumer_module, + consumer_state: consumer_state + } = state + ) do + # NOTE we only support the {:reply, _, _} result format here + # which we turn into a timeout = 0 clause so that we continue to consume. + # any other GenServer flow control could have unintended consequences, + # so we leave that for later consideration + {:reply, reply, new_consumer_state} = consumer_module.handle_call( + message, + from, + consumer_state + ) + {:reply, reply, %{state | consumer_state: new_consumer_state}, 0} + end + + def handle_info( + :timeout, + %State{current_offset: nil, last_commit: nil} = state + ) do + new_state = %State{ + load_offsets(state) | + last_commit: :erlang.monotonic_time(:milli_seconds) + } + + {:noreply, new_state, 0} + end + + def handle_info(:timeout, %State{} = state) do + new_state = consume(state) + + {:noreply, new_state, 0} + end + + def terminate(_reason, %State{} = state) do + commit(state) + end + + # Helpers + + defp consume( + %State{ + worker_name: worker_name, + topic: topic, + partition: partition, + current_offset: offset + } = state + ) do + [ + %FetchResponse{ + topic: ^topic, + partitions: [ + response = %{error_code: :no_error, partition: ^partition} + ] + } + ] = KafkaEx.fetch( + topic, + partition, + offset: offset, + auto_commit: false, + worker_name: worker_name + ) + + case response do + %{last_offset: nil, message_set: []} -> + handle_commit(:async_commit, state) + %{last_offset: _, message_set: message_set} -> + handle_message_set(message_set, state) + end + end + + defp handle_message_set( + message_set, + %State{ + consumer_module: consumer_module, + consumer_state: consumer_state + } = state + ) do + {sync_status, new_consumer_state} = + consumer_module.handle_message_set(message_set, consumer_state) + + %Message{offset: last_offset} = List.last(message_set) + state_out = %State{ + state | + consumer_state: new_consumer_state, + acked_offset: last_offset + 1, + current_offset: last_offset + 1 + } + + handle_commit(sync_status, state_out) + end + + defp handle_commit(:sync_commit, %State{} = state), do: commit(state) + defp handle_commit( + :async_commit, + %State{ + acked_offset: acked, + committed_offset: committed, + commit_threshold: threshold, + last_commit: last_commit, + commit_interval: interval + } = state + ) do + case acked - committed do + 0 -> + %State{state | last_commit: :erlang.monotonic_time(:milli_seconds)} + n when n >= threshold -> + commit(state) + _ -> + if :erlang.monotonic_time(:milli_seconds) - last_commit >= interval do + commit(state) + else + state + end + end + end + + defp commit( + %State{acked_offset: offset, committed_offset: offset} = state + ) do + state + end + + defp commit( + %State{ + worker_name: worker_name, + group: group, + topic: topic, + partition: partition, + acked_offset: offset + } = state + ) do + request = %OffsetCommitRequest{ + consumer_group: group, + topic: topic, + partition: partition, + offset: offset, + } + + [%OffsetCommitResponse{topic: ^topic, partitions: [^partition]}] = + KafkaEx.offset_commit(worker_name, request) + + Logger.debug(fn -> + "Committed offset #{topic}/#{partition}@#{offset} for #{group}" + end) + + %State{ + state | + committed_offset: offset, + last_commit: :erlang.monotonic_time(:milli_seconds) + } + end + + defp load_offsets( + %State{ + worker_name: worker_name, + group: group, + topic: topic, + partition: partition + } = state + ) do + request = %OffsetFetchRequest{ + consumer_group: group, + topic: topic, + partition: partition + } + + [ + %OffsetFetchResponse{ + topic: ^topic, + partitions: [ + %{partition: ^partition, error_code: error_code, offset: offset} + ] + } + ] = KafkaEx.offset_fetch(worker_name, request) + + case error_code do + :no_error -> + %State{ + state | + current_offset: offset, + committed_offset: offset, + acked_offset: offset + } + :unknown_topic_or_partition -> + [ + %OffsetResponse{ + topic: ^topic, + partition_offsets: [ + %{partition: ^partition, error_code: :no_error, offset: [offset]} + ] + } + ] = KafkaEx.earliest_offset(topic, partition, worker_name) + + %State{ + state | + current_offset: offset, + committed_offset: offset, + acked_offset: offset + } + end + end +end diff --git a/lib/kafka_ex/gen_consumer/supervisor.ex b/lib/kafka_ex/gen_consumer/supervisor.ex new file mode 100644 index 00000000..d89451d3 --- /dev/null +++ b/lib/kafka_ex/gen_consumer/supervisor.ex @@ -0,0 +1,96 @@ +defmodule KafkaEx.GenConsumer.Supervisor do + @moduledoc """ + A supervisor for managing `GenConsumer` processes that are part of a consumer + group. + + The supervisor will launch individual `GenConsumer` processes for each + partition given by the `partitions` argument to `start_link/4`. When + terminated, each of the supervisor's child processes will commit its latest + offset before terminating. + + This module manages a static list of consumer processes. For dynamically + distributing consumers in a consumer group across a cluster of nodes, see + `KafkaEx.ConsumerGroup`. + """ + + use Elixir.Supervisor + + @doc """ + Starts a `GenConsumer.Supervisor` process linked to the current process. + + `module` is a module that implements the `GenConsumer` behaviour. + `group_name` is the name of a consumer group, and `assignments` is a list of + partitions for the `GenConsumer`s to consume. `opts` accepts the same + options as `KafkaEx.GenConsumer.start_link/5`. + + ### Return Values + + This function has the same return values as `Supervisor.start_link/3`. + + If the supervisor and its consumers are successfully created, this function + returns `{:ok, pid}`, where `pid` is the PID of the supervisor. + """ + @spec start_link( + callback_module :: module, + consumer_group_name :: binary, + assigned_partitions :: [ + {topic_name :: binary, partition_id :: non_neg_integer} + ], + KafkaEx.GenConsumer.options + ) :: Elixir.Supervisor.on_start + def start_link(consumer_module, group_name, assignments, opts \\ []) do + start_link_result = Elixir.Supervisor.start_link( + __MODULE__, + {consumer_module, group_name, assignments, opts} + ) + case start_link_result do + {:ok, pid} -> + :ok = start_workers(pid, assignments, opts) + {:ok, pid} + + error -> + error + end + end + + @doc """ + Returns a list of child pids + + Intended to be used for operational and testing purposes + """ + @spec child_pids(pid | atom) :: [pid] + def child_pids(supervisor_pid) do + supervisor_pid + |> Supervisor.which_children + |> Enum.map(fn({_, pid, _, _}) -> pid end) + end + + @doc """ + Returns true if any child pids are alive + """ + @spec active?(Supervisor.supervisor) :: boolean + def active?(supervisor_pid) do + supervisor_pid + |> child_pids + |> Enum.any?(&Process.alive?/1) + end + + def init({consumer_module, group_name, _assignments, _opts}) do + children = [ + worker(KafkaEx.GenConsumer, [consumer_module, group_name]) + ] + + supervise(children, strategy: :simple_one_for_one) + end + + defp start_workers(pid, assignments, opts) do + Enum.each(assignments, fn ({topic, partition}) -> + case Elixir.Supervisor.start_child(pid, [topic, partition, opts]) do + {:ok, _child} -> nil + {:ok, _child, _info} -> nil + end + end) + + :ok + end +end diff --git a/lib/kafka_ex/protocol/heartbeat.ex b/lib/kafka_ex/protocol/heartbeat.ex index 4e8ca91c..6fb81590 100644 --- a/lib/kafka_ex/protocol/heartbeat.ex +++ b/lib/kafka_ex/protocol/heartbeat.ex @@ -3,6 +3,17 @@ defmodule KafkaEx.Protocol.Heartbeat do Implementation of the Kafka Hearbeat request and response APIs """ + defmodule Request do + @moduledoc false + defstruct group_name: nil, member_id: nil, generation_id: nil + + @type t :: %Request{ + group_name: binary, + member_id: binary, + generation_id: integer, + } + end + defmodule Response do @moduledoc false # We could just return the error code instead of having the struct, but this @@ -11,17 +22,16 @@ defmodule KafkaEx.Protocol.Heartbeat do @type t :: %Response{error_code: atom | integer} end - @spec create_request(integer, binary, binary, binary, integer) :: binary - def create_request(correlation_id, client_id, member_id, group_id, generation_id) do + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, request) do KafkaEx.Protocol.create_request(:heartbeat, correlation_id, client_id) <> - << byte_size(group_id) :: 16-signed, group_id :: binary, - generation_id :: 32-signed, - byte_size(member_id) :: 16-signed, member_id :: binary >> + << byte_size(request.group_name) :: 16-signed, request.group_name :: binary, + request.generation_id :: 32-signed, + byte_size(request.member_id) :: 16-signed, request.member_id :: binary >> end @spec parse_response(binary) :: Response.t def parse_response(<< _correlation_id :: 32-signed, error_code :: 16-signed >>) do %Response{error_code: KafkaEx.Protocol.error(error_code)} end - end diff --git a/lib/kafka_ex/protocol/join_group.ex b/lib/kafka_ex/protocol/join_group.ex index e9fa432d..222307d0 100644 --- a/lib/kafka_ex/protocol/join_group.ex +++ b/lib/kafka_ex/protocol/join_group.ex @@ -4,19 +4,17 @@ defmodule KafkaEx.Protocol.JoinGroup do @moduledoc """ Implementation of the Kafka JoinGroup request and response APIs """ + @protocol_type "consumer" @strategy_name "assign" @metadata_version 0 defmodule Request do @moduledoc false - defstruct correlation_id: nil, - client_id: nil, member_id: nil, + defstruct member_id: nil, group_name: nil, topics: nil, session_timeout: nil @type t :: %Request{ - correlation_id: integer, - client_id: binary, member_id: binary, group_name: binary, topics: [binary], @@ -29,19 +27,21 @@ defmodule KafkaEx.Protocol.JoinGroup do defstruct error_code: nil, generation_id: 0, leader_id: nil, member_id: nil, members: [] @type t :: %Response{error_code: atom | integer, generation_id: integer, leader_id: binary, member_id: binary, members: [binary]} + + def leader?(%__MODULE__{member_id: member_id, leader_id: leader_id}) do + member_id == leader_id + end end - @spec create_request(Request.t) :: binary - def create_request(join_group_req) do + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, %Request{} = join_group_req) do metadata = << @metadata_version :: 16-signed, length(join_group_req.topics) :: 32-signed, topic_data(join_group_req.topics) :: binary, 0 :: 32-signed >> - KafkaEx.Protocol.create_request( - :join_group, join_group_req.correlation_id, join_group_req.client_id - ) <> + KafkaEx.Protocol.create_request(:join_group, correlation_id, client_id) <> << byte_size(join_group_req.group_name) :: 16-signed, join_group_req.group_name :: binary, join_group_req.session_timeout :: 32-signed, byte_size(join_group_req.member_id) :: 16-signed, join_group_req.member_id :: binary, diff --git a/lib/kafka_ex/protocol/leave_group.ex b/lib/kafka_ex/protocol/leave_group.ex index 11576e81..6a4b9ba5 100644 --- a/lib/kafka_ex/protocol/leave_group.ex +++ b/lib/kafka_ex/protocol/leave_group.ex @@ -1,12 +1,27 @@ defmodule KafkaEx.Protocol.LeaveGroup do + defmodule Request do + @moduledoc false + defstruct group_name: nil, member_id: nil + + @type t :: %Request{ + group_name: binary, + member_id: binary, + } + end + defmodule Response do defstruct error_code: nil + + @type t :: %Response{ + error_code: atom | integer, + } end - def create_request(correlation_id, client_id, group_id, member_id) do + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, request) do KafkaEx.Protocol.create_request(:leave_group, correlation_id, client_id) <> - << byte_size(group_id) :: 16-signed, group_id :: binary, - byte_size(member_id) :: 16-signed, member_id :: binary >> + << byte_size(request.group_name) :: 16-signed, request.group_name :: binary, + byte_size(request.member_id) :: 16-signed, request.member_id :: binary >> end def parse_response(<< _correlation_id :: 32-signed, error_code :: 16-signed >>) do diff --git a/lib/kafka_ex/protocol/metadata.ex b/lib/kafka_ex/protocol/metadata.ex index efcc8bbd..61c566e2 100644 --- a/lib/kafka_ex/protocol/metadata.ex +++ b/lib/kafka_ex/protocol/metadata.ex @@ -41,6 +41,12 @@ defmodule KafkaEx.Protocol.Metadata do end end + def partitions_for_topic(metadata, topic) do + topic_metadata = Enum.find(metadata.topic_metadatas, &(&1.topic == topic)) + + Enum.map(topic_metadata.partition_metadatas, &(&1.partition_id)) + end + defp find_lead_broker(metadata_brokers, topic_metadata, brokers, partition) do case Enum.find(topic_metadata.partition_metadatas, &(partition == &1.partition_id)) do nil -> nil diff --git a/lib/kafka_ex/protocol/sync_group.ex b/lib/kafka_ex/protocol/sync_group.ex index efe5c06e..a94be427 100644 --- a/lib/kafka_ex/protocol/sync_group.ex +++ b/lib/kafka_ex/protocol/sync_group.ex @@ -4,6 +4,18 @@ defmodule KafkaEx.Protocol.SyncGroup do """ @member_assignment_version 0 + defmodule Request do + @moduledoc false + defstruct member_id: nil, group_name: nil, generation_id: nil, assignments: [] + + @type t :: %Request{ + member_id: binary, + group_name: binary, + generation_id: integer, + assignments: [{member :: binary, [{topic :: binary, partitions :: [integer]}]}], + } + end + defmodule Assignment do @moduledoc false defstruct topic: nil, partitions: [] @@ -16,23 +28,21 @@ defmodule KafkaEx.Protocol.SyncGroup do @type t :: %Response{error_code: atom | integer, assignments: [Assignment.t]} end - @spec create_request(integer, binary, binary, integer, binary, [{binary, [Assignment.t]}]) :: binary - def create_request(correlation_id, client_id, group_name, generation_id, member_id, assignments) do + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, %Request{} = request) do KafkaEx.Protocol.create_request(:sync_group, correlation_id, client_id) <> - << byte_size(group_name) :: 16-signed, group_name :: binary, - generation_id :: 32-signed, - byte_size(member_id) :: 16-signed, member_id :: binary, - length(assignments) :: 32-signed, group_assignment_data(assignments, "") :: binary + << byte_size(request.group_name) :: 16-signed, request.group_name :: binary, + request.generation_id :: 32-signed, + byte_size(request.member_id) :: 16-signed, request.member_id :: binary, + length(request.assignments) :: 32-signed, group_assignment_data(request.assignments, "") :: binary >> end @spec parse_response(binary) :: Response.t def parse_response(<< _correlation_id :: 32-signed, error_code :: 16-signed, - _member_assignment_len :: 32-signed, - @member_assignment_version :: 16-signed, - assignments_size :: 32-signed, rest :: binary >>) do - assignments = parse_assignments(assignments_size, rest, []) - %Response{error_code: KafkaEx.Protocol.error(error_code), assignments: assignments} + member_assignment_len :: 32-signed, + member_assignment :: size(member_assignment_len)-binary >>) do + %Response{error_code: KafkaEx.Protocol.error(error_code), assignments: parse_member_assignment(member_assignment)} end # Helper functions to create assignment data structure @@ -65,6 +75,11 @@ defmodule KafkaEx.Protocol.SyncGroup do # Helper functions to parse assignments + defp parse_member_assignment(<<>>), do: [] + defp parse_member_assignment(<< @member_assignment_version :: 16-signed, assignments_size :: 32-signed, rest :: binary >>) do + parse_assignments(assignments_size, rest, []) + end + defp parse_assignments(0, _rest, assignments), do: assignments defp parse_assignments(size, << topic_len :: 16-signed, topic :: size(topic_len)-binary, partition_len :: 32-signed, diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index aaaa9e07..f06bf238 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -4,6 +4,9 @@ defmodule KafkaEx.Server do """ alias KafkaEx.Protocol.ConsumerMetadata + alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest + alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest + alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest alias KafkaEx.Protocol.Metadata alias KafkaEx.Protocol.Metadata.Broker alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse @@ -12,6 +15,7 @@ defmodule KafkaEx.Server do alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest alias KafkaEx.Protocol.Produce alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest + alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest alias KafkaEx.Socket defmodule State do @@ -106,28 +110,28 @@ defmodule KafkaEx.Server do {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_join_group(topics :: [binary], session_timeout :: integer, state :: State.t) :: + @callback kafka_server_join_group(JoinGroupRequest.t, network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_sync_group(group_name :: binary, generation_id :: integer, member_id :: binary, assignments :: [binary] , state :: State.t) :: + @callback kafka_server_sync_group(SyncGroupRequest.t, network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_leave_group(group_name :: binary, member_id :: binary, state :: State.t) :: + @callback kafka_server_leave_group(LeaveGroupRequest.t, network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_heartbeat(group_name :: binary, generation_id :: integer, member_id :: binary, state :: State.t) :: + @callback kafka_server_heartbeat(HeartbeatRequest.t, network_timeout :: integer, state :: State.t) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout | :hibernate} | {:noreply, new_state} | @@ -220,20 +224,20 @@ defmodule KafkaEx.Server do kafka_server_metadata(topic, state) end - def handle_call({:join_group, topics, session_timeout}, _from, state) do - kafka_server_join_group(topics, session_timeout,state) + def handle_call({:join_group, request, network_timeout}, _from, state) do + kafka_server_join_group(request, network_timeout, state) end - def handle_call({:sync_group, group_name, generation_id, member_id, assignments}, _from, state) do - kafka_server_sync_group(group_name, generation_id, member_id, assignments, state) + def handle_call({:sync_group, request, network_timeout}, _from, state) do + kafka_server_sync_group(request, network_timeout, state) end - def handle_call({:leave_group, group_name, member_id}, _from, state) do - kafka_server_leave_group(group_name, member_id, state) + def handle_call({:leave_group, request, network_timeout}, _from, state) do + kafka_server_leave_group(request, network_timeout, state) end - def handle_call({:heartbeat, group_name, generation_id, member_id}, _from, state) do - kafka_server_heartbeat(group_name, generation_id, member_id, state) + def handle_call({:heartbeat, request, network_timeout}, _from, state) do + kafka_server_heartbeat(request, network_timeout, state) end def handle_info(:update_metadata, state) do @@ -399,8 +403,8 @@ defmodule KafkaEx.Server do end) end - defp config_sync_timeout do - Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout) + defp config_sync_timeout(timeout \\ nil) do + timeout || Application.get_env(:kafka_ex, :sync_timeout, @sync_timeout) end end end diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index a587bbdc..b6745793 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -5,8 +5,8 @@ defmodule KafkaEx.Server0P8P0 do # these functions aren't implemented for 0.8.0 @dialyzer [ - {:nowarn_function, kafka_server_heartbeat: 4}, - {:nowarn_function, kafka_server_sync_group: 5}, + {:nowarn_function, kafka_server_heartbeat: 3}, + {:nowarn_function, kafka_server_sync_group: 3}, {:nowarn_function, kafka_server_join_group: 3}, {:nowarn_function, kafka_server_leave_group: 3}, {:nowarn_function, kafka_server_update_consumer_metadata: 1}, @@ -62,9 +62,9 @@ defmodule KafkaEx.Server0P8P0 do def kafka_server_consumer_group(_state), do: raise "Consumer Group is not supported in 0.8.0 version of kafka" def kafka_server_consumer_group_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka" def kafka_server_join_group(_, _, _state), do: raise "Join Group is not supported in 0.8.0 version of kafka" - def kafka_server_sync_group(_, _, _, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" + def kafka_server_sync_group(_, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" - def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" + def kafka_server_heartbeat(_, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" def kafka_server_update_consumer_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka" defp fetch(fetch_request, state) do diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 59b8fac4..6d48643e 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -5,8 +5,8 @@ defmodule KafkaEx.Server0P8P2 do # these functions aren't implemented for 0.8.2 @dialyzer [ - {:nowarn_function, kafka_server_heartbeat: 4}, - {:nowarn_function, kafka_server_sync_group: 5}, + {:nowarn_function, kafka_server_heartbeat: 3}, + {:nowarn_function, kafka_server_sync_group: 3}, {:nowarn_function, kafka_server_join_group: 3}, {:nowarn_function, kafka_server_leave_group: 3} ] @@ -123,9 +123,9 @@ defmodule KafkaEx.Server0P8P2 do end def kafka_server_join_group(_, _, _state), do: raise "Join Group is not supported in 0.8.0 version of kafka" - def kafka_server_sync_group(_, _, _, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" + def kafka_server_sync_group(_, _, _state), do: raise "Sync Group is not supported in 0.8.0 version of kafka" def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" - def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" + def kafka_server_heartbeat(_, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" defp update_consumer_metadata(state), do: update_consumer_metadata(state, @retry_count, 0) defp update_consumer_metadata(state = %State{consumer_group: consumer_group}, 0, error_code) do diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index c9518871..332dc266 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -7,7 +7,6 @@ defmodule KafkaEx.Server0P9P0 do alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse alias KafkaEx.Protocol.Heartbeat alias KafkaEx.Protocol.JoinGroup - alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest alias KafkaEx.Protocol.LeaveGroup alias KafkaEx.Protocol.Metadata.Broker alias KafkaEx.Protocol.SyncGroup @@ -72,49 +71,42 @@ defmodule KafkaEx.Server0P9P0 do {:ok, state} end - def kafka_server_join_group(topics, session_timeout, state) do + def kafka_server_join_group(join_group_request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = JoinGroup.create_request( - %JoinGroupRequest{ - correlation_id: state.correlation_id, - client_id: @client_id, member_id: "", - group_name: state.consumer_group, - topics: topics, session_timeout: session_timeout - } - ) + request = JoinGroup.create_request(state.correlation_id, @client_id, join_group_request) response = broker - |> NetworkClient.send_sync_request(request, config_sync_timeout()) + |> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout)) |> JoinGroup.parse_response {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - def kafka_server_sync_group(group_name, generation_id, member_id, assignments, state) do + def kafka_server_sync_group(sync_group_request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = SyncGroup.create_request(state.correlation_id, @client_id, group_name, generation_id, member_id, assignments) + request = SyncGroup.create_request(state.correlation_id, @client_id, sync_group_request) response = broker - |> NetworkClient.send_sync_request(request, config_sync_timeout()) + |> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout)) |> SyncGroup.parse_response {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - def kafka_server_leave_group(group_name, member_id, state) do + def kafka_server_leave_group(request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = LeaveGroup.create_request(state.correlation_id, @client_id, group_name, member_id) + request = LeaveGroup.create_request(state.correlation_id, @client_id, request) response = broker - |> NetworkClient.send_sync_request(request, config_sync_timeout()) + |> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout)) |> LeaveGroup.parse_response {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - def kafka_server_heartbeat(group_name, generation_id, member_id, state) do + def kafka_server_heartbeat(request, network_timeout, state) do true = consumer_group?(state) {broker, state} = broker_for_consumer_group_with_update(state) - request = Heartbeat.create_request(state.correlation_id, @client_id, member_id, group_name, generation_id) + request = Heartbeat.create_request(state.correlation_id, @client_id, request) response = broker - |> NetworkClient.send_sync_request(request, config_sync_timeout()) + |> NetworkClient.send_sync_request(request, config_sync_timeout(network_timeout)) |> Heartbeat.parse_response {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end diff --git a/scripts/ci_tests.sh b/scripts/ci_tests.sh index ca190817..a853c67b 100755 --- a/scripts/ci_tests.sh +++ b/scripts/ci_tests.sh @@ -11,7 +11,7 @@ export MIX_ENV=test if [ "$COVERALLS" = true ] then echo "Coveralls will be reported" - TEST_COMMAND=coveralls + TEST_COMMAND=coveralls.travis else TEST_COMMAND=test fi diff --git a/scripts/docker_up.sh b/scripts/docker_up.sh index ef22a7d5..953fb0e2 100755 --- a/scripts/docker_up.sh +++ b/scripts/docker_up.sh @@ -46,3 +46,6 @@ do done docker-compose up -d + +# create topics needed for testing +docker-compose exec kafka3 /bin/bash -c "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 KAFKA_PORT=9094 KAFKA_CREATE_TOPICS=consumer_group_implementation_test:4:2 create-topics.sh" diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs new file mode 100644 index 00000000..3855ab33 --- /dev/null +++ b/test/integration/consumer_group_implementation_test.exs @@ -0,0 +1,280 @@ +defmodule KafkaEx.ConsumerGroupImplementationTest do + use ExUnit.Case + + alias KafkaEx.ConsumerGroup + alias KafkaEx.GenConsumer + import TestHelper + + require Logger + + @moduletag :consumer_group + + # note this topic is created by docker_up.sh + @topic_name "consumer_group_implementation_test" + @partition_count 4 + @consumer_group_name "consumer_group_implementation" + + defmodule TestPartitioner do + # wraps an Agent that we use to capture the fact that the partitioner was + # called - normally one would not really need to do this + + alias KafkaEx.ConsumerGroup.PartitionAssignment + + def start_link do + Agent.start_link(fn -> 0 end, name: __MODULE__) + end + + def calls do + Agent.get(__MODULE__, &(&1)) + end + + def assign_partitions(members, partitions) do + Logger.debug(fn -> + "Consumer #{inspect self()} got " <> + "partition assignment: #{inspect members} #{inspect partitions}" + end) + + Agent.update(__MODULE__, &(&1 + 1)) + + PartitionAssignment.round_robin(members, partitions) + end + end + + defmodule TestConsumer do + # test consumer - keeps track of messages handled + + use KafkaEx.GenConsumer + + alias KafkaEx.GenConsumer + + def last_message_set(pid) do + List.last(GenConsumer.call(pid, :message_sets)) || [] + end + + def init(topic, partition) do + Logger.debug(fn -> + "Initialized consumer #{inspect self()} for #{topic}:#{partition}" + end) + {:ok, %{message_sets: []}} + end + + def handle_call(:message_sets, _from, state) do + {:reply, state.message_sets, state} + end + + def handle_message_set(message_set, state) do + Logger.debug(fn -> + "Consumer #{inspect self()} handled message set #{inspect message_set}" + end) + { + :async_commit, + %{state | message_sets: state.message_sets ++ [message_set]} + } + end + end + + def produce(message, partition) do + KafkaEx.produce(@topic_name, partition, message) + message + end + + def correct_last_message?(nil, _, _), do: false + def correct_last_message?([], _, _), do: false + def correct_last_message?(message_set, expected_message, expected_offset) do + Logger.debug(fn -> + "Got message set: #{inspect message_set} " <> + "expecting '#{expected_message}' @ offset #{expected_offset}" + end) + message = List.last(message_set) + message.value == expected_message && message.offset == expected_offset + end + + def sync_stop(pid) when is_pid(pid) do + wait_for(fn -> + if Process.alive?(pid) do + Process.exit(pid, :normal) + end + !Process.alive?(pid) + end) + end + + setup do + {:ok, _} = TestPartitioner.start_link + {:ok, consumer_group_pid1} = ConsumerGroup.start_link( + TestConsumer, + @consumer_group_name, + [@topic_name], + heartbeat_interval: 100, + partition_assignment_callback: &TestPartitioner.assign_partitions/2 + ) + {:ok, consumer_group_pid2} = ConsumerGroup.start_link( + TestConsumer, + @consumer_group_name, + [@topic_name], + heartbeat_interval: 100, + partition_assignment_callback: &TestPartitioner.assign_partitions/2 + ) + + # wait for both consumer groups to join + wait_for(fn -> + ConsumerGroup.active?(consumer_group_pid1) && + ConsumerGroup.active?(consumer_group_pid2) + end) + + on_exit fn -> + sync_stop(consumer_group_pid1) + sync_stop(consumer_group_pid2) + end + + { + :ok, + consumer_group_pid1: consumer_group_pid1, + consumer_group_pid2: consumer_group_pid2 + } + end + + test "basic startup, consume, and shutdown test", context do + assert TestPartitioner.calls > 0 + + generation_id1 = ConsumerGroup.generation_id(context[:consumer_group_pid1]) + generation_id2 = ConsumerGroup.generation_id(context[:consumer_group_pid2]) + assert generation_id1 == generation_id2 + + member1 = ConsumerGroup.member_id(context[:consumer_group_pid1]) + member2 = ConsumerGroup.member_id(context[:consumer_group_pid2]) + assert member1 != member2 + + leader1 = ConsumerGroup.leader_id(context[:consumer_group_pid1]) + leader2 = ConsumerGroup.leader_id(context[:consumer_group_pid2]) + assert leader1 == leader2 + + cond do + leader1 == member1 -> + assert ConsumerGroup.leader?(context[:consumer_group_pid1]) + refute ConsumerGroup.leader?(context[:consumer_group_pid2]) + leader1 == member2 -> + refute ConsumerGroup.leader?(context[:consumer_group_pid1]) + assert ConsumerGroup.leader?(context[:consumer_group_pid2]) + true -> + raise "Neither member is the leader" + end + + assignments1 = ConsumerGroup.assignments(context[:consumer_group_pid1]) + assignments2 = ConsumerGroup.assignments(context[:consumer_group_pid2]) + assert 2 == length(assignments1) + assert 2 == length(assignments2) + assert MapSet.disjoint?( + Enum.into(assignments1, MapSet.new), + Enum.into(assignments2, MapSet.new) + ) + + consumer1_pid = + ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + consumer1_assignments = consumer1_pid + |> GenConsumer.Supervisor.child_pids + |> Enum.map(&GenConsumer.partition/1) + |> Enum.sort + + assert consumer1_assignments == Enum.sort(assignments1) + + consumer2_pid = + ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid2]) + consumer2_assignments = consumer2_pid + |> GenConsumer.Supervisor.child_pids + |> Enum.map(&GenConsumer.partition/1) + |> Enum.sort + + assert consumer2_assignments == Enum.sort(assignments2) + + # all of the partitions should be accounted for + assert @partition_count == length(Enum.uniq(assignments1 ++ assignments2)) + + partition_range = 0..(@partition_count - 1) + + starting_offsets = partition_range + |> Enum.map(fn(px) -> {px, latest_offset_number(@topic_name, px)} end) + |> Enum.into(%{}) + + messages = partition_range + |> Enum.map(fn(px) -> + offset = Map.get(starting_offsets, px) + {px, produce("M #{px} #{offset}", px)} + end) + |> Enum.into(%{}) + + consumers = Map.merge( + ConsumerGroup.partition_consumer_map(context[:consumer_group_pid1]), + ConsumerGroup.partition_consumer_map(context[:consumer_group_pid2]) + ) + + # we actually consume the messages + last_offsets = partition_range + |> Enum.map(fn(px) -> + consumer_pid = Map.get(consumers, {@topic_name, px}) + wait_for(fn -> + message_set = TestConsumer.last_message_set(consumer_pid) + correct_last_message?(message_set, messages[px], starting_offsets[px]) + end) + last_message = List.last(TestConsumer.last_message_set(consumer_pid)) + {px, last_message.offset} + end) + |> Enum.into(%{}) + + # stop the supervisors + Process.unlink(context[:consumer_group_pid1]) + sync_stop(context[:consumer_group_pid1]) + Process.unlink(context[:consumer_group_pid2]) + sync_stop(context[:consumer_group_pid2]) + + # offsets should be committed on exit + for px <- partition_range do + wait_for(fn -> + ending_offset = + latest_consumer_offset_number(@topic_name, px, @consumer_group_name) + last_offset = Map.get(last_offsets, px) + ending_offset == last_offset + 1 + end) + end + end + + test "starting/stopping consumers rebalances assignments", context do + Process.unlink(context[:consumer_group_pid1]) + sync_stop(context[:consumer_group_pid1]) + + # the other cg should get assigned all of the partitions + wait_for(fn -> + @partition_count == + length(ConsumerGroup.assignments(context[:consumer_group_pid2])) + end) + + # and become the leader + assert ConsumerGroup.leader?(context[:consumer_group_pid2]) + + {:ok, consumer_group_pid3} = ConsumerGroup.start_link( + TestConsumer, + @consumer_group_name, + [@topic_name], + heartbeat_interval: 100, + partition_assignment_callback: &TestPartitioner.assign_partitions/2 + ) + + # the new worker should get assigned some partitions + wait_for(fn -> + ConsumerGroup.active?(consumer_group_pid3) + end) + + Process.unlink(context[:consumer_group_pid2]) + sync_stop(context[:consumer_group_pid2]) + + # now the new cg should get all of the partitions + wait_for(fn -> + @partition_count == length(ConsumerGroup.assignments(consumer_group_pid3)) + end) + + # and become the leader + assert ConsumerGroup.leader?(consumer_group_pid3) + + Process.unlink(consumer_group_pid3) + sync_stop(consumer_group_pid3) + end +end diff --git a/test/integration/server0_p_9_p_0_test.exs b/test/integration/server0_p_9_p_0_test.exs index aa32ac27..265cbf50 100644 --- a/test/integration/server0_p_9_p_0_test.exs +++ b/test/integration/server0_p_9_p_0_test.exs @@ -2,14 +2,25 @@ defmodule KafkaEx.Server0P9P0.Test do use ExUnit.Case import TestHelper + alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest + alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest + alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest + alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest + @moduletag :server_0_p_9_p_0 test "can join a consumer group" do random_group = generate_random_string() KafkaEx.create_worker(:join_group, [uris: uris(), consumer_group: random_group]) - # No wrapper in kafka_ex yet as long as the 0.9 functionality is in progress - answer = GenServer.call(:join_group, {:join_group, ["foo", "bar"], 6000}) + request = %JoinGroupRequest{ + group_name: random_group, + member_id: "", + topics: ["foo", "bar"], + session_timeout: 6000, + } + + answer = KafkaEx.join_group(request, worker_name: :join_group) assert answer.error_code == :no_error assert answer.generation_id == 1 # We should be the leader @@ -21,7 +32,15 @@ defmodule KafkaEx.Server0P9P0.Test do # how this pans out eventually as we add more and more 0.9 consumer group code random_group = generate_random_string() KafkaEx.create_worker(:sync_group, [uris: uris(), consumer_group: random_group]) - answer = GenServer.call(:sync_group, {:join_group, ["foo", "bar"], 6000}) + + request = %JoinGroupRequest{ + group_name: random_group, + member_id: "", + topics: ["foo", "bar"], + session_timeout: 6000, + } + + answer = KafkaEx.join_group(request, worker_name: :sync_group) assert answer.error_code == :no_error member_id = answer.member_id @@ -29,7 +48,14 @@ defmodule KafkaEx.Server0P9P0.Test do my_assignments = [{"foo", [1]}, {"bar", [2]}] assignments = [{member_id, my_assignments}] - answer = GenServer.call(:sync_group, {:sync_group, random_group, generation_id, member_id, assignments}) + request = %SyncGroupRequest{ + group_name: random_group, + member_id: member_id, + generation_id: generation_id, + assignments: assignments, + } + + answer = KafkaEx.sync_group(request, worker_name: :sync_group) assert answer.error_code == :no_error # Parsing happens to return the assignments reversed, which is fine as there's no # ordering. Just reverse what we expect to match @@ -41,12 +67,25 @@ defmodule KafkaEx.Server0P9P0.Test do # how this pans out eventually as we add more and more 0.9 consumer group code random_group = generate_random_string() KafkaEx.create_worker(:leave_group, [uris: uris(), consumer_group: random_group]) - answer = GenServer.call(:leave_group, {:join_group, ["foo", "bar"], 6000}) + + request = %JoinGroupRequest{ + group_name: random_group, + member_id: "", + topics: ["foo", "bar"], + session_timeout: 6000, + } + + answer = KafkaEx.join_group(request, worker_name: :leave_group) assert answer.error_code == :no_error member_id = answer.member_id - answer = GenServer.call(:leave_group, {:leave_group, random_group, member_id}) + request = %LeaveGroupRequest{ + group_name: random_group, + member_id: member_id, + } + + answer = KafkaEx.leave_group(request, worker_name: :leave_group) assert answer.error_code == :no_error end @@ -54,7 +93,15 @@ defmodule KafkaEx.Server0P9P0.Test do # See sync test. Removing repetition in the next iteration random_group = generate_random_string() KafkaEx.create_worker(:heartbeat, [uris: uris(), consumer_group: random_group]) - answer = GenServer.call(:heartbeat, {:join_group, ["foo", "bar"], 6000}) + + request = %JoinGroupRequest{ + group_name: random_group, + member_id: "", + topics: ["foo", "bar"], + session_timeout: 6000, + } + + answer = KafkaEx.join_group(request, worker_name: :heartbeat) assert answer.error_code == :no_error member_id = answer.member_id @@ -62,10 +109,23 @@ defmodule KafkaEx.Server0P9P0.Test do my_assignments = [{"foo", [1]}, {"bar", [2]}] assignments = [{member_id, my_assignments}] - answer = GenServer.call(:heartbeat, {:sync_group, random_group, generation_id, member_id, assignments}) + request = %SyncGroupRequest{ + group_name: random_group, + member_id: member_id, + generation_id: generation_id, + assignments: assignments, + } + + answer = KafkaEx.sync_group(request, worker_name: :heartbeat) assert answer.error_code == :no_error - answer = GenServer.call(:heartbeat, {:heartbeat, random_group, generation_id, member_id}) + request = %HeartbeatRequest{ + group_name: random_group, + member_id: member_id, + generation_id: generation_id, + } + + answer = KafkaEx.heartbeat(request, worker_name: :heartbeat) assert answer.error_code == :no_error end end diff --git a/test/kafka_ex/consumer_group/partition_assignment_test.exs b/test/kafka_ex/consumer_group/partition_assignment_test.exs new file mode 100644 index 00000000..b22dbfa9 --- /dev/null +++ b/test/kafka_ex/consumer_group/partition_assignment_test.exs @@ -0,0 +1,18 @@ +defmodule KafkaEx.ConsumerGroup.PartitionAssignmentTest do + use ExUnit.Case + + alias KafkaEx.ConsumerGroup.PartitionAssignment + + test "round robin partition assignment works" do + topic = "topic" + members = ["member1", "member2"] + partitions = [{topic, 0}, {topic, 1}, {topic, 2}] + + assignments = PartitionAssignment.round_robin(members, partitions) + expected = %{ + "member1" => [{topic, 0}, {topic, 2}], + "member2" => [{topic, 1}] + } + assert expected == assignments + end +end diff --git a/test/protocol/heartbeat_test.exs b/test/protocol/heartbeat_test.exs index efeb16a2..7ad16aaa 100644 --- a/test/protocol/heartbeat_test.exs +++ b/test/protocol/heartbeat_test.exs @@ -8,7 +8,13 @@ defmodule KafkaEx.Protocol.Heartbeat.Test do 1234 :: 32, # GenerationId 9 :: 16, "member_id" :: binary # MemberId >> - request = KafkaEx.Protocol.Heartbeat.create_request(42, "client_id", "member_id", "group_id", 1234) + heartbeat_request = %KafkaEx.Protocol.Heartbeat.Request{ + group_name: "group_id", + member_id: "member_id", + generation_id: 1234, + } + + request = KafkaEx.Protocol.Heartbeat.create_request(42, "client_id", heartbeat_request) assert request == good_request end diff --git a/test/protocol/join_group_test.exs b/test/protocol/join_group_test.exs index d8fb87fd..f67f0495 100644 --- a/test/protocol/join_group_test.exs +++ b/test/protocol/join_group_test.exs @@ -15,10 +15,8 @@ defmodule KafkaEx.Protocol.JoinGroup.Test do 0 :: 16, # v0 2 :: 32, 9 :: 16, "topic_one" :: binary, 9 :: 16, "topic_two" :: binary, # Topics array 0 :: 32 >> # UserData - request = JoinGroup.create_request( + request = JoinGroup.create_request(42, "client_id", %JoinGroup.Request{ - correlation_id: 42, - client_id: "client_id", member_id: "member_id", group_name: "group", topics: ["topic_one", "topic_two"], diff --git a/test/protocol/leave_group_test.exs b/test/protocol/leave_group_test.exs index 05e0863a..176f5a20 100644 --- a/test/protocol/leave_group_test.exs +++ b/test/protocol/leave_group_test.exs @@ -8,7 +8,12 @@ defmodule KafkaEx.Protocol.LeaveGroup.Test do byte_size("member") :: 16, "member" :: binary, # MemberId >> - request = KafkaEx.Protocol.LeaveGroup.create_request(42, "client_id", "group", "member") + leave_request = %KafkaEx.Protocol.LeaveGroup.Request{ + group_name: "group", + member_id: "member", + } + + request = KafkaEx.Protocol.LeaveGroup.create_request(42, "client_id", leave_request) assert request == good_request end diff --git a/test/protocol/sync_group_test.exs b/test/protocol/sync_group_test.exs index bb3f73c3..aadccddf 100644 --- a/test/protocol/sync_group_test.exs +++ b/test/protocol/sync_group_test.exs @@ -28,8 +28,14 @@ defmodule KafkaEx.Protocol.SyncGroup.Test do byte_size(second_assignments) :: 32, second_assignments :: binary >> - request = KafkaEx.Protocol.SyncGroup.create_request(42, "client_id", "group", 1, "member_one", - [{"member_one", [{"topic1", [1, 3, 5]}]}, {"member_two", [{"topic1", [2, 4, 6]}]}]) + sync_request = %KafkaEx.Protocol.SyncGroup.Request{ + group_name: "group", + member_id: "member_one", + generation_id: 1, + assignments: [{"member_one", [{"topic1", [1, 3, 5]}]}, {"member_two", [{"topic1", [2, 4, 6]}]}], + } + + request = KafkaEx.Protocol.SyncGroup.create_request(42, "client_id", sync_request) assert request == good_request end @@ -44,4 +50,14 @@ defmodule KafkaEx.Protocol.SyncGroup.Test do assignments: [{"topic1", [5, 3, 1]}]} assert KafkaEx.Protocol.SyncGroup.parse_response(response) == expected_response end + + test "parse empty assignments correctly" do + response = << + 42 :: 32, # CorrelationId + 0 :: 16, # ErrorCode + 0 :: 32, # MemberAssignment (empty) + >> + expected_response = %KafkaEx.Protocol.SyncGroup.Response{error_code: :no_error, assignments: []} + assert KafkaEx.Protocol.SyncGroup.parse_response(response) == expected_response + end end diff --git a/test/test_helper.exs b/test/test_helper.exs index 41c3c3fc..179c499d 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -44,7 +44,7 @@ defmodule TestHelper do {x, {a,b,c + 60}} end - def latest_offset_number(topic, partition_id, worker \\ KafkaEx.Server) do + def latest_offset_number(topic, partition_id, worker \\ :kafka_ex) do offset = KafkaEx.latest_offset(topic, partition_id, worker) |> first_partition_offset @@ -54,7 +54,7 @@ defmodule TestHelper do def latest_consumer_offset_number(topic, partition, consumer_group, - worker \\ KafkaEx.Server) do + worker \\ :kafka_ex) do request = %KafkaEx.Protocol.OffsetFetch.Request{topic: topic, partition: partition, consumer_group: consumer_group}