Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
0c77f12
implement rabbit_amqqueue:delete_with/{4,6} api
Ayanda-D Sep 6, 2023
36a0e40
update channel to use delete_with on queue.delete handling
Ayanda-D Sep 6, 2023
e85cd94
update ctl delete_queue command to use delete_with api
Ayanda-D Sep 6, 2023
80c4a06
make kill_queue/{2,3} and kill_queue_hard/{2,3} from crashing_queues_…
Ayanda-D Sep 6, 2023
d624371
fix unrelated typo
Ayanda-D Sep 6, 2023
c706f90
ctl delete_queue command test case for a crashed queue
Ayanda-D Sep 6, 2023
7b650b3
refactor and move remote_sup_child/2 to rabbit_misc
Ayanda-D Sep 6, 2023
6d5c922
use queue resource names for new queue operations (changes the api sp…
Ayanda-D Sep 7, 2023
36031c7
formatting
Ayanda-D Sep 7, 2023
251153a
be extra defensive on the kill_queue/2 api
Ayanda-D Sep 7, 2023
2bb48bb
make rabbit_control_misc:await_state/4 api more consistent on the que…
Ayanda-D Sep 7, 2023
07e41e8
use descriptive macros instead of magic numbers in await_new_pid/3 an…
Ayanda-D Sep 7, 2023
55bdfb2
improve error handling for non-existent queues in pid_or_crashed/2 ap…
Ayanda-D Sep 7, 2023
664b8f8
add test for ctl delete_queue for a stopped queue
Ayanda-D Sep 7, 2023
64c3cb4
new rabbit_amqqueue_control for queue related control operations and …
Ayanda-D Sep 8, 2023
64164cf
try revert extra_apps generated and added on bazel run gazelle to fix…
Ayanda-D Sep 8, 2023
ba2c3b4
formatting and ensure use sup specs in rabbit_types
Ayanda-D Sep 11, 2023
0178380
Resolve a conflict (#9365)
michaelklishin Sep 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqqueue.erl",
"src/rabbit_amqqueue_control.erl",
"src/rabbit_amqqueue_process.erl",
"src/rabbit_amqqueue_sup.erl",
"src/rabbit_amqqueue_sup_sup.erl",
Expand Down Expand Up @@ -284,6 +285,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqqueue.erl",
"src/rabbit_amqqueue_control.erl",
"src/rabbit_amqqueue_process.erl",
"src/rabbit_amqqueue_sup.erl",
"src/rabbit_amqqueue_sup_sup.erl",
Expand Down Expand Up @@ -536,6 +538,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqqueue.erl",
"src/rabbit_amqqueue_control.erl",
"src/rabbit_amqqueue_process.erl",
"src/rabbit_amqqueue_sup.erl",
"src/rabbit_amqqueue_sup_sup.erl",
Expand Down
109 changes: 108 additions & 1 deletion deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@
-export([delete_crashed/1,
delete_crashed/2,
delete_crashed_internal/2]).

-export([delete_with/4, delete_with/6]).
-export([pid_of/1, pid_of/2]).
-export([pid_or_crashed/2]).
-export([mark_local_durable_queues_stopped/1]).

-export([rebalance/3]).
Expand All @@ -70,6 +71,8 @@

-export([prepend_extra_bcc/1]).

-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
Expand Down Expand Up @@ -108,6 +111,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
active, activity_status, arguments]).
-define(KILL_QUEUE_DELAY_INTERVAL, 100).

warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
Expand Down Expand Up @@ -1581,6 +1585,51 @@ delete_immediately_by_resource(Resources) ->
delete(Q, IfUnused, IfEmpty, ActingUser) ->
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser).

-spec delete_with(amqqueue:amqqueue() | name(), boolean(), boolean(), rabbit_types:username()) ->
rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit().
delete_with(QueueName, IfUnused, IfEmpty, ActingUser) ->
delete_with(QueueName, undefined, IfUnused, IfEmpty, ActingUser, false).

-spec delete_with(amqqueue:amqqueue() | name(), pid() | undefined, boolean(), boolean(), rabbit_types:username(), boolean()) ->
rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit().
delete_with(AMQQueue, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when ?is_amqqueue(AMQQueue) ->
QueueName = amqqueue:get_name(AMQQueue),
delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive);
delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when is_record(QueueName, resource) ->
case with(
QueueName,
fun (Q) ->
if CheckExclusive ->
check_exclusive_access(Q, ConnPid);
true ->
ok
end,
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username)
end,
fun (not_found) ->
{ok, 0};
({absent, Q, crashed}) ->
_ = delete_crashed(Q, Username),
{ok, 0};
({absent, Q, stopped}) ->
_ = delete_crashed(Q, Username),
{ok, 0};
({absent, Q, Reason}) ->
absent(Q, Reason)
end) of
{error, in_use} ->
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
rabbit_misc:precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]);
{error, {exit, _, _}} ->
%% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}}
{ok, 0};
{ok, Count} ->
{ok, Count};
{protocol_error, Type, Reason, ReasonArgs} ->
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end.

%% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS
delete_crashed(Q) when ?amqqueue_is_classic(Q) ->
rabbit_classic_queue:delete_crashed(Q).
Expand Down Expand Up @@ -2039,3 +2088,61 @@ get_bcc_queue(Q, BCCName) ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
BCCQueueName = rabbit_misc:r(VHost, queue, BCCName),
rabbit_amqqueue:lookup(BCCQueueName).

-spec kill_queue_hard(node(), name()) -> ok.
kill_queue_hard(Node, QRes = #resource{kind = queue}) ->
kill_queue_hard(Node, QRes, boom).

-spec kill_queue_hard(node(), name(), atom()) -> ok.
kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) ->
case kill_queue(Node, QRes, Reason) of
crashed -> ok;
stopped -> ok;
NewPid when is_pid(NewPid) ->
timer:sleep(?KILL_QUEUE_DELAY_INTERVAL),
kill_queue_hard(Node, QRes, Reason);
Error -> Error
end.

-spec kill_queue(node(), name()) -> pid() | crashed | stopped | rabbit_types:error(term()).
kill_queue(Node, QRes = #resource{kind = queue}) ->
kill_queue(Node, QRes, boom).

-spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped | rabbit_types:error(term()).
kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) ->
Pid1 = pid_or_crashed(Node, QRes),
exit(Pid1, Reason),
rabbit_amqqueue_control:await_state(Node, QRes, stopped),
stopped;
kill_queue(Node, QRes = #resource{kind = queue}, Reason) ->
case pid_or_crashed(Node, QRes) of
Pid1 when is_pid(Pid1) ->
exit(Pid1, Reason),
rabbit_amqqueue_control:await_new_pid(Node, QRes, Pid1);
crashed ->
crashed;
Error ->
Error
end.

-spec pid_or_crashed(node(), name()) -> pid() | crashed | rabbit_types:error(term()).
pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
case rpc:call(Node, rabbit_amqqueue, lookup, [QRes]) of
{ok, Q} ->
QPid = amqqueue:get_pid(Q),
State = amqqueue:get_state(Q),
case State of
crashed ->
case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
{error, {queue_supervisor_not_found, _}} -> {error, no_sup};
{ok, SPid} ->
case rabbit_misc:remote_sup_child(Node, SPid) of
{ok, _} -> QPid; %% restarting
{error, no_child} -> crashed %% given up
end
end;
_ -> QPid
end;
Error = {error, _} -> Error;
Reason -> {error, Reason}
end.
57 changes: 57 additions & 0 deletions deps/rabbit/src/rabbit_amqqueue_control.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_amqqueue_control).

-export([await_new_pid/3, await_state/3, await_state/4]).

-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000).
-define(AWAIT_NEW_PID_DELAY_INTERVAL, 10).
-define(AWAIT_STATE_DELAY_INTERVAL, 100).
-define(AWAIT_STATE_DELAY_TIME_DELTA, 100).

-include_lib("rabbit_common/include/resource.hrl").

-spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid().
await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) ->
case rabbit_amqqueue:pid_or_crashed(Node, QRes) of
OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL),
await_new_pid(Node, QRes, OldPid);
New -> New
end.

-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'.
await_state(Node, QName, State) when is_binary(QName) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
await_state(Node, QRes, State);
await_state(Node, QRes = #resource{kind = queue}, State) ->
await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT).

-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'.
await_state(Node, QName, State, Time) when is_binary(QName) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
await_state(Node, QRes, State, Time);
await_state(Node, QRes = #resource{kind = queue}, State, Time) ->
case state(Node, QRes) of
State ->
ok;
Other ->
case Time of
0 -> exit({timeout_awaiting_state, State, Other});
_ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL),
await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA)
end
end.

state(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]),
fetch_state(QRes, Infos).

fetch_state(_QRes, []) -> undefined;
fetch_state(QRes, [[{name, QRes}, {state, State}] | _]) -> State;
fetch_state(QRes, [[{name, _}, {state, _State}] | Rem]) ->
fetch_state(QRes, Rem).
Loading