From cdb0a5277513496ff419fd8f0744638d8444161c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 5 Aug 2021 15:33:06 +0300 Subject: [PATCH] Merge pull request #3263 from rabbitmq/rabbitmq-server-3260 Partially reintroduce locking to mirrored_supervisor (cherry picked from commit cf2c6099b3ec2ec408d1fc36193a6c74ae023bfb) --- .../rabbit_common/src/mirrored_supervisor.erl | 85 ++++++++++++++----- .../src/mirrored_supervisor_locks.erl | 33 +++++++ 2 files changed, 98 insertions(+), 20 deletions(-) create mode 100644 deps/rabbit_common/src/mirrored_supervisor_locks.erl diff --git a/deps/rabbit_common/src/mirrored_supervisor.erl b/deps/rabbit_common/src/mirrored_supervisor.erl index 36b8bbf33a4d..2ef2bfda6ec7 100644 --- a/deps/rabbit_common/src/mirrored_supervisor.erl +++ b/deps/rabbit_common/src/mirrored_supervisor.erl @@ -277,13 +277,17 @@ handle_call({init, Overall}, _From, tx_fun = TxFun, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), + LockId = mirrored_supervisor_locks:lock(Group), + maybe_log_lock_acquisition_failure(LockId, Group), ok = pg:join(Group, Overall), - rabbit_log:debug("Mirrored supervisor: initializing, joined group ~p", [Group]), + rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]), Rest = pg:get_members(Group) -- [Overall], Nodes = [node(M) || M <- Rest], rabbit_log:debug("Mirrored supervisor: known group ~p members: ~p on nodes ~p", [Group, Rest, Nodes]), case Rest of - [] -> TxFun(fun() -> delete_all(Group) end); + [] -> + rabbit_log:debug("Mirrored supervisor: no known peer members in group ~p, will delete all child records for it", [Group]), + TxFun(fun() -> delete_all(Group) end); _ -> ok end, [begin @@ -293,8 +297,9 @@ handle_call({init, Overall}, _From, Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case errors([maybe_start(Group, TxFun, Overall, Delegate, S) - || S <- ChildSpecs]) of + Results = [maybe_start(Group, TxFun, Overall, Delegate, S) || S <- ChildSpecs], + mirrored_supervisor_locks:unlock(LockId), + case errors(Results) of [] -> {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; @@ -304,11 +309,25 @@ handle_call({start_child, ChildSpec}, _From, delegate = Delegate, group = Group, tx_fun = TxFun}) -> - {reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of - already_in_mnesia -> {error, already_present}; - {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; - Else -> Else - end, State}; + LockId = mirrored_supervisor_locks:lock(Group), + maybe_log_lock_acquisition_failure(LockId, Group), + rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]), + Result = case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of + already_in_mnesia -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned 'record already present'", [Group, Overall]), + {error, already_present}; + {already_in_mnesia, Pid} -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned 'already running: ~p'", [Group, Overall, Pid]), + {error, {already_started, Pid}}; + Else -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned ~p", [Group, Overall, Else]), + Else + end, + mirrored_supervisor_locks:unlock(LockId), + {reply, Result, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, group = Group, @@ -384,28 +403,50 @@ tell_all_peers_to_die(Group, Reason) -> [cast(P, {die, Reason}) || P <- pg:get_members(Group) -- [self()]]. maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) -> + rabbit_log:debug("Mirrored supervisor: asked to consider starting, group: ~p", [Group]), try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of - start -> start(Delegate, ChildSpec); - undefined -> already_in_mnesia; - Pid -> {already_in_mnesia, Pid} + start -> + rabbit_log:debug("Mirrored supervisor: check_start for group ~p," + " overall ~p returned 'do start'", [Group, Overall]), + start(Delegate, ChildSpec); + undefined -> + rabbit_log:debug("Mirrored supervisor: check_start for group ~p," + " overall ~p returned 'undefined'", [Group, Overall]), + already_in_mnesia; + Pid -> + rabbit_log:debug("Mirrored supervisor: check_start for group ~p," + " overall ~p returned 'already running (~p)'", [Group, Overall, Pid]), + {already_in_mnesia, Pid} catch %% If we are torn down while in the transaction... {error, E} -> {error, E} end. check_start(Group, Overall, Delegate, ChildSpec) -> - case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of + rabbit_log:debug("Mirrored supervisor: check_start for group ~p, id: ~p, overall: ~p", + [Group, id(ChildSpec), Overall]), + ReadResult = mnesia:wread({?TABLE, {Group, id(ChildSpec)}}), + rabbit_log:debug("Mirrored supervisor: check_start table ~s read for key ~p returned ~p", + [?TABLE, {Group, id(ChildSpec)}, ReadResult]), + case ReadResult of [] -> _ = write(Group, Overall, ChildSpec), start; [S] -> #mirrored_sup_childspec{key = {Group, Id}, mirroring_pid = Pid} = S, case Overall of - Pid -> child(Delegate, Id); - _ -> case supervisor(Pid) of - dead -> _ = write(Group, Overall, ChildSpec), - start; - Delegate0 -> child(Delegate0, Id) - end + Pid -> + rabbit_log:debug("Mirrored supervisor: overall matched mirrored pid ~p", [Pid]), + child(Delegate, Id); + _ -> + rabbit_log:debug("Mirrored supervisor: overall ~p did not match mirrored pid ~p", [Overall, Pid]), + rabbit_log:debug("Mirrored supervisor: supervisor(~p) returned ~p", [Pid, supervisor(Pid)]), + case supervisor(Pid) of + dead -> + _ = write(Group, Overall, ChildSpec), + start; + Delegate0 -> + child(Delegate0, Id) + end end end. @@ -423,7 +464,6 @@ delete(Group, Id) -> ok = mnesia:delete({?TABLE, {Group, Id}}). start(Delegate, ChildSpec) -> - rabbit_log:debug("Mirrored supervisor: asked to start with delegate: ~p, child spec: ~p", [Delegate, ChildSpec]), apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). stop(Group, TxFun, Delegate, Id) -> @@ -511,3 +551,8 @@ restore_child_order(ChildSpecs, ChildOrder) -> proplists:get_value(id(A), ChildOrder) < proplists:get_value(id(B), ChildOrder) end, ChildSpecs). + +maybe_log_lock_acquisition_failure(undefined = _LockId, Group) -> + rabbit_log:warning("Mirrored supervisor: could not acquire lock for group ~s", [Group]); +maybe_log_lock_acquisition_failure(_, _) -> + ok. diff --git a/deps/rabbit_common/src/mirrored_supervisor_locks.erl b/deps/rabbit_common/src/mirrored_supervisor_locks.erl new file mode 100644 index 000000000000..fc66a9eef3bf --- /dev/null +++ b/deps/rabbit_common/src/mirrored_supervisor_locks.erl @@ -0,0 +1,33 @@ + %% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(mirrored_supervisor_locks). + +-export([lock/1, unlock/1]). + + -define(KEY_PREFIX, mirrored_supervisor). + +%% +%% API +%% + +lock(Group) -> + Nodes = nodes(), + %% about 300s, same as rabbit_nodes:lock_retries/0 default + LockId = case global:set_lock({?KEY_PREFIX, Group}, Nodes, 80) of + true -> Group; + false -> undefined + end, + LockId. + +unlock(LockId) -> + Nodes = nodes(), + case LockId of + undefined -> ok; + Value -> global:del_lock({?KEY_PREFIX, Value}, Nodes) + end, + ok.