Skip to content

Commit

Permalink
shared counters; reset_counter
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulf Wiger committed Aug 15, 2011
1 parent b1589f6 commit 8c1e685
Show file tree
Hide file tree
Showing 8 changed files with 510 additions and 42 deletions.
140 changes: 132 additions & 8 deletions doc/gproc.md

Large diffs are not rendered by default.

44 changes: 42 additions & 2 deletions doc/gproc_dist.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Class = n - unique name
| p - non-unique property
| c - counter
| a - aggregated counter
Scope = l | g (global or local).</td></tr><tr><td valign="top"><a href="#set_value-2">set_value/2</a></td><td></td></tr><tr><td valign="top"><a href="#start_link-0">start_link/0</a></td><td></td></tr><tr><td valign="top"><a href="#start_link-1">start_link/1</a></td><td></td></tr><tr><td valign="top"><a href="#surrendered-3">surrendered/3</a></td><td></td></tr><tr><td valign="top"><a href="#sync-0">sync/0</a></td><td>Synchronize with the gproc leader.</td></tr><tr><td valign="top"><a href="#terminate-2">terminate/2</a></td><td></td></tr><tr><td valign="top"><a href="#unreg-1">unreg/1</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-2">update_counter/2</a></td><td></td></tr></table>
Scope = l | g (global or local).</td></tr><tr><td valign="top"><a href="#reg_shared-2">reg_shared/2</a></td><td></td></tr><tr><td valign="top"><a href="#reset_counter-1">reset_counter/1</a></td><td></td></tr><tr><td valign="top"><a href="#set_value-2">set_value/2</a></td><td></td></tr><tr><td valign="top"><a href="#start_link-0">start_link/0</a></td><td></td></tr><tr><td valign="top"><a href="#start_link-1">start_link/1</a></td><td></td></tr><tr><td valign="top"><a href="#surrendered-3">surrendered/3</a></td><td></td></tr><tr><td valign="top"><a href="#sync-0">sync/0</a></td><td>Synchronize with the gproc leader.</td></tr><tr><td valign="top"><a href="#terminate-2">terminate/2</a></td><td></td></tr><tr><td valign="top"><a href="#unreg-1">unreg/1</a></td><td></td></tr><tr><td valign="top"><a href="#unreg_shared-1">unreg_shared/1</a></td><td></td></tr><tr><td valign="top"><a href="#update_counter-2">update_counter/2</a></td><td></td></tr><tr><td valign="top"><a href="#update_shared_counter-2">update_shared_counter/2</a></td><td></td></tr></table>



Expand Down Expand Up @@ -244,7 +244,27 @@ Class = n - unique name
| p - non-unique property
| c - counter
| a - aggregated counter
Scope = l | g (global or local)<a name="set_value-2"></a>
Scope = l | g (global or local)<a name="reg_shared-2"></a>

<h3>reg_shared/2</h3>





`reg_shared(Key, Value) -> any()`

<a name="reset_counter-1"></a>

<h3>reset_counter/1</h3>





`reset_counter(Key) -> any()`

<a name="set_value-2"></a>

<h3>set_value/2</h3>

Expand Down Expand Up @@ -329,6 +349,16 @@ the leader died.)<a name="terminate-2"></a>

`unreg(Key) -> any()`

<a name="unreg_shared-1"></a>

<h3>unreg_shared/1</h3>





`unreg_shared(Key) -> any()`

<a name="update_counter-2"></a>

<h3>update_counter/2</h3>
Expand All @@ -339,3 +369,13 @@ the leader died.)<a name="terminate-2"></a>

`update_counter(Key, Incr) -> any()`

<a name="update_shared_counter-2"></a>

<h3>update_shared_counter/2</h3>





`update_shared_counter(Key, Incr) -> any()`

2 changes: 1 addition & 1 deletion doc/gproc_lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ For a detailed description, see gproc/doc/erlang07-wiger.pdf.



