diff --git a/deps/rabbit/test/queue_parallel_SUITE.erl b/deps/rabbit/test/queue_parallel_SUITE.erl index 0ce25e5b18c5..e89e44ae9ac4 100644 --- a/deps/rabbit/test/queue_parallel_SUITE.erl +++ b/deps/rabbit/test/queue_parallel_SUITE.erl @@ -16,8 +16,6 @@ -define(TIMEOUT, 30_000). --import(quorum_queue_utils, [wait_for_messages/2]). - all() -> [ {group, parallel_tests} @@ -172,18 +170,18 @@ publish(Config) -> QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). consume(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). consume_first_empty(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -191,7 +189,7 @@ consume_first_empty(Config) -> declare_queue(Ch, Config, QName), consume_empty(Ch, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, true, [<<"msg1">>]), rabbit_ct_client_helpers:close_channel(Ch). @@ -206,11 +204,11 @@ consume_and_autoack(Config) -> QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, true, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, QName, 0, 0, 0). subscribe(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -222,14 +220,14 @@ subscribe(Config) -> amqp_channel:call(Ch, #'basic.qos'{global = false, prefetch_count = 10})), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), CArgs = ?config(consumer_args, Config), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). subscribe_consumers(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -264,13 +262,13 @@ subscribe_with_autoack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), subscribe(Ch, QName, true, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, QName, 0, 0, 0). consume_and_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -278,11 +276,11 @@ consume_and_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -292,12 +290,12 @@ consume_and_multiple_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -308,13 +306,13 @@ subscribe_and_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -326,16 +324,16 @@ subscribe_and_multiple_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -347,7 +345,7 @@ trigger_message_store_compaction(Config) -> N = 12000, [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)], - wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]), + wait_for_messages(Config, QName, 12000, 12000, 0), AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N), ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags), @@ -368,14 +366,14 @@ subscribe_and_requeue_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), @@ -384,10 +382,10 @@ subscribe_and_requeue_multiple_nack(Config) -> receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end end, rabbit_ct_client_helpers:close_channel(Ch), @@ -399,13 +397,13 @@ consume_and_requeue_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + wait_for_messages(Config, QName, 2, 1, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -415,13 +413,13 @@ consume_and_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -431,13 +429,13 @@ consume_and_requeue_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -447,13 +445,13 @@ consume_and_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -464,21 +462,21 @@ subscribe_and_requeue_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end end, rabbit_ct_client_helpers:close_channel(Ch), @@ -491,16 +489,16 @@ subscribe_and_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -512,18 +510,18 @@ subscribe_and_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -537,24 +535,24 @@ basic_cancel(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), CTag = atom_to_binary(?FUNCTION_NAME, utf8), subscribe(Ch, QName, false, CTag, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), ?assertEqual([], lists:filter(fun(Props) -> Resource = proplists:get_value(queue_name, Props), QName == Resource#resource.name end, Consumers)), publish(Ch, QName, [<<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]), + wait_for_messages(Config, QName, 3, 2, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) + wait_for_messages(Config, QName, 2, 2, 0) after 5000 -> exit(basic_deliver_timeout) end, @@ -567,11 +565,11 @@ purge(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), [_] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + wait_for_messages(Config, QName, 2, 1, 1), {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -581,9 +579,9 @@ purge_no_consumer(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -593,11 +591,11 @@ basic_recover(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [_] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -678,8 +676,8 @@ extra_bcc_option(Config) -> }), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), + wait_for_messages(Config, ExtraBCC, 3, 3, 0), delete_queue(Ch, QName), delete_queue(Ch, ExtraBCC). @@ -708,9 +706,9 @@ extra_bcc_option_multiple_1(Config) -> exchange = Exchange}), publish(Ch, <<"ignore">>, [<<"msg">>], Exchange), - wait_for_messages(Config, [[QName1, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName1, 1, 1, 0), + wait_for_messages(Config, QName2, 1, 1, 0), + wait_for_messages(Config, ExtraBCC, 1, 1, 0), delete_exchange(Ch, Exchange), delete_queue(Ch, QName1), @@ -741,9 +739,9 @@ extra_bcc_option_multiple_2(Config) -> end, [QName1, QName2]), publish(Ch, <<"ignore">>, [<<"msg">>], Exchange), - wait_for_messages(Config, [[QName1, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName1, 1, 1, 0), + wait_for_messages(Config, QName2, 1, 1, 0), + wait_for_messages(Config, ExtraBCC, 1, 1, 0), delete_exchange(Ch, Exchange), delete_queue(Ch, QName1), @@ -837,3 +835,33 @@ set_queue_options1(QName, Options) -> fun(Q) -> amqqueue:set_options(Q, Options) end). + +%% Some CI runs fail with `Failed to create dirty cpu scheduler thread..` +%% when running the ctl commands. Let's try with an rpc call to avoid +%% configuration changes which are less portable +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack) -> + wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, 60). + +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, 0) -> + Infos = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all, [<<"/">>, [name, messages, messages_ready, messages_unacknowledged]]), + Resource = rabbit_misc:r(<<"/">>, queue, QName), + [QInfo] = lists:filter(fun(Info) -> + lists:member({name, Resource}, Info) + end, Infos), + ?assertEqual([{name, Resource}, {messages, Msgs}, {messages_ready, MsgsReady}, + {messages_unacknowledged, MsgsUnack}], + QInfo); +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, N) -> + Infos = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all, [<<"/">>, [name, messages, messages_ready, messages_unacknowledged]]), + Resource = rabbit_misc:r(<<"/">>, queue, QName), + [QInfo] = lists:filter(fun(Info) -> + lists:member({name, Resource}, Info) + end, Infos), + case QInfo of + [{name, Resource}, {messages, Msgs}, {messages_ready, MsgsReady}, + {messages_unacknowledged, MsgsUnack}] -> + ok; + _ -> + timer:sleep(500), + wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, N - 1) + end.