From 1eb57ab8583f40ca8b3770866ab37dea68220972 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Tue, 25 Jul 2023 18:17:16 +0200 Subject: [PATCH] Use rpcs instead of ctl commands to avoid CI failures (cherry picked from commit 48c1a8245b7c765bd1d25d6e6ae92574669d6538) # Conflicts: # deps/rabbit/test/queue_parallel_SUITE.erl --- deps/rabbit/test/queue_parallel_SUITE.erl | 185 +++++++++++++--------- 1 file changed, 110 insertions(+), 75 deletions(-) diff --git a/deps/rabbit/test/queue_parallel_SUITE.erl b/deps/rabbit/test/queue_parallel_SUITE.erl index 3c63a13d16c..234bda416c5 100644 --- a/deps/rabbit/test/queue_parallel_SUITE.erl +++ b/deps/rabbit/test/queue_parallel_SUITE.erl @@ -16,8 +16,6 @@ -define(TIMEOUT, 30_000). --import(quorum_queue_utils, [wait_for_messages/2]). - all() -> [ {group, parallel_tests} @@ -177,18 +175,18 @@ publish(Config) -> QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). consume(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). consume_first_empty(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -196,7 +194,7 @@ consume_first_empty(Config) -> declare_queue(Ch, Config, QName), consume_empty(Ch, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, true, [<<"msg1">>]), rabbit_ct_client_helpers:close_channel(Ch). @@ -211,11 +209,11 @@ consume_and_autoack(Config) -> QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, true, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, QName, 0, 0, 0). subscribe(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -227,14 +225,14 @@ subscribe(Config) -> amqp_channel:call(Ch, #'basic.qos'{global = false, prefetch_count = 10})), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), CArgs = ?config(consumer_args, Config), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). subscribe_consumers(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -269,13 +267,13 @@ subscribe_with_autoack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), subscribe(Ch, QName, true, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, QName, 0, 0, 0). consume_and_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -283,11 +281,11 @@ consume_and_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -297,12 +295,12 @@ consume_and_multiple_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -313,13 +311,13 @@ subscribe_and_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -331,16 +329,16 @@ subscribe_and_multiple_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -352,7 +350,7 @@ trigger_message_store_compaction(Config) -> N = 12000, [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)], - wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]), + wait_for_messages(Config, QName, 12000, 12000, 0), AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N), ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags), @@ -373,14 +371,14 @@ subscribe_and_requeue_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), @@ -389,10 +387,10 @@ subscribe_and_requeue_multiple_nack(Config) -> receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end end, rabbit_ct_client_helpers:close_channel(Ch), @@ -404,13 +402,13 @@ consume_and_requeue_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + wait_for_messages(Config, QName, 2, 1, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -420,13 +418,13 @@ consume_and_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -436,13 +434,13 @@ consume_and_requeue_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -452,13 +450,13 @@ consume_and_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -469,21 +467,21 @@ subscribe_and_requeue_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end end, rabbit_ct_client_helpers:close_channel(Ch), @@ -496,16 +494,16 @@ subscribe_and_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -517,18 +515,18 @@ subscribe_and_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -542,24 +540,24 @@ basic_cancel(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), CTag = atom_to_binary(?FUNCTION_NAME, utf8), subscribe(Ch, QName, false, CTag, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), ?assertEqual([], lists:filter(fun(Props) -> Resource = proplists:get_value(queue_name, Props), QName == Resource#resource.name end, Consumers)), publish(Ch, QName, [<<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]), + wait_for_messages(Config, QName, 3, 2, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) + wait_for_messages(Config, QName, 2, 2, 0) after 5000 -> exit(basic_deliver_timeout) end, @@ -572,11 +570,11 @@ purge(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), [_] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + wait_for_messages(Config, QName, 2, 1, 1), {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -586,9 +584,9 @@ purge_no_consumer(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -598,11 +596,11 @@ basic_recover(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [_] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -683,8 +681,8 @@ extra_bcc_option(Config) -> }), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), + wait_for_messages(Config, ExtraBCC, 3, 3, 0), delete_queue(Ch, QName), delete_queue(Ch, ExtraBCC). @@ -713,9 +711,9 @@ extra_bcc_option_multiple_1(Config) -> exchange = Exchange}), publish(Ch, <<"ignore">>, [<<"msg">>], Exchange), - wait_for_messages(Config, [[QName1, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName1, 1, 1, 0), + wait_for_messages(Config, QName2, 1, 1, 0), + wait_for_messages(Config, ExtraBCC, 1, 1, 0), delete_exchange(Ch, Exchange), delete_queue(Ch, QName1), @@ -746,9 +744,9 @@ extra_bcc_option_multiple_2(Config) -> end, [QName1, QName2]), publish(Ch, <<"ignore">>, [<<"msg">>], Exchange), - wait_for_messages(Config, [[QName1, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName1, 1, 1, 0), + wait_for_messages(Config, QName2, 1, 1, 0), + wait_for_messages(Config, ExtraBCC, 1, 1, 0), delete_exchange(Ch, Exchange), delete_queue(Ch, QName1), @@ -838,9 +836,46 @@ set_queue_options(Config, QName, Options) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, set_queue_options1, [QName, Options]). set_queue_options1(QName, Options) -> +<<<<<<< HEAD rabbit_misc:execute_mnesia_transaction(fun() -> rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName), fun(Q) -> amqqueue:set_options(Q, Options) end) end). +======= + rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName), + fun(Q) -> + amqqueue:set_options(Q, Options) + end). + +%% Some CI runs fail with `Failed to create dirty cpu scheduler thread..` +%% when running the ctl commands. Let's try with an rpc call to avoid +%% configuration changes which are less portable +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack) -> + wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, 60). + +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, 0) -> + Infos = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all, [<<"/">>, [name, messages, messages_ready, messages_unacknowledged]]), + Resource = rabbit_misc:r(<<"/">>, queue, QName), + [QInfo] = lists:filter(fun(Info) -> + lists:member({name, Resource}, Info) + end, Infos), + ?assertEqual([{name, Resource}, {messages, Msgs}, {messages_ready, MsgsReady}, + {messages_unacknowledged, MsgsUnack}], + QInfo); +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, N) -> + Infos = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all, [<<"/">>, [name, messages, messages_ready, messages_unacknowledged]]), + Resource = rabbit_misc:r(<<"/">>, queue, QName), + [QInfo] = lists:filter(fun(Info) -> + lists:member({name, Resource}, Info) + end, Infos), + case QInfo of + [{name, Resource}, {messages, Msgs}, {messages_ready, MsgsReady}, + {messages_unacknowledged, MsgsUnack}] -> + ok; + _ -> + timer:sleep(500), + wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, N - 1) + end. +>>>>>>> 48c1a8245b (Use rpcs instead of ctl commands to avoid CI failures)