<pre>insert_reg(K::<a href="#type-key">key()</a>, Value::any(), Pid::pid(), Scope::<a href="#type-scope">scope()</a>) -> boolean()</pre>
<pre>insert_reg(K::<a href="#type-key">key()</a>, Value::any(), Pid::pid() | shared, Scope::<a href="#type-scope">scope()</a>) -> boolean()</pre>
<br></br>


Expand Down
157 changes: 144 additions & 13 deletions src/gproc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@

-export([start_link/0,
reg/1, reg/2, unreg/1,
reg_shared/1, reg_shared/2, unreg_shared/1,
mreg/3,
munreg/3,
set_value/2,
get_value/1,
get_value/1, get_value/2,
where/1,
await/1, await/2,
nb_wait/1,
Expand All @@ -77,6 +78,8 @@
lookup_value/1,
lookup_values/1,
update_counter/2,
reset_counter/1,
update_shared_counter/2,
give_away/2,
goodbye/0,
send/2,
Expand Down Expand Up @@ -104,6 +107,7 @@
add_global_counter/2,
add_local_aggr_counter/1,
add_global_aggr_counter/1,
add_shared_local_counter/2,
lookup_local_name/1,
lookup_global_name/1,
lookup_local_properties/1,
Expand Down Expand Up @@ -200,6 +204,16 @@ add_local_counter(Name, Initial) when is_integer(Initial) ->
reg({c,l,Name}, Initial).


%% spec(Name::any(), Initial::integer()) -> true
%%
%% @doc Registers a local shared (unique) counter.
%% @equiv reg_shared({c,l,Name},Value)
%% @end
%%
add_shared_local_counter(Name, Initial) when is_integer(Initial) ->
reg_shared({c,l,Name}, Initial).


%% spec(Name::any(), Initial::integer()) -> true
%%
%% @doc Registers a global (non-unique) counter. @equiv reg({c,g,Name},Value)
Expand Down Expand Up @@ -671,6 +685,46 @@ reg({n,l,_} = Key, Value) ->
reg(_, _) ->
erlang:error(badarg).


%% @spec reg_shared(Key::key()) -> true
%%
%% @doc Register a resource, but don't tie it to a particular process.
%%
%% `reg_shared({c,l,C}) -> reg_shared({c,l,C}, 0).'
%% `reg_shared({a,l,A}) -> reg_shared({a,l,A}, undefined).'
%% @end
reg_shared({c,_,_} = Key) ->
reg_shared(Key, 0);
reg_shared({a,_,_} = Key) ->
reg_shared(Key, undefined).


%% @spec reg_shared(Key::key(), Value) -> true
%%
%% @doc Register a resource, but don't tie it to a particular process.
%%
%% Shared resources are all unique. They remain until explicitly unregistered
%% (using {@link unreg_shared/1}). The types of shared resources currently
%% supported are `counter' and `aggregated counter'. In listings and query
%% results, shared resources appear as other similar resources, except that
%% `Pid == shared'. To wit, update_counter({c,l,myCounter}, 1, shared) would
%% increment the shared counter `myCounter' with 1, provided it exists.
%%
%% A shared aggregated counter will track updates in exactly the same way as
%% an aggregated counter which is owned by a process.
%% @end
%%
reg_shared({_,g,_} = Key, Value) ->
%% anything global
?CHK_DIST,
gproc_dist:reg_shared(Key, Value);
reg_shared({a,l,_} = Key, undefined) ->
call({reg_shared, Key, undefined});
reg_shared({c,l,_} = Key, Value) when is_integer(Value) ->
call({reg_shared, Key, Value});
reg_shared(_, _) ->
erlang:error(badarg).

%% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true
%%
%% @doc Register multiple {Key,Value} pairs of a given type and scope.
Expand Down Expand Up @@ -748,6 +802,21 @@ unreg(Key) ->
end
end.

%% @spec (Key:: key()) -> true
%%
%% @doc Unregister a shared resource.
%% @end
unreg_shared(Key) ->
case Key of
{_, g, _} ->
?CHK_DIST,
gproc_dist:unreg_shared(Key);
{T, l, _} when T == c;
T == a -> call({unreg_shared, Key});
_ ->
erlang:error(badarg)
end.

%% @equiv unreg/1
unregister_name(Key) ->
unreg(Key).
Expand Down Expand Up @@ -850,13 +919,20 @@ set_value(_, _) ->
erlang:error(badarg).

%% @spec (Key) -> Value
%% @doc Read the value stored with a key registered to the current process.
%% @doc Reads the value stored with a key registered to the current process.
%%
%% If no such key is registered to the current process, this function exits.
%% @end
get_value(Key) ->
get_value(Key, self()).

%% @spec (Key, Pid) -> Value
%% @doc Reads the value stored with a key registered to the process Pid.
%%
%% If `Pid == shared', the value of a shared key (see {@link reg_shared/1})
%% will be read.
%% @end
%%
get_value({T,_,_} = Key, Pid) when is_pid(Pid) ->
if T==n orelse T==a ->
case ets:lookup(?TAB, {Key, T}) of
Expand All @@ -866,6 +942,15 @@ get_value({T,_,_} = Key, Pid) when is_pid(Pid) ->
true ->
ets:lookup_element(?TAB, {Key, Pid}, 3)
end;
get_value({T,_,_} = K, shared) when T==c; T==a ->
Key = case T of
c -> {K, shared};
a -> {K, a}
end,
case ets:lookup(?TAB, Key) of
[{_, shared, Value}] -> Value;
_ -> erlang:error(badarg)
end;
get_value(_, _) ->
erlang:error(badarg).

Expand Down Expand Up @@ -978,6 +1063,40 @@ update_counter({c,g,_} = Key, Incr) when is_integer(Incr) ->
update_counter(_, _) ->
erlang:error(badarg).


%% @spec (Key) -> {ValueBefore, ValueAfter}
%% Key = {c, Scope, Name}
%% Scope = l | g
%% ValueBefore = integer()
%% ValueAfter = integer()
%%
%% @doc Reads and resets a counter in a "thread-safe" way
%%
%% This function reads the current value of a counter and then resets it to its
%% initial value. The reset operation is done using {@link update_counter/2},
%% which allows for concurrent calls to {@link update_counter/2} without losing
%% updates. Aggregated counters are updated accordingly.
%% @end
%%
reset_counter({c,g,_} = Key) ->
?CHK_DIST,
gproc_dist:reset_counter(Key);
reset_counter({c,l,_} = Key) ->
Current = ets:lookup_element(?TAB, {Key, self()}, 3),
Initial = case ets:lookup(?TAB, {self(), Key}) of
[{_, r}] -> 0;
[{_, Opts}] ->
proplists:get_value(initial, Opts, 0)
end,
{Current, update_counter(Key, Initial - Current)}.


update_shared_counter({c,g,_} = Key, Incr) ->
?CHK_DIST,
gproc_dist:update_shared_counter(Key, Incr);
update_shared_counter({c,l,_} = Key, Incr) ->
gproc_lib:update_counter(Key, Incr, shared).

%% @spec (From::key(), To::pid() | key()) -> undefined | pid()
%%
%% @doc Atomically transfers the key `From' to the process identified by `To'.
Expand Down Expand Up @@ -1150,7 +1269,7 @@ info(Pid) when is_pid(Pid) ->
%% same as [http://www.erlang.org/doc/man/erlang.html#process_info-2].
%% @end
info(Pid, ?MODULE) ->
Keys = ets:select(?TAB, [{ {{Pid,'$1'}, r}, [], ['$1'] }]),
Keys = ets:select(?TAB, [{ {{Pid,'$1'}, '_'}, [], ['$1'] }]),
{?MODULE, lists:zf(
fun(K) ->
try V = get_value(K, Pid),
Expand Down Expand Up @@ -1210,6 +1329,14 @@ handle_call({reg, {_T,l,_} = Key, Val}, {Pid,_}, S) ->
false ->
{reply, badarg, S}
end;
handle_call({reg_shared, {_T,l,_} = Key, Val}, _From, S) ->
case try_insert_reg(Key, Val, shared) of
%% case try_insert_shared(Key, Val) of
true ->
{reply, true, S};
false ->
{reply, badarg, S}
end;
handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
case ets:member(?TAB, {Pid,Key}) of
true ->
Expand All @@ -1218,6 +1345,9 @@ handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) ->
false ->
{reply, badarg, S}
end;
handle_call({unreg_shared, {_,l,_} = Key}, _, S) ->
_ = gproc_lib:remove_reg(Key, shared),
{reply, true, S};
handle_call({await, {_,l,_} = Key, Pid}, From, S) ->
%% Passing the pid explicitly is needed when leader_call is used,
%% since the Pid given as From in the leader is the local gen_leader
Expand Down Expand Up @@ -1307,9 +1437,6 @@ cast(Msg, l) ->
cast(Msg, g) ->
gproc_dist:leader_cast(Msg).




try_insert_reg({T,l,_} = Key, Val, Pid) ->
case gproc_lib:insert_reg(Key, Val, Pid, l) of
false ->
Expand All @@ -1332,6 +1459,10 @@ try_insert_reg({T,l,_} = Key, Val, Pid) ->
true
end.

%% try_insert_shared({c,l,_} = Key, Val) ->
%% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]);
%% try_insert_shared({a,l,_} = Key, Val) ->
%% ets:insert_new(?TAB, [{{Key, a}, shared, Val}, {{shared, Key}, []}]).

-spec audit_process(pid()) -> ok.

Expand All @@ -1349,7 +1480,7 @@ process_is_down(Pid) when is_pid(Pid) ->
false ->
ok;
true ->
Revs = ets:select(?TAB, [{{{Pid,'$1'},r},
Revs = ets:select(?TAB, [{{{Pid,'$1'}, '_'},
[{'==',{element,2,'$1'},l}], ['$1']}]),
lists:foreach(
fun({n,l,_}=K) ->
Expand Down Expand Up @@ -1395,7 +1526,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==n; T==a ->
Pid;
ToPid when is_pid(ToPid) ->
ets:insert(?TAB, [{Key, ToPid, Value},
{{ToPid, K}, r}]),
{{ToPid, K}, []}]),
ets:delete(?TAB, {Pid, K}),
_ = gproc_lib:ensure_monitor(ToPid, l),
ToPid;
Expand All @@ -1418,7 +1549,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==c; T==p ->
badarg;
false ->
ets:insert(?TAB, [{ToKey, ToPid, Value},
{{ToPid, K}, r}]),
{{ToPid, K}, []}]),
ets:delete(?TAB, {Pid, K}),
ets:delete(?TAB, Key),
_ = gproc_lib:ensure_monitor(ToPid, l),
Expand Down Expand Up @@ -1598,7 +1729,7 @@ is_var('$8') -> {true,8};
is_var('$9') -> {true,9};
is_var(X) when is_atom(X) ->
case atom_to_list(X) of
"$" ++ Tl ->
"\$" ++ Tl ->
try N = list_to_integer(Tl),
{true,N}
catch
Expand Down Expand Up @@ -1704,10 +1835,10 @@ qlc_lookup(_Scope, 1, Keys) ->
qlc_lookup(Scope, 2, Pids) ->
lists:flatmap(fun(Pid) ->
Found =
ets:select(?TAB, [{ {{Pid, rev_keypat(Scope)}, r},
[], ['$_']}]),
ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'},
[], ['$_']}]),
lists:flatmap(
fun({{_,{T,_,_}=K}, r}) ->
fun({{_,{T,_,_}=K}, _}) ->
K2 = if T==n orelse T==a -> T;
true -> Pid
end,
Expand Down
Loading

0 comments on commit 8c1e685

Please sign in to comment.