Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions deps/rabbitmq_cli/lib/rabbitmq/cli/core/users.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Core.Users do
# Defined here to not drag in rabbit.hrl and Erlang compilation in an Elixir
# sub-project
@internal_user "rmq-internal"
@cli_user "cli-user"

def internal_user do
@internal_user
end

def cli_user do
@cli_user
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
alias RabbitMQ.CLI.Core.DocGuide
alias RabbitMQ.CLI.Core.{DocGuide, Users}

@behaviour RabbitMQ.CLI.CommandBehaviour

def switches(), do: [if_empty: :boolean, if_unused: :boolean, timeout: :integer]
def switches(), do: [if_empty: :boolean, if_unused: :boolean, force: :boolean, timeout: :integer]
def aliases(), do: [e: :if_empty, u: :if_unused, t: :timeout]

def merge_defaults(args, opts) do
{
args,
Map.merge(%{if_empty: false, if_unused: false, vhost: "/"}, opts)
Map.merge(%{if_empty: false, if_unused: false, force: false, vhost: "/"}, opts)
}
end

Expand Down Expand Up @@ -46,37 +46,49 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
vhost: vhost,
if_empty: if_empty,
if_unused: if_unused,
force: force,
timeout: timeout
}) do
## Generate queue resource name from queue name and vhost
queue_resource = :rabbit_misc.r(vhost, :queue, qname)
user = if force, do: Users.internal_user, else: Users.cli_user
## Lookup a queue on broker node using resource name
case :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :lookup, [queue_resource]) do
{:ok, queue} ->
## Delete queue
:rabbit_misc.rpc_call(
node,
:rabbit_amqqueue,
:delete_with,
[queue, if_unused, if_empty, "cli_user"],
timeout
)
case :rabbit_misc.rpc_call(node,
:rabbit_amqqueue,
:delete_with,
[queue, if_unused, if_empty, user],
timeout
) do
{:ok, _} = ok -> ok

{:badrpc, {:EXIT, {:amqp_error, :resource_locked, _, :none}}} ->
{:error, :protected}

other_error -> other_error
end

{:error, _} = error ->
error
end
end

def output({:error, :protected}, _options) do
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is locked or protected from deletion"}
end

def output({:error, :not_found}, _options) do
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue not found"}
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "No such queue was found"}
end

def output({:error, :not_empty}, _options) do
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue is not empty"}
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is not empty"}
end

def output({:error, :in_use}, _options) do
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue is in use"}
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is in use"}
end

def output({:ok, qlen}, _options) do
Expand All @@ -103,14 +115,15 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
Enum.join(Enum.concat([if_empty_str, if_unused_str]), "and ") <> "..."
end

def usage(), do: "delete_queue [--vhost <vhost>] <queue_name> [--if-empty|-e] [--if-unused|-u]"
def usage(), do: "delete_queue [--vhost <vhost>] <queue_name> [--if-empty|-e] [--if-unused|-u] [--force]"

def usage_additional() do
[
["--vhost", "Virtual host name"],
["<queue_name>", "name of the queue to delete"],
["--if-empty", "delete the queue if it is empty (has no messages ready for delivery)"],
["--if-unused", "delete the queue only if it has no consumers"]
["--if-unused", "delete the queue only if it has no consumers"],
["--force", "delete the queue even if it is protected"]
]
end

Expand Down
34 changes: 27 additions & 7 deletions deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ defmodule DeleteQueueCommandTest do
vhost: @vhost,
timeout: context[:test_timeout],
if_empty: false,
if_unused: false
if_unused: false,
force: false
}}
end

test "merge_defaults: defaults can be overridden" do
assert @command.merge_defaults([], %{}) ==
{[], %{vhost: "/", if_empty: false, if_unused: false}}
{[], %{vhost: "/", if_empty: false, if_unused: false, force: false}}

assert @command.merge_defaults([], %{vhost: "non_default", if_empty: true}) ==
{[], %{vhost: "non_default", if_empty: true, if_unused: false}}
{[], %{vhost: "non_default", if_empty: true, if_unused: false, force: false}}
end

test "validate: providing no queue name fails validation", context do
Expand Down Expand Up @@ -76,6 +77,25 @@ defmodule DeleteQueueCommandTest do
{:error, :not_found} = lookup_queue(q, @vhost)
end

@tag test_timeout: 30000
test "run: protected queue can be deleted only with --force", context do
add_vhost(@vhost)
set_permissions(@user, @vhost, [".*", ".*", ".*"])
on_exit(context, fn -> delete_vhost(@vhost) end)

q = "foo"
n = 20

declare_internal_queue(q, @vhost)
publish_messages(@vhost, q, n)

assert @command.run([q], context[:opts]) == {:error, :protected}
{:ok, _queue} = lookup_queue(q, @vhost)

assert @command.run([q], %{context[:opts] | force: true}) == {:ok, n}
{:error, :not_found} = lookup_queue(q, @vhost)
end

@tag test_timeout: 30000
test "run: request to an existing crashed queue on active node succeeds", context do
add_vhost(@vhost)
Expand Down Expand Up @@ -135,7 +155,7 @@ defmodule DeleteQueueCommandTest do

test "defaults to vhost /" do
assert @command.merge_defaults(["foo"], %{bar: "baz"}) ==
{["foo"], %{bar: "baz", vhost: "/", if_unused: false, if_empty: false}}
{["foo"], %{bar: "baz", vhost: "/", if_unused: false, if_empty: false, force: false}}
end

test "validate: with extra arguments returns an arg count error" do
Expand All @@ -152,13 +172,13 @@ defmodule DeleteQueueCommandTest do
end

test "banner informs that vhost's queue is deleted" do
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: false, if_unused: false}) ==
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: false, if_unused: false, force: false}) ==
"Deleting queue 'my-q' on vhost '/foo' ..."

assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: false}) ==
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: false, force: false}) ==
"Deleting queue 'my-q' on vhost '/foo' if queue is empty ..."

assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: true}) ==
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: true, force: false}) ==
"Deleting queue 'my-q' on vhost '/foo' if queue is empty and if queue is unused ..."
end
end
28 changes: 28 additions & 0 deletions deps/rabbitmq_cli/test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,34 @@ defmodule TestHelper do
])
end

def declare_internal_queue(
name,
vhost,
durable \\ false,
auto_delete \\ false,
args \\ [],
owner \\ :none
) do
queue_name = :rabbit_misc.r(vhost, :queue, name)

amqqueue = :amqqueue.new(
queue_name,
:none,
durable,
auto_delete,
owner,
args,
vhost,
%{})

internal_amqqueue = :amqqueue.make_internal(amqqueue)

:rpc.call(get_rabbit_hostname(), :rabbit_queue_type, :declare, [
internal_amqqueue,
get_rabbit_hostname()
])
end

def declare_stream(name, vhost) do
declare_queue(name, vhost, true, false, [{"x-queue-type", :longstr, "stream"}])
end
Expand Down