Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster wide queue limit - main #11212

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "cluster_limit_SUITE",
size = "medium",
additional_beam = [
":test_queue_utils_beam",
],
)

rabbitmq_integration_suite(
name = "clustering_management_SUITE",
size = "large",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)

erlang_bytecode(
name = "cluster_limit_SUITE_beam_files",
testonly = True,
srcs = ["test/cluster_limit_SUITE.erl"],
outs = ["test/cluster_limit_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "metadata_store_clustering_SUITE_beam_files",
testonly = True,
Expand Down
15 changes: 15 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,21 @@ fun(Conf) ->
end}.


{mapping, "cluster_queue_limit", "rabbit.cluster_queue_limit",
[{datatype, [{atom, infinity}, integer]}]}.

{translation, "rabbit.cluster_queue_limit",
fun(Conf) ->
case cuttlefish:conf_get("cluster_queue_limit", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) andalso Val > 0 -> Val;
_ -> cuttlefish:invalid("should be positive integer or 'infinity'")
end
end
}.


%% Interval (in milliseconds) at which we send keepalive messages
%% to other cluster members. Note that this is not the same thing
%% as net_ticktime; missed keepalive messages will not cause nodes
Expand Down
21 changes: 19 additions & 2 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,8 @@ known_queue_type_names() ->
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
check_queue_limits(Q) ->
maybe
%% Prepare for more checks
ok ?= check_vhost_queue_limit(Q)
ok ?= check_vhost_queue_limit(Q),
ok ?= check_cluster_queue_limit(Q)
end.

check_vhost_queue_limit(Q) ->
Expand All @@ -793,3 +793,20 @@ check_vhost_queue_limit(Q) ->
"queue limit in vhost '~ts' (~tp) is reached",
[QueueName, VHost, Limit]}
end.

check_cluster_queue_limit(Q) ->
#resource{name = QueueName} = amqqueue:get_name(Q),
case rabbit_misc:get_env(rabbit, cluster_queue_limit, infinity) of
infinity ->
ok;
Limit ->
case rabbit_db_queue:count() >= Limit of
true ->
{protocol_error, precondition_failed,
"cannot declare queue '~ts': "
"queue limit in cluster (~tp) is reached",
[QueueName, Limit]};
false ->
ok
end
end.
151 changes: 151 additions & 0 deletions deps/rabbit/test/cluster_limit_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.


-module(cluster_limit_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([nowarn_export_all, export_all]).


all() ->
[
{group, clustered}
].

groups() ->
[
{clustered, [],
[
{size_2, [], [queue_limit]}
]}
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{quorum_tick_interval, 1000},
{cluster_queue_limit, 3}]}),
rabbit_ct_helpers:run_setup_steps(Config1, []).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(clustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
init_per_group(Group, Config) ->
ClusterSize = case Group of
size_2 -> 2
end,
IsMixed = rabbit_ct_helpers:is_mixed_versions(),
case ClusterSize of
2 when IsMixed ->
{skip, "cluster size 2 isn't mixed versions compatible"};
_ ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodes_count, ClusterSize},
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps())
end.

end_per_group(clustered, Config) ->
Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
Q = rabbit_data_coercion:to_binary(Testcase),
Config2 = rabbit_ct_helpers:set_config(Config1,
[{queue_name, Q},
{alt_queue_name, <<Q/binary, "_alt">>},
{alt_2_queue_name, <<Q/binary, "_alt_2">>},
{over_limit_queue_name, <<Q/binary, "_over_limit">>}
]),
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).

merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(
rabbit_ct_helpers:merge_app_env(Config,
{rabbit, [{core_metrics_gc_interval, 100}]}),
{ra, [{min_wal_roll_over_interval, 30000}]}).

end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).


%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

queue_limit(Config) ->
[Server0, Server1] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q1 = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch, Q1)),

Q2 = ?config(alt_queue_name, Config),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch, Q2)),

Q3 = ?config(alt_2_queue_name, Config),
?assertEqual({'queue.declare_ok', Q3, 0, 0},
declare(Ch, Q3)),
Q4 = ?config(over_limit_queue_name, Config),
ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(Ch, Q4)),

%% Trying the second server, in the cluster, but no queues on it,
%% but should still fail as the limit is cluster wide.
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(Ch2, Q4)),

%Trying other types of queues
ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0),
ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
ok.

declare(Ch, Q) ->
declare(Ch, Q, []).

declare(Ch, Q, Args) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,
auto_delete = false,
arguments = Args}).

delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].