Skip to content

Commit

Permalink
Uncrustify the docs (#355)
Browse files Browse the repository at this point in the history
Fixes #344
  • Loading branch information
k32 committed Nov 6, 2019
1 parent cd55b35 commit bb78a36
Show file tree
Hide file tree
Showing 16 changed files with 443 additions and 286 deletions.
212 changes: 125 additions & 87 deletions src/brod.erl

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/brod_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
%%% limitations under the License.
%%%

%% @private
-module(brod_cli).

-ifdef(build_brod_cli).
Expand Down
8 changes: 5 additions & 3 deletions src/brod_cli_pipe.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
%%% limitations under the License.
%%%

%% @doc The input source of brod-cli pipe command
%% @private The input source of brod-cli pipe command.
%% This module implements a process that reads off the bytes
%% from the data source (either stdin or a file)
%% and sends the bytes to parent process.
%% Messages sent to parent process:
%% {pipe, self(), [{Ts :: integer(), Key :: binary(), Val :: binary()}]}
%%
%% ```{pipe, self(), [{Ts :: integer(), Key :: binary(), Val :: binary()}]}'''
%% @end
-module(brod_cli_pipe).

Expand Down Expand Up @@ -76,7 +77,8 @@
}).

%% @doc Args explained:
%% source: 'standard_io' | {file, "path/to/srouce"}
%%
%% source: 'standard_io' | {file, "path/to/source"}
%% kv_deli: 'none' | binary().
%% Delimiter bytes for message key and value
%% msg_deli: binary(). Delimiter between kafka messages
Expand Down
12 changes: 9 additions & 3 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ stop_consumer(Client, TopicName) ->
%% Return already established connection towards the leader broker,
%% Otherwise a new one is established and cached in client state.
%% If the old connection was dead less than a configurable N seconds ago,
%% {error, LastReason} is returned.
%% `{error, LastReason}' is returned.
-spec get_leader_connection(client(), topic(), partition()) ->
{ok, pid()} | {error, any()}.
get_leader_connection(Client, Topic, Partition) ->
Expand All @@ -213,13 +213,13 @@ get_leader_connection(Client, Topic, Partition) ->
%% Return already established connection towards the broker,
%% otherwise a new one is established and cached in client state.
%% If the old connection was dead less than a configurable N seconds ago,
%% {error, LastReason} is returned.
%% `{error, LastReason}' is returned.
-spec get_connection(client(), brod:hostname(), brod:portnum()) ->
{ok, pid()} | {error, any()}.
get_connection(Client, Host, Port) ->
safe_gen_call(Client, {get_connection, Host, Port}, infinity).

%% @doc Get topic metadata, if topic is 'undefined', will fetch ALL metadata.
%% @doc Get topic metadata, if topic is `undefined', will fetch ALL metadata.
-spec get_metadata(client(), all | ?undef | topic()) ->
{ok, kpro:struct()} | {error, any()}.
get_metadata(Client, ?undef) ->
Expand Down Expand Up @@ -282,6 +282,7 @@ deregister_consumer(Client, Topic, Partition) ->

%%%_* gen_server callbacks =====================================================

%% @private
init({BootstrapEndpoints, ClientId, Config}) ->
erlang:process_flag(trap_exit, true),
Tab = ets:new(?ETS(ClientId),
Expand All @@ -293,6 +294,7 @@ init({BootstrapEndpoints, ClientId, Config}) ->
, workers_tab = Tab
}}.

