Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] pasue and resume consumer #35

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 83 additions & 51 deletions src/erlkaf_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@
-define(DEFAULT_POLL_IDLE_MS, 1000).
-define(DEFAULT_BATCH_SIZE, 100).

-behaviour(gen_server).
-behaviour(gen_statem).

-export([
%API
start_link/6,
stop/1,
pause/1,
resume/1,

% gen_server

% gen_statem
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
code_change/3,
callback_mode/0,

%states
polling/3,
paused/3
]).

-record(state, {
Expand All @@ -32,12 +37,11 @@
poll_batch_size,
poll_idle_ms,
dispatch_mode,
messages = [],
last_offset = -1
}).

callback_mode() -> [state_functions, state_enter].
start_link(ClientRef, TopicName, Partition, Offset, QueueRef, TopicSettings) ->
gen_server:start_link(?MODULE, [ClientRef, TopicName, Partition, Offset, QueueRef, TopicSettings], []).
gen_statem:start_link(?MODULE, [ClientRef, TopicName, Partition, Offset, QueueRef, TopicSettings], []).

stop(Pid) ->
case erlang:is_process_alive(Pid) of
Expand All @@ -55,6 +59,11 @@ stop(Pid) ->
{error, not_alive}
end.

pause(Pid) ->
gen_statem:call(Pid, pause).
resume(Pid) ->
gen_statem:call(Pid, resume).

init([ClientRef, TopicName, Partition, Offset, QueueRef, TopicSettings]) ->
?LOG_INFO("start consumer for: ~p partition: ~p offset: ~p", [TopicName, Partition, Offset]),

Expand All @@ -65,11 +74,9 @@ init([ClientRef, TopicName, Partition, Offset, QueueRef, TopicSettings]) ->

case catch CbModule:init(TopicName, Partition, Offset, CbArgs) of
{ok, CbState} ->
schedule_poll(0),

{DpMode, PollBatchSize} = dispatch_mode_parse(DispatchMode),

