Skip to content

Commit

Permalink
Merge bug26394
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon MacMullen committed Sep 29, 2014
2 parents d0ec465 + 44b68e1 commit d73fafd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
1 change: 1 addition & 0 deletions ebin/rabbit_app.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
{handshake_timeout, 10000},
{reverse_dns_lookups, false},
{cluster_partition_handling, ignore},
{cluster_keepalive_interval, 10000},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
Expand Down
39 changes: 30 additions & 9 deletions src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).

-record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}).
-record(state, {monitors, partitions, subscribers, down_ping_timer,
keepalive_timer, autoheal}).

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -249,10 +250,10 @@ init([]) ->
process_flag(trap_exit, true),
net_kernel:monitor_nodes(true, [nodedown_reason]),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
partitions = [],
autoheal = rabbit_autoheal:init()}}.
{ok, ensure_keepalive_timer(#state{monitors = pmon:new(),
subscribers = pmon:new(),
partitions = [],
autoheal = rabbit_autoheal:init()})}.

handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, Partitions, State};
Expand All @@ -279,6 +280,7 @@ handle_cast({node_up, Node, NodeType},
{noreply, State#state{
monitors = pmon:monitor({rabbit, Node}, Monitors)}}
end;

handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({add_node(Node, AllNodes),
Expand All @@ -288,13 +290,19 @@ handle_cast({joined_cluster, Node, NodeType}, State) ->
end,
RunningNodes}),
{noreply, State};

handle_cast({left_cluster, Node}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
del_node(Node, RunningNodes)}),
{noreply, State};

handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};

handle_cast(keepalive, State) ->
{noreply, State};

handle_cast(_Msg, State) ->
{noreply, State}.

Expand Down Expand Up @@ -340,7 +348,7 @@ handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState,
AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
{noreply, State#state{autoheal = AState1}};

handle_info(ping_nodes, State) ->
handle_info(ping_down_nodes, State) ->
%% We ping nodes when some are down to ensure that we find out
%% about healed partitions quickly. We ping all nodes rather than
%% just the ones we know are down for simplicity; it's not expensive
Expand All @@ -354,14 +362,21 @@ handle_info(ping_nodes, State) ->
ping_all(),
case all_nodes_up() of
true -> ok;
false -> Self ! ping_again
false -> Self ! ping_down_nodes_again
end
end),
{noreply, State1};

handle_info(ping_again, State) ->
handle_info(ping_down_nodes_again, State) ->
{noreply, ensure_ping_timer(State)};

handle_info(ping_up_nodes, State) ->
%% In this case we need to ensure that we ping "quickly" -
%% i.e. only nodes that we know to be up.
Nodes = alive_nodes(rabbit_mnesia:cluster_nodes(all)) -- [node()],
[gen_server2:cast({?MODULE, N}, keepalive) || N <- Nodes],
{noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})};

handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -457,7 +472,13 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,

ensure_ping_timer(State) ->
rabbit_misc:ensure_timer(
State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes).
State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL,
ping_down_nodes).

ensure_keepalive_timer(State) ->
{ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval),
rabbit_misc:ensure_timer(
State, #state.keepalive_timer, Interval, ping_up_nodes).

handle_live_rabbit(Node) ->
ok = rabbit_amqqueue:on_node_up(Node),
Expand Down

0 comments on commit d73fafd

Please sign in to comment.