Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
850084e
Stream POC.
bjhaid Jan 24, 2017
14522ff
Replace the current GenEvent based stream implementation with a really
bjhaid Feb 18, 2017
48cdc70
Update documentation.
bjhaid Feb 18, 2017
c6d2fce
Replace short form of function call with the explicit form.
bjhaid Feb 18, 2017
f04d984
Update the documentation on the stream function to reflect it's halting
bjhaid Feb 18, 2017
4c1334e
Delete the dead stream code.
bjhaid Feb 18, 2017
2644a77
Refactor Stream.reduce and create an helper
bjhaid May 5, 2017
2b1561c
Introduce stream_mode support, exposing control of finite and infinite
bjhaid May 5, 2017
854f472
Use a function call when a pipeline is only one function long
bjhaid May 5, 2017
225058a
Merge branch 'master' into stream
dantswain Jul 12, 2017
4405f5c
Formatting and cleanup
dantswain Jul 12, 2017
ebca220
Rename stream_mode option to mirror command line.
dantswain Jul 12, 2017
1ad8633
Fix stream double-consumption, test no_wait_at_logend
dantswain Jul 15, 2017
d46c57b
Clean up, fix spec
dantswain Jul 15, 2017
807b0f4
Refactor commit handling, handle smaller batch
dantswain Jul 18, 2017
362671d
Remove extra blank line
dantswain Jul 18, 2017
90fb196
Update stream documentation
dantswain Jul 18, 2017
a1a376e
Fix stream consumer group start offset
dantswain Jul 18, 2017
441fd93
Copy edit documentation updates
dantswain Jul 18, 2017
8a478a7
Review feedback
dantswain Jul 20, 2017
6660af0
Merge branch 'master' into stream
joshuawscott Jul 21, 2017
50fddd4
Review feedback
dantswain Jul 25, 2017
98338f7
Merge branch 'master' into stream
dantswain Jul 25, 2017
13bba68
Merge branch 'stream' of github.com:kafkaex/kafka_ex into stream
dantswain Jul 25, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,19 @@ iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is th

### Stream kafka logs

**NOTE** You must pass `auto_commit: false` in the options for `stream/3` when using Kafka < 0.8.2 or when using `:no_consumer_group`.
See the `KafkaEx.stream/3` documentation for details on streaming.

```elixir
iex> KafkaEx.create_worker(:stream, [uris: [{"localhost", 9092}]])
{:ok, #PID<0.196.0>}
iex> KafkaEx.produce("foo", 0, "hey", worker_name: :stream)
iex> KafkaEx.produce("foo", 0, "hey")
:ok
iex> KafkaEx.produce("foo", 0, "hi", worker_name: :stream)
iex> KafkaEx.produce("foo", 0, "hi")
:ok
iex> KafkaEx.stream("foo", 0, offset: 0) |> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
%{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]
```

As mentioned, for Kafka < 0.8.2 the `stream/3` requires `autocommit: false`
For Kafka < 0.8.2 the `stream/3` requires `autocommit: false`

```elixir
iex> KafkaEx.stream("foo", 0, offset: 0, auto_commit: false) |> Enum.take(2)
Expand Down
191 changes: 145 additions & 46 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule KafkaEx do
alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.Produce.Message
alias KafkaEx.Server
alias KafkaEx.Stream

@type uri() :: [{binary|char_list, number}]
@type worker_init :: [worker_setting]
Expand Down Expand Up @@ -255,62 +256,160 @@ defmodule KafkaEx do
produce(produce_request, opts)
end

@doc """
Returns a stream that consumes fetched messages.
This puts the specified worker in streaming mode and blocks the worker indefinitely.
The handler is a normal GenEvent handler so you can supply a custom handler, otherwise a default handler is used.
@doc ~S"""
Returns a streamable struct that may be used for consuming messages.

This function should be used with care as the queue is unbounded and can cause OOM.
The returned struct is compatible with the `Stream` and `Enum` modules. Some
important usage notes follow; see below for a detailed list of options.

Optional arguments(KeywordList)
- worker_name: the worker we want to run this metadata request through, when none is provided the default worker `:kafka_ex` is used
- offset: When supplied the fetch would start from this offset, otherwise would start from the last committed offset of the consumer_group the worker belongs to. For Kafka < 0.8.2 you should explicitly specify this.
- handler: the handler we want to handle the streaming events, when none is provided the default KafkaEx.Handler is used
- handler_init: initial state for the handler - leave the default value [] when using the default handler
- auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`.
```elixir
iex> KafkaEx.produce("foo", 0, "hey")
:ok
iex> KafkaEx.produce("foo", 0, "hi")
:ok
iex> stream = KafkaEx.stream("foo", 0)
%KafkaEx.Stream{...}
iex> Enum.take(stream, 2)
[%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1784030606, key: "",
offset: 0, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3776653906, key: "",
offset: 1, value: "hi"}]
iex> stream |> Stream.map(fn(msg) -> IO.puts(msg.value) end) |> Stream.run
"hey"
"hi"
# NOTE this will block! See below.
```

