Skip to content

Commit

Permalink
Adding resolve_offset/6 to brod_utils, as a way to pass a custom time…
Browse files Browse the repository at this point in the history
…out to kpro:connect_partition_leader
  • Loading branch information
escobera committed May 1, 2020
1 parent f76c456 commit 812e3c4
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
-type offset_time() :: brod:offset_time().
-type group_id() :: brod:group_id().

-define(DEFAULT_TIMEOUT, timer:seconds(5)).

%%%_* APIs =====================================================================

%% @equiv create_topics(Hosts, TopicsConfigs, RequestConfigs, [])
Expand Down Expand Up @@ -141,8 +143,16 @@ get_metadata(Hosts, Topics, ConnCfg) ->
offset_time(), conn_config()) ->
{ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg) ->
Opts = #{timeout => proplists:get_value(connect_timeout, ConnCfg, ?DEFAULT_TIMEOUT)},
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts).

%% @doc Resolve timestamp to real offset.
-spec resolve_offset([endpoint()], topic(), partition(),
offset_time(), conn_config(), conn_config()) ->
{ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts) ->
with_conn(
kpro:connect_partition_leader(Hosts, ConnCfg, Topic, Partition),
kpro:connect_partition_leader(Hosts, ConnCfg, Topic, Partition, Opts),
fun(Pid) -> resolve_offset(Pid, Topic, Partition, Time) end).

%% @doc Resolve timestamp or semantic offset to real offset.
Expand Down

0 comments on commit 812e3c4

Please sign in to comment.