diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index f26e0f77093c..888e4dbabf09 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -50,6 +50,7 @@ {handshake_timeout, 10000}, {reverse_dns_lookups, false}, {cluster_partition_handling, ignore}, + {cluster_keepalive_interval, 10000}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index e1ff9ffdd284..5fc04ec9182e 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -39,7 +39,8 @@ -define(RABBIT_UP_RPC_TIMEOUT, 2000). -define(RABBIT_DOWN_PING_INTERVAL, 1000). --record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}). +-record(state, {monitors, partitions, subscribers, down_ping_timer, + keepalive_timer, autoheal}). %%---------------------------------------------------------------------------- @@ -249,10 +250,10 @@ init([]) -> process_flag(trap_exit, true), net_kernel:monitor_nodes(true, [nodedown_reason]), {ok, _} = mnesia:subscribe(system), - {ok, #state{monitors = pmon:new(), - subscribers = pmon:new(), - partitions = [], - autoheal = rabbit_autoheal:init()}}. + {ok, ensure_keepalive_timer(#state{monitors = pmon:new(), + subscribers = pmon:new(), + partitions = [], + autoheal = rabbit_autoheal:init()})}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> {reply, Partitions, State}; @@ -279,6 +280,7 @@ handle_cast({node_up, Node, NodeType}, {noreply, State#state{ monitors = pmon:monitor({rabbit, Node}, Monitors)}} end; + handle_cast({joined_cluster, Node, NodeType}, State) -> {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({add_node(Node, AllNodes), @@ -288,13 +290,19 @@ handle_cast({joined_cluster, Node, NodeType}, State) -> end, RunningNodes}), {noreply, State}; + handle_cast({left_cluster, Node}, State) -> {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes), del_node(Node, RunningNodes)}), {noreply, State}; + handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) -> {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}}; + +handle_cast(keepalive, State) -> + {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}. @@ -340,7 +348,7 @@ handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState, AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions), {noreply, State#state{autoheal = AState1}}; -handle_info(ping_nodes, State) -> +handle_info(ping_down_nodes, State) -> %% We ping nodes when some are down to ensure that we find out %% about healed partitions quickly. We ping all nodes rather than %% just the ones we know are down for simplicity; it's not expensive @@ -354,14 +362,21 @@ handle_info(ping_nodes, State) -> ping_all(), case all_nodes_up() of true -> ok; - false -> Self ! ping_again + false -> Self ! ping_down_nodes_again end end), {noreply, State1}; -handle_info(ping_again, State) -> +handle_info(ping_down_nodes_again, State) -> {noreply, ensure_ping_timer(State)}; +handle_info(ping_up_nodes, State) -> + %% In this case we need to ensure that we ping "quickly" - + %% i.e. only nodes that we know to be up. + Nodes = alive_nodes(rabbit_mnesia:cluster_nodes(all)) -- [node()], + [gen_server2:cast({?MODULE, N}, keepalive) || N <- Nodes], + {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})}; + handle_info(_Info, State) -> {noreply, State}. @@ -457,7 +472,13 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, ensure_ping_timer(State) -> rabbit_misc:ensure_timer( - State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes). + State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, + ping_down_nodes). + +ensure_keepalive_timer(State) -> + {ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval), + rabbit_misc:ensure_timer( + State, #state.keepalive_timer, Interval, ping_up_nodes). handle_live_rabbit(Node) -> ok = rabbit_amqqueue:on_node_up(Node),