Permalink
Browse files

First reasonably well working Distributed Gproc

This version requires the following gen_leader version

http://github.com/uwiger/gen_leader_revival/tree/master/hanssv+serge_version/

(At commit time, other gen_leader versions handle DOWN messages
incorrectly, causing gproc to malfunction.)
  • Loading branch information...
1 parent 4da42e3 commit 0fae114946b1e720fd42a0246edb5e6e126c39a5 Ulf Wiger committed Apr 29, 2010
Showing with 207 additions and 91 deletions.
  1. +1 −1 Makefile
  2. +72 −34 src/gproc.erl
  3. +95 −37 src/gproc_dist.erl
  4. +39 −19 src/gproc_lib.erl
View
@@ -1,6 +1,6 @@
## The MIT License
##
-## Copyright (c) 2008-2010 Ulf Wiger <ulf.wiger@erlang-solutions.com>,
+## Copyright (c) 2008-2010 Ulf Wiger <ulf@wiger.net>,
##
## Permission is hereby granted, free of charge, to any person obtaining a
## copy of this software and associated documentation files (the "Software"),
View
@@ -281,6 +281,9 @@ default(_) -> undefined.
await(Key) ->
await(Key, infinity).
+await({n,g,_} = Key, Timeout) ->
+ ?CHK_DIST,
+ request_wait(Key, Timeout);
await({n,l,_} = Key, Timeout) ->
case ets:lookup(?TAB, {Key, n}) of
[{_, Pid, Value}] ->
@@ -291,15 +294,15 @@ await({n,l,_} = Key, Timeout) ->
await(K, T) ->
erlang:error(badarg, [K, T]).
-request_wait({n,l,_} = Key, Timeout) ->
+request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
TRef = case Timeout of
infinity -> no_timer;
T when is_integer(T), T > 0 ->
erlang:start_timer(T, self(), timeout);
_ ->
erlang:error(badarg, [Key, Timeout])
end,
- WRef = call({await,Key}),
+ WRef = call({await,Key,self()}, C),
receive
{gproc, WRef, registered, {_K, Pid, V}} ->
{Pid, V};
@@ -316,13 +319,20 @@ request_wait({n,l,_} = Key, Timeout) ->
%% {gproc, Ref, registered, {Key, Pid, Value}}, once the name is registered.
%% @end
%%
+nb_wait({n,g,_} = Key) ->
+ ?CHK_DIST,
+ call({await, Key, self()}, g);
nb_wait({n,l,_} = Key) ->
- call({await, Key});
+ call({await, Key, self()}, l);
nb_wait(Key) ->
erlang:error(badarg, [Key]).
-cancel_wait(Key, Ref) ->
- cast({cancel_wait, self(), Key, Ref}),
+cancel_wait({_,g,_} = Key, Ref) ->
+ ?CHK_DIST,
+ cast({cancel_wait, self(), Key, Ref}, g),
+ ok;
+cancel_wait({_,l,_} = Key, Ref) ->
+ cast({cancel_wait, self(), Key, Ref}, l),
ok.
@@ -754,7 +764,7 @@ handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
case try_insert_reg(Key, Val, Pid) of
true ->
- ensure_monitor(Pid),
+ gproc_lib:ensure_monitor(Pid,l),
{reply, true, S};
false ->
{reply, badarg, S}
@@ -767,24 +777,33 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
false ->
{reply, badarg, S}
end;
-handle_call({await, {T,l,_} = Key}, {Pid,Ref} = From, S) ->
- Rev = {{Pid,Key}, r},
- case ets:lookup(?TAB, {Key,T}) of
- [{_, P, Value}] ->
- %% for symmetry, we always reply with Ref and then send a message
- gen_server:reply(From, Ref),
- Pid ! {gproc, Ref, registered, {Key, P, Value}},
+handle_call({await, {_,l,_} = Key, Pid}, {_, Ref}, S) ->
+ %% Passing the pid explicitly is needed when leader_call is used,
+ %% since the Pid given as From in the leader is the local gen_leader
+ %% instance on the calling node.
+ case gproc_lib:await(Key, {Pid, Ref}) of
+ noreply ->
{noreply, S};
- [{K, Waiters}] ->
- NewWaiters = [{Pid,Ref} | Waiters],
- ets:insert(?TAB, [{K, NewWaiters}, Rev]),
- gproc_lib:ensure_monitor(Pid),
- {reply, Ref, S};
- [] ->
- ets:insert(?TAB, [{{Key,T}, [{Pid,Ref}]}, Rev]),
- gproc_lib:ensure_monitor(Pid),
- {reply, Ref, S}
+ {reply, Reply, _} ->
+ {reply, Reply, S}
end;
+%% Rev = {{Pid,Key}, r},
+%% case ets:lookup(?TAB, {Key,T}) of
+%% [{_, P, Value}] ->
+%% %% for symmetry, we always reply with Ref and then send a message
+%% gen_server:reply(From, Ref),
+%% Pid ! {gproc, Ref, registered, {Key, P, Value}},
+%% {noreply, S};
+%% [{K, Waiters}] ->
+%% NewWaiters = [{Pid,Ref} | Waiters],
+%% ets:insert(?TAB, [{K, NewWaiters}, Rev]),
+%% gproc_lib:ensure_monitor(Pid,l),
+%% {reply, Ref, S};
+%% [] ->
+%% ets:insert(?TAB, [{{Key,T}, [{Pid,Ref}]}, Rev]),
+%% gproc_lib:ensure_monitor(Pid,l),
+%% {reply, Ref, S}
+%% end;
handle_call({mreg, T, l, L}, {Pid,_}, S) ->
try gproc_lib:insert_many(T, l, L, Pid) of
{true,_} -> {reply, true, S};
@@ -812,24 +831,43 @@ handle_info(_, S) ->
%% @hidden
code_change(_FromVsn, S, _Extra) ->
+ %% We have changed local monitor markers from {Pid} to {Pid,l}.
+ case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
+ [] ->
+ ok;
+ Pids ->
+ ets:insert(?TAB, [{P,l} || P <- Pids]),
+ ets:select_delete(?TAB, [{{'_'},[],[true]}])
+ end,
{ok, S}.
%% @hidden
terminate(_Reason, _S) ->
ok.
+call(Req) ->
+ call(Req, l).
+call(Req, l) ->
+ chk_reply(gen_server:call(?MODULE, Req), Req);
+call(Req, g) ->
+ chk_reply(gproc_dist:leader_call(Req), Req).
-call(Req) ->
- case gen_server:call(?MODULE, Req) of
+chk_reply(Reply, Req) ->
+ case Reply of
badarg -> erlang:error(badarg, Req);
Reply -> Reply
end.
cast(Msg) ->
- gen_server:cast(?MODULE, Msg).
+ cast(Msg, l).
+
+cast(Msg, l) ->
+ gen_server:cast(?MODULE, Msg);
+cast(Msg, g) ->
+ gproc_dist:leader_cast(Msg).
@@ -860,7 +898,7 @@ process_is_down(Pid) ->
Keys = ets:select(?TAB, [{{{Pid,'$1'},'$2'},
[{'==',{element,2,'$1'},l}], [{{'$1','$2'}}]}]),
ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]),
- ets:delete(?TAB, Pid),
+ ets:delete(?TAB, {Pid,l}),
lists:foreach(fun({Key,r}) ->
gproc_lib:remove_reg_1(Key, Pid);
({Key,w}) ->
@@ -879,16 +917,16 @@ init([]) ->
-ensure_monitor(Pid) when node(Pid) == node() ->
- case ets:insert_new(?TAB, {Pid}) of
- false -> ok;
- true -> erlang:monitor(process, Pid)
- end;
-ensure_monitor(_) ->
- true.
+%% ensure_monitor(Pid) when node(Pid) == node() ->
+%% case ets:insert_new(?TAB, {Pid}) of
+%% false -> ok;
+%% true -> erlang:monitor(process, Pid)
+%% end;
+%% ensure_monitor(_) ->
+%% true.
monitor_me() ->
- case ets:insert_new(?TAB, {self()}) of
+ case ets:insert_new(?TAB, {self(),l}) of
false -> true;
true ->
cast({monitor_me,self()}),
Oops, something went wrong.

0 comments on commit 0fae114

Please sign in to comment.