Skip to content

Commit

Permalink
Make rabbit_misc:is_process_alive() return false for nodes we are par…
Browse files Browse the repository at this point in the history
…titioned from; prevent prequeue:init/1 from entering an infinite loop on partition.
  • Loading branch information
Simon MacMullen committed Oct 9, 2014
1 parent 3dd5168 commit b0f2c46
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
5 changes: 2 additions & 3 deletions src/rabbit_misc.erl
Expand Up @@ -853,9 +853,8 @@ ntoab(IP) ->
%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur
%% would be bad news.
is_process_alive(Pid) ->
Node = node(Pid),
lists:member(Node, [node() | nodes()]) andalso
rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.
rabbit_mnesia:on_running_node(Pid) andalso
rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.

pget(K, P) -> proplists:get_value(K, P).
pget(K, P, D) -> proplists:get_value(K, P, D).
Expand Down
4 changes: 4 additions & 0 deletions src/rabbit_mnesia.erl
Expand Up @@ -27,6 +27,7 @@

status/0,
is_clustered/0,
on_running_node/1,
cluster_nodes/1,
node_type/0,
dir/0,
Expand Down Expand Up @@ -71,6 +72,7 @@
{'running_nodes', [node()]} |
{'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(on_running_node/1 :: (pid()) -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
-spec(dir/0 :: () -> file:filename()).
Expand Down Expand Up @@ -336,6 +338,8 @@ is_running() -> mnesia:system_info(is_running) =:= yes.
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].

on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)).

cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).

%% This function is the actual source of information, since it gets
Expand Down
7 changes: 4 additions & 3 deletions src/rabbit_prequeue.erl
Expand Up @@ -64,13 +64,14 @@ init(Q, slave) -> rabbit_mirror_queue_slave:init(Q);
init(#amqqueue{name = QueueName}, restart) ->
{ok, Q = #amqqueue{pid = QPid,
slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
Local = node(QPid) =:= node(),
LocalOrMasterDown = node(QPid) =:= node()
orelse not rabbit_mnesia:on_running_node(QPid),
Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
case rabbit_misc:is_process_alive(QPid) of
true -> false = Local, %% assertion
true -> false = LocalOrMasterDown, %% assertion
rabbit_mirror_queue_slave:go(self(), async),
rabbit_mirror_queue_slave:init(Q); %% [1]
false -> case Local andalso Slaves =:= [] of
false -> case LocalOrMasterDown andalso, Slaves =:= [] of
true -> crash_restart(Q); %% [2]
false -> timer:sleep(25),
init(Q, restart) %% [3]
Expand Down

0 comments on commit b0f2c46

Please sign in to comment.