{ok, #state{
%once we start up go to the polling state
{ok, polling, #state{
client_ref = ClientRef,
topic_name = TopicName,
partition = Partition,
Expand All @@ -85,56 +92,87 @@ init([ClientRef, TopicName, Partition, Offset, QueueRef, TopicSettings]) ->
{stop, Error}
end.

handle_call(Request, _From, State) ->
?LOG_ERROR("handle_call unexpected message: ~p", [Request]),
{reply, ok, State}.

handle_cast(Request, State) ->
?LOG_ERROR("handle_cast unexpected message: ~p", [Request]),
{noreply, State}.

handle_info(poll_events, #state{queue_ref = Queue, poll_batch_size = PollBatchSize, poll_idle_ms = PollIdleMs} = State) ->
%Polling state
polling(enter, _OldState, #state{topic_name = TopicName,
partition = Partition}) ->
?LOG_INFO("starting polling for: topic ~p, ~p partition", [TopicName, Partition]),
{keep_state_and_data, {state_timeout, 0, poll_events}};
polling(state_timeout, poll_events, #state{queue_ref = Queue,
poll_batch_size = PollBatchSize,
poll_idle_ms = PollIdleMs} = State) ->
case erlkaf_nif:consumer_queue_poll(Queue, PollBatchSize) of
{ok, Events, LastOffset} ->
case Events of
[] ->
schedule_poll(PollIdleMs),
{noreply, State};
{keep_state_and_data, {state_timeout, PollIdleMs, poll_events}};
_ ->
schedule_message_process(0),
{noreply, State#state{messages = Events, last_offset = LastOffset}}
process(Events, State#state{last_offset=LastOffset})
end;
Error ->
?LOG_INFO("~p poll events error: ~p", [?MODULE, Error]),
throw({error, Error})
end;

handle_info(process_messages, #state{
messages = Msgs,
dispatch_mode = DispatchMode,
client_ref = ClientRef,
cb_module = CbModule,
cb_state = CbState} = State) ->

case process_events(DispatchMode, Msgs, batch_offset(DispatchMode, State), ClientRef, CbModule, CbState) of
%Calls
polling({call, From}, pause, Data) ->
{next_state, paused, Data, {reply, From, ok}};
polling({call, From}, resume, _Data) ->
{keep_state_and_data, {reply, From, ok}} ; %Dont need resume when we are already polling
polling({call, _From}, Request, _Data) ->
?LOG_ERROR("unexpected call message: ~p, in ~p state", [Request, ?FUNCTION_NAME]),
keep_state_and_data;
%Casts
polling(cast, Request, _Data) ->
?LOG_ERROR("unexpected call message: ~p, in ~p state", [Request, ?FUNCTION_NAME]),
keep_state_and_data;
%Infos
polling(info, {stop, From, Tag}, Data) ->
handle_stop(From, Tag, Data),
{stop, normal, Data};
polling(info, Request, _Data) ->
?LOG_ERROR("unexpected info message: ~p, in ~p state", [Request, ?FUNCTION_NAME]),
keep_state_and_data.

%Pasued State
paused(enter, _OldState, #state{topic_name = TopicName,
partition = Partition}) ->
?LOG_INFO("pasuing polling for: topic ~p, ~p partition", [TopicName, Partition]),
keep_state_and_data;
%Calls
paused({call, From}, resume, Data) ->
{next_state, polling, Data, {reply, From, ok}};
paused({call, From}, pause, _Data) ->
{keep_state_and_data, {reply, From, ok}}; %Dont need pause when we are already pasued
paused({call, _From}, Request, _Data) ->
?LOG_ERROR("unexpected call message: ~p, in ~p state", [Request, ?FUNCTION_NAME]),
keep_state_and_data;
%Casts
paused(cast, Request, _Data) ->
?LOG_ERROR("unexpected call message: ~p, in ~p state", [Request, ?FUNCTION_NAME]),
keep_state_and_data;
%Infos
paused(info, {stop, From, Tag}, Data) ->
handle_stop(From, Tag, Data),
{stop, normal, Data};
paused(info, Request, _Data) ->
?LOG_ERROR("unexpected info message: ~p, in ~p state", [Request, ?FUNCTION_NAME]),
keep_state_and_data.

process(Events, #state{dispatch_mode = DispatchMode,
client_ref = ClientRef,
cb_module = CbModule,
cb_state = CbState} = State) ->

case process_events(DispatchMode, Events, batch_offset(DispatchMode, State), ClientRef, CbModule, CbState) of
{ok, NewCbState} ->
schedule_poll(0),
{noreply, State#state{messages = [], last_offset = -1, cb_state = NewCbState}};
{keep_state, State#state{last_offset = -1, cb_state = NewCbState}, {state_timeout, 0, poll_events}};
{stop, From, Tag} ->
handle_stop(From, Tag, State),
{stop, normal, State};
Error ->
?LOG_ERROR("unexpected response: ~p", [Error]),
{stop, Error, State}
end;

handle_info({stop, From, Tag}, State) ->
handle_stop(From, Tag, State),
{stop, normal, State};

handle_info(Info, State) ->
?LOG_ERROR("handle_info unexpected message: ~p", [Info]),
{noreply, State}.
end.

terminate(_Reason, _State) ->
ok.
Expand All @@ -154,12 +192,6 @@ dispatch_mode_parse(one_by_one) ->
dispatch_mode_parse({batch, MaxBatchSize}) ->
{batch, MaxBatchSize}.

schedule_poll(Timeout) ->
erlang:send_after(Timeout, self(), poll_events).

schedule_message_process(Timeout) ->
erlang:send_after(Timeout, self(), process_messages).

commit_offset(ClientRef, #erlkaf_msg{topic = Topic, partition = Partition, offset = Offset}) ->
erlkaf_nif:consumer_offset_store(ClientRef, Topic, Partition, Offset).

Expand Down