Permalink
Fetching contributors…
Cannot retrieve contributors at this time
1076 lines (979 sloc) 35.1 KB
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved via the world wide web at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
%% AB. All Rights Reserved.''
%%
%% @author Ulf Wiger <ulf@wiger.net>
%%
%% @doc Extended process registry
%% <p>This module implements an extended process registry</p>
%% <p>For a detailed description, see gproc/doc/erlang07-wiger.pdf.</p>
%% @end
-module(gproc_dist).
-behaviour(gen_leader).
-export([start_link/0, start_link/1,
reg/1, reg/4, unreg/1,
reg_other/5, unreg_other/2,
reg_or_locate/3,
reg_shared/3, unreg_shared/1,
monitor/2,
demonitor/2,
set_attributes/2,
set_attributes_shared/2,
mreg/2,
munreg/2,
set_value/2,
set_value_shared/2,
give_away/2,
update_counter/3,
update_counters/1,
update_shared_counter/2,
reset_counter/1]).
-export([leader_call/1,
leader_cast/1,
sync/0,
get_leader/0]).
%%% internal exports
-export([init/1,
handle_cast/3,
handle_call/4,
handle_info/2, handle_info/3,
handle_leader_call/4,
handle_leader_cast/3,
handle_DOWN/3,
elected/2, % original version
elected/3,
surrendered/3,
from_leader/3,
code_change/4,
terminate/2]).
-include("gproc_int.hrl").
-include("gproc.hrl").
-define(SERVER, ?MODULE).
-record(state, {
always_broadcast = false,
is_leader,
sync_requests = []}).
-include("gproc_trace.hrl").
%% ==========================================================
%% Start functions
start_link() ->
start_link({[node()|nodes()], []}).
start_link(all) ->
start_link({[node()|nodes()], [{bcast_type, all}]});
start_link(Nodes) when is_list(Nodes) ->
start_link({Nodes, []});
start_link({Nodes, Opts}) ->
SpawnOpts = gproc_lib:valid_opts(server_options, []),
gen_leader:start_link(
?SERVER, Nodes, Opts, ?MODULE, [], [{spawn_opt, SpawnOpts}]).
%% ==========================================================
%% API
%% {@see gproc:reg/1}
%%
reg(Key) ->
reg(Key, gproc:default(Key), [], reg).
%% {@see gproc:reg_or_locate/2}
%%
reg_or_locate({n,g,_} = Key, Value, Pid) when is_pid(Pid) ->
leader_call({reg_or_locate, Key, Value, Pid});
reg_or_locate({n,g,_} = Key, Value, F) when is_function(F, 0) ->
MyGroupLeader = group_leader(),
leader_call({reg_or_locate, Key, Value,
fun() ->
%% leader will spawn on caller's node
group_leader(MyGroupLeader, self()),
F()
end});
reg_or_locate(_, _, _) ->
?THROW_GPROC_ERROR(badarg).
%%% @spec({Class,g, Key}, Value) -> true
%%% @doc
%%% Class = n - unique name
%%% | p - non-unique property
%%% | c - counter
%%% | a - aggregated counter
%%% | r - resource property
%%% | rc - resource counter
%%% @end
reg({_,g,_} = Key, Value, Attrs, Op) ->
%% anything global
leader_call({reg, Key, Value, self(), Attrs, Op});
reg(_, _, _, _) ->
?THROW_GPROC_ERROR(badarg).
%% @spec ({Class,g,Key}, pid(), Value, Attrs, Op::reg | unreg) -> true
%% @doc
%% Class = n - unique name
%% | a - aggregated counter
%% | r - resource property
%% | rc - resource counter
%% Value = term()
%% Attrs = [{Key, Value}]
%% @end
reg_other({T,g,_} = Key, Pid, Value, Attrs, Op) when is_pid(Pid) ->
if T==n; T==a; T==r; T==rc ->
leader_call({reg_other, Key, Value, Pid, Attrs, Op});
true ->
?THROW_GPROC_ERROR(badarg)
end;
reg_other(_, _, _, _, _) ->
?THROW_GPROC_ERROR(badarg).
unreg_other({T,g,_} = Key, Pid) when is_pid(Pid) ->
if T==n; T==a; T==r; T==rc ->
leader_call({unreg_other, Key, Pid});
true ->
?THROW_GPROC_ERROR(badarg)
end;
unreg_other(_, _) ->
?THROW_GPROC_ERROR(badarg).
reg_shared({_,g,_} = Key, Value, Attrs) ->
leader_call({reg, Key, Value, shared, Attrs, reg});
reg_shared(_, _, _) ->
?THROW_GPROC_ERROR(badarg).
monitor({_,g,_} = Key, Type) when Type==info;
Type==follow;
Type==standby ->
leader_call({monitor, Key, self(), Type});
monitor(_, _) ->
?THROW_GPROC_ERROR(badarg).
demonitor({_,g,_} = Key, Ref) ->
leader_call({demonitor, Key, self(), Ref});
demonitor(_, _) ->
?THROW_GPROC_ERROR(badarg).
set_attributes({_,g,_} = Key, Attrs) ->
leader_call({set_attributes, Key, Attrs, self()});
set_attributes(_, _) ->
?THROW_GPROC_ERROR(badarg).
set_attributes_shared({_,g,_} = Key, Attrs) ->
leader_call({set_attributes, Key, Attrs, shared});
set_attributes_shared(_, _) ->
?THROW_GPROC_ERROR(badarg).
mreg(T, KVL) ->
if is_list(KVL) -> leader_call({mreg, T, g, KVL, self()});
true -> ?THROW_GPROC_ERROR(badarg)
end.
munreg(T, Keys) ->
if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
true -> ?THROW_GPROC_ERROR(badarg)
end.
unreg({_,g,_} = Key) ->
leader_call({unreg, Key, self()});
unreg(_) ->
?THROW_GPROC_ERROR(badarg).
unreg_shared({T,g,_} = Key) when T==c; T==a ->
leader_call({unreg, Key, shared});
unreg_shared(_) ->
?THROW_GPROC_ERROR(badarg).
set_value({T,g,_} = Key, Value) when T==a; T==c ->
if is_integer(Value) ->
leader_call({set, Key, Value, self()});
true ->
?THROW_GPROC_ERROR(badarg)
end;
set_value({_,g,_} = Key, Value) ->
leader_call({set, Key, Value, self()});
set_value(_, _) ->
?THROW_GPROC_ERROR(badarg).
set_value_shared({T,g,_} = Key, Value) when T==a; T==c; T==p ->
leader_call({set, Key, Value, shared});
set_value_shared(_, _) ->
?THROW_GPROC_ERROR(badarg).
give_away({_,g,_} = Key, To) ->
leader_call({give_away, Key, To, self()}).
update_counter({T,g,_} = Key, Pid, Incr) when is_integer(Incr), T==c;
is_integer(Incr), T==n ->
leader_call({update_counter, Key, Incr, Pid});
update_counter(_, _, _) ->
?THROW_GPROC_ERROR(badarg).
update_counters(List) when is_list(List) ->
leader_call({update_counters, List});
update_counters(_) ->
?THROW_GPROC_ERROR(badarg).
update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
leader_call({update_counter, Key, Incr, shared});
update_shared_counter(_, _) ->
?THROW_GPROC_ERROR(badarg).
reset_counter({c,g,_} = Key) ->
leader_call({reset_counter, Key, self()});
reset_counter(_) ->
?THROW_GPROC_ERROR(badarg).
%% @spec sync() -> true
%% @doc Synchronize with the gproc leader
%%
%% This function can be used to ensure that data has been replicated from the
%% leader to the current node. It does so by asking the leader to ping all
%% live participating nodes. The call will return `true' when all these nodes
%% have either responded or died. In the special case where the leader dies
%% during an ongoing sync, the call will fail with a timeout exception.
%% (Actually, it should be a `leader_died' exception; more study needed to find
%% out why gen_leader times out in this situation, rather than reporting that
%% the leader died.)
%% @end
%%
sync() ->
leader_call(sync).
%% @spec get_leader() -> node()
%% @doc Returns the node of the current gproc leader.
%% @end
get_leader() ->
GenLeader = gen_leader,
GenLeader:call(?MODULE, get_leader).
%% ==========================================================
%% Server-side
handle_cast(_Msg, S, _) ->
{stop, unknown_cast, S}.
handle_call(get_leader, _, S, E) ->
{reply, gen_leader:leader_node(E), S};
handle_call(_, _, S, _) ->
{reply, badarg, S}.
handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
ets:delete(?TAB, {Pid, g}),
leader_cast({pid_is_DOWN, Pid}),
{ok, S};
handle_info({gproc_unreg, Objs}, S) ->
{ok, [{delete, Objs}], S};
handle_info(_, S) ->
{ok, S}.
handle_info(Msg, S, _E) ->
handle_info(Msg, S).
elected(S, _E) ->
{ok, {globals,globs()}, S#state{is_leader = true}}.
elected(S, _E, undefined) ->
%% I have become leader; full synch
{ok, {globals, globs()}, S#state{is_leader = true}};
elected(S, _E, _Node) ->
Synch = {globals, globs()},
if not S#state.always_broadcast ->
%% Another node recognized us as the leader.
%% Don't broadcast all data to everyone else
{reply, Synch, S};
true ->
%% Main reason for doing this is if we are using a gen_leader
%% that doesn't support the 'reply' return value
{ok, Synch, S}
end.
globs() ->
Gs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
As = ets:select(?TAB, [{{{'$1',{'_',g,'_'}}, '$2'},[],['$_']}]),
_ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- Gs],
Gs ++ As.
surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
%% Leader conflict!
surrendered_1(Globs),
{ok, S#state{is_leader = false}};
surrendered(S, {globals, Globs}, _E) ->
%% globals from this node should be more correct in our table than
%% in the leader's
surrendered_1(Globs),
{ok, S#state{is_leader = false}}.
handle_DOWN(Node, S, _E) ->
S1 = check_sync_requests(Node, S),
Head = {{{'_',g,'_'},'_'},'$1','_'},
Gs = [{'==', {node,'$1'},Node}],
Globs = ets:select(?TAB, [{Head, Gs, [{{{element,1,{element,1,'$_'}},
{element,2,'$_'}}}]}]),
case process_globals(Globs) of
[] ->
{ok, S1};
Broadcast ->
{ok, Broadcast, S1}
end.
check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
SReqs1 = lists:flatmap(
fun({From, Ns}) ->
case Ns -- [Node] of
[] ->
gen_leader:reply(From, {leader, reply, true}),
[];
Ns1 ->
[{From, Ns1}]
end
end, SReqs),
S#state{sync_requests = SReqs1}.
handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
GenLeader = gen_leader,
case GenLeader:alive(E) -- [node()] of
[] ->
{reply, true, S};
Alive ->
GenLeader:broadcast({from_leader, {sync, From}}, Alive, E),
{noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
end;
handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E)
when Reg==reg; Reg==reg_other ->
case gproc_lib:insert_reg(K, Value, Pid, g) of
false when Op == reg ->
{reply, badarg, S};
false when Op == ensure ->
case ets:lookup(?TAB, ets_key(K, Pid)) of
[{_, Pid, _}] ->
gproc_lib:do_set_value(K, Value, Pid),
gproc_lib:insert_attr(K, As, Pid, g),
Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
{reply, updated, [{insert, Vals}], S};
_ ->
{reply, badarg, [], S}
end;
true ->
_ = gproc_lib:ensure_monitor(Pid,g),
_ = if As =/= [] ->
gproc_lib:insert_attr(K, As, Pid, g);
true -> []
end,
Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
{reply, regged_new(Op), [{insert, Vals}], S}
end;
handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
T==a ->
case ets:lookup(?TAB, {K, T}) of
[{_, Pid, _}] ->
Opts = get_opts(Pid, K),
Ref = make_ref(),
Opts1 = gproc_lib:add_monitor(Opts, MPid, Ref, Type),
_ = gproc_lib:ensure_monitor(MPid, g),
Obj = {{Pid,K}, Opts1},
ets:insert(?TAB, Obj),
{reply, Ref, [{insert, [Obj]}], S};
LookupRes ->
Ref = make_ref(),
case Type of
standby ->
Event = {failover, MPid},
Msgs = insert_reg(LookupRes, K, undefined, MPid, Event),
Obj = {{K,T}, MPid, undefined},
Rev = {{MPid,K}, []},
ets:insert(?TAB, [Obj, Rev]),
MPid ! {gproc, {failover,MPid}, Ref, K},
{reply, Ref, [{insert, [Obj, Rev]},
{notify, Msgs}], S};
follow ->
case LookupRes of
[{_, Waiters}] ->
add_follow_to_waiters(Waiters, K, MPid, Ref, S);
[] ->
add_follow_to_waiters([], K, MPid, Ref, S);
[{_, Pid, _}] ->
case ets:lookup(?TAB, {Pid,K}) of
[{_, Opts}] when is_list(Opts) ->
Opts1 = gproc_lib:add_monitor(
Opts, MPid, Ref, follow),
ets:insert(?TAB, {{Pid,K}, Opts1}),
{reply, Ref,
[{insert, [{{Pid,K}, Opts1}]}], S}
end
end;
_ ->
MPid ! {gproc, unreg, Ref, K},
{reply, Ref, S}
end
end;
handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
case ets:lookup(?TAB, {K,T}) of
[{_, Pid, _}] ->
Opts = get_opts(Pid, K),
Opts1 = gproc_lib:remove_monitors(Opts, MPid, Ref),
Obj = {{Pid,K}, Opts1},
ets:insert(?TAB, Obj),
ets:delete(?TAB, {MPid, K}),
{reply, ok, [{delete, [{MPid,K}]},
{insert, [Obj]}], S};
[{Key, Waiters}] ->
NewWaiters = [W || W <- Waiters,
W =/= {MPid, Ref, follow}],
{reply, ok, [{insert, [{Key, NewWaiters}]}], S};
_ ->
{reply, ok, S}
end;
handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) ->
case gproc_lib:insert_attr(K, Attrs, Pid, g) of
false ->
{reply, badarg, S};
NewAttrs when is_list(NewAttrs) ->
{reply, true, [{insert, [{{Pid,K}, NewAttrs}]}], S}
end;
handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P},
{FromPid, _}, S, _E) ->
FromNode = node(FromPid),
Reg = fun() ->
Pid = if is_function(P, 0) ->
spawn(FromNode, P);
is_pid(P) ->
P
end,
case gproc_lib:insert_reg(K, Value, Pid, g) of
true ->
_ = gproc_lib:ensure_monitor(Pid,g),
Vals = [{{K,n},Pid,Value}],
{reply, {Pid, Value}, [{insert, Vals}], S};
false ->
{reply, badarg, S}
end
end,
case ets:lookup(?TAB, {K, n}) of
[] ->
Reg();
[{_, _Waiters}] ->
Reg();
[{_, OtherPid, OtherVal}] ->
{reply, {OtherPid, OtherVal}, S}
end;
handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
when is_integer(Incr), T==c;
is_integer(Incr), T==n ->
try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
RealPid = case Pid of
n -> ets:lookup_element(?TAB, {Key,Pid}, 2);
shared -> shared;
P when is_pid(P) -> P
end,
Vals = [{{Key,Pid},RealPid,New} | update_aggr_counter(Key, Incr)],
{reply, New, [{insert, Vals}], S}
catch
error:_ ->
{reply, badarg, S}
end;
handle_leader_call({update_counters, Cs}, _From, S, _E) ->
try {Replies, Vals} = batch_update_counters(Cs),
{reply, Replies, [{insert, Vals}], S}
catch
error:_ ->
{reply, badarg, S}
end;
handle_leader_call({reset_counter, {c,g,_Ctr} = Key, Pid}, _From, S, _E) ->
try Current = ets:lookup_element(?TAB, {Key, Pid}, 3),
Initial = case ets:lookup_element(?TAB, {Pid, Key}, 2) of
r -> 0;
Opts when is_list(Opts) ->
proplists:get_value(initial, Opts, 0)
end,
Incr = Initial - Current,
New = ets:update_counter(?TAB, {Key, Pid}, {3, Incr}),
Vals = [{{Key,Pid},Pid,New} | update_aggr_counter(Key, Incr)],
{reply, {Current, New}, [{insert, Vals}], S}
catch
error:_R ->
io:fwrite("reset_counter failed: ~p~n~p~n", [_R, erlang:get_stacktrace()]),
{reply, badarg, S}
end;
handle_leader_call({Unreg, {T,g,Name} = K, Pid}, _From, S, _E)
when Unreg==unreg;
Unreg==unreg_other->
Key = if T == n; T == a; T == rc -> {K,T};
true -> {K, Pid}
end,
case ets:member(?TAB, Key) of
true ->
_ = gproc_lib:remove_reg(K, Pid, unreg),
if T == c ->
case ets:lookup(?TAB, {{a,g,Name},a}) of
[Aggr] ->
%% updated by remove_reg/3
{reply, true, [{delete,[{K,Pid}, {Pid,K}]},
{insert, [Aggr]}], S};
[] ->
{reply, true, [{delete, [{K,Pid}, {Pid,K}]}], S}
end;
T == r ->
case ets:lookup(?TAB, {{rc,g,Name},rc}) of
[RC] ->
{reply, true, [{delete,[{K,Pid}, {Pid,K}]},
{insert, [RC]}], S};
[] ->
{reply, true, [{delete, [{K,Pid}, {Pid, K}]}], S}
end;
true ->
{reply, true, [{notify, [{K, Pid, unreg}]},
{delete, [{K, Pid}, {Pid,K}]}], S}
end;
false ->
{reply, badarg, S}
end;
handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
when T == a; T == n; T == rc ->
Key = {K, T},
case ets:lookup(?TAB, Key) of
[{_, Pid, Value}] ->
Opts = get_opts(Pid, K),
case pid_to_give_away_to(To) of
Pid ->
{reply, Pid, S};
ToPid when is_pid(ToPid) ->
ets:insert(?TAB, [{Key, ToPid, Value},
{{ToPid,K}, Opts}]),
_ = gproc_lib:ensure_monitor(ToPid, g),
Rev = {Pid, K},
ets:delete(?TAB, Rev),
gproc_lib:notify({migrated, ToPid}, K, Opts),
{reply, ToPid, [{insert, [{Key, ToPid, Value}]},
{notify, [{K, Pid, {migrated, ToPid}}]},
{delete, [{K, Pid}, Rev]}], S};
undefined ->
ets:delete(?TAB, Key),
Rev = {Pid, K},
ets:delete(?TAB, Rev),
gproc_lib:notify(unreg, K, Opts),
{reply, undefined, [{notify, [{K, Pid, unreg}]},
{delete, [{K, Pid}, Rev]}], S}
end;
_ ->
{reply, badarg, S}
end;
handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
if T==p; T==n; T==r ->
try gproc_lib:insert_many(T, g, L, Pid) of
{true,Objs} -> {reply, true, [{insert,Objs}], S};
false -> {reply, badarg, S}
catch
error:_ -> {reply, badarg, S}
end;
true -> {reply, badarg, S}
end;
handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
try gproc_lib:remove_many(T, g, L, Pid) of
[] ->
{reply, true, S};
Objs ->
{reply, true, [{delete, Objs}], S}
catch
error:_ -> {reply, badarg, S}
end;
handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
if T == a ->
if is_integer(V) ->
case gproc_lib:do_set_value(K, V, Pid) of
true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
false -> {reply, badarg, S}
end
end;
T == c ->
try gproc_lib:do_set_counter_value(K, V, Pid),
AKey = {{a,g,N},a},
Aggr = ets:lookup(?TAB, AKey), % may be []
{reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
catch
error:_ ->
{reply, badarg, S}
end;
true ->
case gproc_lib:do_set_value(K, V, Pid) of
true ->
Obj = if T==n -> {{K, T}, Pid, V};
true -> {{K, Pid}, Pid, V}
end,
{reply, true, [{insert,[Obj]}], S};
false ->
{reply, badarg, S}
end
end;
handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
%% The pid in _From is of the gen_leader instance that forwarded the
%% call - not of the client. This is why the Pid is explicitly passed.
%% case gproc_lib:await(Key, {Pid,Ref}) of
case gproc_lib:await(Key, Pid, From) of
{reply, {Ref, {K, P, V}}} ->
{reply, {Ref, {K, P, V}}, S};
{reply, Reply, Insert} ->
{reply, Reply, [{insert, Insert}], S}
end;
handle_leader_call(_, _, S, _E) ->
{reply, badarg, S}.
handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
#state{sync_requests = SReqs} = S,
case lists:keyfind(Ref, 1, SReqs) of
false ->
%% This should never happen, except perhaps if the leader who
%% received the sync request died, and the new leader gets the
%% sync reply. In that case, we trust that the client has been
%% notified anyway, and ignore the message.
{ok, S};
{_, Ns} ->
case lists:delete(Node, Ns) of
[] ->
gen_leader:reply(Ref, {leader, reply, true}),
{ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
Ns1 ->
SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
{ok, S#state{sync_requests = SReqs1}}
end
end;
handle_leader_cast({add_globals, Missing}, S, _E) ->
%% This is an audit message: a peer (non-leader) had info about granted
%% global resources that we didn't know of when we became leader.
%% This could happen due to a race condition when the old leader died.
Update = insert_globals(Missing),
{ok, [{insert, Update}], S};
handle_leader_cast({remove_globals, Globals}, S, _E) ->
delete_globals(Globals),
{ok, S};
handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
case ets:lookup(?TAB, {Key, T}) of
[{_, Waiters}] ->
Ops = gproc_lib:remove_wait(Key, Pid, Ref, Waiters),
{ok, Ops, S};
_ ->
{ok, [], S}
end;
handle_leader_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S, _E) ->
case ets:lookup(?TAB, {Key, T}) of
[{_, Waiters}] ->
Ops = gproc_lib:remove_wait(Key, Pid, all, Waiters),
{ok, Ops, S};
[{_, OtherPid, _}] ->
Ops = gproc_lib:remove_monitors(Key, OtherPid, Pid),
{ok, Ops, S}
end;
handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
Globals = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
[{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
ets:delete(?TAB, {Pid,g}),
case process_globals(Globals) of
[] ->
{ok, S};
Broadcast ->
{ok, Broadcast, S}
end.
mk_broadcast_insert_vals(Objs) ->
lists:flatmap(
fun({{C, g, Name} = K, Pid, Value}) ->
if C == a; C == rc ->
ets:lookup(?TAB, {K,C}) ++ ets:lookup(?TAB, {Pid,K});
C == c ->
[{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
++ ets:lookup(?TAB, {Pid,K});
C == r ->
[{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{rc,g,Name},rc})]
++ ets:lookup(?TAB, {Pid, K});
C == n ->
[{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
true ->
[{{K,Pid},Pid,Value} | ets:lookup(?TAB, {Pid,K})]
end
end, Objs).
process_globals(Globals) ->
{Modified, Notifications} =
lists:foldl(
fun({{T,_,_} = Key, Pid}, A) when T==n; T==a; T==rc ->
case ets:lookup(?TAB, {Pid,Key}) of
[{_, Opts}] when is_list(Opts) ->
maybe_failover(Key, Pid, Opts, A);
_ ->
A
end;
({{T,_,_} = Key, Pid}, {MA,NA}) ->
MA1 = case T of
c ->
Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
update_aggr_counter(Key, -Incr) ++ MA;
r ->
decrement_resource_count(Key, []) ++ MA;
_ ->
MA
end,
N = remove_entry(Key, Pid, unreg),
{MA1, N ++ NA}
end, {[],[]}, Globals),
[{insert, Modified} || Modified =/= []] ++
[{notify, Notifications} || Notifications =/= []] ++
[{delete, Globals} || Globals =/= []].
maybe_failover({T,_,_} = Key, Pid, Opts, {MAcc, NAcc}) ->
Opts = get_opts(Pid, Key),
case filter_standbys(gproc_lib:standbys(Opts)) of
[] ->
Notify = remove_entry(Key, Pid, unreg),
{MAcc, Notify ++ NAcc};
[{ToPid,Ref,_}|_] ->
Value = case ets:lookup(?TAB, {Key,T}) of
[{_, _, V}] -> V;
_ -> undefined
end,
Notify = remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
Opts1 = gproc_lib:remove_monitor(Opts, ToPid, Ref),
_ = gproc_lib:ensure_monitor(ToPid, g),
NewReg = {{Key,T}, ToPid, Value},
NewRev = {{ToPid, Key}, Opts1},
ets:insert(?TAB, [NewReg, NewRev]),
{[NewReg, NewRev | MAcc], Notify ++ NAcc}
end.
filter_standbys(SBs) ->
filter_standbys(SBs, [node()|nodes()]).
filter_standbys([{Pid,_,_} = H|T], Nodes) ->
case lists:member(node(Pid), Nodes) of
true ->
[H|T];
false ->
filter_standbys(T, Nodes)
end;
filter_standbys([], _) ->
[].
remove_entry(Key, Pid, Event) ->
K = ets_key(Key, Pid),
case ets:lookup(?TAB, K) of
[{_, P, _}] when is_pid(P), P =:= Pid; is_atom(Pid) ->
ets:delete(?TAB, K),
remove_rev_entry(get_opts(Pid, Key), Pid, Key, Event);
[{_, _OtherPid, _}] ->
ets:delete(?TAB, {Pid, Key}),
[];
[{_, _Waiters}] ->
%% Skip
[];
[] -> []
end.
remove_rev_entry(Opts, Pid, {T,g,_} = K, Event) when T==n; T==a ->
Key = {Pid, K},
gproc_lib:notify(Event, K, Opts),
ets:delete(?TAB, Key),
[{K, Pid, Event}];
remove_rev_entry(_, Pid, K, _Event) ->
ets:delete(?TAB, {Pid, K}),
[].
get_opts(Pid, K) ->
case ets:lookup(?TAB, {Pid, K}) of
[] -> [];
[{_, r}] -> [];
[{_, Opts}] -> Opts
end.
code_change(_FromVsn, S, _Extra, _E) ->
{ok, S}.
terminate(_Reason, _S) ->
ok.
from_leader({sync, Ref}, S, _E) ->
gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
{ok, S};
from_leader(Ops, S, _E) ->
lists:foreach(
fun({delete, Globals}) ->
delete_globals(Globals);
({insert, Globals}) ->
_ = insert_globals(Globals);
({notify, Events}) ->
do_notify(Events)
end, Ops),
{ok, S}.
insert_globals(Globals) ->
lists:foldl(
fun({{{_,_,_} = Key,_}, Pid, _} = Obj, A) ->
ets:insert(?TAB, Obj),
ets:insert_new(?TAB, {{Pid,Key}, []}),
gproc_lib:ensure_monitor(Pid,g),
A;
({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
ets:insert(?TAB, Obj),
gproc_lib:ensure_monitor(P,g),
[Obj] ++ A;
(_Other, A) ->
A
end, Globals, Globals).
delete_globals(Globals) ->
lists:foreach(
fun({{_,g,_} = K, T}) when is_atom(T); is_pid(T) ->
remove_entry(K, T, []);
({{{_,g,_} = K, T}, P}) when is_pid(P), is_atom(T);
is_pid(P), is_pid(T) ->
remove_entry(K, P, []);
({Pid, Key}) when is_pid(Pid); Pid==shared ->
ets:delete(?TAB, {Pid, Key})
end, Globals).
do_notify([{P, Msg}|T]) when is_pid(P) ->
P ! Msg,
do_notify(T);
do_notify([{K, P, E}|T]) ->
case ets:lookup(?TAB, {P,K}) of
[{_, Opts}] when is_list(Opts) ->
gproc_lib:notify(E, K, Opts);
_ ->
do_notify(T)
end;
do_notify([]) ->
ok.
ets_key({T,_,_} = K, _) when T==n; T==a; T==rc ->
{K, T};
ets_key(K, Pid) ->
{K, Pid}.
leader_call(Req) ->
case gen_leader:leader_call(?MODULE, Req) of
badarg -> ?THROW_GPROC_ERROR(badarg);
Reply -> Reply
end.
leader_cast(Msg) ->
gen_leader:leader_cast(?MODULE, Msg).
init(Opts) ->
S0 = #state{},
AlwaysBcast = proplists:get_value(always_broadcast, Opts,
S0#state.always_broadcast),
{ok, #state{always_broadcast = AlwaysBcast}}.
surrendered_1(Globs) ->
My_local_globs =
ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '$2'},
[{'==', {node,'$1'}, node()}],
[{{ {element,1,'$_'}, '$1', '$2' }}]}]),
_ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- My_local_globs],
?event({'My_local_globs', My_local_globs}),
%% remove all remote globals.
ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
[{'=/=', {node,'$1'}, node()}],
[true]},
{{{'$1',{'_',g,'_'}}, '_'},
[{'=/=', {node,'$1'}, node()}],
[true]}]),
%% insert new non-local globals, collect the leader's version of
%% what my globals are
Ldr_local_globs =
lists:foldl(
fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
ets:insert(?TAB, {K, Pid, V}),
_ = gproc_lib:ensure_monitor(Pid, g),
ets:insert_new(?TAB, {{Pid,Key}, []}),
Acc;
({{_Pid,_}=K, Opts}, Acc) -> % when node(Pid) =/= node() ->
ets:insert(?TAB, {K, Opts}),
Acc;
({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
[Obj|Acc]
end, [], Globs),
?event({'Ldr_local_globs', Ldr_local_globs}),
case [{K,P,V} || {K,P,V} <- My_local_globs,
is_pid(P) andalso
not(lists:keymember(K, 1, Ldr_local_globs))] of
[] ->
%% phew! We have the same picture
ok;
[_|_] = Missing ->
%% This is very unlikely, I think
?event({'Missing', Missing}),
leader_cast({add_globals, mk_broadcast_insert_vals(Missing)})
end,
case [{K,P} || {{K,_}=R,P,_} <- Ldr_local_globs,
is_pid(P) andalso
not(lists:keymember(R, 1, My_local_globs))] of
[] ->
ok;
[_|_] = Remove ->
?event({'Remove', Remove}),
leader_cast({remove_globals, Remove})
end.
batch_update_counters(Cs) ->
batch_update_counters(Cs, [], []).
batch_update_counters([{{c,g,_} = Key, Pid, Incr}|T], Returns, Updates) ->
case update_counter_g(Key, Incr, Pid) of
[{_,_,_} = A, {_, _, V} = C] ->
batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(
A, add_object(C, Updates)));
[{_, _, V} = C] ->
batch_update_counters(T, [{Key,Pid,V}|Returns], add_object(C, Updates))
end;
batch_update_counters([], Returns, Updates) ->
{lists:reverse(Returns), Updates}.
add_object({K,P,_} = Obj, [{K,P,_} | T]) ->
[Obj | T];
add_object(Obj, [H|T]) ->
[H | add_object(Obj, T)];
add_object(Obj, []) ->
[Obj].
update_counter_g({c,g,_} = Key, Incr, Pid) when is_integer(Incr) ->
Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
update_aggr_counter(Key, Incr, [{{Key,Pid},Pid,Res}]);
update_counter_g({c,g,_} = Key, {Incr, Threshold, SetValue}, Pid)
when is_integer(Incr), is_integer(Threshold), is_integer(SetValue) ->
[Prev, New] = ets:update_counter(?TAB, {Key, Pid},
[{3, 0}, {3, Incr, Threshold, SetValue}]),
update_aggr_counter(Key, New - Prev, [{{Key,Pid},Pid,New}]);
update_counter_g({c,g,_} = Key, Ops, Pid) when is_list(Ops) ->
case ets:update_counter(?TAB, {Key, Pid},
[{3, 0} | expand_ops(Ops)]) of
[_] ->
[];
[Prev | Rest] ->
[New | _] = lists:reverse(Rest),
update_aggr_counter(Key, New - Prev, [{Key, Pid, Rest}])
end;
update_counter_g(_, _, _) ->
?THROW_GPROC_ERROR(badarg).
expand_ops([{Incr,Thr,SetV}|T])
when is_integer(Incr), is_integer(Thr), is_integer(SetV) ->
[{3, Incr, Thr, SetV}|expand_ops(T)];
expand_ops([Incr|T]) when is_integer(Incr) ->
[{3, Incr}|expand_ops(T)];
expand_ops([]) ->
[];
expand_ops(_) ->
?THROW_GPROC_ERROR(badarg).
update_aggr_counter({n,_,_}, _) ->
[];
update_aggr_counter(Key, Incr) ->
update_aggr_counter(Key, Incr, []).
update_aggr_counter({c,g,Ctr}, Incr, Acc) ->
Key = {{a,g,Ctr},a},
case ets:lookup(?TAB, Key) of
[] ->
Acc;
[{K, Pid, Prev}] ->
New = {K, Pid, Prev+Incr},
ets:insert(?TAB, New),
[New|Acc]
end.
decrement_resource_count({r,g,Rsrc}, Acc) ->
Key = {{rc,g,Rsrc},rc},
case ets:member(?TAB, Key) of
false ->
Acc;
true ->
%% Call the lib function, which might trigger events
gproc_lib:decrement_resource_count(g, Rsrc),
ets:lookup(?TAB, Key) ++ Acc
end.
pid_to_give_away_to(P) when is_pid(P) ->
P;
pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
case ets:lookup(?TAB, {Key, T}) of
[{_, Pid, _}] ->
Pid;
_ ->
undefined
end.
insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
gproc_lib:insert_reg(K, Val, Pid, g),
tell_waiters(Waiters, K, Pid, Val, Event);
insert_reg([], K, Val, Pid, Event) ->
gproc_lib:insert_reg(K, Val, Pid, g),
tell_waiters([], K, Val, Pid, Event).
tell_waiters([{P,R}|T], K, Pid, V, Event) ->
Msg = {gproc, R, registered, {K, Pid, V}},
if node(P) == node() ->
P ! Msg;
true ->
[{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
end;
tell_waiters([{P,R,follow}|T], K, Pid, V, Event) ->
Msg = {gproc, Event, R, K},
if node(P) == node() ->
P ! Msg;
true ->
[{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
end;
tell_waiters([], _, _, _, _) ->
[].
add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
Obj = {{K,T}, [{Pid, Ref, follow}|Waiters]},
Rev = {{Pid,K}, []},
ets:insert(?TAB, [Obj, Rev]),
Msg = {gproc, unreg, Ref, K},
if node(Pid) == node() ->
Pid ! Msg,
{reply, Ref, [{insert, [Obj, Rev]}], S};
true ->
{reply, Ref, [{insert, [Obj, Rev]},
{notify, [{Pid, Msg}]}], S}
end.
regged_new(reg ) -> true;
regged_new(ensure) -> new.