Skip to content

Commit

Permalink
Implements alternative protocols for leader_call.
Browse files Browse the repository at this point in the history
Existing was 'leader_only'. New 'local_sync' protocol ensures that operations will be synced on the originators node before the leader_call function returns.
  • Loading branch information
archaelus committed Aug 3, 2011
1 parent 3938fd5 commit 15d49f5
Showing 1 changed file with 75 additions and 12 deletions.
87 changes: 75 additions & 12 deletions src/gl_async_bully.erl
Expand Up @@ -43,6 +43,7 @@
%% API
-export([start_link/4
,leader_call/2
,leader_call/3
,leader_cast/2
,call/2
,cast/2
Expand Down Expand Up @@ -127,6 +128,8 @@

-define(ALPHA_TIME, timer:seconds(10)).

-type lc_proto() :: 'leader_only' | 'local_sync'.

%%====================================================================
%% API
%%====================================================================
Expand Down Expand Up @@ -181,8 +184,13 @@ to_other_followers(ExceptNode, Msg, CI)

-spec leader_call(atom(), term()) -> any().
leader_call(Name, Msg) ->
leader_call(Name, Msg, local_sync).

-spec leader_call(atom(), term(), lc_proto()) -> any().
leader_call(Name, Msg, Proto) when Proto =:= local_sync;
Proto =:= leader_only ->
gen_fsm:sync_send_all_state_event(Name,
{leader_call, Msg}).
{leader_call, Proto, Msg}).

-spec leader_cast(atom() | cluster_info(), term()) -> any().
leader_cast(Name, Msg) when is_atom(Name) ->
Expand Down Expand Up @@ -262,7 +270,7 @@ init([Name, Mod, Arg, Net]) when is_list(Net) ->
%%--------------------------------------------------------------------

recovery(timeout, State) ->
start_stage2(State#state{incarn = erlang:now()}).
start_stage2(reincarnate(State)).

norm(_Evt, State) -> {next_state, norm, State}.
elec2(_Evt, State) -> {next_state, elec2, State}.
Expand Down Expand Up @@ -411,6 +419,20 @@ handle_event({from_leader, Ldr, Event}, StateName, State) ->
{next_state, StateName, State}
end;

handle_event({from_leader, Ldr, Event, {local_sync_reply, From, Reply}},
StateName, State) ->
%% Strip local_sync_reply info from message and pass through to
%% regular from_leader processing. Return value from regular
%% processing.
case handle_event({from_leader, Ldr, Event}, StateName, State) of
{next_state, _StateName, _NewState} = CBReturn ->
gen_fsm:reply(From, Reply),
CBReturn;
Else ->
Else
end;


handle_event(Msg, StateName, State) ->
?INFO("~p: ignored ~p", [StateName, Msg]),
{next_state, StateName, State}.
Expand Down Expand Up @@ -466,12 +488,12 @@ handle_event(Msg, StateName, State) ->

handle_sync_event(force_recovery, From, _StateName, State) ->
gen_fsm:reply(From, ok),
start_stage2(State#state{incarn = erlang:now()});
start_stage2(reincarnate(State));

handle_sync_event({leader_call, Call}, From, StateName, State) ->
handle_sync_event({leader_call, LCProto, Call}, From, StateName, State) ->
case State#state.leader of
Node when node() =:= Node ->
case ms_call(handle_leader_call, [Call], From, State) of
case ms_call(LCProto, handle_leader_call, [Call], From, State) of
{noreply, NewState} ->
{next_state, StateName, NewState};
{stop, Reason, NewState} ->
Expand All @@ -481,12 +503,12 @@ handle_sync_event({leader_call, Call}, From, StateName, State) ->
%% Fake a gen_fsm sync_send_all_state_event.
erlang:send(server_on(Node, State),
{'$gen_sync_all_state_event', From,
{leader_call, Call}}),
{leader_call, LCProto, Call}}),
{next_state, StateName, State}
end;

handle_sync_event({call, Call}, From, StateName, State) ->
case ms_call(handle_call, [Call], From, State) of
case ms_call(call, handle_call, [Call], From, State) of
{noreply, NewState} ->
{next_state, StateName, NewState};
{stop, Reason, NewState} ->
Expand Down Expand Up @@ -654,7 +676,7 @@ ms_event(Function, Args, S = #state{ms={Mod,ModS}})
%% You're only allowed to send Sync terms
{ok, SyncBcast, NewModS} when node() =:= S#state.leader ->
NewState = S#state{ms={Mod, NewModS}},
to_peers(SyncBcast, NewState),
sync_bcast(SyncBcast, NewState),
{ok, NewState};
{ok, _, NewModS} when node() =/= S#state.leader ->
{stop, {?MODULE, callback_error,
Expand All @@ -664,7 +686,10 @@ ms_event(Function, Args, S = #state{ms={Mod,ModS}})
{stop, Reason, S#state{ms={Mod, NewModS}}}
end.

ms_call(Function, Args, From, S = #state{ms={Mod,ModS}})
-spec ms_call('call' | lc_proto(),
atom(), list(), term(), #state{}) ->
term().
ms_call(LCProto, Function, Args, From, S = #state{ms={Mod,ModS}})
when is_atom(Function), is_list(Args), is_tuple(From) ->
case apply(Mod, Function,
Args ++ [From, cluster_info(S), ModS]) of
Expand All @@ -674,11 +699,18 @@ ms_call(Function, Args, From, S = #state{ms={Mod,ModS}})
gen_fsm:reply(From, Reply),
{noreply, S#state{ms={Mod, NewModS}}};
{reply, Reply, SyncBcast, NewModS}
when node() =:= S#state.leader ->
when node() =:= S#state.leader,
LCProto =:= leader_only ->
NewState = S#state{ms={Mod, NewModS}},
to_peers(SyncBcast, NewState),
sync_bcast(SyncBcast, NewState),
gen_fsm:reply(From, Reply),
{noreply, NewState};
{reply, Reply, SyncBcast, NewModS}
when node() =:= S#state.leader,
LCProto =:= local_sync ->
NewState = S#state{ms={Mod, NewModS}},
local_sync_bcast(From, Reply, SyncBcast, NewState),
{noreply, NewState};
{reply, _Reply, _Broadcast, NewModS}
when node() =/= S#state.leader ->
{stop, {?MODULE, callback_error,
Expand All @@ -700,14 +732,40 @@ cluster_info(#state{leader=Node,

%% Fake a gen_fsm:send_all_state_event to avoid a trip through
%% net_kernel:connect.
to_peers(Msg, S = #state{acks=Acks}) ->
sync_bcast(Msg, S = #state{acks=Acks}) ->
[ erlang:send(server_on(Node, S),
{'$gen_all_state_event', {from_leader, node(), Msg}},
[noconnect])
|| Node <- ordsets:from_list(Acks),
Node =/= node(),
lists:member(Node, nodes()) ].

%% @doc
%% Sync broadcast for the local_sync leader_call protocol. Causes one
%% peer (on the same machine as the caller) to send the reply locally
%% after processing the SyncBcast result.
local_sync_bcast(From = {Pid, _Tag}, Reply, Msg, S = #state{acks=Acks}) ->
FromNode = node(Pid),
case node() =:= FromNode of
true ->
%% Can send reply here as our local processing is already
%% finished by the time we get here.
gen_fsm:reply(From, Reply);
false ->
erlang:send(server_on(FromNode, S),
{'$gen_all_state_event',
{from_leader, node(), Msg,
{local_sync_reply, From, Reply}}},
[noconnect])
end,
[ erlang:send(server_on(Node, S),
{'$gen_all_state_event', {from_leader, node(), Msg}},
[noconnect])
|| Node <- ordsets:from_list(Acks),
Node =/= node(),
Node =/= FromNode,
lists:member(Node, nodes()) ].

format_status(Fmt, [_Dict, S = #state{name=Name,
elid=Elid}]) ->
[{name, Name},
Expand Down Expand Up @@ -738,3 +796,8 @@ format_mod_status(Fmt, #state{ms={Mod, ModS}}) ->
%% @doc Format a control_message from the local node.
control_message(Msg) ->
{gl_async_bully, node(), Msg}.

%% @doc generate new incarnation id strictly greather than the old
%% one.
reincarnate(State = #state{}) ->
State#state{incarn = erlang:now()}.

0 comments on commit 15d49f5

Please sign in to comment.