From aeca23c69d5c5aea56c1649a365691622f06971c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 24 Jan 2025 15:38:11 +0100 Subject: [PATCH] amqp_client_SUITE: Fix several test flakes [How] 1. Use feature flags correctly: the code shouldn't test if a feature flag is enabled, assuming something else enabled it. It should enable it and react to an error. 2. Use `close_connection_sync/1` instead of the asynchronous `amqp10_client:close_connection/1` to make sure they are really closed. The wait in `end_per_testcase/2` was not enough apparently. 3. For the two testcases that flake the most for me, enclose the code in a try/after and make sure to close the connection at the end, regardless of the result. This should be done for all testcases because the testgroup use a single set of RabbitMQ nodes for all testcases, therefore testcases are supposed to clean up after them... --- deps/rabbit/test/amqp_client_SUITE.erl | 319 +++++++++++++------------ 1 file changed, 167 insertions(+), 152 deletions(-) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index c49e93cb39fa..d23794cd9619 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -241,18 +241,18 @@ init_per_testcase(T, Config) T =:= drain_many_quorum_queue orelse T =:= timed_get_quorum_queue orelse T =:= available_messages_quorum_queue -> - case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of - true -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of + ok -> rabbit_ct_helpers:testcase_started(Config, T); - false -> + _ -> {skip, "Receiving with drain from quorum queues in credit API v1 have a known " "bug that they reply with send_drained before delivering the message."} end; init_per_testcase(single_active_consumer_drain_quorum_queue = T, Config) -> - case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of - true -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of + ok -> rabbit_ct_helpers:testcase_started(Config, T); - false -> + _ -> {skip, "Draining a SAC inactive quorum queue consumer with credit API v1 " "is known to be unsupported."} end; @@ -265,20 +265,20 @@ init_per_testcase(T, Config) %% The new RabbitMQ internal flow control %% writer proc <- session proc <- queue proc %% is only available with credit API v2. - case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of - true -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of + ok -> rabbit_ct_helpers:testcase_started(Config, T); - false -> + _ -> {skip, "Feature flag rabbitmq_4.0.0 is disabled"} end; init_per_testcase(T, Config) when T =:= modified_quorum_queue orelse T =:= modified_dead_letter_headers_exchange orelse T =:= modified_dead_letter_history -> - case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of - true -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of + ok -> rabbit_ct_helpers:testcase_started(Config, T); - false -> + _ -> {skip, "Feature flag rabbitmq_4.0.0 is disabled, but needed for " "the new #modify{} command being sent to quorum queues."} end; @@ -294,10 +294,10 @@ init_per_testcase(T, Config) %% In contrast, cancel API v2 in 4.x will requeue any unacked messages if the receiver detaches. %% We skip the single active consumer tests because these test cases assume that detaching a %% receiver link will requeue unacked messages. - case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of - true -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of + ok -> rabbit_ct_helpers:testcase_started(Config, T); - false -> + _ -> {skip, "Cancel API v2 is disabled due to feature flag rabbitmq_4.0.0 being disabled."} end; init_per_testcase(T, Config) @@ -305,35 +305,43 @@ init_per_testcase(T, Config) T =:= single_active_consumer_quorum_queue orelse T =:= detach_requeues_two_connections_quorum_queue -> %% Feature flag rabbitmq_4.0.0 enables the consumer removal API. - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0'), - rabbit_ct_helpers:testcase_started(Config, T); + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of + ok -> + rabbit_ct_helpers:testcase_started(Config, T); + Skip -> + Skip + end; init_per_testcase(T, Config) when T =:= leader_transfer_quorum_queue_credit_single orelse T =:= leader_transfer_quorum_queue_credit_batches -> %% These test cases flake with feature flag 'rabbitmq_4.0.0' disabled. - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0'), - rabbit_ct_helpers:testcase_started(Config, T); + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of + ok -> + rabbit_ct_helpers:testcase_started(Config, T); + Skip -> + Skip + end; init_per_testcase(T = immutable_bare_message, Config) -> - case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of - true -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of + ok -> rabbit_ct_helpers:testcase_started(Config, T); - false -> + _ -> {skip, "RabbitMQ is known to wrongfully modify the bare message with feature " "flag rabbitmq_4.0.0 disabled"} end; init_per_testcase(T = dead_letter_into_stream, Config) -> - case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_deaths_v2]) of - true -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of + ok -> rabbit_ct_helpers:testcase_started(Config, T); - false -> + _ -> {skip, "This test is known to fail with feature flag message_containers_deaths_v2 disabled " "due to missing feature https://github.com/rabbitmq/rabbitmq-server/issues/11173"} end; init_per_testcase(T = dead_letter_reject, Config) -> - case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_deaths_v2]) of - true -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of + ok -> rabbit_ct_helpers:testcase_started(Config, T); - false -> + _ -> {skip, "This test is known to fail with feature flag message_containers_deaths_v2 disabled " "due bug https://github.com/rabbitmq/rabbitmq-server/issues/11159"} end; @@ -412,7 +420,7 @@ reliable_send_receive(QType, Outcome, Config) -> ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), flush("post sender close"), {ok, Connection2} = amqp10_client:open_connection(OpnConf), @@ -429,7 +437,7 @@ reliable_send_receive(QType, Outcome, Config) -> ok = amqp10_client:detach_link(Receiver), ok = delete_queue(Session2, QName), ok = end_session_sync(Session2), - ok = amqp10_client:close_connection(Connection2). + ok = close_connection_sync(Connection2). %% We test the modified outcome with classic queues. %% We expect that classic queues implement field undeliverable-here incorrectly @@ -476,7 +484,7 @@ modified_classic_queue(Config) -> rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% We test the modified outcome with quorum queues. %% We expect that quorum queues implement field @@ -591,7 +599,7 @@ modified_quorum_queue(Config) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Test that a message can be routed based on the message-annotations %% provided in the modified outcome as described in @@ -723,7 +731,7 @@ modified_dead_letter_headers_exchange(Config) -> ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Test that custom dead lettering event tracking works as described in %% https://rabbitmq.com/blog/2024/10/11/modified-outcome @@ -802,7 +810,7 @@ modified_dead_letter_history(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue. @@ -835,7 +843,7 @@ sender_settle_mode_unsettled(Config) -> rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). sender_settle_mode_unsettled_fanout(Config) -> {Connection, Session, LinkPair} = init(Config), @@ -869,7 +877,7 @@ sender_settle_mode_unsettled_fanout(Config) -> || QName <- QNames], ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue where @@ -911,7 +919,7 @@ sender_settle_mode_mixed(Config) -> rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). invalid_transfer_settled_flag(Config) -> OpnConf = connection_config(Config), @@ -956,7 +964,7 @@ invalid_transfer_settled_flag(Config) -> ct:fail({missing_event, ?LINE}) end, - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). quorum_queue_rejects(Config) -> {Connection, Session, LinkPair} = init(Config), @@ -1002,7 +1010,7 @@ quorum_queue_rejects(Config) -> rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = amqp10_client:end_session(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). receiver_settle_mode_first(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -1090,7 +1098,7 @@ receiver_settle_mode_first(Config) -> ok = amqp10_client:detach_link(Receiver), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = amqp10_client:end_session(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). publishing_to_non_existing_queue_should_settle_with_released(Config) -> OpnConf = connection_config(Config), @@ -1109,7 +1117,7 @@ publishing_to_non_existing_queue_should_settle_with_released(Config) -> ok = wait_for_settlement(DTag1, released), ok = amqp10_client:detach_link(Sender), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), ok = flush("post sender close"). open_link_to_non_existing_destination_should_end_session(Config) -> @@ -1125,7 +1133,7 @@ open_link_to_non_existing_destination_should_end_session(Config) -> {ok, _} = amqp10_client:attach_sender_link( Session, SenderLinkName, Address), wait_for_session_end(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), flush("post sender close") end || Address <- Addresses], ok. @@ -1188,7 +1196,7 @@ roundtrip_with_drain(Config, QueueType, QName) ok = amqp10_client:detach_link(Sender), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). drain_many_classic_queue(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -1267,7 +1275,7 @@ drain_many(Config, QueueType, QName) ok = amqp10_client:detach_link(Receiver), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). amqp_stream_amqpl(Config) -> amqp_amqpl(<<"stream">>, Config). @@ -1431,7 +1439,7 @@ amqp_amqpl(QType, Config) -> ok = rabbit_ct_client_helpers:close_channel(Ch), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). message_headers_conversion(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -1451,7 +1459,7 @@ message_headers_conversion(Config) -> ok = rabbit_ct_client_helpers:close_channel(Ch), ok = delete_queue(Session, QName), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) -> {ok, Sender} = create_amqp10_sender(Session, Address), @@ -1602,7 +1610,7 @@ multiple_sessions(Config) -> [ok = delete_queue(Session1, Q) || Q <- Qs], ok = end_session_sync(Session1), ok = end_session_sync(Session2), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). server_closes_link_classic_queue(Config) -> server_closes_link(<<"classic">>, Config). @@ -1684,7 +1692,7 @@ server_closes_link(QType, Config) -> end)), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). server_closes_link_exchange_settled(Config) -> server_closes_link_exchange(true, Config). @@ -1740,7 +1748,7 @@ server_closes_link_exchange(Settled, Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). link_target_classic_queue_deleted(Config) -> link_target_queue_deleted(<<"classic">>, Config). @@ -1800,7 +1808,7 @@ link_target_queue_deleted(QType, Config) -> ?assert(rpc(Config, meck, validate, [Mod])), ok = rpc(Config, meck, unload, [Mod]), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). rabbit_queue_type_deliver_noop(_TargetQs, _Msg, _Opts, QTypeState) -> Actions = [], @@ -1864,7 +1872,7 @@ target_queues_deleted_accepted(Config) -> ?assert(rpc(Config, meck, validate, [Mod])), ok = rpc(Config, meck, unload, [Mod]), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). rabbit_queue_type_deliver_to_q1(Qs, Msg, Opts, QTypeState) -> %% Drop q2 and q3. @@ -2029,7 +2037,7 @@ sync_get_unsettled(QType, Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -2119,7 +2127,7 @@ sync_get_unsettled_2(QType, Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -2199,7 +2207,7 @@ sync_get_settled(QType, Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -2271,7 +2279,7 @@ timed_get(QType, Config) -> ok = amqp10_client:detach_link(Receiver), ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -2350,7 +2358,7 @@ stop(QType, Config) -> ok = amqp10_client:detach_link(Receiver), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -2441,7 +2449,7 @@ consumer_priority(QType, Config) -> {ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). single_active_consumer_priority_quorum_queue(Config) -> QType = <<"quorum">>, @@ -2577,7 +2585,7 @@ single_active_consumer_priority_quorum_queue(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session1), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). single_active_consumer_classic_queue(Config) -> single_active_consumer(<<"classic">>, Config). @@ -2688,7 +2696,7 @@ single_active_consumer(QType, Config) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). single_active_consumer_drain_classic_queue(Config) -> single_active_consumer_drain(<<"classic">>, Config). @@ -2805,7 +2813,7 @@ single_active_consumer_drain(QType, Config) -> rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% "A session endpoint can choose to unmap its output handle for a link. In this case, the endpoint MUST %% send a detach frame to inform the remote peer that the handle is no longer attached to the link endpoint. @@ -2914,7 +2922,7 @@ detach_requeue_one_session(QType, Config) -> ok = amqp10_client:detach_link(Receiver2), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -2978,7 +2986,7 @@ detach_requeues_drop_head_classic_queue(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). detach_requeues_two_connections_classic_queue(Config) -> detach_requeues_two_connections(<<"classic">>, Config). @@ -3083,8 +3091,8 @@ detach_requeues_two_connections(QType, Config) -> ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session0), ok = end_session_sync(Session1), - ok = amqp10_client:close_connection(Connection0), - ok = amqp10_client:close_connection(Connection1). + ok = close_connection_sync(Connection0), + ok = close_connection_sync(Connection1). resource_alarm_before_session_begin(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -3137,7 +3145,7 @@ resource_alarm_before_session_begin(Config) -> ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session1), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -3203,8 +3211,8 @@ resource_alarm_after_session_begin(Config) -> ok = amqp10_client:detach_link(Receiver3), ok = end_session_sync(Session1), ok = end_session_sync(Session2), - ok = amqp10_client:close_connection(Connection1), - ok = amqp10_client:close_connection(Connection2), + ok = close_connection_sync(Connection1), + ok = close_connection_sync(Connection2), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -3240,7 +3248,7 @@ resource_alarm_send_many(Config) -> timer:sleep(100), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -3297,7 +3305,7 @@ max_message_size_client_to_server(Config) -> ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]). @@ -3351,7 +3359,7 @@ max_message_size_server_to_client(Config) -> ct:fail("did not receive expected error") end, - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -3420,7 +3428,7 @@ last_queue_confirms(Config) -> ok = amqp10_client:detach_link(SenderClassicQ), ok = amqp10_client:detach_link(SenderFanout), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), ?assertEqual(#'queue.delete_ok'{message_count = 3}, amqp_channel:call(Ch, #'queue.delete'{queue = ClassicQ})), ?assertEqual(#'queue.delete_ok'{message_count = 2}, @@ -3494,7 +3502,7 @@ target_queue_deleted(Config) -> ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), ?assertEqual(#'queue.delete_ok'{message_count = 2}, amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -3560,7 +3568,7 @@ target_classic_queue_down(Config) -> ok = amqp10_client:detach_link(Receiver2), ok = delete_queue(Session, QName), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). async_notify_settled_classic_queue(Config) -> async_notify(settled, <<"classic">>, Config). @@ -3728,7 +3736,7 @@ link_flow_control(Config) -> ok = delete_queue(Session, QQ), ok = delete_queue(Session, CQ), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). classic_queue_on_old_node(Config) -> queue_and_client_different_nodes(1, 0, <<"classic">>, Config). @@ -3822,7 +3830,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) ?assertEqual(#'queue.delete_ok'{message_count = ExpectedReadyMsgs}, amqp_channel:call(Ch, #'queue.delete'{queue = QName})), ok = rabbit_ct_client_helpers:close_channel(Ch), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). maintenance(Config) -> {ok, C0} = amqp10_client:open_connection(connection_config(0, Config)), @@ -3880,38 +3888,41 @@ leader_transfer_credit(QName, QType, Credit, Config) -> OpnConf = connection_config(0, Config), {ok, Connection0} = amqp10_client:open_connection(OpnConf), - {ok, Session0} = amqp10_client:begin_session_sync(Connection0), - Address = rabbitmq_amqp_address:queue(QName), - {ok, Sender} = amqp10_client:attach_sender_link( - Session0, <<"test-sender">>, Address), - ok = wait_for_credit(Sender), - - NumMsgs = 30, - ok = send_messages(Sender, NumMsgs, false), - ok = wait_for_accepts(NumMsgs), - ok = detach_link_sync(Sender), - - %% Consume from a follower. - ok = wait_for_local_member(QType, QName, Config), - Filter = consume_from_first(QType), - {ok, Receiver} = amqp10_client:attach_receiver_link( - Session0, <<"receiver">>, Address, - settled, configuration, Filter), - flush(receiver_attached), - %% Top up credits very often during the leader change. - ok = amqp10_client:flow_link_credit(Receiver, Credit, Credit), - - %% After receiving the 1st message, let's move the leader away from node 1. - receive_messages(Receiver, 1), - ok = drain_node(Config, 1), - %% We expect to receive all remaining messages. - receive_messages(Receiver, NumMsgs - 1), - - ok = revive_node(Config, 1), - ok = amqp10_client:detach_link(Receiver), - ok = delete_queue(Session0, QName), - ok = end_session_sync(Session0), - ok = amqp10_client:close_connection(Connection0). + try + {ok, Session0} = amqp10_client:begin_session_sync(Connection0), + Address = rabbitmq_amqp_address:queue(QName), + {ok, Sender} = amqp10_client:attach_sender_link( + Session0, <<"test-sender">>, Address), + ok = wait_for_credit(Sender), + + NumMsgs = 30, + ok = send_messages(Sender, NumMsgs, false), + ok = wait_for_accepts(NumMsgs), + ok = detach_link_sync(Sender), + + %% Consume from a follower. + ok = wait_for_local_member(QType, QName, Config), + Filter = consume_from_first(QType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session0, <<"receiver">>, Address, + settled, configuration, Filter), + flush(receiver_attached), + %% Top up credits very often during the leader change. + ok = amqp10_client:flow_link_credit(Receiver, Credit, Credit), + + %% After receiving the 1st message, let's move the leader away from node 1. + receive_messages(Receiver, 1), + ok = drain_node(Config, 1), + %% We expect to receive all remaining messages. + receive_messages(Receiver, NumMsgs - 1), + + ok = revive_node(Config, 1), + ok = amqp10_client:detach_link(Receiver), + ok = delete_queue(Session0, QName), + ok = end_session_sync(Session0) + after + ok = close_connection_sync(Connection0) + end. leader_transfer_quorum_queue_send(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -3937,21 +3948,25 @@ leader_transfer_send(QName, QType, Config) -> %% Send from a follower. OpnConf = connection_config(0, Config), {ok, Connection0} = amqp10_client:open_connection(OpnConf), - {ok, Session0} = amqp10_client:begin_session_sync(Connection0), - Address = rabbitmq_amqp_address:queue(QName), - {ok, Sender} = amqp10_client:attach_sender_link(Session0, <<"test-sender">>, Address), - ok = wait_for_credit(Sender), - - NumMsgs = 500, - ok = send_messages(Sender, NumMsgs, false), - ok = rabbit_ct_broker_helpers:kill_node(Config, 1), - ok = wait_for_accepts(NumMsgs), - - ok = rabbit_ct_broker_helpers:start_node(Config, 1), - ok = detach_link_sync(Sender), - ok = delete_queue(Session0, QName), - ok = end_session_sync(Session0), - ok = amqp10_client:close_connection(Connection0). + try + {ok, Session0} = amqp10_client:begin_session_sync(Connection0), + Address = rabbitmq_amqp_address:queue(QName), + {ok, Sender} = amqp10_client:attach_sender_link( + Session0, <<"test-sender">>, Address), + ok = wait_for_credit(Sender), + + NumMsgs = 500, + ok = send_messages(Sender, NumMsgs, false), + ok = rabbit_ct_broker_helpers:kill_node(Config, 1), + ok = wait_for_accepts(NumMsgs), + + ok = rabbit_ct_broker_helpers:start_node(Config, 1), + ok = detach_link_sync(Sender), + ok = delete_queue(Session0, QName), + ok = end_session_sync(Session0) + after + close_connection_sync(Connection0) + end. %% rabbitmqctl list_connections %% should list both AMQP 1.0 and AMQP 0.9.1 connections. @@ -4140,7 +4155,7 @@ global_counters(Config) -> ok = rabbit_ct_client_helpers:close_channel(Ch), ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). stream_bloom_filter(Config) -> Stream = atom_to_binary(?FUNCTION_NAME), @@ -4264,7 +4279,7 @@ stream_bloom_filter(Config) -> ok = amqp10_client:detach_link(AppleUnfilteredReceiver), ok = delete_queue(Session, Stream), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). available_messages_classic_queue(Config) -> available_messages(<<"classic">>, Config). @@ -4367,7 +4382,7 @@ available_messages(QType, Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -4422,7 +4437,7 @@ incoming_message_interceptors(Config) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QQName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), true = rpc(Config, persistent_term, erase, [Key]). trace_classic_queue(Config) -> @@ -4523,7 +4538,7 @@ trace(Q, QType, Config) -> [delete_queue(SessionSender, Q0) || Q0 <- Qs], ok = end_session_sync(SessionSender), ok = end_session_sync(SessionReceiver), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% https://www.rabbitmq.com/docs/validated-user-id user_id(Config) -> @@ -4556,7 +4571,7 @@ user_id(Config) -> ct:fail("did not receive expected error") end, - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). message_ttl(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -4594,7 +4609,7 @@ message_ttl(Config) -> ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), ok = delete_queue(Session, QName), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% For backward compatibility, deployment tools should be able to %% enable and disable the deprecated no-op AMQP 1.0 plugin. @@ -4750,7 +4765,7 @@ attach_to_exclusive_queue(Config) -> after 30000 -> ct:fail({missing_event, ?LINE}) end, - ok = amqp10_client:close_connection(Connection), + ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -4809,7 +4824,7 @@ priority(QArgs, Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). dead_letter_headers_exchange(Config) -> {Connection, Session, LinkPair} = init(Config), @@ -4906,7 +4921,7 @@ dead_letter_headers_exchange(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). dead_letter_reject(Config) -> {Connection, Session, LinkPair} = init(Config), @@ -5014,7 +5029,7 @@ dead_letter_reject(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName3), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). dead_letter_reject_message_order_classic_queue(Config) -> dead_letter_reject_message_order(<<"classic">>, Config). @@ -5105,7 +5120,7 @@ dead_letter_reject_message_order(QType, Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). dead_letter_reject_many_message_order_classic_queue(Config) -> dead_letter_reject_many_message_order(<<"classic">>, Config). @@ -5194,7 +5209,7 @@ dead_letter_reject_many_message_order(QType, Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Dead letter from a quorum queue into a stream. dead_letter_into_stream(Config) -> @@ -5266,8 +5281,8 @@ dead_letter_into_stream(Config) -> ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), ok = end_session_sync(Session0), ok = end_session_sync(Session1), - ok = amqp10_client:close_connection(Connection0), - ok = amqp10_client:close_connection(Connection1). + ok = close_connection_sync(Connection0), + ok = close_connection_sync(Connection1). accept_multiple_message_order_classic_queue(Config) -> accept_multiple_message_order(<<"classic">>, Config). @@ -5320,7 +5335,7 @@ accept_multiple_message_order(QType, Config) -> ?assertMatch({ok, #{message_count := 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). release_multiple_message_order_classic_queue(Config) -> release_multiple_message_order(<<"classic">>, Config). @@ -5384,7 +5399,7 @@ release_multiple_message_order(QType, Config) -> ?assertMatch({ok, #{message_count := 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% This test asserts the following ยง3.2 requirement: @@ -5548,7 +5563,7 @@ footer_checksum(FooterOpt, Config) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). receive_many_made_available_over_time_classic_queue(Config) -> receive_many_made_available_over_time(<<"classic">>, Config). @@ -5605,7 +5620,7 @@ receive_many_made_available_over_time(QType, Config) -> ok = amqp10_client:detach_link(Receiver), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). receive_many_auto_flow_classic_queue(Config) -> receive_many_auto_flow(<<"classic">>, Config). @@ -5647,7 +5662,7 @@ receive_many_auto_flow(QType, Config) -> ok = amqp10_client:detach_link(Receiver), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% This test ensures that the server sends us TRANSFER and FLOW frames in the correct order %% even if the server is temporarily not allowed to send us any TRANSFERs due to our session @@ -5698,7 +5713,7 @@ incoming_window_closed_transfer_flow_order(Config) -> ok = delete_queue(Session, QName), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). incoming_window_closed_stop_link(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -5751,7 +5766,7 @@ incoming_window_closed_stop_link(Config) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Test that we can close a link while our session incoming-window is closed. incoming_window_closed_close_link(Config) -> @@ -5794,7 +5809,7 @@ incoming_window_closed_close_link(Config) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). incoming_window_closed_rabbitmq_internal_flow_classic_queue(Config) -> incoming_window_closed_rabbitmq_internal_flow(<<"classic">>, Config). @@ -5847,7 +5862,7 @@ incoming_window_closed_rabbitmq_internal_flow(QType, Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). tcp_back_pressure_rabbitmq_internal_flow_classic_queue(Config) -> tcp_back_pressure_rabbitmq_internal_flow(<<"classic">>, Config). @@ -5924,7 +5939,7 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). session_max_per_connection(Config) -> App = rabbit, @@ -6036,7 +6051,7 @@ x_cc_annotation_exchange(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Test that x-cc routing keys work together with target address %% /exchanges/:exchange @@ -6072,7 +6087,7 @@ x_cc_annotation_exchange_routing_key_empty(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Test that x-cc routing keys work together with target address %% /queues/:queue @@ -6104,7 +6119,7 @@ x_cc_annotation_queue(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% Test that x-cc routing keys work together with target address 'null' x_cc_annotation_null(Config) -> @@ -6187,7 +6202,7 @@ x_cc_annotation_null(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). bad_x_cc_annotation_exchange(Config) -> OpnConf = connection_config(Config), @@ -6234,7 +6249,7 @@ bad_x_cc_annotation_exchange(Config) -> end, ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). + ok = close_connection_sync(Connection). %% internal %%