diff --git a/deps/rabbit/test/queue_parallel_SUITE.erl b/deps/rabbit/test/queue_parallel_SUITE.erl index 63b5e2a16e0c..4837a9678b25 100644 --- a/deps/rabbit/test/queue_parallel_SUITE.erl +++ b/deps/rabbit/test/queue_parallel_SUITE.erl @@ -16,8 +16,6 @@ -define(TIMEOUT, 30_000). --import(quorum_queue_utils, [wait_for_messages/2]). - all() -> [ {group, parallel_tests} @@ -183,18 +181,18 @@ publish(Config) -> QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). consume(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). consume_first_empty(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -202,7 +200,7 @@ consume_first_empty(Config) -> declare_queue(Ch, Config, QName), consume_empty(Ch, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, true, [<<"msg1">>]), rabbit_ct_client_helpers:close_channel(Ch). @@ -217,11 +215,11 @@ consume_and_autoack(Config) -> QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), consume(Ch, QName, true, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, QName, 0, 0, 0). subscribe(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -233,14 +231,14 @@ subscribe(Config) -> amqp_channel:call(Ch, #'basic.qos'{global = false, prefetch_count = 10})), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), CArgs = ?config(consumer_args, Config), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, QName, 1, 1, 0). subscribe_consumers(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -275,13 +273,13 @@ subscribe_with_autoack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), subscribe(Ch, QName, true, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, QName, 0, 0, 0). consume_and_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -289,11 +287,11 @@ consume_and_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -303,12 +301,12 @@ consume_and_multiple_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -319,13 +317,13 @@ subscribe_and_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -337,16 +335,16 @@ subscribe_and_multiple_ack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -358,7 +356,7 @@ trigger_message_store_compaction(Config) -> N = 12000, [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)], - wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]), + wait_for_messages(Config, QName, 12000, 12000, 0), AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N), ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags), @@ -379,14 +377,14 @@ subscribe_and_requeue_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), @@ -395,10 +393,10 @@ subscribe_and_requeue_multiple_nack(Config) -> receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end end, rabbit_ct_client_helpers:close_channel(Ch), @@ -410,13 +408,13 @@ consume_and_requeue_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + wait_for_messages(Config, QName, 2, 1, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -426,13 +424,13 @@ consume_and_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -442,13 +440,13 @@ consume_and_requeue_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -458,13 +456,13 @@ consume_and_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -475,21 +473,21 @@ subscribe_and_requeue_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end end, rabbit_ct_client_helpers:close_channel(Ch), @@ -502,16 +500,16 @@ subscribe_and_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), subscribe(Ch, QName, false, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -523,18 +521,18 @@ subscribe_and_multiple_nack(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), subscribe(Ch, QName, false, CArgs), receive_basic_deliver(false), receive_basic_deliver(false), receive {#'basic.deliver'{delivery_tag = DeliveryTag, redelivered = false}, _} -> - wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + wait_for_messages(Config, QName, 3, 0, 3), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + wait_for_messages(Config, QName, 0, 0, 0) end, rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -548,24 +546,24 @@ basic_cancel(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), CTag = atom_to_binary(?FUNCTION_NAME, utf8), subscribe(Ch, QName, false, CTag, CArgs), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), ?assertEqual([], lists:filter(fun(Props) -> Resource = proplists:get_value(queue_name, Props), QName == Resource#resource.name end, Consumers)), publish(Ch, QName, [<<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]), + wait_for_messages(Config, QName, 3, 2, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) + wait_for_messages(Config, QName, 2, 2, 0) after 5000 -> exit(basic_deliver_timeout) end, @@ -578,11 +576,11 @@ purge(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), [_] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + wait_for_messages(Config, QName, 2, 1, 1), {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -592,9 +590,9 @@ purge_no_consumer(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + wait_for_messages(Config, QName, 2, 2, 0), {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, QName, 0, 0, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -604,11 +602,11 @@ basic_recover(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), [_] = consume(Ch, QName, [<<"msg1">>]), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + wait_for_messages(Config, QName, 1, 0, 1), amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName, 1, 1, 0), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -689,8 +687,8 @@ extra_bcc_option(Config) -> }), publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"3">>, <<"3">>, <<"0">>]]), + wait_for_messages(Config, QName, 3, 3, 0), + wait_for_messages(Config, ExtraBCC, 3, 3, 0), delete_queue(Ch, QName), delete_queue(Ch, ExtraBCC). @@ -719,9 +717,9 @@ extra_bcc_option_multiple_1(Config) -> exchange = Exchange}), publish(Ch, <<"ignore">>, [<<"msg">>], Exchange), - wait_for_messages(Config, [[QName1, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName1, 1, 1, 0), + wait_for_messages(Config, QName2, 1, 1, 0), + wait_for_messages(Config, ExtraBCC, 1, 1, 0), delete_exchange(Ch, Exchange), delete_queue(Ch, QName1), @@ -752,9 +750,9 @@ extra_bcc_option_multiple_2(Config) -> end, [QName1, QName2]), publish(Ch, <<"ignore">>, [<<"msg">>], Exchange), - wait_for_messages(Config, [[QName1, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]), - wait_for_messages(Config, [[ExtraBCC, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, QName1, 1, 1, 0), + wait_for_messages(Config, QName2, 1, 1, 0), + wait_for_messages(Config, ExtraBCC, 1, 1, 0), delete_exchange(Ch, Exchange), delete_queue(Ch, QName1), @@ -848,3 +846,33 @@ set_queue_options1(QName, Options) -> fun(Q) -> amqqueue:set_options(Q, Options) end). + +%% Some CI runs fail with `Failed to create dirty cpu scheduler thread..` +%% when running the ctl commands. Let's try with an rpc call to avoid +%% configuration changes which are less portable +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack) -> + wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, 60). + +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, 0) -> + Infos = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all, [<<"/">>, [name, messages, messages_ready, messages_unacknowledged]]), + Resource = rabbit_misc:r(<<"/">>, queue, QName), + [QInfo] = lists:filter(fun(Info) -> + lists:member({name, Resource}, Info) + end, Infos), + ?assertEqual([{name, Resource}, {messages, Msgs}, {messages_ready, MsgsReady}, + {messages_unacknowledged, MsgsUnack}], + QInfo); +wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, N) -> + Infos = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all, [<<"/">>, [name, messages, messages_ready, messages_unacknowledged]]), + Resource = rabbit_misc:r(<<"/">>, queue, QName), + [QInfo] = lists:filter(fun(Info) -> + lists:member({name, Resource}, Info) + end, Infos), + case QInfo of + [{name, Resource}, {messages, Msgs}, {messages_ready, MsgsReady}, + {messages_unacknowledged, MsgsUnack}] -> + ok; + _ -> + timer:sleep(500), + wait_for_messages(Config, QName, Msgs, MsgsReady, MsgsUnack, N - 1) + end.