Skip to content

Commit

Permalink
cluster wide queue limit
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonUnge authored and michaelklishin committed May 22, 2024
1 parent 9c30562 commit 909b964
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 2 deletions.
8 changes: 8 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,14 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "cluster_limit_SUITE",
size = "medium",
additional_beam = [
":test_queue_utils_beam",
],
)

rabbitmq_integration_suite(
name = "clustering_management_SUITE",
size = "large",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2079,6 +2079,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "cluster_limit_SUITE_beam_files",
testonly = True,
srcs = ["test/cluster_limit_SUITE.erl"],
outs = ["test/cluster_limit_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "message_containers_SUITE_beam_files",
testonly = True,
Expand Down
15 changes: 15 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,21 @@ fun(Conf) ->
end}.


{mapping, "cluster_queue_limit", "rabbit.cluster_queue_limit",
[{datatype, [{atom, infinity}, integer]}]}.

{translation, "rabbit.cluster_queue_limit",
fun(Conf) ->
case cuttlefish:conf_get("cluster_queue_limit", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) andalso Val > 0 -> Val;
_ -> cuttlefish:invalid("should be positive integer or 'infinity'")
end
end
}.


%% Interval (in milliseconds) at which we send keepalive messages
%% to other cluster members. Note that this is not the same thing
%% as net_ticktime; missed keepalive messages will not cause nodes
Expand Down
21 changes: 19 additions & 2 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,8 @@ known_queue_type_names() ->
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
check_queue_limits(Q) ->
maybe
%% Prepare for more checks
ok ?= check_vhost_queue_limit(Q)
ok ?= check_vhost_queue_limit(Q),
ok ?= check_cluster_queue_limit(Q)
end.

check_vhost_queue_limit(Q) ->
Expand All @@ -758,3 +758,20 @@ check_vhost_queue_limit(Q) ->
"queue limit in vhost '~ts' (~tp) is reached",
[QueueName, VHost, Limit]}
end.

check_cluster_queue_limit(Q) ->
#resource{name = QueueName} = amqqueue:get_name(Q),
case rabbit_misc:get_env(rabbit, cluster_queue_limit, infinity) of
infinity ->
ok;
Limit ->
case rabbit_db_queue:count() >= Limit of
true ->
{protocol_error, precondition_failed,
"cannot declare queue '~ts': "
"queue limit in cluster (~tp) is reached",
[QueueName, Limit]};
false ->
ok
end
end.
151 changes: 151 additions & 0 deletions deps/rabbit/test/cluster_limit_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.


-module(cluster_limit_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([nowarn_export_all, export_all]).


all() ->
[
{group, clustered}
].

groups() ->
[
{clustered, [],
[
{size_2, [], [queue_limit]}
]}
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{quorum_tick_interval, 1000},
{cluster_queue_limit, 3}]}),
rabbit_ct_helpers:run_setup_steps(Config1, []).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(clustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
init_per_group(Group, Config) ->
ClusterSize = case Group of
size_2 -> 2
end,
IsMixed = rabbit_ct_helpers:is_mixed_versions(),
case ClusterSize of
2 when IsMixed ->
{skip, "cluster size 2 isn't mixed versions compatible"};
_ ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodes_count, ClusterSize},
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps())
end.

end_per_group(clustered, Config) ->
Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
Q = rabbit_data_coercion:to_binary(Testcase),
Config2 = rabbit_ct_helpers:set_config(Config1,
[{queue_name, Q},
{alt_queue_name, <<Q/binary, "_alt">>},
{alt_2_queue_name, <<Q/binary, "_alt_2">>},
{over_limit_queue_name, <<Q/binary, "_over_limit">>}
]),
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).

merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(
rabbit_ct_helpers:merge_app_env(Config,
{rabbit, [{core_metrics_gc_interval, 100}]}),
{ra, [{min_wal_roll_over_interval, 30000}]}).

end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).


%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

queue_limit(Config) ->
[Server0, Server1] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q1 = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch, Q1)),

Q2 = ?config(alt_queue_name, Config),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch, Q2)),

Q3 = ?config(alt_2_queue_name, Config),
?assertEqual({'queue.declare_ok', Q3, 0, 0},
declare(Ch, Q3)),
Q4 = ?config(over_limit_queue_name, Config),
ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(Ch, Q4)),

%% Trying the second server, in the cluster, but no queues on it,
%% but should still fail as the limit is cluster wide.
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(Ch2, Q4)),

%Trying other types of queues
ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0),
ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
ok.

declare(Ch, Q) ->
declare(Ch, Q, []).

declare(Ch, Q, Args) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,
auto_delete = false,
arguments = Args}).

delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].

0 comments on commit 909b964

Please sign in to comment.