Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ws cleanup & unmatched returns

  • Loading branch information...
commit a486aab01ca558e44e89af449b237072951ce54d 1 parent 6ebc977
Ulf Wiger authored
View
21 rebar.config
@@ -1,16 +1,17 @@
%% -*- erlang -*-
{erl_opts, [debug_info]}.
{deps, [
- {edown, ".*", {git, "git://github.com/esl/edown.git", "HEAD"}},
- {gen_leader, ".*",
- {git, "git://github.com/abecciu/gen_leader_revival.git", "HEAD"}}
+ {edown, ".*", {git, "git://github.com/esl/edown.git", "HEAD"}},
+ {gen_leader, ".*",
+ {git, "git://github.com/abecciu/gen_leader_revival.git", "HEAD"}}
]}.
{dialyzer_opts, [{warnings, [no_unused,
- no_improper_lists, no_fun_app, no_match,
- no_opaque, no_fail_call,
- error_handling, no_match,
- behaviours, underspecs]}]}.
+ no_improper_lists, no_fun_app, no_match,
+ no_opaque, no_fail_call,
+ error_handling, no_match,
+ unmatched_returns,
+ behaviours, underspecs]}]}.
{edoc_opts, [{doclet, edown_doclet},
- {top_level_readme,
- {"./README.md",
- "http://github.com/esl/gproc"}}]}.
+ {top_level_readme,
+ {"./README.md",
+ "http://github.com/esl/gproc"}}]}.
View
278 src/gproc.erl
@@ -22,7 +22,7 @@
%%
%% Type and scope for registration and lookup:
%%
-%% @type type() = n | p | c | a. n = name; p = property; c = counter;
+%% @type type() = n | p | c | a. n = name; p = property; c = counter;
%% a = aggregate_counter
%% @type scope() = l | g. l = local registration; g = global registration
%%
@@ -46,11 +46,11 @@
%% @end
-module(gproc).
-behaviour(gen_server).
-
+
-export([start_link/0,
reg/1, reg/2, unreg/1,
mreg/3,
- munreg/3,
+ munreg/3,
set_value/2,
get_value/1,
where/1,
@@ -74,8 +74,8 @@
%% Environment handling
-export([get_env/3, get_env/4,
- get_set_env/3, get_set_env/4,
- set_env/5]).
+ get_set_env/3, get_set_env/4,
+ set_env/5]).
%% Convenience functions
-export([add_local_name/1,
@@ -134,7 +134,7 @@
%%
%% @doc Starts the gproc server.
%%
-%% This function is intended to be called from gproc_sup, as part of
+%% This function is intended to be called from gproc_sup, as part of
%% starting the gproc application.
%% @end
start_link() ->
@@ -203,7 +203,7 @@ add_local_aggr_counter(Name) -> reg({a,l,Name}).
%% @end
%%
add_global_aggr_counter(Name) -> reg({a,g,Name}).
-
+
%% @spec (Name::any()) -> pid()
%%
@@ -238,7 +238,7 @@ lookup_local_aggr_counter(Name) -> lookup_value({a,l,Name}).
%% @end
%%
lookup_global_aggr_counter(Name) -> lookup_value({a,g,Name}).
-
+
%% @spec (Property::any()) -> [{pid(), Value}]
%%
@@ -298,9 +298,9 @@ get_env(Scope, App, Key) ->
%% `{p, Scope, {gproc_env, App, Key}}'. If this fails, it will try the provided
%% alternative strategy. `Strategy' is a list of alternatives, tried in order.
%% Each alternative can be one of:
-%%
+%%
%% * `app_env' - try `application:get_env(App, Key)'
-%% * `os_env' - try `os:getenv(ENV)', where `ENV' is `Key' converted into an
+%% * `os_env' - try `os:getenv(ENV)', where `ENV' is `Key' converted into an
%% uppercase string
%% * `{os_env, ENV}' - try `os:getenv(ENV)'
%% * `inherit' - inherit the cached value, if any, held by the (proc_lib) parent.
@@ -308,16 +308,16 @@ get_env(Scope, App, Key) ->
%% * `{inherit, Id}' - inherit the cached value, if any, held by the process
%% registered in `gproc' as `Id'.
%% * `init_arg' - try `init:get_argument(Key)'; expects a single value, if any.
-%% * `{mnesia, ActivityType, Oid, Pos}' - try
+%% * `{mnesia, ActivityType, Oid, Pos}' - try
%% `mnesia:activity(ActivityType, fun() -> mnesia:read(Oid) end)'; retrieve the
%% value in position `Pos' if object found.
%% * `{default, Value}' - set a default value to return once alternatives have been
%% exhausted; if not set, `undefined' will be returned.
%% * `error' - raise an exception, `erlang:error(gproc_env, [App, Key, Scope])'.
%%
-%% While any alternative can occur more than once, the only one that might make
-%% sense to use multiple times is `{default, Value}'.
-%%
+%% While any alternative can occur more than once, the only one that might make
+%% sense to use multiple times is `{default, Value}'.
+%%
%% The return value will be one of:
%%
%% * The value of the first matching alternative, or exception caused by `error',
@@ -325,7 +325,7 @@ get_env(Scope, App, Key) ->
%% * The last instance of `{default, Value}', or `undefined', if there is no
%% matching alternative, default or `error' entry in the list.
%%
-%% The `error' option can be used to assert that a value has been previously
+%% The `error' option can be used to assert that a value has been previously
%% cached. Alternatively, it can be used to assert that a value is either cached
%% or at least defined somewhere, e.g. `get_env(l, mnesia, dir, [app_env, error])'.
%% @end
@@ -342,7 +342,7 @@ get_set_env(Scope, App, Key) ->
%% @spec get_set_env(Scope::scope(), App::atom(), Key::atom(), Strategy) -> Value
%% @doc Fetch and cache an environment value, if not already cached.
%%
-%% This function does the same thing as {@link get_env/4}, but also updates the
+%% This function does the same thing as {@link get_env/4}, but also updates the
%% cache. Note that the cache will be updated even if the result of the lookup
%% is `undefined'.
%%
@@ -356,10 +356,10 @@ get_set_env(Scope, App, Key, Strategy)
do_get_env(Context, App, Key, Alternatives, Set) ->
case lookup_env(Context, App, Key, self()) of
- undefined ->
- check_alternatives(Alternatives, Context, App, Key, undefined, Set);
- {ok, Value} ->
- Value
+ undefined ->
+ check_alternatives(Alternatives, Context, App, Key, undefined, Set);
+ {ok, Value} ->
+ Value
end.
%% @spec set_env(Scope::scope(), App::atom(),
@@ -372,7 +372,7 @@ do_get_env(Context, App, Key, Alternatives, Set) ->
%%
%% This function should be exercised with caution, as it affects the larger
%% environment outside gproc. This function modifies the cached value, and then
-%% proceeds to update the underlying environment (OS environment variable or
+%% proceeds to update the underlying environment (OS environment variable or
%% application environment variable).
%%
%% When the `mnesia' alternative is used, gproc will try to update any existing
@@ -385,32 +385,32 @@ set_env(Scope, App, Key, Value, Strategy)
when Scope==l, is_atom(App), is_atom(Key);
Scope==g, is_atom(App), is_atom(Key) ->
case is_valid_set_strategy(Strategy, Value) of
- true ->
- update_cached_env(Scope, App, Key, Value),
- set_strategy(Strategy, App, Key, Value);
- false ->
- erlang:error(badarg)
+ true ->
+ update_cached_env(Scope, App, Key, Value),
+ set_strategy(Strategy, App, Key, Value);
+ false ->
+ erlang:error(badarg)
end.
check_alternatives([{default, Val}|Alts], Scope, App, Key, _, Set) ->
check_alternatives(Alts, Scope, App, Key, Val, Set);
check_alternatives([H|T], Scope, App, Key, Def, Set) ->
case try_alternative(H, App, Key, Scope) of
- undefined ->
- check_alternatives(T, Scope, App, Key, Def, Set);
- {ok, Value} ->
- if Set ->
- cache_env(Scope, App, Key, Value),
- Value;
- true ->
- Value
- end
+ undefined ->
+ check_alternatives(T, Scope, App, Key, Def, Set);
+ {ok, Value} ->
+ if Set ->
+ cache_env(Scope, App, Key, Value),
+ Value;
+ true ->
+ Value
+ end
end;
check_alternatives([], Scope, App, Key, Def, Set) ->
if Set ->
- cache_env(Scope, App, Key, Def);
+ cache_env(Scope, App, Key, Def);
true ->
- ok
+ ok
end,
Def.
@@ -418,47 +418,47 @@ try_alternative(error, App, Key, Scope) ->
erlang:error(gproc_env, [App, Key, Scope]);
try_alternative(inherit, App, Key, Scope) ->
case get('$ancestors') of
- [P|_] ->
- lookup_env(Scope, App, Key, P);
- _ ->
- undefined
+ [P|_] ->
+ lookup_env(Scope, App, Key, P);
+ _ ->
+ undefined
end;
try_alternative({inherit, P}, App, Key, Scope) when is_pid(P) ->
lookup_env(Scope, App, Key, P);
try_alternative({inherit, P}, App, Key, Scope) ->
case where(P) of
- undefined -> undefined;
- Pid when is_pid(Pid) ->
- lookup_env(Scope, App, Key, Pid)
+ undefined -> undefined;
+ Pid when is_pid(Pid) ->
+ lookup_env(Scope, App, Key, Pid)
end;
try_alternative(app_env, App, Key, _Scope) ->
case application:get_env(App, Key) of
- undefined -> undefined;
- {ok, undefined} -> undefined;
- {ok, Value} -> {ok, Value}
+ undefined -> undefined;
+ {ok, undefined} -> undefined;
+ {ok, Value} -> {ok, Value}
end;
try_alternative(os_env, _App, Key, _) ->
case os:getenv(os_env_key(Key)) of
- "" -> undefined;
- Val -> {ok, Val}
+ "" -> undefined;
+ Val -> {ok, Val}
end;
try_alternative({os_env, Key}, _, _, _) ->
case os:getenv(Key) of
- "" -> undefined;
- Val -> {ok, Val}
+ "" -> undefined;
+ Val -> {ok, Val}
end;
try_alternative(init_arg, _, Key, _) ->
case init:get_argument(Key) of
- {ok, [[Value]]} ->
- {ok, Value};
- error ->
- undefined
+ {ok, [[Value]]} ->
+ {ok, Value};
+ error ->
+ undefined
end;
try_alternative({mnesia,Type,Key,Pos}, _, _, _) ->
case mnesia:activity(Type, fun() -> mnesia:read(Key) end) of
- [] -> undefined;
- [Found] ->
- {ok, element(Pos, Found)}
+ [] -> undefined;
+ [Found] ->
+ {ok, element(Pos, Found)}
end.
os_env_key(Key) ->
@@ -466,10 +466,10 @@ os_env_key(Key) ->
lookup_env(Scope, App, Key, P) ->
case ets:lookup(?TAB, {{p, Scope, {gproc_env, App, Key}}, P}) of
- [] ->
- undefined;
- [{_, _, Value}] ->
- {ok, Value}
+ [] ->
+ undefined;
+ [{_, _, Value}] ->
+ {ok, Value}
end.
cache_env(Scope, App, Key, Value) ->
@@ -477,10 +477,10 @@ cache_env(Scope, App, Key, Value) ->
update_cached_env(Scope, App, Key, Value) ->
case lookup_env(Scope, App, Key, self()) of
- undefined ->
- cache_env(Scope, App, Key, Value);
- {ok, _} ->
- set_value({p, Scope, {gproc_env, App, Key}}, Value)
+ undefined ->
+ cache_env(Scope, App, Key, Value);
+ {ok, _} ->
+ set_value({p, Scope, {gproc_env, App, Key}}, Value)
end.
is_valid_set_strategy([os_env|T], Value) ->
@@ -498,29 +498,29 @@ is_valid_set_strategy(_, _) ->
set_strategy([H|T], App, Key, Value) ->
case H of
- app_env ->
- application:set_env(App, Key, Value);
- os_env ->
- os:putenv(os_env_key(Key), Value);
- {os_env, ENV} ->
- os:putenv(ENV, Value);
- {mnesia,Type,Oid,Pos} ->
- mnesia:activity(
- Type,
- fun() ->
- Rec = case mnesia:read(Oid) of
- [] ->
- {Tab,K} = Oid,
- Tag = mnesia:table_info(Tab, record_name),
- Attrs = mnesia:table_info(Tab, attributes),
- list_to_tuple(
- [Tag,K |
- [undefined || _ <- tl(Attrs)]]);
- [Old] ->
- Old
- end,
- mnesia:write(setelement(Pos, Rec, Value))
- end)
+ app_env ->
+ application:set_env(App, Key, Value);
+ os_env ->
+ os:putenv(os_env_key(Key), Value);
+ {os_env, ENV} ->
+ os:putenv(ENV, Value);
+ {mnesia,Type,Oid,Pos} ->
+ mnesia:activity(
+ Type,
+ fun() ->
+ Rec = case mnesia:read(Oid) of
+ [] ->
+ {Tab,K} = Oid,
+ Tag = mnesia:table_info(Tab, record_name),
+ Attrs = mnesia:table_info(Tab, attributes),
+ list_to_tuple(
+ [Tag,K |
+ [undefined || _ <- tl(Attrs)]]);
+ [Old] ->
+ Old
+ end,
+ mnesia:write(setelement(Pos, Rec, Value))
+ end)
end,
set_strategy(T, App, Key, Value);
set_strategy([], _, _, Value) ->
@@ -528,11 +528,11 @@ set_strategy([], _, _, Value) ->
is_string(S) ->
try begin _ = iolist_to_binary(S),
- true
- end
+ true
+ end
catch
- error:_ ->
- false
+ error:_ ->
+ false
end.
%% @spec reg(Key::key()) -> true
@@ -556,11 +556,11 @@ await(Key) ->
%% Timeout = integer() | infinity
%%
%% @doc Wait for a local name to be registered.
-%% The function raises an exception if the timeout expires. Timeout must be
+%% The function raises an exception if the timeout expires. Timeout must be
%% either an interger &gt; 0 or 'infinity'.
%% A small optimization: we first perform a lookup, to see if the name
-%% is already registered. This way, the cost of the operation will be
-%% roughly the same as of where/1 in the case where the name is already
+%% is already registered. This way, the cost of the operation will be
+%% roughly the same as of where/1 in the case where the name is already
%% registered (the difference: await/2 also returns the value).
%% @end
%%
@@ -594,10 +594,10 @@ request_wait({n,C,_} = Key, Timeout) when C==l; C==g ->
end,
receive
{gproc, WRef, registered, {_K, Pid, V}} ->
- case TRef of
- no_timer -> ignore;
- _ -> erlang:cancel_timer(TRef)
- end,
+ _ = case TRef of
+ no_timer -> ignore;
+ _ -> erlang:cancel_timer(TRef)
+ end,
{Pid, V};
{timeout, TRef, gproc_timeout} ->
cancel_wait(Key, WRef),
@@ -627,7 +627,7 @@ cancel_wait({_,g,_} = Key, Ref) ->
cancel_wait({_,l,_} = Key, Ref) ->
cast({cancel_wait, self(), Key, Ref}, l),
ok.
-
+
%% @spec reg(Key::key(), Value) -> true
%%
@@ -652,7 +652,7 @@ reg(_, _) ->
%% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
%%
%% @doc Register multiple {Key,Value} pairs of a given type and scope.
-%%
+%%
%% This function is more efficient than calling {@link reg/2} repeatedly.
%% It is also atomic in regard to unique names; either all names are registered
%% or none are.
@@ -674,7 +674,7 @@ mreg(_, _, _) ->
%% @spec munreg(type(), scope(), [Key::any()]) -> true
%%
%% @doc Unregister multiple Key items of a given type and scope.
-%%
+%%
%% This function is usually more efficient than calling {@link unreg/1}
%% repeatedly.
%% @end
@@ -694,14 +694,14 @@ munreg(_, _, _) ->
existing(T,Scope,L) ->
Keys = if T==p; T==c ->
- [{{T,Scope,K}, self()} || K <- L];
- T==a; T==n ->
- [{{T,Scope,K}, T} || K <- L]
- end,
+ [{{T,Scope,K}, self()} || K <- L];
+ T==a; T==n ->
+ [{{T,Scope,K}, T} || K <- L]
+ end,
_ = [case ets:member(?TAB, K) of
- false -> erlang:error(badarg);
- true -> true
- end || K <- Keys],
+ false -> erlang:error(badarg);
+ true -> true
+ end || K <- Keys],
L.
@@ -720,7 +720,7 @@ unreg(Key) ->
case ets:member(?TAB, {Key,self()}) of
true ->
_ = gproc_lib:remove_reg(Key, self()),
- true;
+ true;
false ->
erlang:error(badarg)
end
@@ -779,7 +779,7 @@ local_munreg(T, L) when T==p; T==c ->
%% @spec (Key :: key(), Value) -> true
%% @doc Sets the value of the registeration entry given by Key
-%%
+%%
%% Key is assumed to exist and belong to the calling process.
%% If it doesn't, this function will exit.
%%
@@ -852,7 +852,7 @@ lookup_value({T,_,_} = Key) ->
%% @spec (Key::key()) -> pid()
%%
%% @doc Returns the pid registered as Key
-%%
+%%
%% The type of registration entry must be either name or aggregated counter.
%% Otherwise this function will exit. Use {@link lookup_pids/1} in these
%% cases.
@@ -926,7 +926,7 @@ lookup_values({T,_,_} = Key) ->
%% @doc Updates the counter registered as Key for the current process.
%%
%% This function works like ets:update_counter/3
-%% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
+%% (see [http://www.erlang.org/doc/man/ets.html#update_counter-3]), but
%% will fail if the type of object referred to by Key is not a counter.
%% @end
%%
@@ -946,14 +946,14 @@ update_counter(_, _) ->
%% from one process to another, and returns the pid of the new owner.
%%
%% `To' must be either a pid or a unique name (name or aggregated counter), but
-%% does not necessarily have to resolve to an existing process. If there is
+%% does not necessarily have to resolve to an existing process. If there is
%% no process registered with the `To' key, `give_away/2' returns `undefined',
%% and the `From' key is effectively unregistered.
%%
%% It is allowed to give away a key to oneself, but of course, this operation
%% will have no effect.
%%
-%% Fails with `badarg' if the calling process does not have a `From' key
+%% Fails with `badarg' if the calling process does not have a `From' key
%% registered.
%% @end
give_away({_,l,_} = Key, ToPid) when is_pid(ToPid), node(ToPid) == node() ->
@@ -968,9 +968,9 @@ give_away({_,g,_} = Key, To) ->
%%
%% @doc Sends a message to the process, or processes, corresponding to Key.
%%
-%% If Key belongs to a unique object (name or aggregated counter), this
+%% If Key belongs to a unique object (name or aggregated counter), this
%% function will send a message to the corresponding process, or fail if there
-%% is no such process. If Key is for a non-unique object type (counter or
+%% is no such process. If Key is for a non-unique object type (counter or
%% property), Msg will be send to all processes that have such an object.
%% @end
%%
@@ -1081,8 +1081,8 @@ step(Key, S, T) ->
%% ProcessInfo = [{gproc, [{Key,Value}]} | ProcessInfo]
%%
%% @doc Similar to `process_info(Pid)' but with additional gproc info.
-%%
-%% Returns the same information as process_info(Pid), but with the
+%%
+%% Returns the same information as process_info(Pid), but with the
%% addition of a `gproc' information item, containing the `{Key,Value}'
%% pairs registered to the process.
%% @end
@@ -1095,7 +1095,7 @@ info(Pid) when is_pid(Pid) ->
%% @doc Similar to process_info(Pid, Item), but with additional gproc info.
%%
%% For `Item = gproc', this function returns a list of `{Key, Value}' pairs
-%% registered to the process Pid. For other values of Item, it returns the
+%% registered to the process Pid. For other values of Item, it returns the
%% same as [http://www.erlang.org/doc/man/erlang.html#process_info-2].
%% @end
info(Pid, ?MODULE) ->
@@ -1132,7 +1132,7 @@ handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
NewWaiters ->
ets:insert(?TAB, {K, NewWaiters}),
case lists:keymember(Pid, 1, NewWaiters) of
- true ->
+ true ->
%% should be extremely unlikely
ok;
false ->
@@ -1149,7 +1149,7 @@ handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) ->
handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
case try_insert_reg(Key, Val, Pid) of
true ->
- gproc_lib:ensure_monitor(Pid,l),
+ _ = gproc_lib:ensure_monitor(Pid,l),
{reply, true, S};
false ->
{reply, badarg, S}
@@ -1214,13 +1214,13 @@ handle_info(_, S) ->
%% @hidden
code_change(_FromVsn, S, _Extra) ->
%% We have changed local monitor markers from {Pid} to {Pid,l}.
- case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
- [] ->
- ok;
- Pids ->
- ets:insert(?TAB, [{P,l} || P <- Pids]),
- ets:select_delete(?TAB, [{{'_'},[],[true]}])
- end,
+ _ = case ets:select(?TAB, [{{'$1'},[],['$1']}]) of
+ [] ->
+ ok;
+ Pids ->
+ ets:insert(?TAB, [{P,l} || P <- Pids]),
+ ets:select_delete(?TAB, [{{'_'},[],[true]}])
+ end,
{ok, S}.
%% @hidden
@@ -1281,7 +1281,7 @@ try_insert_reg({T,l,_} = Key, Val, Pid) ->
audit_process(Pid) when is_pid(Pid) ->
ok = gen_server:call(gproc, {audit_process, Pid}, infinity).
-
+
-spec process_is_down(pid()) -> ok.
@@ -1289,7 +1289,7 @@ process_is_down(Pid) when is_pid(Pid) ->
%% delete the monitor marker
%% io:fwrite(user, "process_is_down(~p) - ~p~n", [Pid,ets:tab2list(?TAB)]),
ets:delete(?TAB, {Pid,l}),
- Revs = ets:select(?TAB, [{{{Pid,'$1'},r},
+ Revs = ets:select(?TAB, [{{{Pid,'$1'},r},
[{'==',{element,2,'$1'},l}], ['$1']}]),
lists:foreach(
fun({n,l,_}=K) ->
@@ -1313,7 +1313,7 @@ process_is_down(Pid) when is_pid(Pid) ->
[{_, _, Value}] = ets:lookup(?TAB, Key),
ets:delete(?TAB, Key),
gproc_lib:update_aggr_counter(l, C, -Value);
- ({a,l,_} = K) ->
+ ({a,l,_} = K) ->
ets:delete(?TAB, {K,a});
({p,_,_} = K) ->
ets:delete(?TAB, {K, Pid})
@@ -1335,7 +1335,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
ets:insert(?TAB, [{Key, ToPid, Value},
{{ToPid, K}, r}]),
ets:delete(?TAB, {Pid, K}),
- gproc_lib:ensure_monitor(ToPid, l),
+ _ = gproc_lib:ensure_monitor(ToPid, l),
ToPid;
undefined ->
_ = gproc_lib:remove_reg(K, Pid),
@@ -1359,7 +1359,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
{{ToPid, K}, r}]),
ets:delete(?TAB, {Pid, K}),
ets:delete(?TAB, Key),
- gproc_lib:ensure_monitor(ToPid, l),
+ _ = gproc_lib:ensure_monitor(ToPid, l),
ToPid
end;
undefined ->
@@ -1369,9 +1369,9 @@ do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
_ ->
badarg
end.
-
-pid_to_give_away_to(P) when is_pid(P), node(P) == node() ->
+
+pid_to_give_away_to(P) when is_pid(P), node(P) == node() ->
P;
pid_to_give_away_to({T,l,_} = Key) when T==n; T==a ->
case ets:lookup(?TAB, {Key, T}) of
@@ -1403,7 +1403,7 @@ set_monitors() ->
set_monitors('$end_of_table') ->
ok;
set_monitors({Pids, Cont}) ->
- [erlang:monitor(process,Pid) || Pid <- Pids],
+ _ = [erlang:monitor(process,Pid) || Pid <- Pids],
set_monitors(ets:select(Cont)).
@@ -1490,7 +1490,7 @@ headpat(T, C, V1,V2,V3) ->
{{{Kp,K2},V2,V3}, Vars}.
%% l(L) -> L.
-
+
subst(X, '_', _F, Vs) ->
View
545 src/gproc_dist.erl
@@ -23,32 +23,32 @@
-behaviour(gen_leader).
-export([start_link/0, start_link/1,
- reg/1, reg/2, unreg/1,
- mreg/2,
- munreg/2,
- set_value/2,
- give_away/2,
- update_counter/2]).
+ reg/1, reg/2, unreg/1,
+ mreg/2,
+ munreg/2,
+ set_value/2,
+ give_away/2,
+ update_counter/2]).
-export([leader_call/1,
- leader_cast/1,
- sync/0,
- get_leader/0]).
+ leader_cast/1,
+ sync/0,
+ get_leader/0]).
%%% internal exports
-export([init/1,
- handle_cast/3,
- handle_call/4,
- handle_info/2,
- handle_leader_call/4,
- handle_leader_cast/3,
- handle_DOWN/3,
+ handle_cast/3,
+ handle_call/4,
+ handle_info/2,
+ handle_leader_call/4,
+ handle_leader_cast/3,
+ handle_DOWN/3,
elected/2, % original version
- elected/3,
- surrendered/3,
- from_leader/3,
- code_change/4,
- terminate/2]).
+ elected/3,
+ surrendered/3,
+ from_leader/3,
+ code_change/4,
+ terminate/2]).
-include("gproc.hrl").
@@ -57,8 +57,10 @@
-record(state, {
always_broadcast = false,
is_leader,
- sync_requests = []}).
+ sync_requests = []}).
+%% ==========================================================
+%% Start functions
start_link() ->
start_link({[node()|nodes()], []}).
@@ -70,9 +72,9 @@ start_link(Nodes) when is_list(Nodes) ->
start_link({Nodes, Opts}) ->
gen_leader:start_link(
?SERVER, Nodes, Opts, ?MODULE, [], []).
-
-%% ?SERVER, Nodes, [],?MODULE, [], [{debug,[trace]}]).
+%% ==========================================================
+%% API
%% {@see gproc:reg/1}
%%
@@ -103,19 +105,17 @@ munreg(T, Keys) ->
if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
true -> erlang:error(badarg)
end.
-
unreg({_,g,_} = Key) ->
leader_call({unreg, Key, self()});
unreg(_) ->
erlang:error(badarg).
-
set_value({T,g,_} = Key, Value) when T==a; T==c ->
if is_integer(Value) ->
- leader_call({set, Key, Value});
+ leader_call({set, Key, Value});
true ->
- erlang:error(badarg)
+ erlang:error(badarg)
end;
set_value({_,g,_} = Key, Value) ->
leader_call({set, Key, Value, self()});
@@ -153,8 +153,8 @@ sync() ->
get_leader() ->
gen_leader:call(?MODULE, get_leader).
-%%% ==========================================================
-
+%% ==========================================================
+%% Server-side
handle_cast(_Msg, S, _) ->
{stop, unknown_cast, S}.
@@ -166,9 +166,6 @@ handle_call(_, _, S, _) ->
handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
leader_cast({pid_is_DOWN, Pid}),
-%% ets:select_delete(?TAB, [{{{Pid,'_'}}, [], [true]}]),
-%% ets:delete(?TAB, Pid),
-%% lists:foreach(fun(Key) -> gproc_lib:remove_reg_1(Key, Pid) end, Keys),
{ok, S};
handle_info(_, S) ->
{ok, S}.
@@ -214,47 +211,45 @@ handle_DOWN(Node, S, _E) ->
Broadcast ->
{ok, Broadcast, S1}
end.
-%% ets:select_delete(?TAB, [{Head, Gs, [true]}]),
-%% {ok, [{delete, Globs}], S}.
check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
SReqs1 = lists:flatmap(
- fun({From, Ns}) ->
- case Ns -- [Node] of
- [] ->
- gen_leader:reply(From, {leader, reply, true}),
- [];
- Ns1 ->
- [{From, Ns1}]
- end
- end, SReqs),
+ fun({From, Ns}) ->
+ case Ns -- [Node] of
+ [] ->
+ gen_leader:reply(From, {leader, reply, true}),
+ [];
+ Ns1 ->
+ [{From, Ns1}]
+ end
+ end, SReqs),
S#state{sync_requests = SReqs1}.
handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
case gen_leader:alive(E) -- [node()] of
- [] ->
- {reply, true, S};
- Alive ->
- gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
- {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
+ [] ->
+ {reply, true, S};
+ Alive ->
+ gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
+ {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
end;
handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
case gproc_lib:insert_reg(K, Value, Pid, g) of
- false ->
- {reply, badarg, S};
- true ->
- gproc_lib:ensure_monitor(Pid,g),
- Vals =
- if C == a ->
- ets:lookup(?TAB, {K,a});
- C == c ->
+ false ->
+ {reply, badarg, S};
+ true ->
+ _ = gproc_lib:ensure_monitor(Pid,g),
+ Vals =
+ if C == a ->
+ ets:lookup(?TAB, {K,a});
+ C == c ->
[{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
- C == n ->
- [{{K,n},Pid,Value}];
- true ->
- [{{K,Pid},Pid,Value}]
- end,
- {reply, true, [{insert, Vals}], S}
+ C == n ->
+ [{{K,n},Pid,Value}];
+ true ->
+ [{{K,Pid},Pid,Value}]
+ end,
+ {reply, true, [{insert, Vals}], S}
end;
handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
when is_integer(Incr) ->
@@ -267,102 +262,102 @@ handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
end;
handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
Key = if T == n; T == a -> {K,T};
- true -> {K, Pid}
- end,
+ true -> {K, Pid}
+ end,
case ets:member(?TAB, Key) of
- true ->
- gproc_lib:remove_reg(K, Pid),
- if T == c ->
- case ets:lookup(?TAB, {{a,g,Name},a}) of
- [Aggr] ->
- %% updated by remove_reg/2
- {reply, true, [{delete,[Key, {Pid,K}]},
- {insert, [Aggr]}], S};
- [] ->
- {reply, true, [{delete, [Key, {Pid,K}]}], S}
- end;
- true ->
- {reply, true, [{delete, [Key]}], S}
- end;
- false ->
- {reply, badarg, S}
+ true ->
+ _ = gproc_lib:remove_reg(K, Pid),
+ if T == c ->
+ case ets:lookup(?TAB, {{a,g,Name},a}) of
+ [Aggr] ->
+ %% updated by remove_reg/2
+ {reply, true, [{delete,[Key, {Pid,K}]},
+ {insert, [Aggr]}], S};
+ [] ->
+ {reply, true, [{delete, [Key, {Pid,K}]}], S}
+ end;
+ true ->
+ {reply, true, [{delete, [Key]}], S}
+ end;
+ false ->
+ {reply, badarg, S}
end;
handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
when T == a; T == n ->
Key = {K, T},
case ets:lookup(?TAB, Key) of
- [{_, Pid, Value}] ->
- case pid_to_give_away_to(To) of
- Pid ->
- {reply, Pid, S};
- ToPid when is_pid(ToPid) ->
- ets:insert(?TAB, [{Key, ToPid, Value},
- {{ToPid,K}, r}]),
- gproc_lib:ensure_monitor(ToPid, g),
- {reply, ToPid, [{delete, [Key, {Pid,K}]},
- {insert, [{Key, ToPid, Value}]}], S};
- undefined ->
- ets:delete(?TAB, Key),
- ets:delete(?TAB, {Pid, K}),
- {reply, undefined, [{delete, [Key, {Pid,K}]}], S}
- end;
- _ ->
- {reply, badarg, S}
+ [{_, Pid, Value}] ->
+ case pid_to_give_away_to(To) of
+ Pid ->
+ {reply, Pid, S};
+ ToPid when is_pid(ToPid) ->
+ ets:insert(?TAB, [{Key, ToPid, Value},
+ {{ToPid,K}, r}]),
+ _ = gproc_lib:ensure_monitor(ToPid, g),
+ {reply, ToPid, [{delete, [Key, {Pid,K}]},
+ {insert, [{Key, ToPid, Value}]}], S};
+ undefined ->
+ ets:delete(?TAB, Key),
+ ets:delete(?TAB, {Pid, K}),
+ {reply, undefined, [{delete, [Key, {Pid,K}]}], S}
+ end;
+ _ ->
+ {reply, badarg, S}
end;
handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
if T==p; T==n ->
- try gproc_lib:insert_many(T, g, L, Pid) of
- {true,Objs} -> {reply, true, [{insert,Objs}], S};
- false -> {reply, badarg, S}
- catch
- error:_ -> {reply, badarg, S}
- end;
+ try gproc_lib:insert_many(T, g, L, Pid) of
+ {true,Objs} -> {reply, true, [{insert,Objs}], S};
+ false -> {reply, badarg, S}
+ catch
+ error:_ -> {reply, badarg, S}
+ end;
true -> {reply, badarg, S}
end;
handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
try gproc_lib:remove_many(T, g, L, Pid) of
- [] ->
- {reply, true, S};
- Objs ->
- {reply, true, [{delete, Objs}], S}
+ [] ->
+ {reply, true, S};
+ Objs ->
+ {reply, true, [{delete, Objs}], S}
catch
- error:_ -> {reply, badarg, S}
+ error:_ -> {reply, badarg, S}
end;
handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
if T == a ->
- if is_integer(V) ->
- case gproc_lib:do_set_value(K, V, Pid) of
- true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
- false -> {reply, badarg, S}
- end
- end;
+ if is_integer(V) ->
+ case gproc_lib:do_set_value(K, V, Pid) of
+ true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
+ false -> {reply, badarg, S}
+ end
+ end;
T == c ->
- try gproc_lib:do_set_counter_value(K, V, Pid),
- AKey = {{a,g,N},a},
- Aggr = ets:lookup(?TAB, AKey), % may be []
- {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
- catch
- error:_ ->
- {reply, badarg, S}
- end;
+ try gproc_lib:do_set_counter_value(K, V, Pid),
+ AKey = {{a,g,N},a},
+ Aggr = ets:lookup(?TAB, AKey), % may be []
+ {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
+ catch
+ error:_ ->
+ {reply, badarg, S}
+ end;
true ->
- case gproc_lib:do_set_value(K, V, Pid) of
- true ->
- Obj = if T==n -> {{K, T}, Pid, V};
- true -> {{K, Pid}, Pid, V}
- end,
- {reply, true, [{insert,[Obj]}], S};
- false ->
- {reply, badarg, S}
- end
+ case gproc_lib:do_set_value(K, V, Pid) of
+ true ->
+ Obj = if T==n -> {{K, T}, Pid, V};
+ true -> {{K, Pid}, Pid, V}
+ end,
+ {reply, true, [{insert,[Obj]}], S};
+ false ->
+ {reply, badarg, S}
+ end
end;
handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
%% The pid in _From is of the gen_leader instance that forwarded the
%% call - not of the client. This is why the Pid is explicitly passed.
%% case gproc_lib:await(Key, {Pid,Ref}) of
case gproc_lib:await(Key, Pid, From) of
- {reply, {Ref, {K, P, V}}} ->
- {reply, {Ref, {K, P, V}}, S};
+ {reply, {Ref, {K, P, V}}} ->
+ {reply, {Ref, {K, P, V}}, S};
{reply, Reply, Insert} ->
{reply, Reply, [{insert, Insert}], S}
end;
@@ -372,21 +367,21 @@ handle_leader_call(_, _, S, _E) ->
handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
#state{sync_requests = SReqs} = S,
case lists:keyfind(Ref, 1, SReqs) of
- false ->
- %% This should never happen, except perhaps if the leader who
- %% received the sync request died, and the new leader gets the
- %% sync reply. In that case, we trust that the client has been notified
- %% anyway, and ignore the message.
- {ok, S};
- {_, Ns} ->
- case lists:delete(Node, Ns) of
- [] ->
- gen_leader:reply(Ref, {leader, reply, true}),
- {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
- Ns1 ->
- SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
- {ok, S#state{sync_requests = SReqs1}}
- end
+ false ->
+ %% This should never happen, except perhaps if the leader who
+ %% received the sync request died, and the new leader gets the
+ %% sync reply. In that case, we trust that the client has been notified
+ %% anyway, and ignore the message.
+ {ok, S};
+ {_, Ns} ->
+ case lists:delete(Node, Ns) of
+ [] ->
+ gen_leader:reply(Ref, {leader, reply, true}),
+ {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
+ Ns1 ->
+ SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
+ {ok, S#state{sync_requests = SReqs1}}
+ end
end;
handle_leader_cast({add_globals, Missing}, S, _E) ->
%% This is an audit message: a peer (non-leader) had info about granted
@@ -399,13 +394,13 @@ handle_leader_cast({remove_globals, Globals}, S, _E) ->
{ok, S};
handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
Globals = ets:select(?TAB, [{{{Pid,'$1'},r},
- [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
+ [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
ets:delete(?TAB, {Pid,g}),
case process_globals(Globals) of
- [] ->
- {ok, S};
- Broadcast ->
- {ok, Broadcast, S}
+ [] ->
+ {ok, S};
+ Broadcast ->
+ {ok, Broadcast, S}
end.
process_globals(Globals) ->
@@ -427,69 +422,58 @@ process_globals(Globals) ->
[{Op,Objs} || {Op,Objs} <- [{insert,Modified},
{delete,Globals}], Objs =/= []].
-
code_change(_FromVsn, S, _Extra, _E) ->
{ok, S}.
terminate(_Reason, _S) ->
ok.
-
-
from_leader({sync, Ref}, S, _E) ->
gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
{ok, S};
from_leader(Ops, S, _E) ->
lists:foreach(
fun({delete, Globals}) ->
- delete_globals(Globals);
- ({insert, Globals}) ->
- ets:insert(?TAB, Globals),
- lists:foreach(
- fun({{{_,g,_}=Key,_}, P, _}) ->
- ets:insert(?TAB, {{P,Key},r}),
- gproc_lib:ensure_monitor(P,g);
+ delete_globals(Globals);
+ ({insert, Globals}) ->
+ ets:insert(?TAB, Globals),
+ lists:foreach(
+ fun({{{_,g,_}=Key,_}, P, _}) ->
+ ets:insert(?TAB, {{P,Key},r}),
+ gproc_lib:ensure_monitor(P,g);
({{P,_K},r}) ->
gproc_lib:ensure_monitor(P,g);
(_) ->
skip
- end, Globals)
+ end, Globals)
end, Ops),
{ok, S}.
delete_globals(Globals) ->
lists:foreach(
fun({{_,g,_},T} = K) when is_atom(T) ->
- ets:delete(?TAB, K);
- ({Key, Pid}) when is_pid(Pid) ->
+ ets:delete(?TAB, K);
+ ({Key, Pid}) when is_pid(Pid) ->
K = ets_key(Key,Pid),
- ets:delete(?TAB, K),
- ets:delete(?TAB, {Pid, Key});
- ({Pid, K}) when is_pid(Pid) ->
- ets:delete(?TAB, {Pid, K})
- %% case node(Pid) =:= node() of
- %% true ->
- %% ets:delete(?TAB, {Pid,g});
- %% _ -> ok
- %% end
+ ets:delete(?TAB, K),
+ ets:delete(?TAB, {Pid, Key});
+ ({Pid, K}) when is_pid(Pid) ->
+ ets:delete(?TAB, {Pid, K})
end, Globals).
ets_key({T,_,_} = K, _) when T==n; T==a ->
{K, T};
ets_key(K, Pid) ->
{K, Pid}.
-
leader_call(Req) ->
case gen_leader:leader_call(?MODULE, Req) of
- badarg -> erlang:error(badarg, Req);
- Reply -> Reply
+ badarg -> erlang:error(badarg, Req);
+ Reply -> Reply
end.
leader_cast(Msg) ->
gen_leader:leader_cast(?MODULE, Msg).
-
-
init(Opts) ->
S0 = #state{},
@@ -497,44 +481,42 @@ init(Opts) ->
S0#state.always_broadcast),
{ok, #state{always_broadcast = AlwaysBcast}}.
-
surrendered_1(Globs) ->
My_local_globs =
- ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
- [{'==', {node,'$1'}, node()}],
- ['$_']}]),
+ ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
+ [{'==', {node,'$1'}, node()}],
+ ['$_']}]),
%% remove all remote globals - we don't have monitors on them.
ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
- [{'=/=', {node,'$1'}, node()}],
- [true]}]),
+ [{'=/=', {node,'$1'}, node()}],
+ [true]}]),
%% insert new non-local globals, collect the leader's version of
%% what my globals are
Ldr_local_globs =
- lists:foldl(
- fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
- ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
- Acc;
- ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
- [Obj|Acc]
- end, [], Globs),
+ lists:foldl(
+ fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
+ ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
+ Acc;
+ ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
+ [Obj|Acc]
+ end, [], Globs),
case [{K,P,V} || {K,P,V} <- My_local_globs,
- not(lists:keymember(K, 1, Ldr_local_globs))] of
- [] ->
- %% phew! We have the same picture
- ok;
- [_|_] = Missing ->
- %% This is very unlikely, I think
- leader_cast({add_globals, Missing})
+ not(lists:keymember(K, 1, Ldr_local_globs))] of
+ [] ->
+ %% phew! We have the same picture
+ ok;
+ [_|_] = Missing ->
+ %% This is very unlikely, I think
+ leader_cast({add_globals, Missing})
end,
case [{K,P} || {K,P,_} <- Ldr_local_globs,
- not(lists:keymember(K, 1, My_local_globs))] of
- [] ->
- ok;
- [_|_] = Remove ->
- leader_cast({remove_globals, Remove})
+ not(lists:keymember(K, 1, My_local_globs))] of
+ [] ->
+ ok;
+ [_|_] = Remove ->
+ leader_cast({remove_globals, Remove})
end.
-
update_aggr_counter({c,g,Ctr}, Incr) ->
Key = {{a,g,Ctr},a},
case ets:lookup(?TAB, Key) of
@@ -555,154 +537,3 @@ pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
_ ->
undefined
end.
-
-%% -ifdef(TEST).
-
-%% dist_test_() ->
-%% {timeout, 60,
-%% [{foreach,
-%% fun() ->
-%% Ns = start_slaves([n1, n2]),
-%% %% dbg:tracer(),
-%% %% [dbg:n(N) || N <- Ns],
-%% %% dbg:tpl(gproc_dist, x),
-%% %% dbg:p(all,[c]),
-%% Ns
-%% end,
-%% fun(Ns) ->
-%% [rpc:call(N, init, stop, []) || N <- Ns]
-%% end,
-%% [
-%% {with, [fun(Ns) -> {in_parallel, [fun(X) -> t_simple_reg(X) end,
-%% fun(X) -> t_await_reg(X) end,
-%% fun(X) -> t_give_away(X) end]
-%% }
-%% end]}
-%% ]}
-%% ]}.
-
-%% -define(T_NAME, {n, g, {?MODULE, ?LINE}}).
-
-%% t_simple_reg([H|_] = Ns) ->
-%% ?debugMsg(t_simple_reg),
-%% Name = ?T_NAME,
-%% P = t_spawn_reg(H, Name),
-%% ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
-%% ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
-%% ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
-%% ?assertMatch(ok, t_call(P, die)).
-
-%% t_await_reg([A,B|_]) ->
-%% ?debugMsg(t_await_reg),
-%% Name = ?T_NAME,
-%% P = t_spawn(A),
-%% P ! {self(), {apply, gproc, await, [Name]}},
-%% P1 = t_spawn_reg(B, Name),
-%% ?assert(P1 == receive
-%% {P, Res} ->
-%% element(1, Res)
-%% end),
-%% ?assertMatch(ok, t_call(P, die)),
-%% ?assertMatch(ok, t_call(P1, die)).
-
-%% t_give_away([A,B|_] = Ns) ->
-%% ?debugMsg(t_give_away),
-%% Na = ?T_NAME,
-%% Nb = ?T_NAME,
-%% Pa = t_spawn_reg(A, Na),
-%% Pb = t_spawn_reg(B, Nb),
-%% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
-%% ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
-%% %% ?debugHere,
-%% ?assertMatch(Pb, t_call(Pa, {apply, {gproc, give_away, [Na, Nb]}})),
-%% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
-%% %% ?debugHere,
-%% ?assertMatch(Pa, t_call(Pa, {apply, {gproc, give_away, [Na, Pa]}})),
-%% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
-%% %% ?debugHere,
-%% ?assertMatch(ok, t_call(Pa, die)),
-%% ?assertMatch(ok, t_call(Pb, die)).
-
-%% t_sleep() ->
-%% timer:sleep(1000).
-
-%% t_lookup_everywhere(Key, Nodes, Exp) ->
-%% t_lookup_everywhere(Key, Nodes, Exp, 3).
-
-%% t_lookup_everywhere(Key, _, Exp, 0) ->
-%% {lookup_failed, Key, Exp};
-%% t_lookup_everywhere(Key, Nodes, Exp, I) ->
-%% Expected = [{N, Exp} || N <- Nodes],
-%% Found = [{N,rpc:call(N, gproc, where, [Key])} || N <- Nodes],
-%% if Expected =/= Found ->
-%% ?debugFmt("lookup ~p failed (~p), retrying...~n", [Key, Found]),
-%% t_sleep(),
-%% t_lookup_everywhere(Key, Nodes, Exp, I-1);
-%% true ->
-%% ok
-%% end.
-
-
-%% t_spawn(Node) ->
-%% Me = self(),
-%% P = spawn(Node, fun() ->
-%% Me ! {self(), ok},
-%% t_loop()
-%% end),
-%% receive
-%% {P, ok} -> P
-%% end.
-
-%% t_spawn_reg(Node, Name) ->
-%% Me = self(),
-%% spawn(Node, fun() ->
-%% ?assertMatch(true, gproc:reg(Name)),
-%% Me ! {self(), ok},
-%% t_loop()
-%% end),
-%% receive
-%% {P, ok} -> P
-%% end.
-
-%% t_call(P, Req) ->
-%% P ! {self(), Req},
-%% receive
-%% {P, Res} ->
-%% Res
-%% end.
-
-%% t_loop() ->
-%% receive
-%% {From, die} ->
-%% From ! {self(), ok};
-%% {From, {apply, M, F, A}} ->
-%% From ! {self(), apply(M, F, A)},
-%% t_loop()
-%% end.
-
-%% start_slaves(Ns) ->
-%% [H|T] = Nodes = [start_slave(N) || N <- Ns],
-%% %% ?debugVal([pong = rpc:call(H, net, ping, [N]) || N <- T]),
-%% %% ?debugVal(rpc:multicall(Nodes, application, start, [gproc])),
-%% Nodes.
-
-%% start_slave(Name) ->
-%% case node() of
-%% nonode@nohost ->
-%% os:cmd("epmd -daemon"),
-%% {ok, _} = net_kernel:start([gproc_master, shortnames]);
-%% _ ->
-%% ok
-%% end,
-%% {ok, Node} = slave:start(
-%% host(), Name,
-%% "-pa . -pz ../ebin -pa ../deps/gen_leader/ebin "
-%% "-gproc gproc_dist all"),
-%% %% io:fwrite(user, "Slave node: ~p~n", [Node]),
-%% Node.
-
-%% host() ->
-%% [Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
-%% list_to_atom(Host).
-
-%% -endif.
View
6 src/gproc_init.erl
@@ -35,12 +35,12 @@ soft_reset() ->
hard_reset() ->
%% exit normal {n,'_','_'}
- [ exit(Pid,normal) || Pid <- gproc:lookup_pids({n,'_','_'}),
- (node(Pid) =/= node()) orelse is_process_alive(Pid) ],
+ _ = [ exit(Pid,normal) || Pid <- gproc:lookup_pids({n,'_','_'}),
+ (node(Pid) =/= node()) orelse is_process_alive(Pid) ],
%% kill via supervisor
ok = supervisor:terminate_child(gproc_sup, gproc),
%% delete ets table
- [ ets:delete(Tab) || Tab <- ets:all(), Tab =:= gproc ],
+ _ = [ ets:delete(Tab) || Tab <- ets:all(), Tab =:= gproc ],
%% restart via supervisor
{ok,_} = supervisor:restart_child(gproc_sup, gproc),
ok.
View
60 src/gproc_lib.erl
@@ -22,15 +22,15 @@
-module(gproc_lib).
-export([await/3,
- do_set_counter_value/3,
- do_set_value/3,
- ensure_monitor/2,
- insert_many/4,
- insert_reg/4,
- remove_many/4,
- remove_reg/2,
- update_aggr_counter/3,
- update_counter/3]).
+ do_set_counter_value/3,
+ do_set_value/3,
+ ensure_monitor/2,
+ insert_many/4,
+ insert_reg/4,
+ remove_many/4,
+ remove_reg/2,
+ update_aggr_counter/3,
+ update_counter/3]).
-include("gproc.hrl").
@@ -92,7 +92,7 @@ insert_many(T, Scope, KVL, Pid) ->
true ->
RevObjs = mk_reg_rev_objs(T, Scope, Pid, KVL),
ets:insert(?TAB, RevObjs),
- gproc_lib:ensure_monitor(Pid, Scope),
+ _ = gproc_lib:ensure_monitor(Pid, Scope),
{true, Objs};
false ->
Existing = [{Obj, ets:lookup(?TAB, K)} || {K,_,_} = Obj <- Objs],
@@ -108,13 +108,13 @@ insert_many(T, Scope, KVL, Pid) ->
false ->
%% possibly waiters, but they are handled in next step
insert_objects(Existing),
- gproc_lib:ensure_monitor(Pid, Scope),
+ _ = gproc_lib:ensure_monitor(Pid, Scope),
{true, Objs}
end
end.
-spec insert_objects([{key(), pid(), any()}]) -> ok.
-
+
insert_objects(Objs) ->
lists:foreach(
fun({{{Id,_} = _K, Pid, V} = Obj, Existing}) ->
@@ -132,25 +132,25 @@ await({T,C,_} = Key, WPid, {_Pid, Ref} = From) ->
case ets:lookup(?TAB, {Key,T}) of
[{_, P, Value}] ->
%% for symmetry, we always reply with Ref and then send a message
- if C == g ->
- %% in the global case, we bundle the reply, since otherwise
- %% the messages can pass each other
- {reply, {Ref, {Key, P, Value}}};
- true ->
- gen_server:reply(From, Ref),
- WPid ! {gproc, Ref, registered, {Key, P, Value}},
- noreply
- end;
+ if C == g ->
+ %% in the global case, we bundle the reply, since otherwise
+ %% the messages can pass each other
+ {reply, {Ref, {Key, P, Value}}};
+ true ->
+ gen_server:reply(From, Ref),
+ WPid ! {gproc, Ref, registered, {Key, P, Value}},
+ noreply
+ end;
[{K, Waiters}] ->
NewWaiters = [{WPid,Ref} | Waiters],
W = {K, NewWaiters},
ets:insert(?TAB, [W, Rev]),
- gproc_lib:ensure_monitor(WPid,C),
+ _ = gproc_lib:ensure_monitor(WPid,C),
{reply, Ref, [W,Rev]};
[] ->
W = {{Key,T}, [{WPid,Ref}]},
ets:insert(?TAB, [W, Rev]),
- gproc_lib:ensure_monitor(WPid,C),
+ _ = gproc_lib:ensure_monitor(WPid,C),
{reply, Ref, [W,Rev]}
end.
@@ -170,10 +170,10 @@ maybe_waiters(K, Pid, Value, T, Info) ->
-spec notify_waiters([{pid(), reference()}], key(), pid(), any()) -> ok.
notify_waiters(Waiters, K, Pid, V) ->
- [begin
- P ! {gproc, Ref, registered, {K, Pid, V}},
- ets:delete(?TAB, {P, K})
- end || {P, Ref} <- Waiters],
+ _ = [begin
+ P ! {gproc, Ref, registered, {K, Pid, V}},
+ ets:delete(?TAB, {P, K})
+ end || {P, Ref} <- Waiters],
ok.
@@ -208,9 +208,9 @@ remove_reg(Key, Pid) ->
remove_many(T, Scope, L, Pid) ->
lists:flatmap(fun(K) ->
- Key = {T, Scope, K},
- remove_reg(Key, Pid)
- end, L).
+ Key = {T, Scope, K},
+ remove_reg(Key, Pid)
+ end, L).
remove_reg_1({c,_,_} = Key, Pid) ->
remove_counter_1(Key, ets:lookup_element(?TAB, Reg = {Key,Pid}, 3), Pid),
View
12 src/gproc_sup.erl
@@ -42,12 +42,12 @@ init(_Args) ->
permanent, 2000, worker, [gproc]},
Dist = case application:get_env(gproc_dist) of
- undefined -> [];
- {ok, false} -> [];
- {ok, Env} ->
- [{gproc_dist, {gproc_dist, start_link, [Env]},
- permanent, 2000, worker, [gproc_dist]}]
- end,
+ undefined -> [];
+ {ok, false} -> [];
+ {ok, Env} ->
+ [{gproc_dist, {gproc_dist, start_link, [Env]},
+ permanent, 2000, worker, [gproc_dist]}]
+ end,
{ok,{{one_for_one, 15, 60}, [GProc | Dist]}}.
Please sign in to comment.
Something went wrong with that request. Please try again.