Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uw multicall #126

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 86 additions & 5 deletions src/gproc_dist.erl
Expand Up @@ -44,6 +44,7 @@
-export([leader_call/1,
leader_cast/1,
sync/0,
multicall/3,
get_leader/0]).

%%% internal exports
Expand All @@ -69,7 +70,8 @@
-record(state, {
always_broadcast = false,
is_leader,
sync_requests = []}).
sync_requests = [],
calls = []}).

-include("gproc_trace.hrl").
%% ==========================================================
Expand Down Expand Up @@ -270,6 +272,28 @@ get_leader() ->
GenLeader = gen_leader,
GenLeader:call(?MODULE, get_leader).

%% @spec multicall(Module::atom(), Func::atom(), Args::list()) ->
%% {[Result], [{node(), Error}]}
%%
%% @doc Perform a multicall RPC on all live gproc nodes
%%
%% This function works like {@link rpc:multicall/3}, except the calls are
%% routed via the gproc leader and its connected nodes - the same route as
%% for the data replication. This means that a multicall following a global
%% registration is guaranteed to follow the update on each gproc node.
%%
%% The return value will be of the form `{GoodResults, BadNodes}', where
%% `BadNodes' is a list of `{Node, Error}' for each node where the call
%% fails.
%% @end
multicall(M, F, A) ->
case leader_call({multicall, M, F, A}) of
{ok, Result} ->
Result;
{error, Error} ->
error(Error)
end.

%% ==========================================================
%% Server-side

Expand All @@ -281,10 +305,25 @@ handle_call(get_leader, _, S, E) ->
handle_call(_, _, S, _) ->
{reply, badarg, S}.

handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
ets:delete(?TAB, {Pid, g}),
leader_cast({pid_is_DOWN, Pid}),
{ok, S};
handle_info({'DOWN', MRef, process, Pid, Msg}, #state{calls = Calls} = S) ->
case lists:keyfind(Pid, 1, Calls) of
{Pid, MRef, server, From} ->
Reply = case Msg of
{mcall, Result} ->
{ok, multicall_result(Result)};
Error ->
{error, Error}
end,
gen_leader:reply(From, {leader,reply,Reply}),
{ok, S#state{calls = lists:keydelete(Pid, 1, Calls)}};
{Pid, MRef, client, Server} ->
Server ! {rcall_result, self(), Msg},
{ok, S#state{calls = lists:keydelete(Pid, 1, Calls)}};
_ ->
ets:delete(?TAB, {Pid, g}),
leader_cast({pid_is_DOWN, Pid}),
{ok, S}
end;
handle_info({gproc_unreg, Objs}, S) ->
{ok, [{delete, Objs}], S};
handle_info(_, S) ->
Expand Down Expand Up @@ -364,6 +403,19 @@ handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
{noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
end;
handle_leader_call({multicall, M, F, A}, From, #state{calls = Calls} = S, E) ->
OtherNodes = gen_leader:alive(E) -- [node()],
{Pid, MRef} = spawn_monitor(
fun() ->
exit({mcall, multicall_server(M, F, A, OtherNodes)})
end),
if OtherNodes =/= [] ->
gen_leader:broadcast({from_leader, {multicall, M, F, A, Pid}},
OtherNodes, E);
true ->
ok
end,
{noreply, S#state{calls = [{Pid, MRef, server, From}|Calls]}};
handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E)
when Reg==reg; Reg==reg_other ->
case gproc_lib:insert_reg(K, Value, Pid, g) of
Expand Down Expand Up @@ -819,6 +871,11 @@ terminate(_Reason, _S) ->
from_leader({sync, Ref}, S, _E) ->
gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
{ok, S};
from_leader({multicall, M, F, A, Pid}, #state{calls = Calls} = S, _E) ->
{Pid1, MRef} = spawn_monitor(fun() ->
exit({mcall, apply(M, F, A)})
end),
{ok, S#state{calls = [{Pid1, MRef, client, Pid}|Calls]}};
from_leader(Ops, S, _E) ->
lists:foreach(
fun({delete, Globals}) ->
Expand Down Expand Up @@ -1070,3 +1127,27 @@ add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->

regged_new(reg ) -> true;
regged_new(ensure) -> new.

multicall_server(M, F, A, Nodes) ->
MyRes = try {mcall, apply(M, F, A)}
catch
_:E ->
{E, erlang:get_stacktrace()}
end,
[{node(), MyRes}|await_nodes(Nodes)].

await_nodes([H|T]) ->
receive
{rcall_result, Pid, Res} when node(Pid) =:= H ->
[{H, Res}|await_nodes(T)]
end;
await_nodes([]) ->
[].

multicall_result(Res) ->
lists:foldr(
fun({_, {mcall, Good}}, {G, B}) ->
{[Good|G], B};
({N, E}, {G, B}) ->
{G, [{N,E}|B]}
end, {[], []}, Res).
34 changes: 33 additions & 1 deletion test/gproc_dist_tests.erl
Expand Up @@ -86,7 +86,8 @@ basic_tests(Ns) ->
?f(t_monitor(Ns)),
?f(t_standby_monitor(Ns)),
?f(t_follow_monitor(Ns)),
?f(t_subscribe(Ns))
?f(t_subscribe(Ns)),
?f(t_multicall(Ns))
].

dist_setup() ->
Expand Down Expand Up @@ -500,6 +501,37 @@ t_subscribe([A,B|_] = Ns) ->
?assertEqual({gproc_monitor,Na,undefined}, got_msg(Pb, gproc_monitor)),
ok.

t_multicall(Ns) ->
t_multicall(Ns, 3).

t_multicall(Ns, I) when I > 0 ->
?assertMatch(
true, lists:all(fun t_mcall_/1, Ns)),
t_multicall(Ns, I-1);
t_multicall(_, _) ->
ok.

t_mcall_(N) ->
Na = ?T_NAME,
Pa = gproc_test_lib:t_spawn_reg_mcall(N, Na),
?assertMatch(ok, t_call(Pa, die)),
true.

%% t_multicall(Ns) ->
%% t_multicall_(Ns, 10).

%% t_multicall_([A|_] = Ns, I) when I > 0 ->
%% Na = ?T_NAME,
%% Pa = t_spawn_reg(A, Na),
%% Expected = {[Pa || _ <- Ns], []},
%% ?assertMatch(
%% Expected,
%% t_call(Pa, {apply, gproc_dist, multicall, [gproc, where, [Na]]})),
%% ok = t_call(Pa, die),
%% t_multicall_(Ns, I-1);
%% t_multicall_(_, _) ->
%% ok.

%% got_msg(Pb, Tag) ->
%% t_call(Pb,
%% {apply_fun,
Expand Down
38 changes: 38 additions & 0 deletions test/gproc_test_lib.erl
Expand Up @@ -3,6 +3,7 @@
-export([t_spawn/1, t_spawn/2,
t_spawn_reg/2, t_spawn_reg/3, t_spawn_reg/4,
t_spawn_reg_shared/3,
t_spawn_reg_mcall/2,
t_spawn_mreg/2,
t_call/2,
t_loop/0, t_loop/1,
Expand Down Expand Up @@ -80,6 +81,43 @@ t_spawn_reg_shared(Node, Name, Value) ->
erlang:error({timeout, t_spawn_reg_shared, [Node,Name,Value]})
end.

t_spawn_reg_mcall(Node, {_,g,_} = Name) ->
try t_spawn_reg_mcall_(Node, Name)
catch
error:E ->
error({E, erlang:get_stacktrace()})
end.

t_spawn_reg_mcall_(Node, Name) ->
Me = self(),
P = spawn(
Node, fun() ->
t_spawn_reg_mcall_p(Name, Me)
end),
MRef = erlang:monitor(process, P),
receive
{P, ok} ->
P;
{'DOWN', MRef, _, _, Reason} ->
error(Reason)
after 1000 ->
erlang:error({timeout, t_spawn_reg_mcall, [Node, Name]})
end.

t_spawn_reg_mcall_p(Name, Parent) ->
try begin
true = gproc:reg(Name),
{GoodRes, []} = gproc_dist:multicall(gproc, where, [Name]),
true = lists:all(fun(X) -> X =:= self() end, GoodRes),
Parent ! {self(), ok}
end
catch
error:E ->
error({E, erlang:get_stacktrace()})
end,
t_loop().


default_value({c,_,_}) -> 0;
default_value(_) -> undefined.

Expand Down