From 1702b7fe3b97c690fb52a7f7e26abc9c4db73770 Mon Sep 17 00:00:00 2001 From: Joshua Scott Date: Thu, 15 Feb 2018 17:01:46 -0600 Subject: [PATCH] Allow passing fetch options to GenConsumer Fixes #239 --- lib/kafka_ex/gen_consumer.ex | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index c659eea6..4ccfe38b 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -56,7 +56,7 @@ defmodule KafkaEx.GenConsumer do of consumed messages. KafkaEx supports two commit strategies: asynchronous and synchronous. The return value of `c:handle_message_set/2` determines which strategy is used: - + * `{:sync_commit, new_state}` causes synchronous offset commits. * `{:async_commit, new_state}` causes asynchronous offset commits. @@ -98,7 +98,7 @@ defmodule KafkaEx.GenConsumer do * `:commit_threshold` is the maximum number of acknowledged messages that a `KafkaEx.GenConsumer` will allow to be uncommitted before triggering a commit. - + These can be set globally in the `:kafka_ex` app's environment or on a per-consumer basis by passing options to `start_link/5`: @@ -299,6 +299,7 @@ defmodule KafkaEx.GenConsumer do :acked_offset, :last_commit, :auto_offset_reset, + :fetch_options ] end @@ -331,6 +332,9 @@ defmodule KafkaEx.GenConsumer do the oldest available, `:latest` moves to the most recent. If anything else is specified, the error will simply be raised. + * `:fetch_options` - Optional keyword list that is passed along to the + `KafkaEx.fetch` call. + Both `:commit_interval` and `:commit_threshold` default to the application config (e.g., `Application.get_env/2`) if that value is present, or the stated default if the application config is not present. @@ -410,6 +414,14 @@ defmodule KafkaEx.GenConsumer do [consumer_group: group_name] ++ worker_opts ) + default_fetch_options = [ + auto_commit: false, + worker_name: worker_name + ] + + given_fetch_options = Keyword.get(opts, :fetch_options, []) + fetch_options = Keyword.merge(default_fetch_options, given_fetch_options) + state = %State{ consumer_module: consumer_module, consumer_state: consumer_state, @@ -420,6 +432,7 @@ defmodule KafkaEx.GenConsumer do group: group_name, topic: topic, partition: partition, + fetch_options: fetch_options } Process.flag(:trap_exit, true) @@ -482,7 +495,8 @@ defmodule KafkaEx.GenConsumer do worker_name: worker_name, topic: topic, partition: partition, - current_offset: offset + current_offset: offset, + fetch_options: fetch_options } = state ) do [ @@ -495,9 +509,7 @@ defmodule KafkaEx.GenConsumer do ] = KafkaEx.fetch( topic, partition, - offset: offset, - auto_commit: false, - worker_name: worker_name + Keyword.merge(fetch_options, [offset: offset]) ) state =