Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #86 from uwiger/uw-pool-fixes

various fixes to gproc_pool
  • Loading branch information...
commit 7655bddc4397d0cc3f7c4d95e7b96790605946fe 2 parents fa53703 + d5443b9
@uwiger authored
Showing with 138 additions and 32 deletions.
  1. +20 −8 src/gproc.erl
  2. +15 −3 src/gproc_lib.erl
  3. +103 −21 src/gproc_pool.erl
View
28 src/gproc.erl
@@ -1667,7 +1667,7 @@ update_counters1(_) ->
%% @spec (Key) -> {ValueBefore, ValueAfter}
-%% Key = {c, Scope, Name}
+%% Key = {c, Scope, Name} | {n, Scope, Name}
%% Scope = l | g
%% ValueBefore = integer()
%% ValueAfter = integer()
@@ -1678,22 +1678,34 @@ update_counters1(_) ->
%% initial value. The reset operation is done using {@link update_counter/2},
%% which allows for concurrent calls to {@link update_counter/2} without losing
%% updates. Aggregated counters are updated accordingly.
+%%
+%% If `Key' refers to a unique name, the operation will depend on the value
+%% part of the registration being an integer(). While non-integer values are
+%% not permitted at all for counter objects, it is the user's responsibility to
+%% ensure that a name, on which `reset_counter/1' is to be performed, has the
+%% appropriate value type.
%% @end
%%
reset_counter(Key) ->
?CATCH_GPROC_ERROR(reset_counter1(Key), [Key]).
-reset_counter1({c,g,_} = Key) ->
+reset_counter1({T,g,_} = Key) when T==c; T==n ->
?CHK_DIST,
gproc_dist:reset_counter(Key);
+reset_counter1({n,l,_} = Key) ->
+ [{_, Pid, Current}] = ets:lookup(?TAB, {Key, n}),
+ {Current, update_counter(Key, get_initial(Pid, Key) - Current)};
reset_counter1({c,l,_} = Key) ->
Current = ets:lookup_element(?TAB, {Key, self()}, 3),
- Initial = case ets:lookup(?TAB, {self(), Key}) of
- [{_, r}] -> 0;
- [{_, Opts}] ->
- proplists:get_value(initial, Opts, 0)
- end,
- {Current, update_counter(Key, Initial - Current)}.
+ {Current, update_counter(Key, get_initial(self(), Key) - Current)}.
+
+get_initial(Pid, Key) ->
+ case ets:lookup(?TAB, {Pid, Key}) of
+ [{_, r}] -> 0;
+ [{_, Opts}] ->
+ proplists:get_value(initial, Opts, 0)
+ end.
+
%% @spec (Key::key(), Incr) -> integer() | [integer()]
%% Incr = IncrVal | UpdateOp | [UpdateOp]
View
18 src/gproc_lib.erl
@@ -460,14 +460,22 @@ do_set_counter_value({_,C,N} = Key, Value, Pid) ->
update_counter({T,l,Ctr} = Key, Incr, Pid) when is_integer(Incr), T==c;
is_integer(Incr), T==n ->
Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}),
- update_aggr_counter(l, Ctr, Incr),
+ if T==c ->
+ update_aggr_counter(l, Ctr, Incr);
+ true ->
+ ok
+ end,
Res;
update_counter({T,l,Ctr} = Key, {Incr, Threshold, SetValue}, Pid)
when is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==c;
is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==n ->
[Prev, New] = ets:update_counter(?TAB, {Key, Pid},
[{3, 0}, {3, Incr, Threshold, SetValue}]),
- update_aggr_counter(l, Ctr, New - Prev),
+ if T==c ->
+ update_aggr_counter(l, Ctr, New - Prev);
+ true ->
+ ok
+ end,
New;
update_counter({T,l,Ctr} = Key, Ops, Pid) when is_list(Ops), T==c;
is_list(Ops), T==n ->
@@ -477,7 +485,11 @@ update_counter({T,l,Ctr} = Key, Ops, Pid) when is_list(Ops), T==c;
[];
[Prev | Rest] ->
[New | _] = lists:reverse(Rest),
- update_aggr_counter(l, Ctr, New - Prev),
+ if T==c ->
+ update_aggr_counter(l, Ctr, New - Prev);
+ true ->
+ ok
+ end,
Rest
end;
update_counter(_, _, _) ->
View
124 src/gproc_pool.erl
@@ -73,6 +73,7 @@
pick_worker/1, % (Pool)
pick_worker/2, % (Pool, Value)
claim/2, % (Pool, Fun)
+ claim/3, % (Pool, Fun, Wait)
log/1, % (WorkerId)
randomize/1]). % (Pool)
@@ -85,7 +86,8 @@
code_change/3]).
-export([test/1, test/3, ptest/4, test_run/2, test_run1/2, test_run2/2,
- test_run0/2, setup_test_pool/3, remove_test_pool/1]).
+ test_run0/2, setup_test_pool/3, setup_test_pool/4,
+ remove_test_pool/1]).
-define(POOL(Pool), {p,l,{?MODULE,Pool}}).
-define(POOL_CUR(Pool), {c,l,{?MODULE,Pool,cur}}).
@@ -406,8 +408,15 @@ ret(Name, pid) ->
false
end.
+%% @equiv claim(Pool, F, nowait)
+claim(Pool, F) when is_function(F, 2) ->
+ claim(Pool, F, nowait).
-%% @spec claim(Pool::any(), Fun::function()) -> {true, Res} | false
+%% @spec claim(Pool, Fun, Wait) -> {true, Res} | false
+%% Pool = any()
+%% Fun = function()
+%% Wait = nowait | {busy_wait, integer()}
+%%
%% @doc Picks the first available worker in the pool and applies `Fun'.
%%
%% A `claim' pool allows the caller to "claim" a worker during a short span
@@ -417,21 +426,49 @@ ret(Name, pid) ->
%% The gproc name of the worker serves as a mutex, where its value is 0 (zero)
%% if the worker is free, and 1 (one) if it is busy. The mutex operation is
%% implemented using `gproc:update_counter/2'.
+%%
+%% `Wait == nowait' means that the call will return `false' immediately if
+%% there is no available worker.
+%%
+%% `Wait == {busy_wait, Timeout}' will keep repeating the claim attempt
+%% for `Timeout' milliseconds. If still no worker is available, it will
+%% return `false'.
%% @end
-claim(Pool, F) when is_function(F, 2) ->
+claim(Pool, F, Wait) ->
case gproc:get_value(?POOL(Pool), shared) of
- {0, _} -> false;
- {_, claim} ->
- claim_(Pool, F);
- _ ->
- error(badarg)
+ {0, _} -> false;
+ {_, claim} ->
+ W = setup_wait(Wait, Pool),
+ claim_w(Pool, F, W);
+ _ ->
+ error(badarg)
+ end.
+
+claim_w(_Pool, _F, timeout) ->
+ false;
+claim_w(Pool, F, W) ->
+ case claim_(Pool, F) of
+ false ->
+ claim_w(Pool, F, do_wait(W));
+ Other ->
+ clear_wait(W),
+ Other
end.
+%% Define how many workers to select in each chunk. We want to strike
+%% a good compromise between the cost of succeeding on the first try
+%% (likely a common event) and the cost of retrying. In my measurements,
+%% if a chunk size of 1 costs ca 30 us (on my Macbook), a chunk size of 5
+%% adds only ca 20% to the cost, i.e. a few us.
+-define(CLAIM_CHUNK, 5).
+
claim_(Pool, F) ->
- case gproc:select({l,n}, [{ {{n,l,[?MODULE,Pool,'$1','_']}, '_', 0}, [],
- [{{ {element,1,'$_'}, '$1' }}]}], 1) of
- {[{K, Pid}], Cont} ->
- case try_claim(K, Pid, F) of
+ %% Sorry, but we use ets:select/3 here in order to shave off a few us.
+ case ets:select(gproc, [{ {{{n,l,[?MODULE,Pool,'_','_']},n}, '$1', 0}, [],
+ [{{ {element,1,{element,1,'$_'}}, '$1' }}]}],
+ ?CLAIM_CHUNK) of
+ {[_|_] = Workers, Cont} ->
+ case try_claim(Workers, F) of
{true, _} = True ->
True;
false ->
@@ -441,10 +478,12 @@ claim_(Pool, F) ->
false
end.
+claim_cont('$end_of_table', _) ->
+ false;
claim_cont(Cont, F) ->
- case gproc:select(Cont) of
- {[{K, Pid}], Cont1} ->
- case try_claim(K, Pid, F) of
+ case ets:select(Cont) of
+ {[_|_] = Workers, Cont1} ->
+ case try_claim(Workers, F) of
{true, _} = True ->
True;
false ->
@@ -454,6 +493,16 @@ claim_cont(Cont, F) ->
false
end.
+try_claim([], _) ->
+ false;
+try_claim([{K,Pid}|T], F) ->
+ case try_claim(K, Pid, F) of
+ false ->
+ try_claim(T, F);
+ Other ->
+ Other
+ end.
+
try_claim(K, Pid, F) ->
case gproc:update_counter(K, [0, {1, 1, 1}]) of
[0, 1] ->
@@ -461,13 +510,39 @@ try_claim(K, Pid, F) ->
try Res = F(K, Pid),
{true, Res}
after
- gproc:set_value(K, 0)
+ gproc:reset_counter(K)
end;
[1, 1] ->
%% no
false
end.
+setup_wait(nowait, _) ->
+ nowait;
+setup_wait({busy_wait, MS}, Pool) ->
+ Ref = erlang:send_after(MS, self(), {claim, Pool}),
+ {busy_wait, Ref}.
+
+do_wait(nowait) ->
+ timeout;
+do_wait({busy_wait, Ref} = W) ->
+ %% Yielding here serves two purposes:
+ %% 1) Increase the chance that whoever's before us can finish
+ %% 2) The value of read_timer/1 only refreshes after yield (so I've heard)
+ erlang:yield(),
+ case erlang:read_timer(Ref) of
+ false ->
+ erlang:cancel_timer(Ref),
+ timeout;
+ _ ->
+ W
+ end.
+
+clear_wait(nowait) ->
+ ok;
+clear_wait({busy_wait, Ref}) ->
+ erlang:cancel_timer(Ref),
+ ok.
%% @spec log(GprocKey) -> integer()
%% @doc Update a counter associated with a worker name.
@@ -928,14 +1003,18 @@ f(_) ->
%% @private
-setup_test_pool(P, Type0, Opts) ->
+setup_test_pool(P, Type, Opts) ->
+ setup_test_pool(P, Type, Opts, test_workers()).
+
+setup_test_pool(P, Type0, Opts, Workers) ->
Type = case Type0 of {_, T} -> T; T when is_atom(T) -> T end,
new(P, Type, Opts),
[begin R = add_worker(P, W),
io:fwrite("add_worker(~p, ~p) -> ~p; Ws = ~p~n",
[P, W, R, get_workers_(?POOL(P))]),
connect_worker(P, W)
- end || W <- test_workers()].
+ end || W <- Workers].
+
%% @private
remove_test_pool(P) ->
@@ -978,11 +1057,14 @@ test_run1(_, _, S, M) ->
%% @private
test_run2(N, P) ->
- test_run2(N, P, fun(K,_) -> log(K) end, 0, 0).
+ test_run2(N, P, fun(K,_) ->
+ R = log(K),
+ timer:sleep(crypto:rand_uniform(1,50)),
+ R
+ end, 0, 0).
test_run2(N, P, F, S, M) when N > 0 ->
- {T, {true, _}} = timer:tc(?MODULE, claim, [P, F]),
- timer:sleep(crypto:rand_uniform(1,50)),
+ {T, {true, _}} = timer:tc(?MODULE, claim, [P, F, {busy_wait, 5000}]),
test_run2(N-1, P, F, S+T, M+1);
test_run2(_, _, _, S, M) ->
S/M.
Please sign in to comment.
Something went wrong with that request. Please try again.