Skip to content

Commit

Permalink
Merge pull request #11496 from rabbitmq/speed-up-rabbit-managements-t…
Browse files Browse the repository at this point in the history
…ests

Speed up rabbit managements tests
  • Loading branch information
michaelklishin committed Jun 21, 2024
2 parents e316feb + db91415 commit 0f1804e
Show file tree
Hide file tree
Showing 8 changed files with 632 additions and 481 deletions.
9 changes: 5 additions & 4 deletions deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ merge_stats_app_env(Config, Interval, SampleInterval) ->
Config1 = rabbit_ct_helpers:merge_app_env(
Config, {rabbit, [{collect_statistics_interval, Interval}]}),
rabbit_ct_helpers:merge_app_env(
Config1, {rabbitmq_management_agent, [{sample_retention_policies,
[{global, [{605, SampleInterval}]},
{basic, [{605, SampleInterval}]},
{detailed, [{10, SampleInterval}]}] }]}).
Config1, {rabbitmq_management_agent,
[{sample_retention_policies,
[{global, [{605, SampleInterval}]},
{basic, [{605, SampleInterval}]},
{detailed, [{10, SampleInterval}]}] }]}).
http_get_from_node(Config, Node, Path) ->
{ok, {{_HTTP, CodeAct, _}, Headers, ResBody}} =
req(Config, Node, get, Path, [auth_header("guest", "guest")]),
Expand Down
164 changes: 86 additions & 78 deletions deps/rabbitmq_management/test/clustering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit_core_metrics.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-import(rabbit_ct_broker_helpers, [get_node_config/3, restart_node/2]).
-import(rabbit_mgmt_test_util, [http_get/2, http_put/4, http_post/4, http_delete/3, http_delete/4]).
Expand All @@ -20,6 +21,8 @@
-compile(nowarn_export_all).
-compile(export_all).

-define(STATS_INTERVAL, 250).

