Skip to content

Commit

Permalink
Add marker feature to subscription.
Browse files Browse the repository at this point in the history
  • Loading branch information
vjache committed Jan 6, 2012
1 parent 45ef693 commit a6f03a0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 21 deletions.
6 changes: 5 additions & 1 deletion src/logmachine.erl
Expand Up @@ -32,6 +32,7 @@
-export([get_zlist/2,
subscribe/2,
subscribe/3,
subscribe/4,
info/1]).

-export([start_err_sim/0]).
Expand Down Expand Up @@ -62,7 +63,10 @@ subscribe(InstanceName, FromTimestamp) ->
FromTimestamp :: timestamp(),
SubscriberPid :: pid()) -> SubscriptionSession :: pid().
subscribe(InstanceName, FromTimestamp, SubPid) ->
logmachine_sup:start_sub_child(InstanceName, FromTimestamp, SubPid).
subscribe(InstanceName, FromTimestamp, SubPid, InstanceName).

subscribe(InstanceName, FromTimestamp, SubPid, Marker) ->
logmachine_sup:start_sub_child(InstanceName, FromTimestamp, SubPid, Marker).

%%-------------------------------------------------------------------------------
%% @doc
Expand Down
37 changes: 21 additions & 16 deletions src/logmachine_sub_srv.erl
Expand Up @@ -27,7 +27,7 @@

%% --------------------------------------------------------------------
%% External exports
-export([start_link/3]).
-export([start_link/4]).

%% gen_server callbacks
-export([init/1,
Expand All @@ -37,38 +37,41 @@
terminate/2,
code_change/3]).

-record(state, {instance,zlist,subscriber,subscriber_mref,last_from,mode :: recover | normal}).
-record(state, {instance,zlist,subscriber,subscriber_mref,last_from,mode :: recover | normal, marker}).

%% ====================================================================
%% External functions
%% ====================================================================

start_link(InstanceName, FromTimestamp, SubPid) ->
gen_server:start_link(?MODULE, {InstanceName, FromTimestamp, SubPid}, []).
start_link(InstanceName, FromTimestamp, SubPid, Marker) ->
gen_server:start_link(?MODULE, {InstanceName, FromTimestamp, SubPid, Marker}, []).


%% ====================================================================
%% Server functions
%% ====================================================================

init({InstanceName, FromTimestamp, SubPid}) ->
init({InstanceName, FromTimestamp, SubPid, Marker}) ->
link(SubPid),
MRef=monitor(process, SubPid),
ZList=logmachine:get_zlist(InstanceName, FromTimestamp),
{ok, #state{instance=InstanceName,
zlist=ZList,
subscriber=SubPid,
subscriber_mref=MRef,
last_from=FromTimestamp,mode=recover}, 1}.
last_from=FromTimestamp,
mode=recover,
marker=Marker}, 1}.

handle_call(_Request, _From, #state{}=State) ->
{reply, unsupported, State}.

handle_cast({Ts, _ }=HistoryEntry,
#state{subscriber=SubPid,
last_from=LastFrom,
mode=normal}=State) when Ts > LastFrom ->
LastFrom1=send_entries(SubPid, [HistoryEntry], LastFrom),
mode=normal,
marker=Marker}=State) when Ts > LastFrom ->
LastFrom1=send_entries(SubPid, [HistoryEntry], LastFrom, Marker),
{noreply, State#state{last_from=LastFrom1}};
handle_cast(_Msg, #state{}=State) ->
{noreply, State}.
Expand All @@ -79,7 +82,8 @@ handle_info(timeout,
zlist=[],
subscriber=SubPid,
last_from={MgS,S,McS},
mode=recover}=State) ->
mode=recover,
marker=Marker}=State) ->
% Subscribe for receiver events
logmachine_receiver_srv:subscribe(InstanceName, self(), {gen_server,cast}),
% Await buffered messages arrive to RAM cache
Expand All @@ -88,19 +92,20 @@ handle_info(timeout,
LastFrom={MgS,S,McS+1},
ZList=logmachine:get_zlist(InstanceName, LastFrom),
% Send if any
LastFrom1=send_entries(SubPid, ZList, LastFrom),
LastFrom1=send_entries(SubPid, ZList, LastFrom, Marker),
% Switch mode to normal
{noreply, State#state{last_from=LastFrom1,mode=normal}};
% Do pump another one chunk to the subscriber (recovery)
handle_info(timeout,
#state{zlist=ZList,
subscriber=SubPid,
last_from=LastFrom,
mode=recover}=State) ->
mode=recover,
marker=Marker}=State) ->
% Take a chunk of 100 entries
{L, ZList1}=zlists:scroll(100, ZList),
% Send it to subscriber
LastFrom1=send_entries(SubPid, L, LastFrom),
LastFrom1=send_entries(SubPid, L, LastFrom, Marker),
% Update state with tail zlist and new timestamp
{noreply, State#state{zlist=ZList1,last_from=LastFrom1}, 1};
% Subscriber process terminated normaly, hence subscription session stops also (normaly)
Expand All @@ -118,8 +123,8 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%% Internal functions
%% --------------------------------------------------------------------

send_entries(SubPid, [{T,_}=E], _LastFrom) ->
SubPid ! E, T;
send_entries(SubPid, EntryList, LastFrom) ->
zlists:foldl(fun({T,_}=E, _) -> SubPid ! E, T end, LastFrom, EntryList).
send_entries(SubPid, [{T,_}=E], _LastFrom, Marker) ->
SubPid ! {Marker, E}, T;
send_entries(SubPid, EntryList, LastFrom, Marker) ->
zlists:foldl(fun({T,_}=E, _) -> SubPid ! {Marker, E}, T end, LastFrom, EntryList).

9 changes: 5 additions & 4 deletions src/logmachine_sup.erl
Expand Up @@ -32,7 +32,7 @@
start_link_instance/1,
start_link_component/2,
start_link_sub_sup/0,
start_sub_child/3]).
start_sub_child/4]).

%% --------------------------------------------------------------------
%% Internal exports
Expand Down Expand Up @@ -69,15 +69,16 @@ start_link_sub_sup() ->
RegName='logmachine.sub.sup',
supervisor:start_link({local, RegName}, ?MODULE, sub_sup).

start_sub_child(InstanceName, FromTimestamp, SubPid) ->
case supervisor:start_child('logmachine.sub.sup', [InstanceName, FromTimestamp, SubPid]) of
start_sub_child(InstanceName, FromTimestamp, SubPid, Marker) ->
case supervisor:start_child('logmachine.sub.sup', [InstanceName, FromTimestamp, SubPid, Marker]) of
{ok, Child} -> Child;
{ok, Child, _} -> Child;
Unexpected -> throw({failed_to_start_subscription,
[{reason,Unexpected},
{instance,InstanceName},
{start_from,FromTimestamp},
{subscriber, SubPid}]})
{subscriber, SubPid},
{marker, Marker}]})
end.

%% ====================================================================
Expand Down

0 comments on commit a6f03a0

Please sign in to comment.