diff --git a/README.md b/README.md index 77b3f9da..e86c6b62 100644 --- a/README.md +++ b/README.md @@ -219,21 +219,19 @@ iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is th ### Stream kafka logs -**NOTE** You must pass `auto_commit: false` in the options for `stream/3` when using Kafka < 0.8.2 or when using `:no_consumer_group`. +See the `KafkaEx.stream/3` documentation for details on streaming. ```elixir -iex> KafkaEx.create_worker(:stream, [uris: [{"localhost", 9092}]]) -{:ok, #PID<0.196.0>} -iex> KafkaEx.produce("foo", 0, "hey", worker_name: :stream) +iex> KafkaEx.produce("foo", 0, "hey") :ok -iex> KafkaEx.produce("foo", 0, "hi", worker_name: :stream) +iex> KafkaEx.produce("foo", 0, "hi") :ok iex> KafkaEx.stream("foo", 0, offset: 0) |> Enum.take(2) [%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"}, %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}] ``` -As mentioned, for Kafka < 0.8.2 the `stream/3` requires `autocommit: false` +For Kafka < 0.8.2 the `stream/3` requires `autocommit: false` ```elixir iex> KafkaEx.stream("foo", 0, offset: 0, auto_commit: false) |> Enum.take(2) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 5edfe56c..58140ec3 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -15,6 +15,7 @@ defmodule KafkaEx do alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest alias KafkaEx.Protocol.Produce.Message alias KafkaEx.Server + alias KafkaEx.Stream @type uri() :: [{binary|char_list, number}] @type worker_init :: [worker_setting] @@ -255,62 +256,160 @@ defmodule KafkaEx do produce(produce_request, opts) end - @doc """ - Returns a stream that consumes fetched messages. - This puts the specified worker in streaming mode and blocks the worker indefinitely. - The handler is a normal GenEvent handler so you can supply a custom handler, otherwise a default handler is used. + @doc ~S""" + Returns a streamable struct that may be used for consuming messages. - This function should be used with care as the queue is unbounded and can cause OOM. + The returned struct is compatible with the `Stream` and `Enum` modules. Some + important usage notes follow; see below for a detailed list of options. - Optional arguments(KeywordList) - - worker_name: the worker we want to run this metadata request through, when none is provided the default worker `:kafka_ex` is used - - offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to. For Kafka < 0.8.2 you should explicitly specify this. - - handler: the handler we want to handle the streaming events, when none is provided the default KafkaEx.Handler is used - - handler_init: initial state for the handler - leave the default value [] when using the default handler - - auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`. + ```elixir + iex> KafkaEx.produce("foo", 0, "hey") + :ok + iex> KafkaEx.produce("foo", 0, "hi") + :ok + iex> stream = KafkaEx.stream("foo", 0) + %KafkaEx.Stream{...} + iex> Enum.take(stream, 2) + [%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1784030606, key: "", + offset: 0, value: "hey"}, + %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3776653906, key: "", + offset: 1, value: "hi"}] + iex> stream |> Stream.map(fn(msg) -> IO.puts(msg.value) end) |> Stream.run + "hey" + "hi" + # NOTE this will block! See below. + ``` - ## Example + ## Reusing streams - ```elixir - iex> KafkaEx.create_worker(:stream, [{"localhost", 9092}]) - {:ok, #PID<0.196.0>} - iex> KafkaEx.produce("foo", 0, "hey", worker_name: :stream) - iex> KafkaEx.produce("foo", 0, "hi", worker_name: :stream) - iex> KafkaEx.stream("foo", 0) |> Enum.take(2) - [%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"}, - %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}] + Reusing the same `KafkaEx.Stream` struct results in consuming the same + messages multiple times. This is by design and mirrors the functionality of + `File.stream!/3`. If you want to reuse the same stream struct, update its + `:offset` before reuse. + + ``` + iex> stream = KafkaEx.stream("foo", 0) + iex> [m1, m2] = Enum.take(stream, 2) + iex> [m1, m2] = Enum.take(stream, 2) # these will be the same messages + iex> stream = %{stream | fetch_request: %{stream.fetch_request | offset: m2.offset + 1}} + iex> [m3, m4] = Enum.take(stream, 2) # new messages + ``` + + ## Streams block at log end + + By default, the stream consumes indefinitely and will block at log end until + new messages are available. Use the `no_wait_at_logend: true` option to have + the stream halt when no more messages are available. This mirrors the + command line arguments of + [SimpleConsumerShell](https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-SimpleConsumerShell). + + Note that this means that fetches will return up to as many messages + as are immediately available in the partition, regardless of arguments. + + ``` + iex> Enum.map(1..3, fn(ix) -> KafkaEx.produce("bar", 0, "Msg #{ix}") end) + iex> stream = KafkaEx.stream("bar", 0, no_wait_at_logend: true, offset: 0) + iex> Enum.map(stream, fn(m) -> m.value end) # does not block + ["Msg 1", "Msg 2", "Msg 3"] + iex> stream |> Stream.map(fn(m) -> m.value end) |> Enum.take(10) + # only 3 messages are available + ["Msg 1", "Msg 2", "Msg 3"] + ``` + + ## Consumer group and auto commit + + If you pass a value for the `consumer_group` option and true for + `auto_commit`, the offset of the last message consumed will be committed to + the broker during each cycle. + + For example, suppose we start at the beginning of a partition with millions + of messages and the `max_bytes` setting is such that each `fetch` request + gets 25 messages. In this setting, we will (roughly) be committing offsets + 25, 50, 75, etc. + + Note that offsets are committed immediately after messages are retrieved + and before you know if you have succesfully consumed them. It is + therefore possible that you could miss messages if your consumer crashes in + the middle of consuming a batch, effectively losing the guarantee of + at-least-once delivery. If you need this guarantee, we recommend that you + construct a GenServer-based consumer module and manage your commits manually. + + ``` + iex> Enum.map(1..10, fn(ix) -> KafkaEx.produce("baz", 0, "Msg #{ix}") end) + iex> stream = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true) + iex> stream |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end) + ["Msg 1", "Msg 2"] + iex> stream |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end) + ["Msg 1", "Msg 2"] # same values + iex> stream2 = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true) + iex> stream2 |> Enum.take(1) |> Enum.map(fn(msg) -> msg.value end) + ["Msg 3"] # stream2 got the next available offset ``` + + ## Options + + `KafkaEx.stream/3` accepts a keyword list of options for the third argument. + + - `no_wait_at_logend` (boolean): Set this to true to halt the stream when + there are no more messages available. Defaults to false, i.e., the stream + blocks to wait for new messages. + + - `worker_name` (term): The KafkaEx worker to use for communication with the + brokers. Defaults to `:kafka_ex` (the default worker). + + - `consumer_group` (string): Name of the consumer group used for the initial + offset fetch and automatic offset commit (if `auto_commit` is true). Omit + this value or use `:no_consumer_group` to not use a consumer group (default). + Consumer groups are not compatible with Kafka < 0.8.2. + + - `offset` (integer): The offset from which to start fetching. By default, + this is the last available offset of the partition when no consumer group is + specified. When a consumer group is specified, the next message after the + last committed offset is used. For Kafka < 0.8.2 you must explicitly specify + an offset. + + - `auto_commit` (boolean): If true, the stream automatically commits offsets + of fetched messages. See discussion above. """ - @spec stream(binary, integer, Keyword.t) :: GenEvent.Stream.t + @spec stream(binary, integer, Keyword.t) :: KafkaEx.Stream.t def stream(topic, partition, opts \\ []) do - worker_name = Keyword.get(opts, :worker_name, Config.default_worker) - supplied_offset = Keyword.get(opts, :offset) - handler = Keyword.get(opts, :handler, KafkaEx.Handler) - handler_init = Keyword.get(opts, :handler_init, []) - auto_commit = Keyword.get(opts, :auto_commit, true) - wait_time = Keyword.get(opts, :wait_time, @wait_time) - min_bytes = Keyword.get(opts, :min_bytes, @min_bytes) + auto_commit = Keyword.get(opts, :auto_commit, true) + consumer_group = Keyword.get(opts, :consumer_group) max_bytes = Keyword.get(opts, :max_bytes, @max_bytes) + min_bytes = Keyword.get(opts, :min_bytes, @min_bytes) + supplied_offset = Keyword.get(opts, :offset) + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + no_wait_at_logend = Keyword.get(opts, :no_wait_at_logend, false) + wait_time = Keyword.get(opts, :wait_time, @wait_time) - event_stream = Server.call(worker_name, {:create_stream, handler, handler_init}) - retrieved_offset = current_offset(supplied_offset, partition, topic, worker_name) - - send(worker_name, { - :start_streaming, - %FetchRequest{ - auto_commit: auto_commit, - topic: topic, partition: partition, - offset: retrieved_offset, wait_time: wait_time, - min_bytes: min_bytes, max_bytes: max_bytes - } - }) - event_stream - end + retrieved_offset = + if consumer_group && !supplied_offset do + request = %OffsetFetchRequest{ + topic: topic, + partition: partition, + consumer_group: consumer_group + } + + fetched_offset = worker_name + |> KafkaEx.offset_fetch(request) + |> KafkaEx.Protocol.OffsetFetch.Response.last_offset + + fetched_offset + 1 + else + current_offset(supplied_offset, partition, topic, worker_name) + end - @spec stop_streaming(Keyword.t) :: :stop_streaming - def stop_streaming(opts \\ []) do - worker_name = Keyword.get(opts, :worker_name, Config.default_worker) - send(worker_name, :stop_streaming) + fetch_request = %FetchRequest{ + auto_commit: auto_commit, + topic: topic, partition: partition, + offset: retrieved_offset, wait_time: wait_time, + min_bytes: min_bytes, max_bytes: max_bytes + } + + %Stream{ + worker_name: worker_name, fetch_request: fetch_request, + consumer_group: consumer_group, no_wait_at_logend: no_wait_at_logend + } end defp build_worker_options(worker_init) do diff --git a/lib/kafka_ex/handler.ex b/lib/kafka_ex/handler.ex deleted file mode 100644 index 1a819865..00000000 --- a/lib/kafka_ex/handler.ex +++ /dev/null @@ -1,18 +0,0 @@ -defmodule KafkaEx.Handler do - @moduledoc """ - Default GenEvent handler for KafkaEx.stream - - Received message sets are accumulated in the GenEvent state and can - be retrieved and flushed via the `:messages` call. - """ - - use GenEvent - - def handle_event(message_set, message_sets) do - {:ok, [message_set|message_sets]} - end - - def handle_call(:messages, message_sets) do - {:ok, Enum.reverse(message_sets), []} - end -end diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index ee8f6170..77ea0214 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -25,6 +25,12 @@ defmodule KafkaEx.Protocol.Fetch do @moduledoc false defstruct topic: nil, partitions: [] @type t :: %Response{topic: binary, partitions: list} + + @spec partition_messages(list, binary, integer) :: map + def partition_messages(responses, topic, partition) do + response = Enum.find(responses, &(&1.topic == topic)) || %Response{} + Enum.find(response.partitions, &(&1.partition == partition)) + end end defmodule Message do diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index a5632836..aaaa9e07 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -21,7 +21,6 @@ defmodule KafkaEx.Server do metadata: %Metadata.Response{}, brokers: [], event_pid: nil, - stream_timer: nil, consumer_metadata: %ConsumerMetadata.Response{}, correlation_id: 0, consumer_group: nil, @@ -36,7 +35,6 @@ defmodule KafkaEx.Server do metadata: Metadata.Response.t, brokers: [Broker.t], event_pid: nil | pid, - stream_timer: reference, consumer_metadata: ConsumerMetadata.Response.t, correlation_id: integer, metadata_update_interval: nil | integer, @@ -136,21 +134,6 @@ defmodule KafkaEx.Server do {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_create_stream(handler :: term, handler_init :: term, state :: State.t) :: - {:reply, reply, new_state} | - {:reply, reply, new_state, timeout | :hibernate} | - {:noreply, new_state} | - {:noreply, new_state, timeout | :hibernate} | - {:stop, reason, reply, new_state} | - {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_start_streaming(fetch_request :: FetchRequest.t, state :: State.t) :: - {:noreply, new_state} | - {:noreply, new_state, timeout | :hibernate} | - {:stop, reason :: term, new_state} when new_state: term - @callback kafka_server_stop_streaming(state :: State.t) :: - {:noreply, new_state} | - {:noreply, new_state, timeout | :hibernate} | - {:stop, reason :: term, new_state} when new_state: term @callback kafka_server_update_metadata(state :: State.t) :: {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | @@ -253,24 +236,6 @@ defmodule KafkaEx.Server do kafka_server_heartbeat(group_name, generation_id, member_id, state) end - def handle_call({:create_stream, handler, handler_init}, _from, state) do - kafka_server_create_stream(handler, handler_init, state) - end - - def handle_info({:start_streaming, fetch_request}, state) do - kafka_server_start_streaming(fetch_request, state) - end - - def handle_info(:stop_streaming, state) do - {:noreply, state} = kafka_server_stop_streaming(state) - state = case state.stream_timer do - nil -> state - ref -> Process.cancel_timer(ref) - %{state | stream_timer: nil} - end - {:noreply, state} - end - def handle_info(:update_metadata, state) do kafka_server_update_metadata(state) end @@ -360,25 +325,6 @@ defmodule KafkaEx.Server do {:reply, metadata, updated_state} end - def kafka_server_create_stream(handler, handler_init, state) do - new_state = if state.event_pid && Process.alive?(state.event_pid) do - Logger.log(:warn, "'#{state.worker_name}' already streaming handler '#{handler}'") - state - else - {:ok, event_pid} = GenEvent.start_link - updated_state = %{state | event_pid: event_pid} - :ok = GenEvent.add_handler(updated_state.event_pid, handler, handler_init) - updated_state - end - {:reply, GenEvent.stream(new_state.event_pid), new_state} - end - - def kafka_server_stop_streaming(state) do - Logger.log(:debug, "Stopped worker #{inspect state.worker_name} from streaming") - GenEvent.stop(state.event_pid) - {:noreply, %{state | event_pid: nil}} - end - def kafka_server_update_metadata(state) do {:noreply, update_metadata(state)} end @@ -419,8 +365,7 @@ defmodule KafkaEx.Server do defoverridable [ kafka_server_produce: 2, kafka_server_offset: 4, - kafka_server_metadata: 2, kafka_server_create_stream: 3, - kafka_server_stop_streaming: 1, kafka_server_update_metadata: 1, + kafka_server_metadata: 2, kafka_server_update_metadata: 1, ] defp remove_stale_brokers(brokers, metadata_brokers) do diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index b96a17ed..a587bbdc 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -67,31 +67,6 @@ defmodule KafkaEx.Server0P8P0 do def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" def kafka_server_update_consumer_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka" - def kafka_server_start_streaming(_, state = %State{event_pid: nil}) do - # our streaming could have been canceled with a streaming update in-flight - {:noreply, state} - end - def kafka_server_start_streaming(fetch_request, state) do - {response, state} = fetch(fetch_request, state) - offset = case response do - :topic_not_found -> - fetch_request.offset - _ -> - message = response |> hd |> Map.get(:partitions) |> hd - Enum.each(message.message_set, fn(message_set) -> GenEvent.notify(state.event_pid, message_set) end) - case message.last_offset do - nil -> fetch_request.offset - last_offset -> last_offset + 1 - end - end - - ref = Process.send_after( - self(), {:start_streaming, %{fetch_request | offset: offset}}, 500 - ) - - {:noreply, %{state | stream_timer: ref}} - end - defp fetch(fetch_request, state) do fetch_data = Fetch.create_request(%FetchRequest{ fetch_request | diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 573c8fea..59b8fac4 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -116,33 +116,6 @@ defmodule KafkaEx.Server0P8P2 do {:reply, consumer_metadata, state} end - def kafka_server_start_streaming(_, state = %State{event_pid: nil}) do - # our streaming could have been canceled with a streaming update in-flight - {:noreply, state} - end - def kafka_server_start_streaming(fetch_request, state) do - true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state) - - {response, state} = fetch(fetch_request, state) - offset = case response do - :topic_not_found -> - fetch_request.offset - _ -> - message = response |> hd |> Map.get(:partitions) |> hd - Enum.each(message.message_set, fn(message_set) -> GenEvent.notify(state.event_pid, message_set) end) - case message.last_offset do - nil -> fetch_request.offset - last_offset -> last_offset + 1 - end - end - - ref = Process.send_after( - self(), {:start_streaming, %{fetch_request | offset: offset}}, 500 - ) - - {:noreply, %{state | stream_timer: ref}} - end - def kafka_server_update_consumer_metadata(state) do true = consumer_group?(state) {_, state} = update_consumer_metadata(state) diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 6b165af7..c9518871 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -34,7 +34,6 @@ defmodule KafkaEx.Server0P9P0 do defdelegate kafka_server_offset_fetch(offset_fetch, state), to: Server0P8P2 defdelegate kafka_server_offset_commit(offset_commit_request, state), to: Server0P8P2 defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2 - defdelegate kafka_server_start_streaming(fetch_request, state), to: Server0P8P2 defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2 def kafka_server_init([args]) do diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex new file mode 100644 index 00000000..e4effd78 --- /dev/null +++ b/lib/kafka_ex/stream.ex @@ -0,0 +1,141 @@ +defmodule KafkaEx.Stream do + @moduledoc false + + alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest + alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest + alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse + + defstruct worker_name: nil, + fetch_request: %FetchRequest{}, + consumer_group: nil, + no_wait_at_logend: false + + @type t :: %__MODULE__{} + + defimpl Enumerable do + def reduce(%KafkaEx.Stream{} = data, acc, fun) do + # this function returns a Stream.resource stream, so we need to define + # start_fun, next_fun, and after_fun callbacks + + # the state payload for the stream is just the offset + start_fun = fn -> data.fetch_request.offset end + + # each iteration we need to take care of fetching and (possibly) + # committing offsets + next_fun = fn offset -> + data + |> fetch_response(offset) + |> maybe_commit_offset(data, acc) + |> stream_control(data, offset) + end + + # there isn't really any cleanup, so we don't need to do anything with + # the after_fun callback + after_fun = fn(_last_offset) -> :ok end + + Stream.resource(start_fun, next_fun, after_fun).(acc, fun) + end + + def count(_stream) do + {:error, __MODULE__} + end + + def member?(_stream, _item) do + {:error, __MODULE__} + end + + ###################################################################### + # Main stream flow control + + # if we get a response, we return the message set and point at the next + # offset after the last message + defp stream_control( + %{ + error_code: :no_error, + last_offset: last_offset, + message_set: message_set + }, + _stream_data, + _offset + ) when is_integer(last_offset) do + {message_set, last_offset + 1} + end + + # if we don't get any messages and no_wait_at_logend is true, we halt + defp stream_control( + %{}, + %KafkaEx.Stream{no_wait_at_logend: true}, + offset + ) do + {:halt, offset} + end + + # for all other cases we block until messages are ready + defp stream_control(%{}, %KafkaEx.Stream{}, offset) do + {[], offset} + end + ###################################################################### + + ###################################################################### + # Offset management + + # first, determine if we even need to commit an offset + defp maybe_commit_offset( + fetch_response, + %KafkaEx.Stream{ + fetch_request: %FetchRequest{auto_commit: auto_commit} + } = stream_data, + acc + ) do + if need_commit?(fetch_response, auto_commit) do + offset_to_commit = last_offset(acc, fetch_response.message_set) + commit_offset(stream_data, offset_to_commit) + end + + fetch_response + end + + # no response -> no commit + defp need_commit?(fetch_response, _auto_commit) + when fetch_response == %{}, do: false + # no messages in response -> no commit + defp need_commit?(%{message_set: []}, _auto_commit), do: false + # otherwise, use the auto_commit setting + defp need_commit?(_fetch_response, auto_commit), do: auto_commit + + # if we have requested fewer messages than we fetched, commit the offset + # of the last one we will actually consume + defp last_offset({:cont, {_, n}}, message_set) + when n <= length(message_set) do + message = Enum.at(message_set, n - 1) + message.offset + end + + # otherwise, commit the offset of the last message + defp last_offset({:cont, _}, message_set) do + message = List.last(message_set) + message.offset + end + + defp commit_offset(%KafkaEx.Stream{} = stream_data, offset) do + GenServer.call(stream_data.worker_name, { + :offset_commit, + %OffsetCommitRequest{ + consumer_group: stream_data.consumer_group, + topic: stream_data.fetch_request.topic, + partition: stream_data.fetch_request.partition, + offset: offset, metadata: "" + } + }) + end + ###################################################################### + + # make the actual fetch request + defp fetch_response(data, offset) do + req = data.fetch_request + data.worker_name + |> GenServer.call({:fetch, %{req| offset: offset}}) + |> FetchResponse.partition_messages(req.topic, req.partition) + end + end +end diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index 621b926c..359587f4 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -211,12 +211,12 @@ defmodule KafkaEx.ConsumerGroup.Test do }) stream = KafkaEx.stream(random_string, 0, worker_name: :stream_auto_commit, offset: 0) log = TestHelper.wait_for_any( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end + fn() -> Enum.take(stream, 2) end ) refute Enum.empty?(log) - offset_fetch_response = KafkaEx.offset_fetch(:stream_auto_commit, %Proto.OffsetFetch.Request{topic: random_string, partition: 0}) |> hd + [offset_fetch_response | _] = KafkaEx.offset_fetch(:stream_auto_commit, %Proto.OffsetFetch.Request{topic: random_string, partition: 0}) error_code = offset_fetch_response.partitions |> hd |> Map.get(:error_code) offset = offset_fetch_response.partitions |> hd |> Map.get(:offset) @@ -240,31 +240,158 @@ defmodule KafkaEx.ConsumerGroup.Test do stream = KafkaEx.stream(random_string, 0, worker_name: worker_name) log = TestHelper.wait_for_any( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end + fn() -> Enum.take(stream, 2) end ) refute Enum.empty?(log) - first_message = log |> hd - assert first_message.offset == 4 end - test "stream does not commit offset with auto_commit is set to false" do - random_string = generate_random_string() - KafkaEx.create_worker(:stream_no_auto_commit, uris: uris()) - Enum.each(1..10, fn _ -> KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [%Proto.Produce.Message{value: "hey"}]}) end) - stream = KafkaEx.stream(random_string, 0, worker_name: :stream_no_auto_commit, auto_commit: false, offset: 0) + test "stream auto_commit deals with small batches correctly" do + topic_name = generate_random_string() + consumer_group = "stream_test" + + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + %Proto.Produce.Message{value: "message 5"}, + ] + }, worker_name: :stream) + + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + offset: 0, + auto_commit: true, + consumer_group: consumer_group + ) + + [m1, m2] = Enum.take(stream, 2) + assert "message 2" == m1.value + assert "message 3" == m2.value - # make sure we consume at least one message before we assert that there is no offset committed - _log = TestHelper.wait_for_any( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end + offset = TestHelper.latest_consumer_offset_number( + topic_name, + 0, + consumer_group, + :stream ) + assert offset == m2.offset + end - offset_fetch_response = KafkaEx.offset_fetch(:stream_no_auto_commit, %Proto.OffsetFetch.Request{topic: random_string, partition: 0}) |> hd - offset_fetch_response.partitions |> hd |> Map.get(:error_code) - offset_fetch_response_offset = offset_fetch_response.partitions |> hd |> Map.get(:offset) + test "stream auto_commit doesn't exceed the end of the log" do + topic_name = generate_random_string() + consumer_group = "stream_test" + + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + %Proto.Produce.Message{value: "message 5"}, + ] + }, worker_name: :stream) + + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + offset: 0, + no_wait_at_logend: true, + auto_commit: true, + consumer_group: consumer_group + ) + + [m1, m2, m3, m4] = Enum.take(stream, 10) + assert "message 2" == m1.value + assert "message 3" == m2.value + assert "message 4" == m3.value + assert "message 5" == m4.value + + offset = TestHelper.latest_consumer_offset_number( + topic_name, + 0, + consumer_group, + :stream + ) + assert offset == m4.offset + + other_consumer_group = "another_consumer_group" + map_stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + no_wait_at_logend: true, + auto_commit: true, + offset: 0, + consumer_group: other_consumer_group + ) + + assert ["message 2", "message 3", "message 4", "message 5"] = + Enum.map(map_stream, fn(m) -> m.value end) + + offset = TestHelper.latest_consumer_offset_number( + topic_name, + 0, + other_consumer_group, + :stream + ) + # should have the same offset as the first stream + assert offset == m4.offset + end + + test "streams with a consumer group begin at the last committed offset" do + topic_name = generate_random_string() + consumer_group = "stream_test" + + KafkaEx.create_worker(:stream, uris: uris()) + messages_in = Enum.map( + 1 .. 10, + fn(ix) -> %Proto.Produce.Message{value: "Msg #{ix}"} end + ) + KafkaEx.produce( + %Proto.Produce.Request{ + topic: topic_name, + partition: 0, + required_acks: 1, + messages: messages_in + }, + worker_name: :stream + ) + + stream1 = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + no_wait_at_logend: true, + auto_commit: true, + offset: 0, + consumer_group: consumer_group + ) + + assert ["Msg 1", "Msg 2"] == stream1 + |> Enum.take(2) + |> Enum.map(fn(msg) -> msg.value end) + + stream2 = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + no_wait_at_logend: true, + auto_commit: true, + consumer_group: consumer_group + ) + + message_values = stream2 + |> Enum.take(2) + |> Enum.map(fn(msg) -> msg.value end) - assert 0 >= offset_fetch_response_offset + assert ["Msg 3", "Msg 4"] == message_values end end diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index 4f5c00d0..ee1ecfa0 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -329,110 +329,140 @@ defmodule KafkaEx.Integration.Test do # stream test "streams kafka logs" do - random_string = generate_random_string() + topic_name = generate_random_string() KafkaEx.create_worker(:stream, uris: uris()) - KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "hey"}, - %Proto.Produce.Message{value: "hi"}, + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 1"}, + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, ] }, worker_name: :stream) - stream = KafkaEx.stream(random_string, 0, worker_name: :stream, offset: 0, auto_commit: false) - log = TestHelper.wait_for_accum( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) end, - 2 + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + offset: 0, + auto_commit: false ) - refute Enum.empty?(log) - [first,second|_] = log - assert first.value == "hey" - assert second.value == "hi" - end + logs = Enum.take(stream, 2) + assert 2 == length(logs) + [m1, m2] = logs + assert m1.value == "message 1" + assert m2.value == "message 2" - test "doesn't error when re-creating an existing stream" do - random_string = generate_random_string() - KafkaEx.stream(random_string, 0, offset: 0) - KafkaEx.stream(random_string, 0, offset: 0) + # calling stream again will get the same messages + assert logs == Enum.take(stream, 2) end - test "stop_streaming stops streaming, and stream starts it up again" do - random_string = generate_random_string() - KafkaEx.create_worker(:stream2, uris: uris()) - stream = KafkaEx.stream(random_string, 0, worker_name: :stream2, offset: 0, auto_commit: false) - - KafkaEx.create_worker(:producer, uris: uris()) - KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "one"}, - %Proto.Produce.Message{value: "two"}, + test "stream with small max_bytes makes multiple requests if necessary" do + topic_name = generate_random_string() + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 1"}, + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + %Proto.Produce.Message{value: "message 5"}, ] - }, worker_name: :producer) + }, worker_name: :stream) - log = TestHelper.wait_for_accum( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) end, - 2 + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + max_bytes: 50, + offset: 0, + auto_commit: false ) - last_offset = hd(Enum.reverse(log)).offset + logs = Enum.take(stream, 4) + assert 4 == length(logs) + [m1, m2, m3, m4] = logs + assert m1.value == "message 1" + assert m2.value == "message 2" + assert m3.value == "message 3" + assert m4.value == "message 4" - KafkaEx.stop_streaming(worker_name: :stream2) - :ok = TestHelper.wait_for(fn() -> !Process.alive?(stream.manager) end) + # calling stream again will get the same messages + assert logs == Enum.take(stream, 4) + end - KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "three"}, - %Proto.Produce.Message{value: "four"}, + test "stream blocks until new messages are available" do + topic_name = generate_random_string() + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 1"} ] - }, worker_name: :producer) + }, worker_name: :stream) - stream = KafkaEx.stream(random_string, 0, worker_name: :stream2, offset: last_offset+1, auto_commit: false) + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + max_bytes: 50, + offset: 0, + auto_commit: false + ) + task = Task.async(fn -> Enum.take(stream, 4) end) + + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + %Proto.Produce.Message{value: "message 5"}, + ] + }, worker_name: :stream) - :ok = TestHelper.wait_for(fn() -> Process.alive?(stream.manager) end) + logs = Task.await(task) - log = GenEvent.call(stream.manager, KafkaEx.Handler, :messages) - assert length(log) == 0 + assert 4 == length(logs) + [m1, m2, m3, m4] = logs + assert m1.value == "message 1" + assert m2.value == "message 2" + assert m3.value == "message 3" + assert m4.value == "message 4" + end - KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "five"}, - %Proto.Produce.Message{value: "six"}, + test "stream is non-blocking with no_wait_at_logend" do + topic_name = generate_random_string() + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 1"} ] - }, worker_name: :producer) + }, worker_name: :stream) - log = TestHelper.wait_for_accum( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) end, - 4 + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + max_bytes: 50, + offset: 0, + auto_commit: false, + no_wait_at_logend: true ) - assert length(log) == 4 + logs = Enum.take(stream, 4) + + assert 1 == length(logs) + [m1] = logs + assert m1.value == "message 1" + + # we can also execute something like 'map' which would otherwise be + # open-ended + assert ["message 1"] == Enum.map(stream, fn(m) -> m.value end) end - test "streams kafka logs with custom handler and initial state" do + test "doesn't error when re-creating an existing stream" do random_string = generate_random_string() - KafkaEx.create_worker(:stream3, uris: uris()) - {:ok, offset} = KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "hey"}, - %Proto.Produce.Message{value: "hi"}, - ] - }, worker_name: :stream3) - - defmodule CustomHandlerSendingMessage do - use GenEvent - - def init(pid) do - {:ok, pid} - end - - def handle_event(message, pid) do - send(pid, message) - {:ok, pid} - end - end - KafkaEx.stream(random_string, 0, worker_name: :stream3, offset: offset, auto_commit: false, handler: CustomHandlerSendingMessage, handler_init: self()) - - assert_receive %KafkaEx.Protocol.Fetch.Message{key: nil, value: "hey", offset: ^offset, attributes: 0, crc: 4264455069} - offset = offset + 1 - assert_receive %KafkaEx.Protocol.Fetch.Message{key: nil, value: "hi", offset: ^offset, attributes: 0, crc: 4251893211} - receive do - _ -> assert(false, "Should not have received a third message") - after 100 -> :ok - end + KafkaEx.stream(random_string, 0, offset: 0) + KafkaEx.stream(random_string, 0, offset: 0) end end