## Example
## Reusing streams

```elixir
iex> KafkaEx.create_worker(:stream, [{"localhost", 9092}])
{:ok, #PID<0.196.0>}
iex> KafkaEx.produce("foo", 0, "hey", worker_name: :stream)
iex> KafkaEx.produce("foo", 0, "hi", worker_name: :stream)
iex> KafkaEx.stream("foo", 0) |> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
%{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]
Reusing the same `KafkaEx.Stream` struct results in consuming the same
messages multiple times. This is by design and mirrors the functionality of
`File.stream!/3`. If you want to reuse the same stream struct, update its
`:offset` before reuse.

```
iex> stream = KafkaEx.stream("foo", 0)
iex> [m1, m2] = Enum.take(stream, 2)
iex> [m1, m2] = Enum.take(stream, 2) # these will be the same messages
iex> stream = %{stream | fetch_request: %{stream.fetch_request | offset: m2.offset + 1}}
iex> [m3, m4] = Enum.take(stream, 2) # new messages
```

## Streams block at log end

By default, the stream consumes indefinitely and will block at log end until
new messages are available. Use the `no_wait_at_logend: true` option to have
the stream halt when no more messages are available. This mirrors the
command line arguments of
[SimpleConsumerShell](https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-SimpleConsumerShell).

Note that this means that fetches will return up to as many messages
as are immediately available in the partition, regardless of arguments.

```
iex> Enum.map(1..3, fn(ix) -> KafkaEx.produce("bar", 0, "Msg #{ix}") end)
iex> stream = KafkaEx.stream("bar", 0, no_wait_at_logend: true, offset: 0)
iex> Enum.map(stream, fn(m) -> m.value end) # does not block
["Msg 1", "Msg 2", "Msg 3"]
iex> stream |> Stream.map(fn(m) -> m.value end) |> Enum.take(10)
# only 3 messages are available
["Msg 1", "Msg 2", "Msg 3"]
```

## Consumer group and auto commit

If you pass a value for the `consumer_group` option and true for
`auto_commit`, the offset of the last message consumed will be committed to
the broker during each cycle.

For example, suppose we start at the beginning of a partition with millions
of messages and the `max_bytes` setting is such that each `fetch` request
gets 25 messages. In this setting, we will (roughly) be committing offsets
25, 50, 75, etc.

Note that offsets are committed immediately after messages are retrieved
and before you know if you have succesfully consumed them. It is
therefore possible that you could miss messages if your consumer crashes in
the middle of consuming a batch, effectively losing the guarantee of
at-least-once delivery. If you need this guarantee, we recommend that you
construct a GenServer-based consumer module and manage your commits manually.

```
iex> Enum.map(1..10, fn(ix) -> KafkaEx.produce("baz", 0, "Msg #{ix}") end)
iex> stream = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true)
iex> stream |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end)
["Msg 1", "Msg 2"]
iex> stream |> Enum.take(2) |> Enum.map(fn(msg) -> msg.value end)
["Msg 1", "Msg 2"] # same values
iex> stream2 = KafkaEx.stream("baz", 0, consumer_group: "my_consumer", auto_commit: true)
iex> stream2 |> Enum.take(1) |> Enum.map(fn(msg) -> msg.value end)
["Msg 3"] # stream2 got the next available offset
```

## Options

`KafkaEx.stream/3` accepts a keyword list of options for the third argument.

- `no_wait_at_logend` (boolean): Set this to true to halt the stream when
there are no more messages available. Defaults to false, i.e., the stream
blocks to wait for new messages.

- `worker_name` (term): The KafkaEx worker to use for communication with the
brokers. Defaults to `:kafka_ex` (the default worker).

- `consumer_group` (string): Name of the consumer group used for the initial
offset fetch and automatic offset commit (if `auto_commit` is true). Omit
this value or use `:no_consumer_group` to not use a consumer group (default).
Consumer groups are not compatible with Kafka < 0.8.2.

- `offset` (integer): The offset from which to start fetching. By default,
this is the last available offset of the partition when no consumer group is
specified. When a consumer group is specified, the next message after the
last committed offset is used. For Kafka < 0.8.2 you must explicitly specify
an offset.

- `auto_commit` (boolean): If true, the stream automatically commits offsets
of fetched messages. See discussion above.
"""
@spec stream(binary, integer, Keyword.t) :: GenEvent.Stream.t
@spec stream(binary, integer, Keyword.t) :: KafkaEx.Stream.t
def stream(topic, partition, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
supplied_offset = Keyword.get(opts, :offset)
handler = Keyword.get(opts, :handler, KafkaEx.Handler)
handler_init = Keyword.get(opts, :handler_init, [])
auto_commit = Keyword.get(opts, :auto_commit, true)
wait_time = Keyword.get(opts, :wait_time, @wait_time)
min_bytes = Keyword.get(opts, :min_bytes, @min_bytes)
auto_commit = Keyword.get(opts, :auto_commit, true)
consumer_group = Keyword.get(opts, :consumer_group)
max_bytes = Keyword.get(opts, :max_bytes, @max_bytes)
min_bytes = Keyword.get(opts, :min_bytes, @min_bytes)
supplied_offset = Keyword.get(opts, :offset)
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
no_wait_at_logend = Keyword.get(opts, :no_wait_at_logend, false)
wait_time = Keyword.get(opts, :wait_time, @wait_time)

event_stream = Server.call(worker_name, {:create_stream, handler, handler_init})
retrieved_offset = current_offset(supplied_offset, partition, topic, worker_name)

send(worker_name, {
:start_streaming,
%FetchRequest{
auto_commit: auto_commit,
topic: topic, partition: partition,
offset: retrieved_offset, wait_time: wait_time,
min_bytes: min_bytes, max_bytes: max_bytes
}
})
event_stream
end
retrieved_offset =
if consumer_group && !supplied_offset do
request = %OffsetFetchRequest{
topic: topic,
partition: partition,
consumer_group: consumer_group
}

fetched_offset = worker_name
|> KafkaEx.offset_fetch(request)
|> KafkaEx.Protocol.OffsetFetch.Response.last_offset

fetched_offset + 1
else
current_offset(supplied_offset, partition, topic, worker_name)
end

@spec stop_streaming(Keyword.t) :: :stop_streaming
def stop_streaming(opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
send(worker_name, :stop_streaming)
fetch_request = %FetchRequest{
auto_commit: auto_commit,
topic: topic, partition: partition,
offset: retrieved_offset, wait_time: wait_time,
min_bytes: min_bytes, max_bytes: max_bytes
}

%Stream{
worker_name: worker_name, fetch_request: fetch_request,
consumer_group: consumer_group, no_wait_at_logend: no_wait_at_logend
}
end

defp build_worker_options(worker_init) do
Expand Down
18 changes: 0 additions & 18 deletions lib/kafka_ex/handler.ex

This file was deleted.

6 changes: 6 additions & 0 deletions lib/kafka_ex/protocol/fetch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ defmodule KafkaEx.Protocol.Fetch do
@moduledoc false
defstruct topic: nil, partitions: []
@type t :: %Response{topic: binary, partitions: list}

@spec partition_messages(list, binary, integer) :: map
def partition_messages(responses, topic, partition) do
response = Enum.find(responses, &(&1.topic == topic)) || %Response{}
Enum.find(response.partitions, &(&1.partition == partition))
end
end

defmodule Message do
Expand Down
57 changes: 1 addition & 56 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ defmodule KafkaEx.Server do
metadata: %Metadata.Response{},
brokers: [],
event_pid: nil,
stream_timer: nil,
consumer_metadata: %ConsumerMetadata.Response{},
correlation_id: 0,
consumer_group: nil,
Expand All @@ -36,7 +35,6 @@ defmodule KafkaEx.Server do
metadata: Metadata.Response.t,
brokers: [Broker.t],
event_pid: nil | pid,
stream_timer: reference,
consumer_metadata: ConsumerMetadata.Response.t,
correlation_id: integer,
metadata_update_interval: nil | integer,
Expand Down Expand Up @@ -136,21 +134,6 @@ defmodule KafkaEx.Server do
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason, reply, new_state} |
{:stop, reason, new_state} when reply: term, new_state: term, reason: term
@callback kafka_server_create_stream(handler :: term, handler_init :: term, state :: State.t) ::
{:reply, reply, new_state} |
{:reply, reply, new_state, timeout | :hibernate} |
{:noreply, new_state} |
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason, reply, new_state} |
{:stop, reason, new_state} when reply: term, new_state: term, reason: term
@callback kafka_server_start_streaming(fetch_request :: FetchRequest.t, state :: State.t) ::
{:noreply, new_state} |
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason :: term, new_state} when new_state: term
@callback kafka_server_stop_streaming(state :: State.t) ::
{:noreply, new_state} |
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason :: term, new_state} when new_state: term
@callback kafka_server_update_metadata(state :: State.t) ::
{:noreply, new_state} |
{:noreply, new_state, timeout | :hibernate} |
Expand Down Expand Up @@ -253,24 +236,6 @@ defmodule KafkaEx.Server do
kafka_server_heartbeat(group_name, generation_id, member_id, state)
end

def handle_call({:create_stream, handler, handler_init}, _from, state) do
kafka_server_create_stream(handler, handler_init, state)
end

def handle_info({:start_streaming, fetch_request}, state) do
kafka_server_start_streaming(fetch_request, state)
end

def handle_info(:stop_streaming, state) do
{:noreply, state} = kafka_server_stop_streaming(state)
state = case state.stream_timer do
nil -> state
ref -> Process.cancel_timer(ref)
%{state | stream_timer: nil}
end
{:noreply, state}
end

def handle_info(:update_metadata, state) do
kafka_server_update_metadata(state)
end
Expand Down Expand Up @@ -360,25 +325,6 @@ defmodule KafkaEx.Server do
{:reply, metadata, updated_state}
end

def kafka_server_create_stream(handler, handler_init, state) do
new_state = if state.event_pid && Process.alive?(state.event_pid) do
Logger.log(:warn, "'#{state.worker_name}' already streaming handler '#{handler}'")
state
else
{:ok, event_pid} = GenEvent.start_link
updated_state = %{state | event_pid: event_pid}
:ok = GenEvent.add_handler(updated_state.event_pid, handler, handler_init)
updated_state
end
{:reply, GenEvent.stream(new_state.event_pid), new_state}
end

def kafka_server_stop_streaming(state) do
Logger.log(:debug, "Stopped worker #{inspect state.worker_name} from streaming")
GenEvent.stop(state.event_pid)
{:noreply, %{state | event_pid: nil}}
end

def kafka_server_update_metadata(state) do
{:noreply, update_metadata(state)}
end
Expand Down Expand Up @@ -419,8 +365,7 @@ defmodule KafkaEx.Server do

defoverridable [
kafka_server_produce: 2, kafka_server_offset: 4,
kafka_server_metadata: 2, kafka_server_create_stream: 3,
kafka_server_stop_streaming: 1, kafka_server_update_metadata: 1,
kafka_server_metadata: 2, kafka_server_update_metadata: 1,
]

defp remove_stale_brokers(brokers, metadata_brokers) do
Expand Down
Loading