Skip to content

Commit

Permalink
Merge pull request #3263 from rabbitmq/rabbitmq-server-3260
Browse files Browse the repository at this point in the history
Partially reintroduce locking to mirrored_supervisor

(cherry picked from commit cf2c609)
  • Loading branch information
michaelklishin committed Aug 5, 2021
1 parent 84597a2 commit cdb0a52
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 20 deletions.
85 changes: 65 additions & 20 deletions deps/rabbit_common/src/mirrored_supervisor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,17 @@ handle_call({init, Overall}, _From,
tx_fun = TxFun,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
LockId = mirrored_supervisor_locks:lock(Group),
maybe_log_lock_acquisition_failure(LockId, Group),
ok = pg:join(Group, Overall),
rabbit_log:debug("Mirrored supervisor: initializing, joined group ~p", [Group]),
rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]),
Rest = pg:get_members(Group) -- [Overall],
Nodes = [node(M) || M <- Rest],
rabbit_log:debug("Mirrored supervisor: known group ~p members: ~p on nodes ~p", [Group, Rest, Nodes]),
case Rest of
[] -> TxFun(fun() -> delete_all(Group) end);
[] ->
rabbit_log:debug("Mirrored supervisor: no known peer members in group ~p, will delete all child records for it", [Group]),
TxFun(fun() -> delete_all(Group) end);
_ -> ok
end,
[begin
Expand All @@ -293,8 +297,9 @@ handle_call({init, Overall}, _From,
Delegate = delegate(Overall),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
case errors([maybe_start(Group, TxFun, Overall, Delegate, S)
|| S <- ChildSpecs]) of
Results = [maybe_start(Group, TxFun, Overall, Delegate, S) || S <- ChildSpecs],
mirrored_supervisor_locks:unlock(LockId),
case errors(Results) of
[] -> {reply, ok, State1};
Errors -> {stop, {shutdown, Errors}, State1}
end;
Expand All @@ -304,11 +309,25 @@ handle_call({start_child, ChildSpec}, _From,
delegate = Delegate,
group = Group,
tx_fun = TxFun}) ->
{reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
already_in_mnesia -> {error, already_present};
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
Else -> Else
end, State};
LockId = mirrored_supervisor_locks:lock(Group),
maybe_log_lock_acquisition_failure(LockId, Group),
rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]),
Result = case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
already_in_mnesia ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned 'record already present'", [Group, Overall]),
{error, already_present};
{already_in_mnesia, Pid} ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned 'already running: ~p'", [Group, Overall, Pid]),
{error, {already_started, Pid}};
Else ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned ~p", [Group, Overall, Else]),
Else
end,
mirrored_supervisor_locks:unlock(LockId),
{reply, Result, State};

handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
group = Group,
Expand Down Expand Up @@ -384,28 +403,50 @@ tell_all_peers_to_die(Group, Reason) ->
[cast(P, {die, Reason}) || P <- pg:get_members(Group) -- [self()]].

maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) ->
rabbit_log:debug("Mirrored supervisor: asked to consider starting, group: ~p", [Group]),
try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
start -> start(Delegate, ChildSpec);
undefined -> already_in_mnesia;
Pid -> {already_in_mnesia, Pid}
start ->
rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
" overall ~p returned 'do start'", [Group, Overall]),
start(Delegate, ChildSpec);
undefined ->
rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
" overall ~p returned 'undefined'", [Group, Overall]),
already_in_mnesia;
Pid ->
rabbit_log:debug("Mirrored supervisor: check_start for group ~p,"
" overall ~p returned 'already running (~p)'", [Group, Overall, Pid]),
{already_in_mnesia, Pid}
catch
%% If we are torn down while in the transaction...
{error, E} -> {error, E}
end.

check_start(Group, Overall, Delegate, ChildSpec) ->
case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of
rabbit_log:debug("Mirrored supervisor: check_start for group ~p, id: ~p, overall: ~p",
[Group, id(ChildSpec), Overall]),
ReadResult = mnesia:wread({?TABLE, {Group, id(ChildSpec)}}),
rabbit_log:debug("Mirrored supervisor: check_start table ~s read for key ~p returned ~p",
[?TABLE, {Group, id(ChildSpec)}, ReadResult]),
case ReadResult of
[] -> _ = write(Group, Overall, ChildSpec),
start;
[S] -> #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Pid} = S,
case Overall of
Pid -> child(Delegate, Id);
_ -> case supervisor(Pid) of
dead -> _ = write(Group, Overall, ChildSpec),
start;
Delegate0 -> child(Delegate0, Id)
end
Pid ->
rabbit_log:debug("Mirrored supervisor: overall matched mirrored pid ~p", [Pid]),
child(Delegate, Id);
_ ->
rabbit_log:debug("Mirrored supervisor: overall ~p did not match mirrored pid ~p", [Overall, Pid]),
rabbit_log:debug("Mirrored supervisor: supervisor(~p) returned ~p", [Pid, supervisor(Pid)]),
case supervisor(Pid) of
dead ->
_ = write(Group, Overall, ChildSpec),
start;
Delegate0 ->
child(Delegate0, Id)
end
end
end.

Expand All @@ -423,7 +464,6 @@ delete(Group, Id) ->
ok = mnesia:delete({?TABLE, {Group, Id}}).

start(Delegate, ChildSpec) ->
rabbit_log:debug("Mirrored supervisor: asked to start with delegate: ~p, child spec: ~p", [Delegate, ChildSpec]),
apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]).

stop(Group, TxFun, Delegate, Id) ->
Expand Down Expand Up @@ -511,3 +551,8 @@ restore_child_order(ChildSpecs, ChildOrder) ->
proplists:get_value(id(A), ChildOrder)
< proplists:get_value(id(B), ChildOrder)
end, ChildSpecs).

maybe_log_lock_acquisition_failure(undefined = _LockId, Group) ->
rabbit_log:warning("Mirrored supervisor: could not acquire lock for group ~s", [Group]);
maybe_log_lock_acquisition_failure(_, _) ->
ok.
33 changes: 33 additions & 0 deletions deps/rabbit_common/src/mirrored_supervisor_locks.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(mirrored_supervisor_locks).

-export([lock/1, unlock/1]).

-define(KEY_PREFIX, mirrored_supervisor).

%%
%% API
%%

lock(Group) ->
Nodes = nodes(),
%% about 300s, same as rabbit_nodes:lock_retries/0 default
LockId = case global:set_lock({?KEY_PREFIX, Group}, Nodes, 80) of
true -> Group;
false -> undefined
end,
LockId.

unlock(LockId) ->
Nodes = nodes(),
case LockId of
undefined -> ok;
Value -> global:del_lock({?KEY_PREFIX, Value}, Nodes)
end,
ok.

0 comments on commit cdb0a52

Please sign in to comment.