Skip to content

Commit

Permalink
Merge pull request #150 from uwiger/uw-rewrite-sync
Browse files Browse the repository at this point in the history
Uw rewrite sync
  • Loading branch information
uwiger committed Oct 6, 2017
2 parents 37adea9 + 9c9c0ca commit c4cc57f
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 38 deletions.
136 changes: 101 additions & 35 deletions src/gproc_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
-record(state, {
always_broadcast = false,
is_leader,
sync_clients = [],
sync_requests = []}).

-include("gproc_trace.hrl").
Expand Down Expand Up @@ -266,7 +267,7 @@ reset_counter(_) ->
%%
sync() ->
%% Increase timeout since gen_leader can take some time ...
leader_call(sync, 10000).
gen_server:call(?MODULE, sync, 5000).

%% @spec get_leader() -> node()
%% @doc Returns the node of the current gproc leader.
Expand All @@ -283,6 +284,8 @@ handle_cast(_Msg, S, _) ->

handle_call(get_leader, _, S, E) ->
{reply, gen_leader:leader_node(E), S};
handle_call(sync, From, S, E) ->
{noreply, initiate_sync(From, S, E)};
handle_call(_, _, S, _) ->
{reply, badarg, S}.

Expand Down Expand Up @@ -326,16 +329,16 @@ globs() ->
surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
%% Leader conflict!
surrendered_1(Globs),
{ok, S#state{is_leader = false}};
{ok, maybe_reinitiate_sync(S#state{is_leader = false})};
surrendered(S, {globals, Globs}, _E) ->
%% globals from this node should be more correct in our table than
%% in the leader's
surrendered_1(Globs),
{ok, S#state{is_leader = false}}.
{ok, maybe_reinitiate_sync(S#state{is_leader = false})}.


handle_DOWN(Node, S, _E) ->
S1 = check_sync_requests(Node, S),
handle_DOWN(Node, S, E) ->
S1 = check_sync_requests(Node, S, E),
Head = {{{'_',g,'_'},'_'},'$1','_'},
Gs = [{'==', {node,'$1'},Node}],
Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
Expand All @@ -347,28 +350,31 @@ handle_DOWN(Node, S, _E) ->
{ok, Broadcast, S1}
end.

check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
SReqs1 = lists:flatmap(
fun({From, Ns}) ->
case Ns -- [Node] of
[] ->
gen_leader:reply(From, {leader, reply, true}),
[];
Ns1 ->
[{From, Ns1}]
end
end, SReqs),
S#state{sync_requests = SReqs1}.

handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
GenLeader = gen_leader,
case GenLeader:alive(E) -- [node()] of
check_sync_requests(Node, #state{sync_requests = SReqs} = S, E) ->
check_sync_requests(SReqs, Node, S, E).

check_sync_requests([], _, S, _) ->
S;
check_sync_requests([{From, Ns}|Reqs], Node, S, E) ->
case lists:member(Node, Ns) of
true ->
remove_node_from_sync_request(Node, Ns, From, S, E);
false ->
check_sync_requests(Reqs, Node, S, E)
end.

remove_node_from_sync_request(Node, Ns, From, S, E) ->
case Ns -- [Node] of
[] ->
{reply, true, S};
Alive ->
GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
{noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
end;
check_sync_requests(Node, send_sync_complete(From, S, E), E);
Ns1 ->
Rs1 = lists:keyreplace(
From, 1, S#state.sync_requests, {From, Ns1}),
%% Yes, we start over and run through the list from the top,
%% with updated state; simpler code that way.
check_sync_requests(Node, S#state{sync_requests = Rs1}, E)
end.

handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E)
when Reg==reg; Reg==reg_other ->
case gproc_lib:insert_reg(K, Value, Pid, g) of
Expand Down Expand Up @@ -667,7 +673,18 @@ handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
handle_leader_call(_, _, S, _E) ->
{reply, badarg, S}.

handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
handle_leader_cast({initiate_sync, Ref}, S, E) ->
case gen_leader:alive(E) -- [node()] of
[] ->
%% ???
{noreply, send_sync_complete(Ref, S, E)};
Alive ->
gen_leader:broadcast({from_leader, {sync, Ref}}, Alive, E),
{noreply, S#state{sync_requests =
[{Ref, Alive}|S#state.sync_requests]}}
end;

handle_leader_cast({sync_reply, Node, Ref}, S, E) ->
#state{sync_requests = SReqs} = S,
case lists:keyfind(Ref, 1, SReqs) of
false ->
Expand All @@ -679,8 +696,7 @@ handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
{_, Ns} ->
case lists:delete(Node, Ns) of
[] ->
gen_leader:reply(Ref, {leader, reply, true}),
{ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
{ok, send_sync_complete(Ref, S, E)};
Ns1 ->
SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
{ok, S#state{sync_requests = SReqs1}}
Expand Down Expand Up @@ -843,6 +859,16 @@ terminate(_Reason, _S) ->
from_leader({sync, Ref}, S, _E) ->
gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
{ok, S};
from_leader({sync_complete, Ref}, S, _E) ->
case Ref of
{From, _} when node(From) == node() ->
{ok, reply_to_sync_client(Ref, S)};
_ ->
%% we shouldn't have to, but ensure that we don't have
%% the sync request in our state.
{ok, S#state{sync_requests = lists:keydelete(
Ref, 1, S#state.sync_requests)}}
end;
from_leader(Ops, S, _E) ->
lists:foreach(
fun({delete, Globals}) ->
Expand Down Expand Up @@ -887,7 +913,7 @@ delete_globals(Globals) ->
do_notify([{P, Msg}|T]) when is_pid(P), node(P) =:= node() ->
P ! Msg,
do_notify(T);
do_notify([{P, Msg}|T]) when is_pid(P) ->
do_notify([{P, _Msg}|T]) when is_pid(P) ->
do_notify(T);
do_notify([{K, P, E}|T]) ->
case ets:lookup(?TAB, {P,K}) of
Expand All @@ -911,11 +937,11 @@ leader_call(Req) ->
Reply -> Reply
end.

leader_call(Req, Timeout) ->
case gen_leader:leader_call(?MODULE, Req, Timeout) of
badarg -> ?THROW_GPROC_ERROR(badarg);
Reply -> Reply
end.
%% leader_call(Req, Timeout) ->
%% case gen_leader:leader_call(?MODULE, Req, Timeout) of
%% badarg -> ?THROW_GPROC_ERROR(badarg);
%% Reply -> Reply
%% end.

leader_cast(Msg) ->
gen_leader:leader_cast(?MODULE, Msg).
Expand Down Expand Up @@ -1118,3 +1144,43 @@ ensure_rev(K) ->

regged_new(reg ) -> true;
regged_new(ensure) -> new.


initiate_sync(From, #state{is_leader = true} = S, E) ->
case gen_leader:alive(E) -- [node()] of
[] ->
%% I'm alone - sync is trivial
gen_server:reply(From, true),
S;
Alive ->
gen_leader:broadcast(
{from_leader, {sync, From}}, Alive, E),
S#state{sync_requests =
[{From, Alive}|S#state.sync_requests]}
end;
initiate_sync(From, S, _E) ->
leader_cast({initiate_sync, From}),
S.

maybe_reinitiate_sync(#state{sync_clients = []} = S) ->
S;
maybe_reinitiate_sync(#state{sync_clients = Cs} = S) ->
_ = [leader_cast({initiate_sync, From}) || From <- Cs],
S.

send_sync_complete({From, _} = Ref, S, _E) when node(From) == node() ->
reply_to_sync_client(Ref, S);
send_sync_complete({From, _} = Ref, S, E) ->
%% Notify the node that initiated the sync
%% 'broadcasting' to exactly one node.
gen_leader:broadcast(
{from_leader, {sync_complete, Ref}}, [node(From)], E),
S#state{sync_requests =
lists:keydelete(Ref, 1, S#state.sync_requests)}.

reply_to_sync_client(Ref, S) ->
gen_server:reply(Ref, true),
S#state{sync_clients =
S#state.sync_clients -- [Ref],
sync_requests =
lists:keydelete(Ref, 1, S#state.sync_requests)}.
22 changes: 19 additions & 3 deletions test/gproc_dist_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
-define(f(E), fun() -> ?debugVal(E) end).

dist_test_() ->
{timeout, 180,
{timeout, 120,
[
%% {setup,
%% fun dist_setup/0,
Expand All @@ -49,7 +49,7 @@ dist_test_() ->
tests(Ns, [?f(t_fail_node(Ns))])
end,
fun(Ns) ->
tests(Ns, [{timeout, 15, ?f(t_master_dies(Ns))}])
tests(Ns, [{timeout, 10, ?f(t_master_dies(Ns))}])
end
]}
]}.
Expand Down Expand Up @@ -575,6 +575,7 @@ t_sync_cand_dies([A,B,C]) ->
%% immediately. Therefore, we should have our answer well within 1 sec.
?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).


%% Verify that the registry updates consistently if a non-leader node
%% dies.
t_fail_node(Ns) ->
Expand Down Expand Up @@ -613,12 +614,27 @@ t_master_dies([A,B,C] = Ns) ->
?assertMatch(ok, rpc:call(L, application, stop, [gproc])),
Names = [{Na,Pa}, {Nb,Pb}, {Nc,Pc}] -- [{Nl, Pl}],
RestNs = Ns -- [L],
?assertMatch(true, rpc:call(hd(RestNs), gproc_dist, sync, [])),
%% ?assertMatch(true, rpc:call(hd(RestNs), gproc_dist, sync, [])),
?assertMatch(true, try_sync(hd(RestNs), RestNs)),
?assertMatch(ok, t_lookup_everywhere(Nl, RestNs, undefined)),
[?assertMatch(ok, t_lookup_everywhere(Nx, RestNs, Px))
|| {Nx, Px} <- Names],
ok.

try_sync(N, Ns) ->
case rpc:call(N, gproc_dist, sync, []) of
{badrpc, _} = Err ->
?debugFmt(
"Error in gproc_dist:sync() (~p):~n"
" ~p~n"
"Status = ~p~n",
[Err, N,
{Ns, rpc:multicall([N|Ns], sys, get_status, [gproc_dist])}]),
Err;
true ->
true
end.

t_sleep() ->
timer:sleep(500).

Expand Down

0 comments on commit c4cc57f

Please sign in to comment.