Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule KafkaEx.GenConsumer do
of consumed messages. KafkaEx supports two commit strategies: asynchronous
and synchronous. The return value of `c:handle_message_set/2` determines
which strategy is used:

* `{:sync_commit, new_state}` causes synchronous offset commits.
* `{:async_commit, new_state}` causes asynchronous offset commits.

Expand Down Expand Up @@ -98,7 +98,7 @@ defmodule KafkaEx.GenConsumer do
* `:commit_threshold` is the maximum number of acknowledged messages that a
`KafkaEx.GenConsumer` will allow to be uncommitted before triggering a
commit.

These can be set globally in the `:kafka_ex` app's environment or on a
per-consumer basis by passing options to `start_link/5`:

Expand Down Expand Up @@ -299,6 +299,7 @@ defmodule KafkaEx.GenConsumer do
:acked_offset,
:last_commit,
:auto_offset_reset,
:fetch_options
]
end

Expand Down Expand Up @@ -331,6 +332,9 @@ defmodule KafkaEx.GenConsumer do
the oldest available, `:latest` moves to the most recent. If anything else
is specified, the error will simply be raised.

* `:fetch_options` - Optional keyword list that is passed along to the
`KafkaEx.fetch` call.

Both `:commit_interval` and `:commit_threshold` default to the application
config (e.g., `Application.get_env/2`) if that value is present, or the
stated default if the application config is not present.
Expand Down Expand Up @@ -410,6 +414,14 @@ defmodule KafkaEx.GenConsumer do
[consumer_group: group_name] ++ worker_opts
)

default_fetch_options = [
auto_commit: false,
worker_name: worker_name
]

given_fetch_options = Keyword.get(opts, :fetch_options, [])
fetch_options = Keyword.merge(default_fetch_options, given_fetch_options)

state = %State{
consumer_module: consumer_module,
consumer_state: consumer_state,
Expand All @@ -420,6 +432,7 @@ defmodule KafkaEx.GenConsumer do
group: group_name,
topic: topic,
partition: partition,
fetch_options: fetch_options
}

Process.flag(:trap_exit, true)
Expand Down Expand Up @@ -482,7 +495,8 @@ defmodule KafkaEx.GenConsumer do
worker_name: worker_name,
topic: topic,
partition: partition,
current_offset: offset
current_offset: offset,
fetch_options: fetch_options
} = state
) do
[
Expand All @@ -495,9 +509,7 @@ defmodule KafkaEx.GenConsumer do
] = KafkaEx.fetch(
topic,
partition,
offset: offset,
auto_commit: false,
worker_name: worker_name
Keyword.merge(fetch_options, [offset: offset])
)

state =
Expand Down