Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

gproc:nb_wait/1 now works even if waiting on self.

Thanks to @idubrov for spotting. The notify_waiters function would delete all
reverse entries, including the own pid, even if this pid was the one registering.

My solution was to lift the reverse-entry removal from the notify_waiters fn,
and ensuring that it didn't remove the reverse entry of whatever pid actually
registered the name. Test cases added, both for local and distributed case.
  • Loading branch information...
commit 27be35ea837dc259ee2b34752e1130615a9b713c 1 parent 669e1b2
Ulf Wiger authored
Showing with 60 additions and 11 deletions.
  1. +5 −1 src/gproc_lib.erl
  2. +40 −10 test/gproc_dist_tests.erl
  3. +15 −0 test/gproc_tests.erl
View
6 src/gproc_lib.erl
@@ -173,7 +173,11 @@ maybe_waiters(K, Pid, Value, T, Info) ->
notify_waiters(Waiters, K, Pid, V) ->
_ = [begin
P ! {gproc, Ref, registered, {K, Pid, V}},
- ets:delete(?TAB, {P, K})
+ case P of
+ Pid -> ignore;
+ _ ->
+ ets:delete(?TAB, {P, K})
+ end
end || {P, Ref} <- Waiters],
ok.
View
50 test/gproc_dist_tests.erl
@@ -50,6 +50,9 @@ dist_test_() ->
?debugVal(t_await_reg(Ns))
end,
fun() ->
+ ?debugVal(t_await_self(Ns))
+ end,
+ fun() ->
?debugVal(t_await_reg_exists(Ns))
end,
fun() ->
@@ -108,6 +111,24 @@ t_await_reg([A,B|_]) ->
?assertMatch(ok, t_call(P, die)),
?assertMatch(ok, t_call(P1, die)).
+t_await_self([A|_]) ->
+ Name = ?T_NAME,
+ P = t_spawn(A, false), % buffer unknowns
+ Ref = t_call(P, {apply, gproc, nb_wait, [Name]}),
+ ?assertMatch(ok, t_call(P, {selective, true})),
+ ?assertMatch(true, t_call(P, {apply, gproc, reg, [Name, some_value]})),
+ ?assertMatch({registered, {Name, P, some_value}},
+ t_call(P, {apply_fun, fun() ->
+ receive
+ {gproc, Ref, R, Wh} ->
+ {R, Wh}
+ after 10000 ->
+ timeout
+ end
+ end})),
+ ?assertMatch(ok, t_call(P, {selective, false})),
+ ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})).
+
t_await_reg_exists([A,B|_]) ->
Name = ?T_NAME,
P = t_spawn(A),
@@ -149,21 +170,20 @@ t_sync(Ns) ->
%% the other candidate doesn't respond too quickly.
t_sync_cand_dies([A,B|_]) ->
Leader = rpc:call(A, gproc_dist, get_leader, []),
- Other = case Leader of
+ Other = case Leader of
A -> B;
B -> A
end,
?assertMatch(ok, rpc:call(Other, sys, suspend, [gproc_dist])),
P = rpc:call(Other, erlang, whereis, [gproc_dist]),
Key = rpc:async_call(Leader, gproc_dist, sync, []),
- %% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should
+ %% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should
%% still be waiting.
?assertMatch(timeout, rpc:nb_yield(Key, 1000)),
exit(P, kill),
%% The leader should detect that the other candidate died and respond
%% immediately. Therefore, we should have our answer well within 1 sec.
?assertMatch({value, true}, rpc:nb_yield(Key, 1000)).
-
t_fail_node([A,B|_] = Ns) ->
Na = ?T_NAME,
@@ -178,8 +198,7 @@ t_fail_node([A,B|_] = Ns) ->
?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
?assertMatch(ok, t_call(Pa, die)),
?assertMatch(ok, t_call(Pb, die)).
-
-
+
t_sleep() ->
timer:sleep(500).
@@ -198,13 +217,15 @@ t_lookup_everywhere(Key, Nodes, Exp, I) ->
true ->
ok
end.
-
t_spawn(Node) ->
+ t_spawn(Node, false).
+
+t_spawn(Node, Selective) when is_boolean(Selective) ->
Me = self(),
P = spawn(Node, fun() ->
Me ! {self(), ok},
- t_loop()
+ t_loop(Selective)
end),
receive
{P, ok} -> P
@@ -244,13 +265,22 @@ t_call(P, Req) ->
end.
t_loop() ->
+ t_loop(false).
+
+t_loop(Selective) when is_boolean(Selective) ->
receive
{From, Ref, die} ->
From ! {self(), Ref, ok};
+ {From, Ref, {selective, Bool}} when is_boolean(Bool) ->
+ From ! {self(), Ref, ok},
+ t_loop(Bool);
{From, Ref, {apply, M, F, A}} ->
From ! {self(), Ref, apply(M, F, A)},
- t_loop();
- Other ->
+ t_loop(Selective);
+ {From, Ref, {apply_fun, F}} ->
+ From ! {self(), Ref, F()},
+ t_loop(Selective);
+ Other when not Selective ->
?debugFmt("got unknown msg: ~p~n", [Other]),
exit({unknown_msg, Other})
end.
@@ -259,7 +289,7 @@ start_slaves(Ns) ->
[H|T] = Nodes = [start_slave(N) || N <- Ns],
_ = [rpc:call(H, net, ping, [N]) || N <- T],
Nodes.
-
+
start_slave(Name) ->
case node() of
nonode@nohost ->
View
15 test/gproc_tests.erl
@@ -75,6 +75,8 @@ reg_test_() ->
, ?_test(t_is_clean())
, {spawn, ?_test(t_await())}
, ?_test(t_is_clean())
+ , {spawn, ?_test(t_await_self())}
+ , ?_test(t_is_clean())
, {spawn, ?_test(t_simple_mreg())}
, ?_test(t_is_clean())
, {spawn, ?_test(t_gproc_crash())}
@@ -142,6 +144,19 @@ t_await() ->
erlang:error(timeout)
end.
+t_await_self() ->
+ Me = self(),
+ Ref = gproc:nb_wait({n, l, t_await_self}),
+ ?assert(gproc:reg({n, l, t_await_self}, some_value) =:= true),
+ ?assertEqual(true, receive
+ {gproc, Ref, R, Wh} ->
+ {registered, {{n, l, t_await_self},
+ Me, some_value}} = {R, Wh},
+ true
+ after 10000 ->
+ timeout
+ end).
+
t_is_clean() ->
sys:get_status(gproc), % in order to synch
T = ets:tab2list(gproc),
Please sign in to comment.
Something went wrong with that request. Please try again.