diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/core/users.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/core/users.ex new file mode 100644 index 000000000000..7c584df0fb2c --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/core/users.ex @@ -0,0 +1,20 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Core.Users do + # Defined here to not drag in rabbit.hrl and Erlang compilation in an Elixir + # sub-project + @internal_user "rmq-internal" + @cli_user "cli-user" + + def internal_user do + @internal_user + end + + def cli_user do + @cli_user + end +end \ No newline at end of file diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex index f8cdb87603a4..05807d774bd9 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/delete_queue_command.ex @@ -5,17 +5,17 @@ ## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do - alias RabbitMQ.CLI.Core.DocGuide + alias RabbitMQ.CLI.Core.{DocGuide, Users} @behaviour RabbitMQ.CLI.CommandBehaviour - def switches(), do: [if_empty: :boolean, if_unused: :boolean, timeout: :integer] + def switches(), do: [if_empty: :boolean, if_unused: :boolean, force: :boolean, timeout: :integer] def aliases(), do: [e: :if_empty, u: :if_unused, t: :timeout] def merge_defaults(args, opts) do { args, - Map.merge(%{if_empty: false, if_unused: false, vhost: "/"}, opts) + Map.merge(%{if_empty: false, if_unused: false, force: false, vhost: "/"}, opts) } end @@ -46,37 +46,49 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do vhost: vhost, if_empty: if_empty, if_unused: if_unused, + force: force, timeout: timeout }) do ## Generate queue resource name from queue name and vhost queue_resource = :rabbit_misc.r(vhost, :queue, qname) + user = if force, do: Users.internal_user, else: Users.cli_user ## Lookup a queue on broker node using resource name case :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :lookup, [queue_resource]) do {:ok, queue} -> ## Delete queue - :rabbit_misc.rpc_call( - node, - :rabbit_amqqueue, - :delete_with, - [queue, if_unused, if_empty, "cli_user"], - timeout - ) + case :rabbit_misc.rpc_call(node, + :rabbit_amqqueue, + :delete_with, + [queue, if_unused, if_empty, user], + timeout + ) do + {:ok, _} = ok -> ok + + {:badrpc, {:EXIT, {:amqp_error, :resource_locked, _, :none}}} -> + {:error, :protected} + + other_error -> other_error + end {:error, _} = error -> error end end + def output({:error, :protected}, _options) do + {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is locked or protected from deletion"} + end + def output({:error, :not_found}, _options) do - {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue not found"} + {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "No such queue was found"} end def output({:error, :not_empty}, _options) do - {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue is not empty"} + {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is not empty"} end def output({:error, :in_use}, _options) do - {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue is in use"} + {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is in use"} end def output({:ok, qlen}, _options) do @@ -103,14 +115,15 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do Enum.join(Enum.concat([if_empty_str, if_unused_str]), "and ") <> "..." end - def usage(), do: "delete_queue [--vhost ] [--if-empty|-e] [--if-unused|-u]" + def usage(), do: "delete_queue [--vhost ] [--if-empty|-e] [--if-unused|-u] [--force]" def usage_additional() do [ ["--vhost", "Virtual host name"], ["", "name of the queue to delete"], ["--if-empty", "delete the queue if it is empty (has no messages ready for delivery)"], - ["--if-unused", "delete the queue only if it has no consumers"] + ["--if-unused", "delete the queue only if it has no consumers"], + ["--force", "delete the queue even if it is protected"] ] end diff --git a/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs b/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs index 6ff38e0d1a51..9c153e28eba1 100644 --- a/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/delete_queue_command_test.exs @@ -25,16 +25,17 @@ defmodule DeleteQueueCommandTest do vhost: @vhost, timeout: context[:test_timeout], if_empty: false, - if_unused: false + if_unused: false, + force: false }} end test "merge_defaults: defaults can be overridden" do assert @command.merge_defaults([], %{}) == - {[], %{vhost: "/", if_empty: false, if_unused: false}} + {[], %{vhost: "/", if_empty: false, if_unused: false, force: false}} assert @command.merge_defaults([], %{vhost: "non_default", if_empty: true}) == - {[], %{vhost: "non_default", if_empty: true, if_unused: false}} + {[], %{vhost: "non_default", if_empty: true, if_unused: false, force: false}} end test "validate: providing no queue name fails validation", context do @@ -76,6 +77,25 @@ defmodule DeleteQueueCommandTest do {:error, :not_found} = lookup_queue(q, @vhost) end + @tag test_timeout: 30000 + test "run: protected queue can be deleted only with --force", context do + add_vhost(@vhost) + set_permissions(@user, @vhost, [".*", ".*", ".*"]) + on_exit(context, fn -> delete_vhost(@vhost) end) + + q = "foo" + n = 20 + + declare_internal_queue(q, @vhost) + publish_messages(@vhost, q, n) + + assert @command.run([q], context[:opts]) == {:error, :protected} + {:ok, _queue} = lookup_queue(q, @vhost) + + assert @command.run([q], %{context[:opts] | force: true}) == {:ok, n} + {:error, :not_found} = lookup_queue(q, @vhost) + end + @tag test_timeout: 30000 test "run: request to an existing crashed queue on active node succeeds", context do add_vhost(@vhost) @@ -135,7 +155,7 @@ defmodule DeleteQueueCommandTest do test "defaults to vhost /" do assert @command.merge_defaults(["foo"], %{bar: "baz"}) == - {["foo"], %{bar: "baz", vhost: "/", if_unused: false, if_empty: false}} + {["foo"], %{bar: "baz", vhost: "/", if_unused: false, if_empty: false, force: false}} end test "validate: with extra arguments returns an arg count error" do @@ -152,13 +172,13 @@ defmodule DeleteQueueCommandTest do end test "banner informs that vhost's queue is deleted" do - assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: false, if_unused: false}) == + assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: false, if_unused: false, force: false}) == "Deleting queue 'my-q' on vhost '/foo' ..." - assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: false}) == + assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: false, force: false}) == "Deleting queue 'my-q' on vhost '/foo' if queue is empty ..." - assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: true}) == + assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: true, force: false}) == "Deleting queue 'my-q' on vhost '/foo' if queue is empty and if queue is unused ..." end end diff --git a/deps/rabbitmq_cli/test/test_helper.exs b/deps/rabbitmq_cli/test/test_helper.exs index d36d6746b87f..5bebf4d98e4d 100644 --- a/deps/rabbitmq_cli/test/test_helper.exs +++ b/deps/rabbitmq_cli/test/test_helper.exs @@ -302,6 +302,34 @@ defmodule TestHelper do ]) end + def declare_internal_queue( + name, + vhost, + durable \\ false, + auto_delete \\ false, + args \\ [], + owner \\ :none + ) do + queue_name = :rabbit_misc.r(vhost, :queue, name) + + amqqueue = :amqqueue.new( + queue_name, + :none, + durable, + auto_delete, + owner, + args, + vhost, + %{}) + + internal_amqqueue = :amqqueue.make_internal(amqqueue) + + :rpc.call(get_rabbit_hostname(), :rabbit_queue_type, :declare, [ + internal_amqqueue, + get_rabbit_hostname() + ]) + end + def declare_stream(name, vhost) do declare_queue(name, vhost, true, false, [{"x-queue-type", :longstr, "stream"}]) end