Skip to content

Commit

Permalink
Acquire locks when starting mirrored supervisor children
Browse files Browse the repository at this point in the history
Unlike pg2, pg in Erlang 24 is eventually consistent. So this
reintroduces some of the same kind of locking mirrored_supervisor
used to rely on implicitly via pg2.

Per discussion with @lhoguin.

Closes #3260.

References #3132, #3154.
  • Loading branch information
michaelklishin committed Aug 4, 2021
1 parent 9a0f4b1 commit 22add3c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 20 deletions.
41 changes: 21 additions & 20 deletions deps/rabbit_common/src/mirrored_supervisor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ handle_call({init, Overall}, _From,
tx_fun = TxFun,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
LockId = mirrored_supervisor_locks:lock(Group),
ok = pg:join(Group, Overall),
rabbit_log:debug("Mirrored supervisor: initializing, overall supervisor ~p joined group ~p", [Overall, Group]),
Rest = pg:get_members(Group) -- [Overall],
Expand All @@ -295,8 +296,9 @@ handle_call({init, Overall}, _From,
Delegate = delegate(Overall),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
case errors([maybe_start(Group, TxFun, Overall, Delegate, S)
|| S <- ChildSpecs]) of
Results = [maybe_start(Group, TxFun, Overall, Delegate, S) || S <- ChildSpecs],
mirrored_supervisor_locks:unlock(LockId),
case errors(Results) of
[] -> {reply, ok, State1};
Errors -> {stop, {shutdown, Errors}, State1}
end;
Expand All @@ -306,21 +308,24 @@ handle_call({start_child, ChildSpec}, _From,
delegate = Delegate,
group = Group,
tx_fun = TxFun}) ->
LockId = mirrored_supervisor_locks:lock(Group),
rabbit_log:debug("Mirrored supervisor: asked to consider starting a child, group: ~p", [Group]),
{reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
already_in_mnesia ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned 'record already present'", [Group, Overall]),
{error, already_present};
{already_in_mnesia, Pid} ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned 'already running: ~p'", [Group, Overall, Pid]),
{error, {already_started, Pid}};
Else ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned ~p", [Group, Overall, Else]),
Else
end, State};
Result = case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
already_in_mnesia ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned 'record already present'", [Group, Overall]),
{error, already_present};
{already_in_mnesia, Pid} ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned 'already running: ~p'", [Group, Overall, Pid]),
{error, {already_started, Pid}};
Else ->
rabbit_log:debug("Mirrored supervisor: maybe_start for group ~p,"
" overall ~p returned ~p", [Group, Overall, Else]),
Else
end,
mirrored_supervisor_locks:unlock(LockId),
{reply, Result, State};

handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
group = Group,
Expand Down Expand Up @@ -457,11 +462,9 @@ delete(Group, Id) ->
ok = mnesia:delete({?TABLE, {Group, Id}}).

start(Delegate, ChildSpec) ->
rabbit_log:debug("Mirrored supervisor: asked to start with delegate: ~p, child spec: ~p", [Delegate, ChildSpec]),
apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]).

stop(Group, TxFun, Delegate, Id) ->
rabbit_log:debug("Mirrored supervisor: asked to stop, group: ~p, child ID: ~p", [Group, Id]),
try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of
deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]);
running -> {error, running}
Expand All @@ -470,8 +473,6 @@ stop(Group, TxFun, Delegate, Id) ->
end.

check_stop(Group, Delegate, Id) ->
rabbit_log:debug("Mirrored supervisor: checking if child ~p in group ~p should be stopped: ~p",
[Id, Group, child(Delegate, Id)]),
case child(Delegate, Id) of
undefined -> delete(Group, Id),
deleted;
Expand Down
33 changes: 33 additions & 0 deletions deps/rabbit_common/src/mirrored_supervisor_locks.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(mirrored_supervisor_locks).

-export([lock/1, unlock/1]).

-define(KEY_PREFIX, mirrored_supervisor).

%%
%% API
%%

lock(Group) ->
Nodes = nodes(),
%% about 300s, same as rabbit_nodes:lock_retries/0 default
LockId = case global:set_lock({?KEY_PREFIX, Group}, Nodes, 80) of
true -> Group;
false -> undefined
end,
LockId.

unlock(LockId) ->
Nodes = nodes(),
case LockId of
undefined -> ok;
Value -> global:del_lock({?KEY_PREFIX, Value}, Nodes)
end,
ok.

0 comments on commit 22add3c

Please sign in to comment.