all() ->
[
{group, non_parallel_tests}
Expand Down Expand Up @@ -62,11 +65,11 @@ groups() ->
%% -------------------------------------------------------------------

merge_app_env(Config) ->
Config1 = rabbit_ct_helpers:merge_app_env(Config,
{rabbit, [
{collect_statistics, fine},
{collect_statistics_interval, 500}
]}),
Config1 = rabbit_ct_helpers:merge_app_env(
Config, {rabbit, [
{collect_statistics, fine},
{collect_statistics_interval, ?STATS_INTERVAL}
]}),
rabbit_ct_helpers:merge_app_env(Config1,
{rabbitmq_management_agent, [
{rates_mode, detailed},
Expand Down Expand Up @@ -140,15 +143,17 @@ qq_replicas_delete(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
{ok, Chan} = amqp_connection:open_channel(Conn),
_ = queue_declare_quorum(Chan, <<"qq.23">>),
_ = wait_for_queue(Config, "/queues/%2F/qq.23"),

?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.23"), 30000),

Nodename1 = rabbit_data_coercion:to_binary(get_node_config(Config, 1, nodename)),
Body = [{node, Nodename1}],
http_post(Config, "/queues/quorum/%2F/qq.23/replicas/add", Body, ?NO_CONTENT),
timer:sleep(1100),

http_delete(Config, "/queues/quorum/%2F/qq.23/replicas/delete", ?ACCEPTED, Body),
timer:sleep(1100),
?awaitMatch(#{members := [_]}, http_get(Config, "/queues/%2F/qq.23"), 30000),

http_post(Config, "/queues/quorum/%2F/qq.23/replicas/add", Body, ?NO_CONTENT),
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.23"), 30000),

http_delete(Config, "/queues/%2F/qq.23", ?NO_CONTENT),

Expand All @@ -161,16 +166,19 @@ qq_replicas_grow(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
{ok, Chan} = amqp_connection:open_channel(Conn),
_ = queue_declare_quorum(Chan, <<"qq.24">>),
_ = wait_for_queue(Config, "/queues/%2F/qq.24"),

Nodename1 = rabbit_data_coercion:to_list(get_node_config(Config, 1, nodename)),
Body = [
{strategy, <<"all">>},
{queue_pattern, <<"qq.24">>},
{vhost_pattern, <<".*">>}
],
http_post(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/grow", Body, ?NO_CONTENT),
timer:sleep(1100),
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.24"), 30000),
http_delete(Config, "/queues/quorum/%2F/qq.24/replicas/delete", ?ACCEPTED,
[{node, Nodename1}]),
?awaitMatch(#{members := [_]}, http_get(Config, "/queues/%2F/qq.24"), 30000),

Body = [{strategy, <<"all">>},
{queue_pattern, <<"qq.24">>},
{vhost_pattern, <<".*">>}],
http_post(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/grow",
Body, ?NO_CONTENT),
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.24"), 30000),

http_delete(Config, "/queues/%2F/qq.24", ?NO_CONTENT),

Expand All @@ -182,22 +190,17 @@ qq_replicas_grow(Config) ->
qq_replicas_shrink(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
{ok, Chan} = amqp_connection:open_channel(Conn),
_ = queue_declare_quorum(Chan, <<"qq.24">>),
_ = wait_for_queue(Config, "/queues/%2F/qq.24"),
_ = queue_declare_quorum(Chan, <<"qq.25">>),
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.25"), 30000),

?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.25"), 30000),
Nodename1 = rabbit_data_coercion:to_list(get_node_config(Config, 1, nodename)),
Body = [
{strategy, <<"all">>},
{queue_pattern, <<"qq.24">>},
{vhost_pattern, <<".*">>}
],
http_post(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/grow", Body, ?NO_CONTENT),
timer:sleep(1100),

http_delete(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/shrink", ?ACCEPTED),
timer:sleep(1100),
http_delete(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/shrink",
?ACCEPTED),
?awaitMatch(#{members := [_]}, http_get(Config, "/queues/%2F/qq.25"), 30000),

http_delete(Config, "/queues/%2F/qq.24", ?NO_CONTENT),
http_delete(Config, "/queues/%2F/qq.25", ?NO_CONTENT),

amqp_channel:close(Chan),
rabbit_ct_client_helpers:close_connection(Conn),
Expand All @@ -213,8 +216,7 @@ queue_on_other_node(Config) ->
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
consume(Chan2, <<"some-queue">>),

timer:sleep(5100),
force_stats(),
force_stats(Config),
Res = http_get(Config, "/queues/%2F/some-queue"),

% assert some basic data is present
Expand Down Expand Up @@ -249,8 +251,7 @@ queue_with_multiple_consumers(Config) ->
amqp_channel:cast(Chan, #'basic.ack'{delivery_tag = T})
end,

timer:sleep(5100),
force_stats(),
force_stats(Config),

Res = http_get(Config, "/queues/%2F/multi-consumer-queue1"),
http_delete(Config, "/queues/%2F/multi-consumer-queue1", ?NO_CONTENT),
Expand Down Expand Up @@ -280,7 +281,7 @@ queue_consumer_cancelled(Config) ->

#'basic.cancel_ok'{} =
amqp_channel:call(Chan, #'basic.cancel'{consumer_tag = Tag}),
force_stats(),
force_stats(Config),
Res = http_get(Config, "/queues/%2F/some-queue"),

amqp_channel:close(Chan),
Expand All @@ -297,10 +298,10 @@ queue_consumer_channel_closed(Config) ->
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),

consume(Chan, <<"some-queue">>),
force_stats(), % ensure channel stats have been written
force_stats(Config), % ensure channel stats have been written

amqp_channel:close(Chan),
force_stats(),
force_stats(Config),

Res = http_get(Config, "/queues/%2F/some-queue"),
% assert there are no consumer details
Expand All @@ -321,8 +322,8 @@ queue(Config) ->
basic_get(Chan, <<"some-queue">>),
publish(Chan2, <<"some-queue">>),
basic_get(Chan2, <<"some-queue">>),
force_stats(),
timer:sleep(5100),
force_stats(Config),

Res = http_get(Config, "/queues/%2F/some-queue"),
% assert single queue is returned
[#{} | _] = maps:get(deliveries, Res),
Expand All @@ -337,7 +338,7 @@ queues_single(Config) ->
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),

force_stats(),
force_stats(Config),
Res = http_get(Config, "/queues/%2F"),
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),

Expand All @@ -353,8 +354,7 @@ queues_multiple(Config) ->
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
_ = wait_for_queue(Config, "/queues/%2F/some-other-queue"),

force_stats(),
timer:sleep(5100),
force_stats(Config),

Res = http_get(Config, "/queues/%2F"),
[Q1, Q2 | _] = Res,
Expand All @@ -370,10 +370,10 @@ queues_multiple(Config) ->

queues_removed(Config) ->
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
force_stats(),
force_stats(Config),
N = length(http_get(Config, "/queues/%2F")),
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
force_stats(),
force_stats(Config),
?assertEqual(N - 1, length(http_get(Config, "/queues/%2F"))),
ok.

Expand All @@ -387,8 +387,7 @@ channels_multiple_on_different_nodes(Config) ->
{ok, Chan2} = amqp_connection:open_channel(Conn2),
consume(Chan, <<"some-queue">>),

timer:sleep(5100),
force_stats(),
force_stats(Config),

Res = http_get(Config, "/channels"),
% assert two channels are present
Expand All @@ -409,13 +408,12 @@ channel_closed(Config) ->
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),

{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
force_stats(),
force_stats(Config),

consume(Chan2, <<"some-queue">>),
amqp_channel:close(Chan),

timer:sleep(5100),
force_stats(),
force_stats(Config),

Res = http_get(Config, "/channels"),
% assert one channel is present
Expand All @@ -432,8 +430,8 @@ channel(Config) ->
[{_, ChData}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [channel_created]),

ChName = uri_string:recompose(#{path => binary_to_list(pget(name, ChData))}),
timer:sleep(5100),
force_stats(),

force_stats(Config),
Res = http_get(Config, "/channels/" ++ ChName ),
% assert channel is non empty
#{} = Res,
Expand All @@ -452,8 +450,8 @@ channel_other_node(Config) ->
consume(Chan, Q),
publish(Chan, Q),

timer:sleep(5100),
force_stats(),
wait_for_collect_statistics_interval(),
force_stats(Config),

Res = http_get(Config, "/channels/" ++ ChName ),
% assert channel is non empty
Expand All @@ -476,8 +474,7 @@ channel_with_consumer_on_other_node(Config) ->
consume(Chan, Q),
publish(Chan, Q),

timer:sleep(5100),
force_stats(),
force_stats(Config),

Res = http_get(Config, "/channels/" ++ ChName),
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
Expand All @@ -498,8 +495,7 @@ channel_with_consumer_on_one_node(Config) ->
ChName = get_channel_name(Config, 0),
consume(Chan, Q),

timer:sleep(5100),
force_stats(),
force_stats(Config),

Res = http_get(Config, "/channels/" ++ ChName),
amqp_channel:close(Chan),
Expand All @@ -521,8 +517,7 @@ consumers(Config) ->
consume(Chan, <<"some-queue">>),
consume(Chan2, <<"some-queue">>),

timer:sleep(5100),
force_stats(),
force_stats(Config),
Res = http_get(Config, "/consumers"),

% assert there are two non-empty consumer records
Expand All @@ -547,8 +542,8 @@ connections(Config) ->
{ok, _Chan2} = amqp_connection:open_channel(Conn2),

%% channel count needs a bit longer for 2nd chan
timer:sleep(5100),
force_stats(),
wait_for_collect_statistics_interval(),
force_stats(Config),

Res = http_get(Config, "/connections"),

Expand All @@ -575,7 +570,7 @@ exchanges(Config) ->
consume(Chan, QName),
publish_to(Chan, XName, <<"some-key">>),

force_stats(),
force_stats(Config),
Res = http_get(Config, "/exchanges"),
[X] = [X || X <- Res, maps:get(name, X) =:= XName],

Expand All @@ -599,8 +594,8 @@ exchange(Config) ->
consume(Chan, QName),
publish_to(Chan, XName, <<"some-key">>),

force_stats(),
force_stats(),
force_stats(Config),
force_stats(Config),
Res = http_get(Config, "/exchanges/%2F/some-other-exchange"),

?assertEqual(<<"direct">>, maps:get(type, Res)),
Expand All @@ -619,8 +614,9 @@ vhosts(Config) ->
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, Chan2} = amqp_connection:open_channel(Conn2),
publish(Chan2, <<"some-queue">>),
timer:sleep(5100), % TODO force stat emission
force_stats(),

wait_for_collect_statistics_interval(),
force_stats(Config),
Res = http_get(Config, "/vhosts"),

http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
Expand All @@ -643,8 +639,9 @@ nodes(Config) ->

{ok, Chan2} = amqp_connection:open_channel(Conn),
publish(Chan2, <<"some-queue">>),
timer:sleep(5100), % TODO force stat emission
force_stats(),

wait_for_collect_statistics_interval(),
force_stats(Config),
Res = http_get(Config, "/nodes"),
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),

Expand Down Expand Up @@ -672,8 +669,8 @@ overview(Config) ->
{ok, Chan2} = amqp_connection:open_channel(Conn2),
publish(Chan, <<"queue-n1">>),
publish(Chan2, <<"queue-n2">>),
timer:sleep(5100), % TODO force stat emission
force_stats(), % channel count needs a bit longer for 2nd chan
wait_for_collect_statistics_interval(),
force_stats(Config), % channel count needs a bit longer for 2nd chan
Res = http_get(Config, "/overview"),

http_delete(Config, "/queues/%2F/queue-n1", ?NO_CONTENT),
Expand Down Expand Up @@ -833,16 +830,24 @@ dump_table(Config, Table) ->
Data0 = rabbit_ct_broker_helpers:rpc(Config, 1, ets, tab2list, [Table]),
ct:pal(?LOW_IMPORTANCE, "Node 1: Dump of table ~tp:~n~tp~n", [Table, Data0]).

force_stats() ->
force_all(),
timer:sleep(2000).
force_stats(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
force_all(Nodes),
ok.

force_all() ->
[begin
{rabbit_mgmt_external_stats, N} ! emit_update,
timer:sleep(125)
end || N <- [node() | nodes()]],
send_to_all_collectors(collect_metrics).
force_all(Nodes) ->
lists:append(
[begin
ExtStats = {rabbit_mgmt_external_stats, N},
ExtStats ! emit_update,
[ExtStats |
[begin
Name = {rabbit_mgmt_metrics_collector:name(Table), N},
Name ! collect_metrics,
Name
end
|| {Table, _} <- ?CORE_TABLES]]
end || N <- Nodes]).

send_to_all_collectors(Msg) ->
[begin
Expand All @@ -860,3 +865,6 @@ listener_proto(Proto) when is_atom(Proto) ->
%% rabbit:status/0 used this formatting before rabbitmq/rabbitmq-cli#340
listener_proto({Proto, _Port, _Interface}) ->
Proto.

wait_for_collect_statistics_interval() ->
timer:sleep(?STATS_INTERVAL * 2).
Loading

0 comments on commit 0f1804e

Please sign in to comment.