%% @private
handle_info(init, State0) ->
Endpoints = State0#state.bootstrap_endpoints,
State1 = ensure_metadata_connection(State0),
Expand Down Expand Up @@ -323,6 +325,7 @@ handle_info(Info, State) ->
[?MODULE, self(), State#state.client_id, Info]),
{noreply, State}.

%% @private
handle_call({stop_producer, Topic}, _From, State) ->
ok = brod_producers_sup:stop_producer(State#state.producers_sup, Topic),
{reply, ok, State};
Expand Down Expand Up @@ -368,6 +371,7 @@ handle_call(stop, _From, State) ->
handle_call(Call, _From, State) ->
{reply, {error, {unknown_call, Call}}, State}.

%% @private
handle_cast({register, Key, Pid}, #state{workers_tab = Tab} = State) ->
ets:insert(Tab, {Key, Pid}),
{noreply, State};
Expand All @@ -379,9 +383,11 @@ handle_cast(Cast, State) ->
[?MODULE, self(), State#state.client_id, Cast]),
{noreply, State}.

%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%% @private
terminate(Reason, #state{ client_id = ClientId
, meta_conn = MetaConn
, payload_conns = PayloadConns
Expand Down
130 changes: 84 additions & 46 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,44 +120,70 @@ start_link(Bootstrap, Topic, Partition, Config) ->
start_link(Bootstrap, Topic, Partition, Config, []).

%% @doc Start (link) a partition consumer.
%%
%% Possible configs:
%% min_bytes (optional default = 0):
%% Minimal bytes to fetch in a batch of messages
%% max_bytes (optional default = 1MB):
%% Maximum bytes to fetch in a batch of messages
%% NOTE: this value might be expanded to retry when it is not enough
%% to fetch even one single message, then slowly shrinked back
%% to this given value.
%% max_wait_time (optional, default = 10000 ms):
%% Max number of seconds allowd for the broker to collect min_bytes of
%% messages in fetch response
%% sleep_timeout (optional, default = 1000 ms):
%% <ul>
%% <li>`min_bytes' (optional, default = 0)
%%
%% Minimal bytes to fetch in a batch of messages</li>
%%
%% <li>`max_bytes' (optional, default = 1MB)
%%
%% Maximum bytes to fetch in a batch of messages.
%%
%% NOTE: this value might be expanded to retry when it is not
%% enough to fetch even a single message, then slowly
%% shrinked back to the given value.</li>
%%
%% <li>`max_wait_time' (optional, default = 10000 ms)
%%
%% Max number of seconds allowd for the broker to collect
%% `min_bytes' of messages in fetch response</li>
%%
%% <li>`sleep_timeout' (optional, default = 1000 ms)
%%
%% Allow consumer process to sleep this amout of ms if kafka replied
%% 'empty' message-set.
%% prefetch_count (optional, default = 10):
%% The window size (number of messages) allowed to fetch-ahead.
%% prefetch_bytes (optional, default = 100KB):
%% 'empty' message set.</li>
%%
%% <li>`prefetch_count' (optional, default = 10)
%%
%% The window size (number of messages) allowed to fetch-ahead.</li>
%%
%% <li>`prefetch_bytes' (optional, default = 100KB)
%%
%% The total number of bytes allowed to fetch-ahead.
%% brod_consumer is greed, it only stops fetching more messages in
%% when number of unacked messages has exceeded prefetch_count AND
%% the unacked total volume has exceeded prefetch_bytes
%% begin_offset (optional, default = latest):
%% The offset from which to begin fetch requests.
%% offset_reset_policy (optional, default = reset_by_subscriber)
%% How to reset begin_offset if OffsetOutOfRange exception is received.
%% reset_by_subscriber: consumer is suspended (is_suspended=true in state)
%% and wait for subscriber to re-subscribe with a new
%% 'begin_offset' option.
%% reset_to_earliest: consume from the earliest offset.
%% reset_to_latest: consume from the last available offset.
%% size_stat_window: (optional, default = 5)
%% The moving-average window size to caculate average message size.
%% Average message size is used to shrink max_bytes in fetch requests
%% after it has been expanded to fetch a large message.
%% Use 0 to immediately shrink back to original max_bytes from config.
%% A size esitmation allows users to set a relatively small max_bytes,
%% then let it dynamically adjust to a number around
%% PrefetchCount * AverageSize
%% the unacked total volume has exceeded prefetch_bytes</li>
%%
%% <li>`begin_offset' (optional, default = latest)
%%
%% The offset from which to begin fetch requests.</li>
%%
%% <li>`offset_reset_policy' (optional, default = reset_by_subscriber)
%%
%% How to reset `begin_offset' if `OffsetOutOfRange' exception is received.
%%
%% `reset_by_subscriber': consumer is suspended
%% (`is_suspended=true' in state) and wait
%% for subscriber to re-subscribe with a new
%% `begin_offset' option.
%%
%% `reset_to_earliest': consume from the earliest offset.
%%
%% `reset_to_latest': consume from the last available offset.</li>
%%
%% <li>`size_stat_window': (optional, default = 5)
%%
%% The moving-average window size to caculate average message
%% size. Average message size is used to shrink `max_bytes' in
%% fetch requests after it has been expanded to fetch a large
%% message. Use 0 to immediately shrink back to original
%% `max_bytes' from config. A size estimation allows users to set
%% a relatively small `max_bytes', then let it dynamically adjust
%% to a number around `PrefetchCount * AverageSize'</li>
%%
%% </ul>
%% @end
-spec start_link(pid() | brod:bootstrap(),
topic(), partition(), config(), [any()]) ->
Expand All @@ -181,18 +207,23 @@ stop_maybe_kill(Pid, Timeout) ->
ok
end.

%% @doc Subscribe or resubscribe on messages from a partition.
%% Caller may pass in a set of options which is an extention of consumer config
%% to update the parameters such as max_bytes and max_wait_time etc.
%% also to update the start point (begin_offset) of the data stream.
%% @doc Subscribe or resubscribe on messages from a partition. Caller
%% may specify a set of options extending consumer config. It is
%% possible to update parameters such as `max_bytes' and
%% `max_wait_time', or the starting point (`begin_offset') of the data
%% stream.
%%
%% Possible options:
%% all consumer configs as documented for start_link/5
%% begin_offset (optional, default = latest)
%% A subscriber may consume and process messages then persist the associated
%% offset to a persistent storage, then start (or restart) with
%% last_processed_offset + 1 as the begin_offset to proceed.
%% By default, it fetches from the latest available offset.
%% @end
%%
%% All consumer configs as documented for {@link start_link/5}
%%
%% `begin_offset' (optional, default = latest)
%%
%% A subscriber may consume and process messages, then persist the
%% associated offset to a persistent storage, then start (or
%% restart) from `last_processed_offset + 1' as the `begin_offset'
%% to proceed. By default, it starts fetching from the latest
%% available offset.
-spec subscribe(pid(), pid(), options()) -> ok | {error, any()}.
subscribe(Pid, SubscriberPid, ConsumerOptions) ->
safe_gen_call(Pid, {subscribe, SubscriberPid, ConsumerOptions}, infinity).
Expand All @@ -210,8 +241,10 @@ ack(Pid, Offset) ->

-spec debug(pid(), print | string() | none) -> ok.
%% @doc Enable/disable debugging on the consumer process.
%% debug(Pid, print) prints debug info on stdout
%% debug(Pid, File) prints debug info into a File
%%
%% `debug(Pid, print)' prints debug info to stdout.
%%
%% `debug(Pid, File)' prints debug info to a file `File'.
debug(Pid, none) ->
do_debug(Pid, no_debug);
debug(Pid, print) ->
Expand Down Expand Up @@ -265,6 +298,7 @@ init({Bootstrap, Topic, Partition, Config}) ->
, connection_mref = ?undef
}}.

%% @private
handle_info(?INIT_CONNECTION, #state{subscriber = Subscriber} = State0) ->
case brod_utils:is_pid_alive(Subscriber) andalso
maybe_init_connection(State0) of
Expand Down Expand Up @@ -303,6 +337,7 @@ handle_info(Info, State) ->
[?MODULE, self(), Info]),
{noreply, State}.

%% @private
handle_call(get_connection, _From, #state{connection = C} = State) ->
{reply, C, State};
handle_call({subscribe, Pid, Options}, _From,
Expand Down Expand Up @@ -340,6 +375,7 @@ handle_call(stop, _From, State) ->
handle_call(Call, _From, State) ->
{reply, {error, {unknown_call, Call}}, State}.

%% @private
handle_cast({ack, Offset}, #state{pending_acks = PendingAcks} = State0) ->
NewPendingAcks = handle_ack(PendingAcks, Offset),
State1 = State0#state{pending_acks = NewPendingAcks},
Expand All @@ -350,6 +386,7 @@ handle_cast(Cast, State) ->
[?MODULE, self(), Cast]),
{noreply, State}.

%% @private
terminate(Reason, #state{ bootstrap = Bootstrap
, topic = Topic
, partition = Partition
Expand All @@ -370,6 +407,7 @@ terminate(Reason, #state{ bootstrap = Bootstrap
[Topic, Partition, Reason]),
ok.

%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

Expand Down
2 changes: 1 addition & 1 deletion src/brod_consumers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
%%%

%%%=============================================================================
%%% @doc brod consumers supervisor
%%% @private brod consumers supervisor
%%% @end
%%%=============================================================================

Expand Down
Loading

0 comments on commit bb78a36

Please sign in to comment.