From 850084e08f07186dca6cb8a6d754a707a180c3ab Mon Sep 17 00:00:00 2001 From: bjhaid Date: Mon, 23 Jan 2017 19:05:49 -0600 Subject: [PATCH 01/20] Stream POC. --- lib/kafka_ex/stream.ex | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 lib/kafka_ex/stream.ex diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex new file mode 100644 index 00000000..ef8bfeb7 --- /dev/null +++ b/lib/kafka_ex/stream.ex @@ -0,0 +1,29 @@ +defmodule KafkaEx.Stream do + @moduledoc false + alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest + alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest + + defstruct worker_name: nil, fetch_request: %FetchRequest{}, consumer_group: nil + + defimpl Enumerable do + def reduce(data, acc, fun) do + next_fun = fn offset -> + if data.fetch_request.auto_commit do + GenServer.call(data.worker_name, {:offset_commit, %OffsetCommitRequest{consumer_group: data.consumer_group, topic: data.fetch_request.topic, partition: data.fetch_request.partition, offset: offset, metadata: ""}}) + end + response = GenServer.call(data.worker_name, {:fetch, %{data.fetch_request| offset: offset}}) + last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) + {response, last_offset} + end + Stream.resource(fn -> data.fetch_request.offset end, next_fun, fn offset -> offset end).(acc, fun) + end + def count(_stream) do + {:error, __MODULE__} + end + + def member?(_stream, _item) do + {:error, __MODULE__} + end + end + +end From 14522ffe373589ea6f6ce42b79e3d1f80366bb5f Mon Sep 17 00:00:00 2001 From: bjhaid Date: Sat, 18 Feb 2017 11:40:23 -0600 Subject: [PATCH 02/20] Replace the current GenEvent based stream implementation with a really lazy stream implementation, this closes #145. --- lib/kafka_ex.ex | 29 +++------ lib/kafka_ex/stream.ex | 20 ++++-- test/integration/consumer_group_test.exs | 8 +-- test/integration/integration_test.exs | 83 +----------------------- 4 files changed, 27 insertions(+), 113 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index ea6241a0..774fcfe5 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -262,8 +262,6 @@ Optional arguments(KeywordList) Optional arguments(KeywordList) - worker_name: the worker we want to run this metadata request through, when none is provided the default worker `:kafka_ex` is used - offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to. For Kafka < 0.8.2 you should explicitly specify this. - - handler: the handler we want to handle the streaming events, when none is provided the default KafkaEx.Handler is used - - handler_init: initial state for the handler - leave the default value [] when using the default handler - auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`. @@ -283,32 +281,21 @@ Optional arguments(KeywordList) def stream(topic, partition, opts \\ []) do worker_name = Keyword.get(opts, :worker_name, Config.default_worker) supplied_offset = Keyword.get(opts, :offset) - handler = Keyword.get(opts, :handler, KafkaEx.Handler) - handler_init = Keyword.get(opts, :handler_init, []) auto_commit = Keyword.get(opts, :auto_commit, true) wait_time = Keyword.get(opts, :wait_time, @wait_time) min_bytes = Keyword.get(opts, :min_bytes, @min_bytes) max_bytes = Keyword.get(opts, :max_bytes, @max_bytes) - - event_stream = Server.call(worker_name, {:create_stream, handler, handler_init}) + consumer_group = Keyword.get(opts, :consumer_group) retrieved_offset = current_offset(supplied_offset, partition, topic, worker_name) - send(worker_name, { - :start_streaming, - %FetchRequest{ - auto_commit: auto_commit, - topic: topic, partition: partition, - offset: retrieved_offset, wait_time: wait_time, - min_bytes: min_bytes, max_bytes: max_bytes - } - }) - event_stream - end + fetch_request = %FetchRequest{ + auto_commit: auto_commit, + topic: topic, partition: partition, + offset: retrieved_offset, wait_time: wait_time, + min_bytes: min_bytes, max_bytes: max_bytes + } - @spec stop_streaming(Keyword.t) :: :stop_streaming - def stop_streaming(opts \\ []) do - worker_name = Keyword.get(opts, :worker_name, Config.default_worker) - send(worker_name, :stop_streaming) + %KafkaEx.Stream{worker_name: worker_name, fetch_request: fetch_request, consumer_group: consumer_group} end defp build_worker_options(worker_init) do diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index ef8bfeb7..bea32bb0 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -9,13 +9,23 @@ defmodule KafkaEx.Stream do def reduce(data, acc, fun) do next_fun = fn offset -> if data.fetch_request.auto_commit do - GenServer.call(data.worker_name, {:offset_commit, %OffsetCommitRequest{consumer_group: data.consumer_group, topic: data.fetch_request.topic, partition: data.fetch_request.partition, offset: offset, metadata: ""}}) + GenServer.call(data.worker_name, { + :offset_commit, + %OffsetCommitRequest{ + consumer_group: data.consumer_group, + topic: data.fetch_request.topic, + partition: data.fetch_request.partition, + offset: offset, metadata: "" + } + }) end - response = GenServer.call(data.worker_name, {:fetch, %{data.fetch_request| offset: offset}}) - last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) - {response, last_offset} + response = data.worker_name + |> GenServer.call({:fetch, %{data.fetch_request| offset: offset}}) + |> hd |> Map.get(:partitions) |> hd + last_offset = response |> Map.get(:last_offset) + {response.message_set, last_offset} end - Stream.resource(fn -> data.fetch_request.offset end, next_fun, fn offset -> offset end).(acc, fun) + Stream.resource(&(data.fetch_request.offset), next_fun, &(&1)).(acc, fun) end def count(_stream) do {:error, __MODULE__} diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index 621b926c..7429348c 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -211,7 +211,7 @@ defmodule KafkaEx.ConsumerGroup.Test do }) stream = KafkaEx.stream(random_string, 0, worker_name: :stream_auto_commit, offset: 0) log = TestHelper.wait_for_any( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end + fn() -> stream |> Enum.take(2) end ) refute Enum.empty?(log) @@ -240,13 +240,11 @@ defmodule KafkaEx.ConsumerGroup.Test do stream = KafkaEx.stream(random_string, 0, worker_name: worker_name) log = TestHelper.wait_for_any( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end + fn() -> stream |> Enum.take(2) end ) refute Enum.empty?(log) - first_message = log |> hd - assert first_message.offset == 4 end @@ -258,7 +256,7 @@ defmodule KafkaEx.ConsumerGroup.Test do # make sure we consume at least one message before we assert that there is no offset committed _log = TestHelper.wait_for_any( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end + fn() -> stream |> Enum.take(2) end ) offset_fetch_response = KafkaEx.offset_fetch(:stream_no_auto_commit, %Proto.OffsetFetch.Request{topic: random_string, partition: 0}) |> hd diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index 4f5c00d0..9a1f33b8 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -339,7 +339,7 @@ defmodule KafkaEx.Integration.Test do stream = KafkaEx.stream(random_string, 0, worker_name: :stream, offset: 0, auto_commit: false) log = TestHelper.wait_for_accum( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) end, + fn() -> stream |> Enum.take(2) end, 2 ) @@ -354,85 +354,4 @@ defmodule KafkaEx.Integration.Test do KafkaEx.stream(random_string, 0, offset: 0) KafkaEx.stream(random_string, 0, offset: 0) end - - test "stop_streaming stops streaming, and stream starts it up again" do - random_string = generate_random_string() - KafkaEx.create_worker(:stream2, uris: uris()) - stream = KafkaEx.stream(random_string, 0, worker_name: :stream2, offset: 0, auto_commit: false) - - KafkaEx.create_worker(:producer, uris: uris()) - KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "one"}, - %Proto.Produce.Message{value: "two"}, - ] - }, worker_name: :producer) - - log = TestHelper.wait_for_accum( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) end, - 2 - ) - - last_offset = hd(Enum.reverse(log)).offset - - KafkaEx.stop_streaming(worker_name: :stream2) - :ok = TestHelper.wait_for(fn() -> !Process.alive?(stream.manager) end) - - KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "three"}, - %Proto.Produce.Message{value: "four"}, - ] - }, worker_name: :producer) - - stream = KafkaEx.stream(random_string, 0, worker_name: :stream2, offset: last_offset+1, auto_commit: false) - - :ok = TestHelper.wait_for(fn() -> Process.alive?(stream.manager) end) - - log = GenEvent.call(stream.manager, KafkaEx.Handler, :messages) - assert length(log) == 0 - - KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "five"}, - %Proto.Produce.Message{value: "six"}, - ] - }, worker_name: :producer) - - log = TestHelper.wait_for_accum( - fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) end, - 4 - ) - - assert length(log) == 4 - end - - test "streams kafka logs with custom handler and initial state" do - random_string = generate_random_string() - KafkaEx.create_worker(:stream3, uris: uris()) - {:ok, offset} = KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "hey"}, - %Proto.Produce.Message{value: "hi"}, - ] - }, worker_name: :stream3) - - defmodule CustomHandlerSendingMessage do - use GenEvent - - def init(pid) do - {:ok, pid} - end - - def handle_event(message, pid) do - send(pid, message) - {:ok, pid} - end - end - KafkaEx.stream(random_string, 0, worker_name: :stream3, offset: offset, auto_commit: false, handler: CustomHandlerSendingMessage, handler_init: self()) - - assert_receive %KafkaEx.Protocol.Fetch.Message{key: nil, value: "hey", offset: ^offset, attributes: 0, crc: 4264455069} - offset = offset + 1 - assert_receive %KafkaEx.Protocol.Fetch.Message{key: nil, value: "hi", offset: ^offset, attributes: 0, crc: 4251893211} - receive do - _ -> assert(false, "Should not have received a third message") - after 100 -> :ok - end - end end From 48cdc7019c29df209a07815706890ad0e277cf55 Mon Sep 17 00:00:00 2001 From: bjhaid Date: Sat, 18 Feb 2017 11:47:13 -0600 Subject: [PATCH 03/20] Update documentation. --- lib/kafka_ex.ex | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 774fcfe5..5d15d783 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -254,15 +254,11 @@ Optional arguments(KeywordList) @doc """ Returns a stream that consumes fetched messages. - This puts the specified worker in streaming mode and blocks the worker indefinitely. - The handler is a normal GenEvent handler so you can supply a custom handler, otherwise a default handler is used. - - This function should be used with care as the queue is unbounded and can cause OOM. - Optional arguments(KeywordList) - worker_name: the worker we want to run this metadata request through, when none is provided the default worker `:kafka_ex` is used - offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to. For Kafka < 0.8.2 you should explicitly specify this. - auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`. + - consumer_group: Name of the group of consumers, `:no_consumer_group` should be passed for Kafka < 0.8.2, defaults to `Application.get_env(:kafka_ex, :consumer_group)` ## Example From c6d2fce169c0933d31938b29c4c55668d95ca69e Mon Sep 17 00:00:00 2001 From: bjhaid Date: Sat, 18 Feb 2017 11:50:41 -0600 Subject: [PATCH 04/20] Replace short form of function call with the explicit form. --- lib/kafka_ex/stream.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index bea32bb0..963362c2 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -25,7 +25,7 @@ defmodule KafkaEx.Stream do last_offset = response |> Map.get(:last_offset) {response.message_set, last_offset} end - Stream.resource(&(data.fetch_request.offset), next_fun, &(&1)).(acc, fun) + Stream.resource(fn -> data.fetch_request.offset end, next_fun, &(&1)).(acc, fun) end def count(_stream) do {:error, __MODULE__} From f04d9840f6fed1ba72c1c7aa5743c94ab5cd2b49 Mon Sep 17 00:00:00 2001 From: bjhaid Date: Sat, 18 Feb 2017 12:55:49 -0600 Subject: [PATCH 05/20] Update the documentation on the stream function to reflect it's halting behavior and also update the implementation to halt if there are no more messages to stream. --- lib/kafka_ex.ex | 5 +++-- lib/kafka_ex/stream.ex | 9 +++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 5d15d783..8dbfa695 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -15,6 +15,7 @@ defmodule KafkaEx do alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest alias KafkaEx.Protocol.Produce.Message alias KafkaEx.Server + alias KafkaEx.Stream @type uri() :: [{binary|char_list, number}] @type worker_init :: [worker_setting] @@ -253,7 +254,7 @@ Optional arguments(KeywordList) end @doc """ - Returns a stream that consumes fetched messages. + Returns a stream that consumes fetched messages, the stream will halt once the max_bytes number of messages is reached, if you want to halt the stream early supply a small max_bytes, for the inverse supply a large max_bytes and if you don't want the stream to halt, you should recursively call the stream function. Optional arguments(KeywordList) - worker_name: the worker we want to run this metadata request through, when none is provided the default worker `:kafka_ex` is used - offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to. For Kafka < 0.8.2 you should explicitly specify this. @@ -291,7 +292,7 @@ Optional arguments(KeywordList) min_bytes: min_bytes, max_bytes: max_bytes } - %KafkaEx.Stream{worker_name: worker_name, fetch_request: fetch_request, consumer_group: consumer_group} + %Stream{worker_name: worker_name, fetch_request: fetch_request, consumer_group: consumer_group} end defp build_worker_options(worker_init) do diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index 963362c2..8213e6ba 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -22,11 +22,16 @@ defmodule KafkaEx.Stream do response = data.worker_name |> GenServer.call({:fetch, %{data.fetch_request| offset: offset}}) |> hd |> Map.get(:partitions) |> hd - last_offset = response |> Map.get(:last_offset) - {response.message_set, last_offset} + if response.error_code == :no_error && + response.last_offset != nil && response.last_offset != offset do + {response.message_set, response.last_offset} + else + {:halt, offset} + end end Stream.resource(fn -> data.fetch_request.offset end, next_fun, &(&1)).(acc, fun) end + def count(_stream) do {:error, __MODULE__} end From 4c1334e1765b0a5c818e9e67f351c5e6b075fe01 Mon Sep 17 00:00:00 2001 From: bjhaid Date: Sat, 18 Feb 2017 13:44:12 -0600 Subject: [PATCH 06/20] Delete the dead stream code. --- lib/kafka_ex/handler.ex | 18 ---------- lib/kafka_ex/server.ex | 57 +------------------------------- lib/kafka_ex/server_0_p_8_p_0.ex | 25 -------------- lib/kafka_ex/server_0_p_8_p_2.ex | 27 --------------- lib/kafka_ex/server_0_p_9_p_0.ex | 1 - 5 files changed, 1 insertion(+), 127 deletions(-) delete mode 100644 lib/kafka_ex/handler.ex diff --git a/lib/kafka_ex/handler.ex b/lib/kafka_ex/handler.ex deleted file mode 100644 index 1a819865..00000000 --- a/lib/kafka_ex/handler.ex +++ /dev/null @@ -1,18 +0,0 @@ -defmodule KafkaEx.Handler do - @moduledoc """ - Default GenEvent handler for KafkaEx.stream - - Received message sets are accumulated in the GenEvent state and can - be retrieved and flushed via the `:messages` call. - """ - - use GenEvent - - def handle_event(message_set, message_sets) do - {:ok, [message_set|message_sets]} - end - - def handle_call(:messages, message_sets) do - {:ok, Enum.reverse(message_sets), []} - end -end diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index eb69c1e4..52718d56 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -21,7 +21,6 @@ defmodule KafkaEx.Server do metadata: %Metadata.Response{}, brokers: [], event_pid: nil, - stream_timer: nil, consumer_metadata: %ConsumerMetadata.Response{}, correlation_id: 0, consumer_group: nil, @@ -36,7 +35,6 @@ defmodule KafkaEx.Server do metadata: Metadata.Response.t, brokers: [Broker.t], event_pid: nil | pid, - stream_timer: reference, consumer_metadata: ConsumerMetadata.Response.t, correlation_id: integer, metadata_update_interval: nil | integer, @@ -136,21 +134,6 @@ defmodule KafkaEx.Server do {:noreply, new_state, timeout | :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_create_stream(handler :: term, handler_init :: term, state :: State.t) :: - {:reply, reply, new_state} | - {:reply, reply, new_state, timeout | :hibernate} | - {:noreply, new_state} | - {:noreply, new_state, timeout | :hibernate} | - {:stop, reason, reply, new_state} | - {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_server_start_streaming(fetch_request :: FetchRequest.t, state :: State.t) :: - {:noreply, new_state} | - {:noreply, new_state, timeout | :hibernate} | - {:stop, reason :: term, new_state} when new_state: term - @callback kafka_server_stop_streaming(state :: State.t) :: - {:noreply, new_state} | - {:noreply, new_state, timeout | :hibernate} | - {:stop, reason :: term, new_state} when new_state: term @callback kafka_server_update_metadata(state :: State.t) :: {:noreply, new_state} | {:noreply, new_state, timeout | :hibernate} | @@ -253,24 +236,6 @@ defmodule KafkaEx.Server do kafka_server_heartbeat(group_name, generation_id, member_id, state) end - def handle_call({:create_stream, handler, handler_init}, _from, state) do - kafka_server_create_stream(handler, handler_init, state) - end - - def handle_info({:start_streaming, fetch_request}, state) do - kafka_server_start_streaming(fetch_request, state) - end - - def handle_info(:stop_streaming, state) do - {:noreply, state} = kafka_server_stop_streaming(state) - state = case state.stream_timer do - nil -> state - ref -> Process.cancel_timer(ref) - %{state | stream_timer: nil} - end - {:noreply, state} - end - def handle_info(:update_metadata, state) do kafka_server_update_metadata(state) end @@ -360,25 +325,6 @@ defmodule KafkaEx.Server do {:reply, metadata, updated_state} end - def kafka_server_create_stream(handler, handler_init, state) do - new_state = if state.event_pid && Process.alive?(state.event_pid) do - Logger.log(:warn, "'#{state.worker_name}' already streaming handler '#{handler}'") - state - else - {:ok, event_pid} = GenEvent.start_link - updated_state = %{state | event_pid: event_pid} - :ok = GenEvent.add_handler(updated_state.event_pid, handler, handler_init) - updated_state - end - {:reply, GenEvent.stream(new_state.event_pid), new_state} - end - - def kafka_server_stop_streaming(state) do - Logger.log(:debug, "Stopped worker #{inspect state.worker_name} from streaming") - GenEvent.stop(state.event_pid) - {:noreply, %{state | event_pid: nil}} - end - def kafka_server_update_metadata(state) do {:noreply, update_metadata(state)} end @@ -419,8 +365,7 @@ defmodule KafkaEx.Server do defoverridable [ kafka_server_produce: 2, kafka_server_offset: 4, - kafka_server_metadata: 2, kafka_server_create_stream: 3, - kafka_server_stop_streaming: 1, kafka_server_update_metadata: 1, + kafka_server_metadata: 2, kafka_server_update_metadata: 1, ] defp remove_stale_brokers(brokers, metadata_brokers) do diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index 83c82c93..9e316063 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -67,31 +67,6 @@ defmodule KafkaEx.Server0P8P0 do def kafka_server_heartbeat(_, _, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" def kafka_server_update_consumer_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka" - def kafka_server_start_streaming(_, state = %State{event_pid: nil}) do - # our streaming could have been canceled with a streaming update in-flight - {:noreply, state} - end - def kafka_server_start_streaming(fetch_request, state) do - {response, state} = fetch(fetch_request, state) - offset = case response do - :topic_not_found -> - fetch_request.offset - _ -> - message = response |> hd |> Map.get(:partitions) |> hd - Enum.each(message.message_set, fn(message_set) -> GenEvent.notify(state.event_pid, message_set) end) - case message.last_offset do - nil -> fetch_request.offset - last_offset -> last_offset + 1 - end - end - - ref = Process.send_after( - self(), {:start_streaming, %{fetch_request | offset: offset}}, 500 - ) - - {:noreply, %{state | stream_timer: ref}} - end - defp fetch(fetch_request, state) do fetch_data = Fetch.create_request(%FetchRequest{ fetch_request | diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 2677f792..80634c56 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -111,33 +111,6 @@ defmodule KafkaEx.Server0P8P2 do {:reply, consumer_metadata, state} end - def kafka_server_start_streaming(_, state = %State{event_pid: nil}) do - # our streaming could have been canceled with a streaming update in-flight - {:noreply, state} - end - def kafka_server_start_streaming(fetch_request, state) do - true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state) - - {response, state} = fetch(fetch_request, state) - offset = case response do - :topic_not_found -> - fetch_request.offset - _ -> - message = response |> hd |> Map.get(:partitions) |> hd - Enum.each(message.message_set, fn(message_set) -> GenEvent.notify(state.event_pid, message_set) end) - case message.last_offset do - nil -> fetch_request.offset - last_offset -> last_offset + 1 - end - end - - ref = Process.send_after( - self(), {:start_streaming, %{fetch_request | offset: offset}}, 500 - ) - - {:noreply, %{state | stream_timer: ref}} - end - def kafka_server_update_consumer_metadata(state) do true = consumer_group?(state) {_, state} = update_consumer_metadata(state) diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index c8ba2041..76b60e9c 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -34,7 +34,6 @@ defmodule KafkaEx.Server0P9P0 do defdelegate kafka_server_offset_fetch(offset_fetch, state), to: Server0P8P2 defdelegate kafka_server_offset_commit(offset_commit_request, state), to: Server0P8P2 defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2 - defdelegate kafka_server_start_streaming(fetch_request, state), to: Server0P8P2 defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2 def kafka_server_init([args]) do From 2644a774802ab36b739a2f45d60a0ef3c26beff5 Mon Sep 17 00:00:00 2001 From: bjhaid Date: Thu, 4 May 2017 19:25:57 -0500 Subject: [PATCH 07/20] Refactor Stream.reduce and create an helper Fetch.Response.partition_messages function. --- lib/kafka_ex/protocol/fetch.ex | 6 ++++++ lib/kafka_ex/stream.ex | 20 +++++++++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index ee8f6170..f3d8d8c4 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -25,6 +25,12 @@ defmodule KafkaEx.Protocol.Fetch do @moduledoc false defstruct topic: nil, partitions: [] @type t :: %Response{topic: binary, partitions: list} + + @spec partition_messages(list, binary, integer) :: list + def partition_messages(responses, topic, partition) do + response = Enum.find(responses, &(&1.topic == topic)) || %Response{} + Enum.find(response.partitions, &(&1.partition == partition)) + end end defmodule Message do diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index 8213e6ba..2bcf8a0c 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -2,8 +2,11 @@ defmodule KafkaEx.Stream do @moduledoc false alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest + alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse - defstruct worker_name: nil, fetch_request: %FetchRequest{}, consumer_group: nil + defstruct worker_name: nil, + fetch_request: %FetchRequest{}, + consumer_group: nil defimpl Enumerable do def reduce(data, acc, fun) do @@ -19,9 +22,7 @@ defmodule KafkaEx.Stream do } }) end - response = data.worker_name - |> GenServer.call({:fetch, %{data.fetch_request| offset: offset}}) - |> hd |> Map.get(:partitions) |> hd + response = fetch_response(data, offset) if response.error_code == :no_error && response.last_offset != nil && response.last_offset != offset do {response.message_set, response.last_offset} @@ -29,7 +30,16 @@ defmodule KafkaEx.Stream do {:halt, offset} end end - Stream.resource(fn -> data.fetch_request.offset end, next_fun, &(&1)).(acc, fun) + Stream.resource( + fn -> data.fetch_request.offset end, next_fun, &(&1) + ).(acc, fun) + end + + defp fetch_response(data, offset) do + req = data.fetch_request + data.worker_name + |> GenServer.call({:fetch, %{req| offset: offset}}) + |> FetchResponse.partition_messages(req.topic, req.partition) end def count(_stream) do From 2b1561ceaf954a18d7a17a9ad7aab728c5e4384e Mon Sep 17 00:00:00 2001 From: bjhaid Date: Thu, 4 May 2017 20:09:59 -0500 Subject: [PATCH 08/20] Introduce stream_mode support, exposing control of finite and infinite stream to the user. --- lib/kafka_ex.ex | 21 +++++++++++++-------- lib/kafka_ex/stream.ex | 8 ++++++-- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 8dbfa695..43572453 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -254,12 +254,13 @@ Optional arguments(KeywordList) end @doc """ - Returns a stream that consumes fetched messages, the stream will halt once the max_bytes number of messages is reached, if you want to halt the stream early supply a small max_bytes, for the inverse supply a large max_bytes and if you don't want the stream to halt, you should recursively call the stream function. + Returns a stream that consumes fetched messages, the stream will halt once the max_bytes number of messages is reached, if you want to halt the stream early supply a small max_bytes, for the inverse supply a large max_bytes. Optional arguments(KeywordList) + - stream_mode: the mode the stream will be in, `:infinite` for an infinite stream and `:finite` for a finite stream, a finite stream will return once there's no data left to consume from the partition while an infinite stream will block till there's data to consume from the stream. - worker_name: the worker we want to run this metadata request through, when none is provided the default worker `:kafka_ex` is used - offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to. For Kafka < 0.8.2 you should explicitly specify this. - auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`. - - consumer_group: Name of the group of consumers, `:no_consumer_group` should be passed for Kafka < 0.8.2, defaults to `Application.get_env(:kafka_ex, :consumer_group)` + - consumer_group: Name of the group of consumers, `:no_consumer_group` should be passed for Kafka < 0.8.2, defaults to `Application.get_env(:kafka_ex, :consumer_group)`. ## Example @@ -276,14 +277,15 @@ Optional arguments(KeywordList) """ @spec stream(binary, integer, Keyword.t) :: GenEvent.Stream.t def stream(topic, partition, opts \\ []) do - worker_name = Keyword.get(opts, :worker_name, Config.default_worker) - supplied_offset = Keyword.get(opts, :offset) auto_commit = Keyword.get(opts, :auto_commit, true) - wait_time = Keyword.get(opts, :wait_time, @wait_time) - min_bytes = Keyword.get(opts, :min_bytes, @min_bytes) - max_bytes = Keyword.get(opts, :max_bytes, @max_bytes) consumer_group = Keyword.get(opts, :consumer_group) + max_bytes = Keyword.get(opts, :max_bytes, @max_bytes) + min_bytes = Keyword.get(opts, :min_bytes, @min_bytes) + supplied_offset = Keyword.get(opts, :offset) + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) retrieved_offset = current_offset(supplied_offset, partition, topic, worker_name) + stream_mode = Keyword.get(opts, :stream_mode, :infinite) + wait_time = Keyword.get(opts, :wait_time, @wait_time) fetch_request = %FetchRequest{ auto_commit: auto_commit, @@ -292,7 +294,10 @@ Optional arguments(KeywordList) min_bytes: min_bytes, max_bytes: max_bytes } - %Stream{worker_name: worker_name, fetch_request: fetch_request, consumer_group: consumer_group} + %Stream{ + worker_name: worker_name, fetch_request: fetch_request, + consumer_group: consumer_group, stream_mode: stream_mode + } end defp build_worker_options(worker_init) do diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index 2bcf8a0c..89551b88 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -6,7 +6,8 @@ defmodule KafkaEx.Stream do defstruct worker_name: nil, fetch_request: %FetchRequest{}, - consumer_group: nil + consumer_group: nil, + stream_mode: :infinite defimpl Enumerable do def reduce(data, acc, fun) do @@ -27,7 +28,7 @@ defmodule KafkaEx.Stream do response.last_offset != nil && response.last_offset != offset do {response.message_set, response.last_offset} else - {:halt, offset} + {mode(data.stream_mode), offset} end end Stream.resource( @@ -35,6 +36,9 @@ defmodule KafkaEx.Stream do ).(acc, fun) end + defp mode(:finite), do: :halt + defp mode(:infinite), do: [] + defp fetch_response(data, offset) do req = data.fetch_request data.worker_name From 854f4729d01d4488a05d60115994cc1c5ab98dc2 Mon Sep 17 00:00:00 2001 From: bjhaid Date: Thu, 4 May 2017 20:43:37 -0500 Subject: [PATCH 09/20] Use a function call when a pipeline is only one function long --- test/integration/consumer_group_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index 7429348c..4240ae77 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -211,7 +211,7 @@ defmodule KafkaEx.ConsumerGroup.Test do }) stream = KafkaEx.stream(random_string, 0, worker_name: :stream_auto_commit, offset: 0) log = TestHelper.wait_for_any( - fn() -> stream |> Enum.take(2) end + fn() -> Enum.take(stream, 2) end ) refute Enum.empty?(log) @@ -240,7 +240,7 @@ defmodule KafkaEx.ConsumerGroup.Test do stream = KafkaEx.stream(random_string, 0, worker_name: worker_name) log = TestHelper.wait_for_any( - fn() -> stream |> Enum.take(2) end + fn() -> Enum.take(stream, 2) end ) refute Enum.empty?(log) @@ -256,7 +256,7 @@ defmodule KafkaEx.ConsumerGroup.Test do # make sure we consume at least one message before we assert that there is no offset committed _log = TestHelper.wait_for_any( - fn() -> stream |> Enum.take(2) end + fn() -> Enum.take(stream, 2) end ) offset_fetch_response = KafkaEx.offset_fetch(:stream_no_auto_commit, %Proto.OffsetFetch.Request{topic: random_string, partition: 0}) |> hd From 4405f5c0de8672e64771db1d6c4df50e5a08a3db Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Wed, 12 Jul 2017 17:37:34 -0400 Subject: [PATCH 10/20] Formatting and cleanup --- lib/kafka_ex.ex | 41 ++++++++++++++++++++------- test/integration/integration_test.exs | 2 +- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 43572453..b5266924 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -254,13 +254,26 @@ Optional arguments(KeywordList) end @doc """ - Returns a stream that consumes fetched messages, the stream will halt once the max_bytes number of messages is reached, if you want to halt the stream early supply a small max_bytes, for the inverse supply a large max_bytes. + Returns a stream that consumes fetched messages, the stream will halt once + the max_bytes number of messages is reached, if you want to halt the stream + early supply a small max_bytes, for the inverse supply a large max_bytes. + Optional arguments(KeywordList) - - stream_mode: the mode the stream will be in, `:infinite` for an infinite stream and `:finite` for a finite stream, a finite stream will return once there's no data left to consume from the partition while an infinite stream will block till there's data to consume from the stream. - - worker_name: the worker we want to run this metadata request through, when none is provided the default worker `:kafka_ex` is used - - offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to. For Kafka < 0.8.2 you should explicitly specify this. - - auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`. - - consumer_group: Name of the group of consumers, `:no_consumer_group` should be passed for Kafka < 0.8.2, defaults to `Application.get_env(:kafka_ex, :consumer_group)`. + - stream_mode: the mode the stream will be in, `:infinite` for an infinite + stream and `:finite` for a finite stream, a finite stream will return + once there's no data left to consume from the partition while an + infinite stream will block till there's data to consume from the stream. + - worker_name: the worker we want to run this metadata request through, when + none is provided the default worker `:kafka_ex` is used + - offset: When supplied the fetch would start from this offset, otherwise + would start from the last committed offset of the consumer_group the worker + lbelongs to. For Kafka < 0.8.2 you should explicitly specify this. + - auto_commit: specifies if the last offset should be commited or not. + Default is true. You must set this to false when using + Kafka < 0.8.2 or `:no_consumer_group`. + - consumer_group: Name of the group of consumers, `:no_consumer_group` + should be passed for Kafka < 0.8.2, defaults to + `Application.get_env(:kafka_ex, :consumer_group)`. ## Example @@ -277,16 +290,22 @@ Optional arguments(KeywordList) """ @spec stream(binary, integer, Keyword.t) :: GenEvent.Stream.t def stream(topic, partition, opts \\ []) do - auto_commit = Keyword.get(opts, :auto_commit, true) + auto_commit = Keyword.get(opts, :auto_commit, true) consumer_group = Keyword.get(opts, :consumer_group) max_bytes = Keyword.get(opts, :max_bytes, @max_bytes) min_bytes = Keyword.get(opts, :min_bytes, @min_bytes) - supplied_offset = Keyword.get(opts, :offset) - worker_name = Keyword.get(opts, :worker_name, Config.default_worker) - retrieved_offset = current_offset(supplied_offset, partition, topic, worker_name) - stream_mode = Keyword.get(opts, :stream_mode, :infinite) + supplied_offset = Keyword.get(opts, :offset) + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + stream_mode = Keyword.get(opts, :stream_mode, :infinite) wait_time = Keyword.get(opts, :wait_time, @wait_time) + retrieved_offset = current_offset( + supplied_offset, + partition, + topic, + worker_name + ) + fetch_request = %FetchRequest{ auto_commit: auto_commit, topic: topic, partition: partition, diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index 9a1f33b8..85afe097 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -339,7 +339,7 @@ defmodule KafkaEx.Integration.Test do stream = KafkaEx.stream(random_string, 0, worker_name: :stream, offset: 0, auto_commit: false) log = TestHelper.wait_for_accum( - fn() -> stream |> Enum.take(2) end, + fn() -> Enum.take(stream, 2) end, 2 ) From ebca2203df2f5226d0524bf495cef09fae2500f4 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Wed, 12 Jul 2017 17:54:40 -0400 Subject: [PATCH 11/20] Rename stream_mode option to mirror command line. --- README.md | 6 ++++++ lib/kafka_ex.ex | 26 ++++++++++++++++---------- lib/kafka_ex/stream.ex | 10 +++++----- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 77b3f9da..181a5162 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,12 @@ iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is th **NOTE** You must pass `auto_commit: false` in the options for `stream/3` when using Kafka < 0.8.2 or when using `:no_consumer_group`. +**NOTE** The stream blocks by default at the end of the kafka log. To override +this and have the stream return immediately at the end of the log, pass +`no_wait_at_logend: true` in the options. This mirrors the command line +arguments of +[SimpleConsumerShell](https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-SimpleConsumerShell). + ```elixir iex> KafkaEx.create_worker(:stream, [uris: [{"localhost", 9092}]]) {:ok, #PID<0.196.0>} diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index b5266924..b9456bd3 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -254,23 +254,29 @@ Optional arguments(KeywordList) end @doc """ - Returns a stream that consumes fetched messages, the stream will halt once - the max_bytes number of messages is reached, if you want to halt the stream - early supply a small max_bytes, for the inverse supply a large max_bytes. + Returns a streamable struct that may be used for consuming messages. + + By default, the stream consumes indefinitely and will block at log end until + new messages are available. Use the `no_wait_at_logend: true` option to have + the stream halt when no more messages are available. + + Optional arguments (keyword list): + + - no_wait_at_logend: Set this to true to halt the stream when there are no + more messages available. Defaults to false, i.e., the stream blocks to wait + for new messages. - Optional arguments(KeywordList) - - stream_mode: the mode the stream will be in, `:infinite` for an infinite - stream and `:finite` for a finite stream, a finite stream will return - once there's no data left to consume from the partition while an - infinite stream will block till there's data to consume from the stream. - worker_name: the worker we want to run this metadata request through, when none is provided the default worker `:kafka_ex` is used + - offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker lbelongs to. For Kafka < 0.8.2 you should explicitly specify this. + - auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`. + - consumer_group: Name of the group of consumers, `:no_consumer_group` should be passed for Kafka < 0.8.2, defaults to `Application.get_env(:kafka_ex, :consumer_group)`. @@ -296,7 +302,7 @@ Optional arguments(KeywordList) min_bytes = Keyword.get(opts, :min_bytes, @min_bytes) supplied_offset = Keyword.get(opts, :offset) worker_name = Keyword.get(opts, :worker_name, Config.default_worker) - stream_mode = Keyword.get(opts, :stream_mode, :infinite) + no_wait_at_logend = Keyword.get(opts, :no_wait_at_logend, false) wait_time = Keyword.get(opts, :wait_time, @wait_time) retrieved_offset = current_offset( @@ -315,7 +321,7 @@ Optional arguments(KeywordList) %Stream{ worker_name: worker_name, fetch_request: fetch_request, - consumer_group: consumer_group, stream_mode: stream_mode + consumer_group: consumer_group, no_wait_at_logend: no_wait_at_logend } end diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index 89551b88..bbb6aaa7 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -1,5 +1,6 @@ defmodule KafkaEx.Stream do @moduledoc false + alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest alias KafkaEx.Protocol.Fetch.Response, as: FetchResponse @@ -7,7 +8,7 @@ defmodule KafkaEx.Stream do defstruct worker_name: nil, fetch_request: %FetchRequest{}, consumer_group: nil, - stream_mode: :infinite + no_wait_at_logend: false defimpl Enumerable do def reduce(data, acc, fun) do @@ -28,7 +29,7 @@ defmodule KafkaEx.Stream do response.last_offset != nil && response.last_offset != offset do {response.message_set, response.last_offset} else - {mode(data.stream_mode), offset} + {stream_control(data.no_wait_at_logend), offset} end end Stream.resource( @@ -36,8 +37,8 @@ defmodule KafkaEx.Stream do ).(acc, fun) end - defp mode(:finite), do: :halt - defp mode(:infinite), do: [] + defp stream_control(true), do: :halt + defp stream_control(false), do: [] defp fetch_response(data, offset) do req = data.fetch_request @@ -54,5 +55,4 @@ defmodule KafkaEx.Stream do {:error, __MODULE__} end end - end From 1ad8633cfb7038f4f0f8930309031ae6e8d08dda Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sat, 15 Jul 2017 14:02:41 -0400 Subject: [PATCH 12/20] Fix stream double-consumption, test no_wait_at_logend --- lib/kafka_ex/stream.ex | 33 ++++--- test/integration/integration_test.exs | 131 +++++++++++++++++++++++--- 2 files changed, 138 insertions(+), 26 deletions(-) diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index bbb6aaa7..8cac028d 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -11,29 +11,22 @@ defmodule KafkaEx.Stream do no_wait_at_logend: false defimpl Enumerable do - def reduce(data, acc, fun) do + def reduce(data = %KafkaEx.Stream{}, acc, fun) do next_fun = fn offset -> if data.fetch_request.auto_commit do - GenServer.call(data.worker_name, { - :offset_commit, - %OffsetCommitRequest{ - consumer_group: data.consumer_group, - topic: data.fetch_request.topic, - partition: data.fetch_request.partition, - offset: offset, metadata: "" - } - }) + commit_offset(data, offset) end response = fetch_response(data, offset) - if response.error_code == :no_error && - response.last_offset != nil && response.last_offset != offset do - {response.message_set, response.last_offset} + if response.error_code == :no_error && response.last_offset do + {response.message_set, response.last_offset + 1} else {stream_control(data.no_wait_at_logend), offset} end end Stream.resource( - fn -> data.fetch_request.offset end, next_fun, &(&1) + fn -> + data.fetch_request.offset + end, next_fun, &(&1) ).(acc, fun) end @@ -47,6 +40,18 @@ defmodule KafkaEx.Stream do |> FetchResponse.partition_messages(req.topic, req.partition) end + defp commit_offset(stream_data = %KafkaEx.Stream{}, offset) do + GenServer.call(stream_data.worker_name, { + :offset_commit, + %OffsetCommitRequest{ + consumer_group: stream_data.consumer_group, + topic: stream_data.fetch_request.topic, + partition: stream_data.fetch_request.partition, + offset: offset, metadata: "" + } + }) + end + def count(_stream) do {:error, __MODULE__} end diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index 85afe097..e7a9ed03 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -329,24 +329,131 @@ defmodule KafkaEx.Integration.Test do # stream test "streams kafka logs" do - random_string = generate_random_string() + topic_name = generate_random_string() + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 1"}, + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + ] + }, worker_name: :stream) + + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + offset: 0, + auto_commit: false + ) + + logs = Enum.take(stream, 2) + assert 2 == length(logs) + [m1, m2] = logs + assert m1.value == "message 1" + assert m2.value == "message 2" + + # calling stream again will get the same messages + assert logs == Enum.take(stream, 2) + end + + test "stream with small max_bytes makes multiple requests if necessary" do + topic_name = generate_random_string() + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 1"}, + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + %Proto.Produce.Message{value: "message 5"}, + ] + }, worker_name: :stream) + + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + max_bytes: 50, + offset: 0, + auto_commit: false + ) + + logs = Enum.take(stream, 4) + assert 4 == length(logs) + [m1, m2, m3, m4] = logs + assert m1.value == "message 1" + assert m2.value == "message 2" + assert m3.value == "message 3" + assert m4.value == "message 4" + + # calling stream again will get the same messages + assert logs == Enum.take(stream, 4) + end + + test "stream blocks until new messages are available" do + topic_name = generate_random_string() KafkaEx.create_worker(:stream, uris: uris()) - KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [ - %Proto.Produce.Message{value: "hey"}, - %Proto.Produce.Message{value: "hi"}, + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 1"} ] }, worker_name: :stream) - stream = KafkaEx.stream(random_string, 0, worker_name: :stream, offset: 0, auto_commit: false) - log = TestHelper.wait_for_accum( - fn() -> Enum.take(stream, 2) end, - 2 + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + max_bytes: 50, + offset: 0, + auto_commit: false ) +task = Task.async(fn -> Enum.take(stream, 4) end) + + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + %Proto.Produce.Message{value: "message 5"}, + ] + }, worker_name: :stream) + + logs = Task.await(task) + + assert 4 == length(logs) + [m1, m2, m3, m4] = logs + assert m1.value == "message 1" + assert m2.value == "message 2" + assert m3.value == "message 3" + assert m4.value == "message 4" + end + + test "stream is non-blocking with no_wait_at_logend" do + topic_name = generate_random_string() + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 1"} + ] + }, worker_name: :stream) + + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + max_bytes: 50, + offset: 0, + auto_commit: false, + no_wait_at_logend: true + ) + + logs = Enum.take(stream, 4) - refute Enum.empty?(log) - [first,second|_] = log - assert first.value == "hey" - assert second.value == "hi" + assert 1 == length(logs) + [m1] = logs + assert m1.value == "message 1" end test "doesn't error when re-creating an existing stream" do From d46c57b0dbfd9e1f40414a15ebd5dc2e77da3579 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sat, 15 Jul 2017 14:39:06 -0400 Subject: [PATCH 13/20] Clean up, fix spec Fixes #214 --- lib/kafka_ex.ex | 2 +- lib/kafka_ex/protocol/fetch.ex | 2 +- lib/kafka_ex/stream.ex | 85 +++++++++++++++++++++------ test/integration/integration_test.exs | 4 +- 4 files changed, 71 insertions(+), 22 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index b9456bd3..3af05fe4 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -294,7 +294,7 @@ Optional arguments(KeywordList) %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}] ``` """ - @spec stream(binary, integer, Keyword.t) :: GenEvent.Stream.t + @spec stream(binary, integer, Keyword.t) :: KafkaEx.Stream.t def stream(topic, partition, opts \\ []) do auto_commit = Keyword.get(opts, :auto_commit, true) consumer_group = Keyword.get(opts, :consumer_group) diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index f3d8d8c4..77ea0214 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -26,7 +26,7 @@ defmodule KafkaEx.Protocol.Fetch do defstruct topic: nil, partitions: [] @type t :: %Response{topic: binary, partitions: list} - @spec partition_messages(list, binary, integer) :: list + @spec partition_messages(list, binary, integer) :: map def partition_messages(responses, topic, partition) do response = Enum.find(responses, &(&1.topic == topic)) || %Response{} Enum.find(response.partitions, &(&1.partition == partition)) diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index 8cac028d..401fafdc 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -10,28 +10,63 @@ defmodule KafkaEx.Stream do consumer_group: nil, no_wait_at_logend: false + @type t :: %__MODULE__{} + defimpl Enumerable do - def reduce(data = %KafkaEx.Stream{}, acc, fun) do + def reduce(%KafkaEx.Stream{} = data, acc, fun) do + # this function returns a Stream.resource stream, so we need to define + # start_fun, next_fun, and after_fun callbacks + + # the state payload for the stream is just the offset + start_fun = fn -> data.fetch_request.offset end + + # each iteration we need to take care of fetching and (possibly) + # committing offsets next_fun = fn offset -> - if data.fetch_request.auto_commit do - commit_offset(data, offset) - end - response = fetch_response(data, offset) - if response.error_code == :no_error && response.last_offset do - {response.message_set, response.last_offset + 1} - else - {stream_control(data.no_wait_at_logend), offset} - end + data + |> maybe_commit_offset(offset) + |> fetch_response(offset) + |> stream_control(data, offset) end - Stream.resource( - fn -> - data.fetch_request.offset - end, next_fun, &(&1) - ).(acc, fun) + + # there isn't really any cleanup, so we don't need to do anything with + # the after_fun callback + after_fun = fn(_last_offset) -> :ok end + + Stream.resource(start_fun, next_fun, after_fun).(acc, fun) end - defp stream_control(true), do: :halt - defp stream_control(false), do: [] + ###################################################################### + # Main stream flow control + + # if we get a response, we return the message set and point at the next + # offset after the last message + defp stream_control( + %{ + error_code: :no_error, + last_offset: last_offset, + message_set: message_set + }, + _stream_data, + _offset + ) when is_integer(last_offset) do + {message_set, last_offset + 1} + end + + # if we don't get any messages and no_wait_at_logend is true, we halt + defp stream_control( + %{}, + %KafkaEx.Stream{no_wait_at_logend: true}, + offset + ) do + {:halt, offset} + end + + # for all other cases we block until messages are ready + defp stream_control(%{}, %KafkaEx.Stream{}, offset) do + {[], offset} + end + ###################################################################### defp fetch_response(data, offset) do req = data.fetch_request @@ -40,7 +75,21 @@ defmodule KafkaEx.Stream do |> FetchResponse.partition_messages(req.topic, req.partition) end - defp commit_offset(stream_data = %KafkaEx.Stream{}, offset) do + defp maybe_commit_offset( + %KafkaEx.Stream{ + fetch_request: %FetchRequest{auto_commit: true} + } = stream_data, + offset + ) do + commit_offset(stream_data, offset) + stream_data + end + + defp maybe_commit_offset(%KafkaEx.Stream{} = stream_data, _offset) do + stream_data + end + + defp commit_offset(%KafkaEx.Stream{} = stream_data, offset) do GenServer.call(stream_data.worker_name, { :offset_commit, %OffsetCommitRequest{ diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index e7a9ed03..3196857c 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -409,7 +409,7 @@ defmodule KafkaEx.Integration.Test do offset: 0, auto_commit: false ) -task = Task.async(fn -> Enum.take(stream, 4) end) + task = Task.async(fn -> Enum.take(stream, 4) end) KafkaEx.produce(%Proto.Produce.Request{ topic: topic_name, partition: 0, required_acks: 1, messages: [ @@ -429,7 +429,7 @@ task = Task.async(fn -> Enum.take(stream, 4) end) assert m3.value == "message 3" assert m4.value == "message 4" end - + test "stream is non-blocking with no_wait_at_logend" do topic_name = generate_random_string() KafkaEx.create_worker(:stream, uris: uris()) From 807b0f46458370ac81617e33c454db05d0541b92 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 18 Jul 2017 12:08:53 -0400 Subject: [PATCH 14/20] Refactor commit handling, handle smaller batch --- lib/kafka_ex/stream.ex | 68 ++++++++++----- test/integration/consumer_group_test.exs | 100 ++++++++++++++++++++++- test/integration/integration_test.exs | 4 + 3 files changed, 152 insertions(+), 20 deletions(-) diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index 401fafdc..4c840503 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -17,6 +17,7 @@ defmodule KafkaEx.Stream do # this function returns a Stream.resource stream, so we need to define # start_fun, next_fun, and after_fun callbacks + # the state payload for the stream is just the offset start_fun = fn -> data.fetch_request.offset end @@ -24,8 +25,8 @@ defmodule KafkaEx.Stream do # committing offsets next_fun = fn offset -> data - |> maybe_commit_offset(offset) |> fetch_response(offset) + |> maybe_commit_offset(data, acc) |> stream_control(data, offset) end @@ -36,6 +37,14 @@ defmodule KafkaEx.Stream do Stream.resource(start_fun, next_fun, after_fun).(acc, fun) end + def count(_stream) do + {:error, __MODULE__} + end + + def member?(_stream, _item) do + {:error, __MODULE__} + end + ###################################################################### # Main stream flow control @@ -68,25 +77,45 @@ defmodule KafkaEx.Stream do end ###################################################################### - defp fetch_response(data, offset) do - req = data.fetch_request - data.worker_name - |> GenServer.call({:fetch, %{req| offset: offset}}) - |> FetchResponse.partition_messages(req.topic, req.partition) - end + ###################################################################### + # Offset management + # first, determine if we even need to commit an offset defp maybe_commit_offset( + fetch_response, %KafkaEx.Stream{ - fetch_request: %FetchRequest{auto_commit: true} + fetch_request: %FetchRequest{auto_commit: auto_commit} } = stream_data, - offset + acc ) do - commit_offset(stream_data, offset) - stream_data + if need_commit?(fetch_response, auto_commit) do + offset_to_commit = last_offset(acc, fetch_response.message_set) + commit_offset(stream_data, offset_to_commit) + end + + fetch_response + end + + # no response -> no commit + defp need_commit?(fetch_response, _auto_commit) + when fetch_response == %{}, do: false + # no messages in response -> no commit + defp need_commit?(%{message_set: []}, _auto_commit), do: false + # otherwise, use the auto_commit setting + defp need_commit?(_fetch_response, auto_commit), do: auto_commit + + # if we have requested fewer messages than we fetched, commit the offset + # of the last one we will actually consume + defp last_offset({:cont, {_, n}}, message_set) + when n <= length(message_set) do + message = Enum.at(message_set, n - 1) + message.offset end - defp maybe_commit_offset(%KafkaEx.Stream{} = stream_data, _offset) do - stream_data + # otherwise, commit the offset of the last message + defp last_offset({:cont, _}, message_set) do + message = List.last(message_set) + message.offset end defp commit_offset(%KafkaEx.Stream{} = stream_data, offset) do @@ -100,13 +129,14 @@ defmodule KafkaEx.Stream do } }) end + ###################################################################### - def count(_stream) do - {:error, __MODULE__} - end - - def member?(_stream, _item) do - {:error, __MODULE__} + # make the actual fetch request + defp fetch_response(data, offset) do + req = data.fetch_request + data.worker_name + |> GenServer.call({:fetch, %{req| offset: offset}}) + |> FetchResponse.partition_messages(req.topic, req.partition) end end end diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index 4240ae77..66560ea7 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -216,7 +216,7 @@ defmodule KafkaEx.ConsumerGroup.Test do refute Enum.empty?(log) - offset_fetch_response = KafkaEx.offset_fetch(:stream_auto_commit, %Proto.OffsetFetch.Request{topic: random_string, partition: 0}) |> hd + [offset_fetch_response | _] = KafkaEx.offset_fetch(:stream_auto_commit, %Proto.OffsetFetch.Request{topic: random_string, partition: 0}) error_code = offset_fetch_response.partitions |> hd |> Map.get(:error_code) offset = offset_fetch_response.partitions |> hd |> Map.get(:offset) @@ -265,4 +265,102 @@ defmodule KafkaEx.ConsumerGroup.Test do assert 0 >= offset_fetch_response_offset end + + test "stream auto_commit deals with small batches correctly" do + topic_name = generate_random_string() + consumer_group = "stream_test" + + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + %Proto.Produce.Message{value: "message 5"}, + ] + }, worker_name: :stream) + + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + offset: 0, + auto_commit: true, + consumer_group: "stream_test" + ) + + [m1, m2] = Enum.take(stream, 2) + assert "message 2" == m1.value + assert "message 3" == m2.value + + offset = TestHelper.latest_consumer_offset_number( + topic_name, + 0, + consumer_group, + :stream + ) + assert offset == m2.offset + end + + test "stream auto_commit doesn't exceed the end of the log" do + topic_name = generate_random_string() + consumer_group = "stream_test" + + KafkaEx.create_worker(:stream, uris: uris()) + KafkaEx.produce(%Proto.Produce.Request{ + topic: topic_name, partition: 0, required_acks: 1, messages: [ + %Proto.Produce.Message{value: "message 2"}, + %Proto.Produce.Message{value: "message 3"}, + %Proto.Produce.Message{value: "message 4"}, + %Proto.Produce.Message{value: "message 5"}, + ] + }, worker_name: :stream) + + stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + offset: 0, + no_wait_at_logend: true, + auto_commit: true, + consumer_group: consumer_group + ) + + [m1, m2, m3, m4] = Enum.take(stream, 10) + assert "message 2" == m1.value + assert "message 3" == m2.value + assert "message 4" == m3.value + assert "message 5" == m4.value + + offset = TestHelper.latest_consumer_offset_number( + topic_name, + 0, + consumer_group, + :stream + ) + assert offset == m4.offset + + other_consumer_group = "another_consumer_group" + map_stream = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + no_wait_at_logend: true, + auto_commit: true, + offset: 0, + consumer_group: other_consumer_group + ) + + assert ["message 2", "message 3", "message 4", "message 5"] = + Enum.map(map_stream, fn(m) -> m.value end) + + offset = TestHelper.latest_consumer_offset_number( + topic_name, + 0, + other_consumer_group, + :stream + ) + # should have the same offset as the first stream + assert offset == m4.offset + end end diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index 3196857c..ee1ecfa0 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -454,6 +454,10 @@ defmodule KafkaEx.Integration.Test do assert 1 == length(logs) [m1] = logs assert m1.value == "message 1" + + # we can also execute something like 'map' which would otherwise be + # open-ended + assert ["message 1"] == Enum.map(stream, fn(m) -> m.value end) end test "doesn't error when re-creating an existing stream" do From 362671dd06a41cd4130fdba1a229ff8862dee0b8 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 18 Jul 2017 13:09:44 -0400 Subject: [PATCH 15/20] Remove extra blank line --- lib/kafka_ex/stream.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/kafka_ex/stream.ex b/lib/kafka_ex/stream.ex index 4c840503..e4effd78 100644 --- a/lib/kafka_ex/stream.ex +++ b/lib/kafka_ex/stream.ex @@ -17,7 +17,6 @@ defmodule KafkaEx.Stream do # this function returns a Stream.resource stream, so we need to define # start_fun, next_fun, and after_fun callbacks - # the state payload for the stream is just the offset start_fun = fn -> data.fetch_request.offset end From 90fb1961a7237aa886a12e8eb2812fe58aebac00 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 18 Jul 2017 15:18:52 -0400 Subject: [PATCH 16/20] Update stream documentation --- README.md | 14 ++---- lib/kafka_ex.ex | 121 +++++++++++++++++++++++++++++++++++++----------- 2 files changed, 98 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 181a5162..62f90f3d 100644 --- a/README.md +++ b/README.md @@ -219,20 +219,12 @@ iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is th ### Stream kafka logs -**NOTE** You must pass `auto_commit: false` in the options for `stream/3` when using Kafka < 0.8.2 or when using `:no_consumer_group`. - -**NOTE** The stream blocks by default at the end of the kafka log. To override -this and have the stream return immediately at the end of the log, pass -`no_wait_at_logend: true` in the options. This mirrors the command line -arguments of -[SimpleConsumerShell](https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-SimpleConsumerShell). +See the `KafkaEx.stream/3` documentation for details on streaming. ```elixir -iex> KafkaEx.create_worker(:stream, [uris: [{"localhost", 9092}]]) -{:ok, #PID<0.196.0>} -iex> KafkaEx.produce("foo", 0, "hey", worker_name: :stream) +iex> KafkaEx.produce("foo", 0, "hey") :ok -iex> KafkaEx.produce("foo", 0, "hi", worker_name: :stream) +iex> KafkaEx.produce("foo", 0, "hi") :ok iex> KafkaEx.stream("foo", 0, offset: 0) |> Enum.take(2) [%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"}, diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 3af05fe4..1c6356a1 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -256,43 +256,112 @@ Optional arguments(KeywordList) @doc """ Returns a streamable struct that may be used for consuming messages. + The returned struct is compatible with the `Stream` and `Enum` modules. + + ```elixir + iex> KafkaEx.produce("foo", 0, "hey") + :ok + iex> KafkaEx.produce("foo", 0, "hi") + :ok + iex> stream = KafkaEx.stream("foo", 0) + %KafkaEx.Stream{...} + iex> Enum.take(stream, 2) + [%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1784030606, key: "", + offset: 0, value: "hey"}, + %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3776653906, key: "", + offset: 1, value: "hi"}] + iex> stream |> Stream.map(fn(msg) -> IO.puts(msg.value) end) |> Stream.run + "hey" + "hi" + # NOTE this will block! See below. + ``` + + ## Reusing streams + + Reusing the same `KafkaEx.Stream` struct results in consuming the same + messages multiple times. This is by design and mirrors the functionality of + `File.stream!`. If you want to reuse the same stream struct, update its + `:offset` before reuse. + + ``` + iex> stream = KafkaEx.stream("foo", 0) + iex> [m1, m2] = Enum.take(stream, 2) + iex> [m1, m2] = Enum.take(stream, 2) # these will be the same messages + iex> stream = %{stream | fetch_request: %{stream.fetch_request | offset: m2.offset + 1}} + iex> [m3, m4] = Enum.take(stream, 2) # new messages + ``` + + ## Streams block at log end + By default, the stream consumes indefinitely and will block at log end until new messages are available. Use the `no_wait_at_logend: true` option to have - the stream halt when no more messages are available. + the stream halt when no more messages are available. This mirrors the + command line arguments of + [SimpleConsumerShell](https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-SimpleConsumerShell). - Optional arguments (keyword list): + Note that this means that fetches will return up to as many messages + as are immediately available in the partition, regardless of arguments. - - no_wait_at_logend: Set this to true to halt the stream when there are no - more messages available. Defaults to false, i.e., the stream blocks to wait - for new messages. + ``` + iex> Enum.map(1..3, fn(ix) -> KafkaEx.produce("bar", 0, "Msg #{ix}") end) + iex> stream = KafkaEx.stream("bar", 0, no_wait_at_logend: true, offset: 0) + iex> Enum.map(stream, fn(m) -> m.value end) # does not block + ["Msg 1", "Msg 2", "Msg 3"] + iex> stream |> Stream.map(fn(m) -> m.value end) |> Enum.take(10) + # only 3 messages are available + ["Msg 1", "Msg 2", "Msg 3"] + ``` - - worker_name: the worker we want to run this metadata request through, when - none is provided the default worker `:kafka_ex` is used + ## Consumer group and auto commit - - offset: When supplied the fetch would start from this offset, otherwise - would start from the last committed offset of the consumer_group the worker - lbelongs to. For Kafka < 0.8.2 you should explicitly specify this. + If you pass a value for the `consumer_group` option and true for + `auto_commit`, the offset of the last message consumed will be committed to + the broker during each cycle. + + For example, suppose we start at the beginning of a partition with millions + of messages and the `max_bytes` setting is such that each `fetch` request + gets 25 messages. In this setting, we will (roughly) be committing offsets + 25, 50, 75, etc. - - auto_commit: specifies if the last offset should be commited or not. - Default is true. You must set this to false when using - Kafka < 0.8.2 or `:no_consumer_group`. + Note that offsets are committed immediately after messages are retrieved, + and therefore before you know if you have succesfully consumed them. It is + therefore possible that you could miss messages if your consumer crashes in + the middle of consuming a batch, therefore losing the guarantee of + at-least-once delivery. If you need this guarantee, we recommend that you + construct a GenServer-based consumer module and manage your commits manually. - - consumer_group: Name of the group of consumers, `:no_consumer_group` - should be passed for Kafka < 0.8.2, defaults to - `Application.get_env(:kafka_ex, :consumer_group)`. + ``` + Enum.map(1..10, fn(ix) -> KafkaEx.produce("baz", 0, "Msg #{ix}") end) + stream = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true) + [m1, m2] = Enum.take(stream, 2) + [m1, m2] = Enum.take(stream, 2) # note we still get the same 2 messages + # TODO fix initial offset on stream re-create + ``` + ## Options - ## Example + `KafkaEx.stream/3` accepts a keyword list of options for the third argument. - ```elixir - iex> KafkaEx.create_worker(:stream, [{"localhost", 9092}]) - {:ok, #PID<0.196.0>} - iex> KafkaEx.produce("foo", 0, "hey", worker_name: :stream) - iex> KafkaEx.produce("foo", 0, "hi", worker_name: :stream) - iex> KafkaEx.stream("foo", 0) |> Enum.take(2) - [%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"}, - %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}] - ``` + - `no_wait_at_logend` (boolean): Set this to true to halt the stream when + there are no more messages available. Defaults to false, i.e., the stream + blocks to wait for new messages. + + - `worker_name` (term): The KafkaEx worker to use for communication with the + brokers. Defaults to `:kafka_ex` (the default worker). + + - `consumer_group` (string): Name of the consumer group used for the initial + offset fetch and automatic offset commit (if `auto_commit` is true). Omit + this value or use `:no_consumer_group` to not use a consumer group (default). + Consumer groups are not compatible with Kafka < 0.8.2. + + - `offset` (integer): The offset from which to start fetching. By default, + this is the last available offset of the partition when no consumer group is + specified. When a consumer group is specified, the next message after the + last committed offset is used. For Kafka < 0.8.2 you must explicitly specify + an offset. + + - `auto_commit` (boolean): If true, the stream automatically commits offsets + of fetched messages. See discussion above. """ @spec stream(binary, integer, Keyword.t) :: KafkaEx.Stream.t def stream(topic, partition, opts \\ []) do From a1a376e0981509d6cf903bc0f072807be094f60e Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 18 Jul 2017 15:52:42 -0400 Subject: [PATCH 17/20] Fix stream consumer group start offset --- lib/kafka_ex.ex | 38 +++++++++----- test/integration/consumer_group_test.exs | 65 +++++++++++++++++------- 2 files changed, 73 insertions(+), 30 deletions(-) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 1c6356a1..6a20a848 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -253,7 +253,7 @@ Optional arguments(KeywordList) produce(produce_request, opts) end - @doc """ + @doc ~S""" Returns a streamable struct that may be used for consuming messages. The returned struct is compatible with the `Stream` and `Enum` modules. @@ -331,11 +331,15 @@ Optional arguments(KeywordList) construct a GenServer-based consumer module and manage your commits manually. ``` - Enum.map(1..10, fn(ix) -> KafkaEx.produce("baz", 0, "Msg #{ix}") end) - stream = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true) - [m1, m2] = Enum.take(stream, 2) - [m1, m2] = Enum.take(stream, 2) # note we still get the same 2 messages - # TODO fix initial offset on stream re-create + iex> Enum.map(1..10, fn(ix) -> KafkaEx.produce("baz", 0, "Msg #{ix}") end) + iex> stream = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true) + iex> stream |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end) + ["Msg 1", "Msg 2"] + iex> stream |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end) + ["Msg 1", "Msg 2"] # same values + iex> stream2 = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true) + iex> stream2 |> Enum.take(1) |> Enum.map(fn(msg) -> msg.value end) + ["Msg 3"] # stream2 got the next available offset ``` ## Options @@ -374,12 +378,22 @@ Optional arguments(KeywordList) no_wait_at_logend = Keyword.get(opts, :no_wait_at_logend, false) wait_time = Keyword.get(opts, :wait_time, @wait_time) - retrieved_offset = current_offset( - supplied_offset, - partition, - topic, - worker_name - ) + retrieved_offset = + if consumer_group && !supplied_offset do + request = %OffsetFetchRequest{ + topic: topic, + partition: partition, + consumer_group: consumer_group + } + + fetched_offset = worker_name + |> KafkaEx.offset_fetch(request) + |> KafkaEx.Protocol.OffsetFetch.Response.last_offset + + fetched_offset + 1 + else + current_offset(supplied_offset, partition, topic, worker_name) + end fetch_request = %FetchRequest{ auto_commit: auto_commit, diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index 66560ea7..d0c0580a 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -248,24 +248,6 @@ defmodule KafkaEx.ConsumerGroup.Test do assert first_message.offset == 4 end - test "stream does not commit offset with auto_commit is set to false" do - random_string = generate_random_string() - KafkaEx.create_worker(:stream_no_auto_commit, uris: uris()) - Enum.each(1..10, fn _ -> KafkaEx.produce(%Proto.Produce.Request{topic: random_string, partition: 0, required_acks: 1, messages: [%Proto.Produce.Message{value: "hey"}]}) end) - stream = KafkaEx.stream(random_string, 0, worker_name: :stream_no_auto_commit, auto_commit: false, offset: 0) - - # make sure we consume at least one message before we assert that there is no offset committed - _log = TestHelper.wait_for_any( - fn() -> Enum.take(stream, 2) end - ) - - offset_fetch_response = KafkaEx.offset_fetch(:stream_no_auto_commit, %Proto.OffsetFetch.Request{topic: random_string, partition: 0}) |> hd - offset_fetch_response.partitions |> hd |> Map.get(:error_code) - offset_fetch_response_offset = offset_fetch_response.partitions |> hd |> Map.get(:offset) - - assert 0 >= offset_fetch_response_offset - end - test "stream auto_commit deals with small batches correctly" do topic_name = generate_random_string() consumer_group = "stream_test" @@ -363,4 +345,51 @@ defmodule KafkaEx.ConsumerGroup.Test do # should have the same offset as the first stream assert offset == m4.offset end + + test "streams with a consumer group begin at the last committed offset" do + topic_name = generate_random_string() + consumer_group = "stream_test" + + KafkaEx.create_worker(:stream, uris: uris()) + messages_in = Enum.map( + 1 .. 10, + fn(ix) -> %Proto.Produce.Message{value: "Msg #{ix}"} end + ) + KafkaEx.produce( + %Proto.Produce.Request{ + topic: topic_name, + partition: 0, + required_acks: 1, + messages: messages_in + }, + worker_name: :stream + ) + + stream1 = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + no_wait_at_logend: true, + auto_commit: true, + offset: 0, + consumer_group: consumer_group + ) + + assert ["Msg 1", "Msg 2"] == stream1 + |> Enum.take(2) + |> Enum.map(fn(msg) -> msg.value end) + + stream2 = KafkaEx.stream( + topic_name, + 0, + worker_name: :stream, + no_wait_at_logend: true, + auto_commit: true, + consumer_group: consumer_group + ) + + assert ["Msg 3", "Msg 4"] == stream2 + |> Enum.take(2) + |> Enum.map(fn(msg) -> msg.value end) + end end From 441fd93dacd2a8ebb830868dca835c9833f1abbf Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 18 Jul 2017 15:59:47 -0400 Subject: [PATCH 18/20] Copy edit documentation updates --- README.md | 2 +- lib/kafka_ex.ex | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 62f90f3d..e86c6b62 100644 --- a/README.md +++ b/README.md @@ -231,7 +231,7 @@ iex> KafkaEx.stream("foo", 0, offset: 0) |> Enum.take(2) %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}] ``` -As mentioned, for Kafka < 0.8.2 the `stream/3` requires `autocommit: false` +For Kafka < 0.8.2 the `stream/3` requires `autocommit: false` ```elixir iex> KafkaEx.stream("foo", 0, offset: 0, auto_commit: false) |> Enum.take(2) diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 6a20a848..dd9e3d24 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -256,7 +256,8 @@ Optional arguments(KeywordList) @doc ~S""" Returns a streamable struct that may be used for consuming messages. - The returned struct is compatible with the `Stream` and `Enum` modules. + The returned struct is compatible with the `Stream` and `Enum` modules. Some + important usage notes follow; see below for a detailed list of options. ```elixir iex> KafkaEx.produce("foo", 0, "hey") @@ -280,7 +281,7 @@ Optional arguments(KeywordList) Reusing the same `KafkaEx.Stream` struct results in consuming the same messages multiple times. This is by design and mirrors the functionality of - `File.stream!`. If you want to reuse the same stream struct, update its + `File.stream!/3`. If you want to reuse the same stream struct, update its `:offset` before reuse. ``` @@ -323,10 +324,10 @@ Optional arguments(KeywordList) gets 25 messages. In this setting, we will (roughly) be committing offsets 25, 50, 75, etc. - Note that offsets are committed immediately after messages are retrieved, - and therefore before you know if you have succesfully consumed them. It is + Note that offsets are committed immediately after messages are retrieved + and before you know if you have succesfully consumed them. It is therefore possible that you could miss messages if your consumer crashes in - the middle of consuming a batch, therefore losing the guarantee of + the middle of consuming a batch, effectively losing the guarantee of at-least-once delivery. If you need this guarantee, we recommend that you construct a GenServer-based consumer module and manage your commits manually. From 8a478a7978b4fd59e8f15e5989b972bb1f062410 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Thu, 20 Jul 2017 09:38:48 -0400 Subject: [PATCH 19/20] Review feedback --- test/integration/consumer_group_test.exs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index d0c0580a..a0e267b4 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -388,8 +388,10 @@ defmodule KafkaEx.ConsumerGroup.Test do consumer_group: consumer_group ) - assert ["Msg 3", "Msg 4"] == stream2 + message_values = stream2 |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end) + + assert ["Msg 3", "Msg 4"] == message_values end end From 50fddd43020116caa7ab7206e612213080c6cde7 Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Tue, 25 Jul 2017 08:35:34 -0400 Subject: [PATCH 20/20] Review feedback --- test/integration/consumer_group_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index a0e267b4..359587f4 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -268,7 +268,7 @@ defmodule KafkaEx.ConsumerGroup.Test do worker_name: :stream, offset: 0, auto_commit: true, - consumer_group: "stream_test" + consumer_group: consumer_group ) [m1, m2] = Enum.take(stream, 2)