From 60ae6c094370999893a78ee99bda3de17effd7ef Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Wed, 8 May 2024 21:20:38 +0000 Subject: [PATCH 1/2] cluster wide queue limit --- deps/rabbit/BUILD.bazel | 8 ++ deps/rabbit/app.bzl | 9 ++ deps/rabbit/priv/schema/rabbit.schema | 15 +++ deps/rabbit/src/rabbit_queue_type.erl | 20 ++- deps/rabbit/test/cluster_limit_SUITE.erl | 151 +++++++++++++++++++++++ 5 files changed, 201 insertions(+), 2 deletions(-) create mode 100644 deps/rabbit/test/cluster_limit_SUITE.erl diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 0ac15abb9e9..2879a225b57 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -337,6 +337,14 @@ rabbitmq_integration_suite( ], ) +rabbitmq_integration_suite( + name = "cluster_limit_SUITE", + size = "medium", + additional_beam = [ + ":test_queue_utils_beam", + ], +) + rabbitmq_integration_suite( name = "clustering_management_SUITE", size = "large", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 177b49355c2..c9adb8e3bbf 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -2091,6 +2091,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"): deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) + erlang_bytecode( + name = "cluster_limit_SUITE_beam_files", + testonly = True, + srcs = ["test/cluster_limit_SUITE.erl"], + outs = ["test/cluster_limit_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], + ) erlang_bytecode( name = "metadata_store_clustering_SUITE_beam_files", testonly = True, diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 53d4556094c..e82dcd45559 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1437,6 +1437,21 @@ fun(Conf) -> end}. +{mapping, "cluster_queue_limit", "rabbit.cluster_queue_limit", + [{datatype, [{atom, infinity}, integer]}]}. + +{translation, "rabbit.cluster_queue_limit", + fun(Conf) -> + case cuttlefish:conf_get("cluster_queue_limit", Conf, undefined) of + undefined -> cuttlefish:unset(); + infinity -> infinity; + Val when is_integer(Val) andalso Val > 0 -> Val; + _ -> cuttlefish:invalid("should be positive integer or 'infinity'") + end + end +}. + + %% Interval (in milliseconds) at which we send keepalive messages %% to other cluster members. Note that this is not the same thing %% as net_ticktime; missed keepalive messages will not cause nodes diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 286a1a1321a..e193c30d7e7 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -777,8 +777,8 @@ known_queue_type_names() -> {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. check_queue_limits(Q) -> maybe - %% Prepare for more checks - ok ?= check_vhost_queue_limit(Q) + ok ?= check_vhost_queue_limit(Q), + ok ?= check_cluster_queue_limit(Q) end. check_vhost_queue_limit(Q) -> @@ -792,4 +792,20 @@ check_vhost_queue_limit(Q) -> "cannot declare queue '~ts': " "queue limit in vhost '~ts' (~tp) is reached", [QueueName, VHost, Limit]} + +check_cluster_queue_limit(Q) -> + #resource{name = QueueName} = amqqueue:get_name(Q), + case rabbit_misc:get_env(rabbit, cluster_queue_limit, infinity) of + infinity -> + ok; + Limit -> + case rabbit_db_queue:count() >= Limit of + true -> + {protocol_error, precondition_failed, + "cannot declare queue '~ts': " + "queue limit in cluster (~tp) is reached", + [QueueName, Limit]}; + false -> + ok + end end. diff --git a/deps/rabbit/test/cluster_limit_SUITE.erl b/deps/rabbit/test/cluster_limit_SUITE.erl new file mode 100644 index 00000000000..0073281b502 --- /dev/null +++ b/deps/rabbit/test/cluster_limit_SUITE.erl @@ -0,0 +1,151 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + + +-module(cluster_limit_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + +-compile([nowarn_export_all, export_all]). + + +all() -> + [ + {group, clustered} + ]. + +groups() -> + [ + {clustered, [], + [ + {size_2, [], [queue_limit]} + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config0) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{quorum_tick_interval, 1000}, + {cluster_queue_limit, 3}]}), + rabbit_ct_helpers:run_setup_steps(Config1, []). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(clustered, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]); +init_per_group(Group, Config) -> + ClusterSize = case Group of + size_2 -> 2 + end, + IsMixed = rabbit_ct_helpers:is_mixed_versions(), + case ClusterSize of + 2 when IsMixed -> + {skip, "cluster size 2 isn't mixed versions compatible"}; + _ -> + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}]), + Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), + rabbit_ct_helpers:run_steps(Config1b, + [fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()) + end. + +end_per_group(clustered, Config) -> + Config; +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{queue_name, Q}, + {alt_queue_name, <>}, + {alt_2_queue_name, <>}, + {over_limit_queue_name, <>} + ]), + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env( + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [{core_metrics_gc_interval, 100}]}), + {ra, [{min_wal_roll_over_interval, 30000}]}). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +queue_limit(Config) -> + [Server0, Server1] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q1 = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, + declare(Ch, Q1)), + + Q2 = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', Q2, 0, 0}, + declare(Ch, Q2)), + + Q3 = ?config(alt_2_queue_name, Config), + ?assertEqual({'queue.declare_ok', Q3, 0, 0}, + declare(Ch, Q3)), + Q4 = ?config(over_limit_queue_name, Config), + ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])), + ?assertExit( + {{shutdown, {server_initiated_close, 406, ExpectedError}}, _}, + declare(Ch, Q4)), + + %% Trying the second server, in the cluster, but no queues on it, + %% but should still fail as the limit is cluster wide. + ?assertExit( + {{shutdown, {server_initiated_close, 406, ExpectedError}}, _}, + declare(Ch2, Q4)), + + %Trying other types of queues + ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0), + ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1), + ?assertExit( + {{shutdown, {server_initiated_close, 406, ExpectedError}}, _}, + declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertExit( + {{shutdown, {server_initiated_close, 406, ExpectedError}}, _}, + declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + ok. + +declare(Ch, Q) -> + declare(Ch, Q, []). + +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + auto_delete = false, + arguments = Args}). + +delete_queues() -> + [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. From 0d68c79e00a6d12225ec7b2433deba1c3d7635ed Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Tue, 21 May 2024 17:57:42 +0000 Subject: [PATCH 2/2] Fix broken code --- deps/rabbit/src/rabbit_queue_type.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index e193c30d7e7..6dd96f74cfd 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -792,6 +792,7 @@ check_vhost_queue_limit(Q) -> "cannot declare queue '~ts': " "queue limit in vhost '~ts' (~tp) is reached", [QueueName, VHost, Limit]} + end. check_cluster_queue_limit(Q) -> #resource{name = QueueName} = amqqueue:get_name(Q),