From 4f3d5e5e132ad1cf7b4417cc40f6991c81f1a5b1 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 4 Aug 2021 15:56:45 +0300 Subject: [PATCH 1/4] Mirrored supervisor: more logging at debug level --- .../rabbit_common/src/mirrored_supervisor.erl | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/deps/rabbit_common/src/mirrored_supervisor.erl b/deps/rabbit_common/src/mirrored_supervisor.erl index 36b8bbf33a4d..6c691bd517b9 100644 --- a/deps/rabbit_common/src/mirrored_supervisor.erl +++ b/deps/rabbit_common/src/mirrored_supervisor.erl @@ -304,10 +304,20 @@ handle_call({start_child, ChildSpec}, _From, delegate = Delegate, group = Group, tx_fun = TxFun}) -> + rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]), {reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of - already_in_mnesia -> {error, already_present}; - {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; - Else -> Else + already_in_mnesia -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned 'record already present'", [Group, Overall]), + {error, already_present}; + {already_in_mnesia, Pid} -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned 'already running: ~p'", [Group, Overall, Pid]), + {error, {already_started, Pid}}; + Else -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned ~p", [Group, Overall, Else]), + Else end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, @@ -384,17 +394,31 @@ tell_all_peers_to_die(Group, Reason) -> [cast(P, {die, Reason}) || P <- pg:get_members(Group) -- [self()]]. maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) -> + rabbit_log:debug("Mirrored supervisor: asked to consider starting, group: ~p", [Group]), try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of - start -> start(Delegate, ChildSpec); - undefined -> already_in_mnesia; - Pid -> {already_in_mnesia, Pid} + start -> + rabbit_log:debug("Mirrored supervisor: check_start for group ~p," + " overall ~p returned 'do start'", [Group, Overall]), + start(Delegate, ChildSpec); + undefined -> + rabbit_log:debug("Mirrored supervisor: check_start for group ~p," + " overall ~p returned 'undefined'", [Group, Overall]), + already_in_mnesia; + Pid -> + rabbit_log:debug("Mirrored supervisor: check_start for group ~p," + " overall ~p returned 'already running (~p)'", [Group, Overall, Pid]), + {already_in_mnesia, Pid} catch %% If we are torn down while in the transaction... {error, E} -> {error, E} end. check_start(Group, Overall, Delegate, ChildSpec) -> - case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of + rabbit_log:debug("Mirrored supervisor: check_start for group ~p, id: ~p", [Group, id(ChildSpec)]), + ReadResult = mnesia:wread({?TABLE, {Group, id(ChildSpec)}}), + rabbit_log:debug("Mirrored supervisor: check_start table ~s read for key ~p returned ~p", + [?TABLE, {Group, id(ChildSpec)}, ReadResult]), + case ReadResult of [] -> _ = write(Group, Overall, ChildSpec), start; [S] -> #mirrored_sup_childspec{key = {Group, Id}, @@ -427,6 +451,7 @@ start(Delegate, ChildSpec) -> apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). stop(Group, TxFun, Delegate, Id) -> + rabbit_log:debug("Mirrored supervisor: asked to stop, group: ~p, child ID: ~p", [Group, Id]), try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); running -> {error, running} From 9a0f4b17cdff7ec9efb5fc88ac92cc807ba4b2bd Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 4 Aug 2021 16:58:44 +0300 Subject: [PATCH 2/4] More mirrored supervisor logging --- .../rabbit_common/src/mirrored_supervisor.erl | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/deps/rabbit_common/src/mirrored_supervisor.erl b/deps/rabbit_common/src/mirrored_supervisor.erl index 6c691bd517b9..204a34e46386 100644 --- a/deps/rabbit_common/src/mirrored_supervisor.erl +++ b/deps/rabbit_common/src/mirrored_supervisor.erl @@ -278,12 +278,14 @@ handle_call({init, Overall}, _From, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), ok = pg:join(Group, Overall), - rabbit_log:debug("Mirrored supervisor: initializing, joined group ~p", [Group]), + rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]), Rest = pg:get_members(Group) -- [Overall], Nodes = [node(M) || M <- Rest], rabbit_log:debug("Mirrored supervisor: known group ~p members: ~p on nodes ~p", [Group, Rest, Nodes]), case Rest of - [] -> TxFun(fun() -> delete_all(Group) end); + [] -> + rabbit_log:debug("Mirrored supervisor: no known peer members in group ~p, will delete all child records for it", [Group]), + TxFun(fun() -> delete_all(Group) end); _ -> ok end, [begin @@ -414,7 +416,8 @@ maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) -> end. check_start(Group, Overall, Delegate, ChildSpec) -> - rabbit_log:debug("Mirrored supervisor: check_start for group ~p, id: ~p", [Group, id(ChildSpec)]), + rabbit_log:debug("Mirrored supervisor: check_start for group ~p, id: ~p, overall: ~p", + [Group, id(ChildSpec), Overall]), ReadResult = mnesia:wread({?TABLE, {Group, id(ChildSpec)}}), rabbit_log:debug("Mirrored supervisor: check_start table ~s read for key ~p returned ~p", [?TABLE, {Group, id(ChildSpec)}, ReadResult]), @@ -424,12 +427,19 @@ check_start(Group, Overall, Delegate, ChildSpec) -> [S] -> #mirrored_sup_childspec{key = {Group, Id}, mirroring_pid = Pid} = S, case Overall of - Pid -> child(Delegate, Id); - _ -> case supervisor(Pid) of - dead -> _ = write(Group, Overall, ChildSpec), - start; - Delegate0 -> child(Delegate0, Id) - end + Pid -> + rabbit_log:debug("Mirrored supervisor: overall matched mirrored pid ~p", [Pid]), + child(Delegate, Id); + _ -> + rabbit_log:debug("Mirrored supervisor: overall ~p did not match mirrored pid ~p", [Overall, Pid]), + rabbit_log:debug("Mirrored supervisor: supervisor(~p) returned ~p", [Pid, supervisor(Pid)]), + case supervisor(Pid) of + dead -> + _ = write(Group, Overall, ChildSpec), + start; + Delegate0 -> + child(Delegate0, Id) + end end end. @@ -460,6 +470,8 @@ stop(Group, TxFun, Delegate, Id) -> end. check_stop(Group, Delegate, Id) -> + rabbit_log:debug("Mirrored supervisor: checking if child ~p in group ~p should be stopped: ~p", + [Id, Group, child(Delegate, Id)]), case child(Delegate, Id) of undefined -> delete(Group, Id), deleted; From 22add3c8caced8ba3c8871ac2779d8ec3a3f047e Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 5 Aug 2021 00:37:17 +0300 Subject: [PATCH 3/4] Acquire locks when starting mirrored supervisor children Unlike pg2, pg in Erlang 24 is eventually consistent. So this reintroduces some of the same kind of locking mirrored_supervisor used to rely on implicitly via pg2. Per discussion with @lhoguin. Closes #3260. References #3132, #3154. --- .../rabbit_common/src/mirrored_supervisor.erl | 41 ++++++++++--------- .../src/mirrored_supervisor_locks.erl | 33 +++++++++++++++ 2 files changed, 54 insertions(+), 20 deletions(-) create mode 100644 deps/rabbit_common/src/mirrored_supervisor_locks.erl diff --git a/deps/rabbit_common/src/mirrored_supervisor.erl b/deps/rabbit_common/src/mirrored_supervisor.erl index 204a34e46386..495fb4c74920 100644 --- a/deps/rabbit_common/src/mirrored_supervisor.erl +++ b/deps/rabbit_common/src/mirrored_supervisor.erl @@ -277,6 +277,7 @@ handle_call({init, Overall}, _From, tx_fun = TxFun, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), + LockId = mirrored_supervisor_locks:lock(Group), ok = pg:join(Group, Overall), rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]), Rest = pg:get_members(Group) -- [Overall], @@ -295,8 +296,9 @@ handle_call({init, Overall}, _From, Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case errors([maybe_start(Group, TxFun, Overall, Delegate, S) - || S <- ChildSpecs]) of + Results = [maybe_start(Group, TxFun, Overall, Delegate, S) || S <- ChildSpecs], + mirrored_supervisor_locks:unlock(LockId), + case errors(Results) of [] -> {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; @@ -306,21 +308,24 @@ handle_call({start_child, ChildSpec}, _From, delegate = Delegate, group = Group, tx_fun = TxFun}) -> + LockId = mirrored_supervisor_locks:lock(Group), rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]), - {reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of - already_in_mnesia -> - rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," - " overall ~p returned 'record already present'", [Group, Overall]), - {error, already_present}; - {already_in_mnesia, Pid} -> - rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," - " overall ~p returned 'already running: ~p'", [Group, Overall, Pid]), - {error, {already_started, Pid}}; - Else -> - rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," - " overall ~p returned ~p", [Group, Overall, Else]), - Else - end, State}; + Result = case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of + already_in_mnesia -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned 'record already present'", [Group, Overall]), + {error, already_present}; + {already_in_mnesia, Pid} -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned 'already running: ~p'", [Group, Overall, Pid]), + {error, {already_started, Pid}}; + Else -> + rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p," + " overall ~p returned ~p", [Group, Overall, Else]), + Else + end, + mirrored_supervisor_locks:unlock(LockId), + {reply, Result, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, group = Group, @@ -457,11 +462,9 @@ delete(Group, Id) -> ok = mnesia:delete({?TABLE, {Group, Id}}). start(Delegate, ChildSpec) -> - rabbit_log:debug("Mirrored supervisor: asked to start with delegate: ~p, child spec: ~p", [Delegate, ChildSpec]), apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). stop(Group, TxFun, Delegate, Id) -> - rabbit_log:debug("Mirrored supervisor: asked to stop, group: ~p, child ID: ~p", [Group, Id]), try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); running -> {error, running} @@ -470,8 +473,6 @@ stop(Group, TxFun, Delegate, Id) -> end. check_stop(Group, Delegate, Id) -> - rabbit_log:debug("Mirrored supervisor: checking if child ~p in group ~p should be stopped: ~p", - [Id, Group, child(Delegate, Id)]), case child(Delegate, Id) of undefined -> delete(Group, Id), deleted; diff --git a/deps/rabbit_common/src/mirrored_supervisor_locks.erl b/deps/rabbit_common/src/mirrored_supervisor_locks.erl new file mode 100644 index 000000000000..fc66a9eef3bf --- /dev/null +++ b/deps/rabbit_common/src/mirrored_supervisor_locks.erl @@ -0,0 +1,33 @@ + %% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(mirrored_supervisor_locks). + +-export([lock/1, unlock/1]). + + -define(KEY_PREFIX, mirrored_supervisor). + +%% +%% API +%% + +lock(Group) -> + Nodes = nodes(), + %% about 300s, same as rabbit_nodes:lock_retries/0 default + LockId = case global:set_lock({?KEY_PREFIX, Group}, Nodes, 80) of + true -> Group; + false -> undefined + end, + LockId. + +unlock(LockId) -> + Nodes = nodes(), + case LockId of + undefined -> ok; + Value -> global:del_lock({?KEY_PREFIX, Value}, Nodes) + end, + ok. From c84115f10233d77bb07663f24d3c197c40d7bedf Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 5 Aug 2021 11:31:35 +0300 Subject: [PATCH 4/4] Warn when lock could not be acquired successfully --- deps/rabbit_common/src/mirrored_supervisor.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/deps/rabbit_common/src/mirrored_supervisor.erl b/deps/rabbit_common/src/mirrored_supervisor.erl index 495fb4c74920..2ef2bfda6ec7 100644 --- a/deps/rabbit_common/src/mirrored_supervisor.erl +++ b/deps/rabbit_common/src/mirrored_supervisor.erl @@ -278,6 +278,7 @@ handle_call({init, Overall}, _From, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), LockId = mirrored_supervisor_locks:lock(Group), + maybe_log_lock_acquisition_failure(LockId, Group), ok = pg:join(Group, Overall), rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]), Rest = pg:get_members(Group) -- [Overall], @@ -309,6 +310,7 @@ handle_call({start_child, ChildSpec}, _From, group = Group, tx_fun = TxFun}) -> LockId = mirrored_supervisor_locks:lock(Group), + maybe_log_lock_acquisition_failure(LockId, Group), rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]), Result = case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of already_in_mnesia -> @@ -549,3 +551,8 @@ restore_child_order(ChildSpecs, ChildOrder) -> proplists:get_value(id(A), ChildOrder) < proplists:get_value(id(B), ChildOrder) end, ChildSpecs). + +maybe_log_lock_acquisition_failure(undefined = _LockId, Group) -> + rabbit_log:warning("Mirrored supervisor: could not acquire lock for group ~s", [Group]); +maybe_log_lock_acquisition_failure(_, _) -> + ok.