diff --git a/.github/workflows/test-make-tests.yaml b/.github/workflows/test-make-tests.yaml index 5fa4c6e43d48..a4ffd93c453c 100644 --- a/.github/workflows/test-make-tests.yaml +++ b/.github/workflows/test-make-tests.yaml @@ -25,6 +25,7 @@ jobs: - parallel-ct-set-2 - parallel-ct-set-3 - parallel-ct-set-4 + - ct-amqp_client - ct-clustering_management - eunit ct-dead_lettering - ct-feature_flags diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 764846a21ac4..89a3396d85c1 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -287,6 +287,20 @@ open_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}; +open_sent(_EvtType, {close, Reason}, State) -> + case send_close(State, Reason) of + ok -> + %% "After writing this frame the peer SHOULD continue to read from the connection + %% until it receives the partner's close frame (in order to guard against + %% erroneously or maliciously implemented partners, a peer SHOULD implement a + %% timeout to give its partner a reasonable time to receive and process the close + %% before giving up and simply closing the underlying transport mechanism)." [ยง2.4.3] + {next_state, close_sent, State, {state_timeout, ?TIMEOUT, received_no_close_frame}}; + {error, closed} -> + {stop, normal, State}; + Error -> + {stop, Error, State} + end; open_sent(info, {'DOWN', MRef, process, _, _}, #state{reader_m_ref = MRef}) -> {stop, {shutdown, reader_down}}. @@ -345,7 +359,10 @@ close_sent(_EvtType, #'v1_0.close'{} = Close, #state{config = Config}) -> ok = notify_closed(Config, Close), {stop, normal}; close_sent(state_timeout, received_no_close_frame, _Data) -> - {stop, normal}. + {stop, normal}; +close_sent(_EvtType, #'v1_0.open'{}, _Data) -> + %% Transition from CLOSE_PIPE to CLOSE_SENT in figure 2.23. + keep_state_and_data. set_other_procs0(OtherProcs, State) -> #{sessions_sup := SessionsSup, diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 5aebf56a99f5..828ce2fc6357 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -175,7 +175,8 @@ bats: $(BATS) tests:: bats -SLOW_CT_SUITES := backing_queue \ +SLOW_CT_SUITES := amqp_client \ + backing_queue \ channel_interceptor \ cluster \ cluster_rename \ @@ -257,7 +258,7 @@ define ct_master.erl halt(0) endef -PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking +PARALLEL_CT_SET_1_A = unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit @@ -282,7 +283,7 @@ PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARAL PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D)) PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D)) -SEQUENTIAL_CT_SUITES = clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue +SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4) ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 8beb7a6d458f..6e75e9a8f1fe 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile([nowarn_export_all, export_all]). @@ -355,10 +356,9 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> - %% Assert that every testcase cleaned up. - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), - %% Wait for sessions to terminate before starting the next test case. + %% Clean up any queues, connections, and sessions. + rpc(Config, ?MODULE, delete_queues, []), + ok = rpc(Config, rabbit_networking, close_all_connections, [<<"test finished">>]), eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))), %% Assert that global counters count correctly. eventually(?_assertMatch(#{publishers := 0, @@ -586,7 +586,7 @@ modified_quorum_queue(Config) -> ok = amqp10_client:settle_msg(Receiver1, M2e, modified), %% Test that we can consume via AMQP 0.9.1 - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), {#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>, props = #'P_basic'{headers = Headers}} @@ -597,7 +597,7 @@ modified_quorum_queue(Config) -> lists:keysearch(<<"x-other">>, 1, Headers)), ?assertEqual({value, {<<"x-delivery-count">>, long, 5}}, lists:keysearch(<<"x-delivery-count">>, 1, Headers)), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = amqp10_client:detach_link(Receiver1), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), @@ -1343,7 +1343,7 @@ amqp_amqpl(QType, Config) -> ok = amqp10_client:detach_link(Sender), flush(detached), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{global = false, prefetch_count = 100}), CTag = <<"my-tag">>, @@ -1426,7 +1426,7 @@ amqp_amqpl(QType, Config) -> after 30000 -> ct:fail({missing_deliver, ?LINE}) end, - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = close_connection_sync(Connection). @@ -1435,7 +1435,7 @@ message_headers_conversion(Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), %% declare a quorum queue - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), amqp_channel:call(Ch, #'queue.declare'{ queue = QName, durable = true, @@ -1447,7 +1447,7 @@ message_headers_conversion(Config) -> amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address), amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = delete_queue(Session, QName), ok = close_connection_sync(Connection). @@ -1553,11 +1553,11 @@ multiple_sessions(Config) -> ok = amqp10_client:flow_link_credit(Receiver2, NMsgsPerReceiver, never), flush("receiver attached"), - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), [#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, exchange = <<"amq.fanout">>}) || QName <- Qs], - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), %% Send on each session. TargetAddr = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), @@ -1613,13 +1613,13 @@ server_closes_link_stream(Config) -> server_closes_link(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, durable = true, arguments = [{<<"x-queue-type">>, longstr, QType}]}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -1694,7 +1694,7 @@ server_closes_link_exchange(Settled, Config) -> XName = atom_to_binary(?FUNCTION_NAME), QName = <<"my queue">>, RoutingKey = <<"my routing key">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, @@ -1736,7 +1736,7 @@ server_closes_link_exchange(Settled, Config) -> ?assertMatch(#{publishers := 0}, get_global_counters(Config)), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = end_session_sync(Session), ok = close_connection_sync(Connection). @@ -1748,13 +1748,13 @@ link_target_quorum_queue_deleted(Config) -> link_target_queue_deleted(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, durable = true, arguments = [{<<"x-queue-type">>, longstr, QType}]}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -1809,7 +1809,7 @@ target_queues_deleted_accepted(Config) -> Q2 = <<"q2">>, Q3 = <<"q3">>, QNames = [Q1, Q2, Q3], - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), [begin #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, @@ -1858,7 +1858,7 @@ target_queues_deleted_accepted(Config) -> ?assertEqual(#'queue.delete_ok'{message_count = 2}, amqp_channel:call(Ch, #'queue.delete'{queue = Q1})), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ?assert(rpc(Config, meck, validate, [Mod])), ok = rpc(Config, meck, unload, [Mod]), ok = end_session_sync(Session), @@ -1943,7 +1943,7 @@ sync_get_unsettled_stream(Config) -> sync_get_unsettled(QType, Config) -> SenderSettleMode = unsettled, QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2032,7 +2032,7 @@ sync_get_unsettled(QType, Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). sync_get_unsettled_2_classic_queue(Config) -> sync_get_unsettled_2(<<"classic">>, Config). @@ -2047,7 +2047,7 @@ sync_get_unsettled_2_stream(Config) -> sync_get_unsettled_2(QType, Config) -> SenderSettleMode = unsettled, QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2122,7 +2122,7 @@ sync_get_unsettled_2(QType, Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). sync_get_settled_classic_queue(Config) -> sync_get_settled(<<"classic">>, Config). @@ -2137,7 +2137,7 @@ sync_get_settled_stream(Config) -> sync_get_settled(QType, Config) -> SenderSettleMode = settled, QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2202,7 +2202,7 @@ sync_get_settled(QType, Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). timed_get_classic_queue(Config) -> timed_get(<<"classic">>, Config). @@ -2216,7 +2216,7 @@ timed_get_stream(Config) -> %% Synchronous get with a timeout, figure 2.44. timed_get(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2274,7 +2274,7 @@ timed_get(QType, Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). stop_classic_queue(Config) -> stop(<<"classic">>, Config). @@ -2287,7 +2287,7 @@ stop_stream(Config) -> %% Test stopping a link, figure 2.46. stop(QType, Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), QName = atom_to_binary(?FUNCTION_NAME), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ @@ -2353,7 +2353,7 @@ stop(QType, Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). consumer_priority_classic_queue(Config) -> consumer_priority(<<"classic">>, Config). @@ -2831,7 +2831,7 @@ detach_requeues_one_session_quorum_queue(Config) -> detach_requeue_one_session(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -2909,7 +2909,7 @@ detach_requeue_one_session(QType, Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). detach_requeues_drop_head_classic_queue(Config) -> QName1 = <<"q1">>, @@ -3079,7 +3079,7 @@ detach_requeues_two_connections(QType, Config) -> resource_alarm_before_session_begin(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -3130,11 +3130,11 @@ resource_alarm_before_session_begin(Config) -> ok = end_session_sync(Session1), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). resource_alarm_after_session_begin(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), @@ -3197,13 +3197,13 @@ resource_alarm_after_session_begin(Config) -> ok = close_connection_sync(Connection1), ok = close_connection_sync(Connection2), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% Test case for %% https://github.com/rabbitmq/rabbitmq-server/issues/12816 resource_alarm_send_many(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), @@ -3233,7 +3233,7 @@ resource_alarm_send_many(Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). auth_attempt_metrics(Config) -> open_and_close_connection(Config), @@ -3266,7 +3266,7 @@ max_message_size_client_to_server(Config) -> ok = rpc(Config, persistent_term, put, [max_message_size, MaxMessageSize]), QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), @@ -3290,12 +3290,12 @@ max_message_size_client_to_server(Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]). max_message_size_server_to_client(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), @@ -3344,13 +3344,13 @@ max_message_size_server_to_client(Config) -> ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). last_queue_confirms(Config) -> ClassicQ = <<"my classic queue">>, QuorumQ = <<"my quorum queue">>, Qs = [ClassicQ, QuorumQ], - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{queue = ClassicQ}), #'queue.declare_ok'{} = amqp_channel:call( @@ -3416,13 +3416,13 @@ last_queue_confirms(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = ClassicQ})), ?assertEqual(#'queue.delete_ok'{message_count = 2}, amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). target_queue_deleted(Config) -> ClassicQ = <<"my classic queue">>, QuorumQ = <<"my quorum queue">>, Qs = [ClassicQ, QuorumQ], - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{queue = ClassicQ}), #'queue.declare_ok'{} = amqp_channel:call( @@ -3488,11 +3488,12 @@ target_queue_deleted(Config) -> ok = close_connection_sync(Connection), ?assertEqual(#'queue.delete_ok'{message_count = 2}, amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). target_classic_queue_down(Config) -> ClassicQueueNode = 2, - Ch = rabbit_ct_client_helpers:open_channel(Config, ClassicQueueNode), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, ClassicQueueNode), QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), #'queue.declare_ok'{} = amqp_channel:call( @@ -3500,7 +3501,7 @@ target_classic_queue_down(Config) -> queue = QName, durable = true, arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}), - ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, ClassicQueueNode), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -3578,7 +3579,8 @@ async_notify_unsettled_stream(Config) -> %% Test asynchronous notification, figure 2.45. async_notify(SenderSettleMode, QType, Config) -> %% Place queue leader on the old node. - Ch = rabbit_ct_client_helpers:open_channel(Config, 1), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, 1), QName = atom_to_binary(?FUNCTION_NAME), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ @@ -3635,7 +3637,7 @@ async_notify(SenderSettleMode, QType, Config) -> end, #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = end_session_sync(Session), ok = close_connection_sync(Connection). @@ -3643,7 +3645,7 @@ async_notify(SenderSettleMode, QType, Config) -> %% (slow queue) does not impact other link receivers (fast queues) on the **same** session. %% (This is unlike AMQP legacy where a single slow queue will block the entire connection.) link_flow_control(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ = <<"cq">>, QQ = <<"qq">>, #'queue.declare_ok'{} = amqp_channel:call( @@ -3656,6 +3658,7 @@ link_flow_control(Config) -> queue = QQ, durable = true, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), @@ -3743,7 +3746,8 @@ quorum_queue_on_new_node(Config) -> %% In mixed version tests, run the queue leader with old code %% and queue client with new code, or vice versa. queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, QueueLeaderNode), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, QueueLeaderNode), QName = atom_to_binary(?FUNCTION_NAME), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{queue = QName, @@ -3812,7 +3816,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) ExpectedReadyMsgs = 0, ?assertEqual(#'queue.delete_ok'{message_count = ExpectedReadyMsgs}, amqp_channel:call(Ch, #'queue.delete'{queue = QName})), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = close_connection_sync(Connection). maintenance(Config) -> @@ -3860,11 +3864,14 @@ leader_transfer_stream_credit_batches(Config) -> leader_transfer_credit(QName, QType, Credit, Config) -> %% Create queue with leader on node 1. {_, _, LinkPair1} = Init = init(1, Config), - {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue( - LinkPair1, - QName, - #{arguments => #{<<"x-queue-type">> => {utf8, QType}, - <<"x-queue-leader-locator">> => {utf8, <<"client-local">>}}}), + ?awaitMatch( + {ok, #{type := QType}}, + rabbitmq_amqp_client:declare_queue( + LinkPair1, + QName, + #{arguments => #{<<"x-queue-type">> => {utf8, QType}, + <<"x-queue-leader-locator">> => {utf8, <<"client-local">>}}}), + 60000), ok = close(Init), OpnConf = connection_config(0, Config), @@ -4009,7 +4016,7 @@ global_counters(Config) -> messages_redelivered_total := QQRedelivered0, messages_acknowledged_total := QQAcknowledged0} = get_global_counters(Config, rabbit_quorum_queue), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ = <<"my classic queue">>, QQ = <<"my quorum queue">>, CQAddress = rabbitmq_amqp_address:queue(CQ), @@ -4134,7 +4141,7 @@ global_counters(Config) -> %% m4 was returned ?assertEqual(UnroutableReturned1 + 1, UnroutableReturned2), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), ok = close_connection_sync(Connection). @@ -4142,12 +4149,12 @@ global_counters(Config) -> stream_bloom_filter(Config) -> Stream = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(Stream), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), amqp_channel:call(Ch, #'queue.declare'{ queue = Stream, durable = true, arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -4274,7 +4281,7 @@ available_messages_stream(Config) -> available_messages(QType, Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = QName, @@ -4366,7 +4373,7 @@ available_messages(QType, Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, @@ -4433,7 +4440,7 @@ trace(Q, QType, Config) -> RoutingKey = <<"my routing key">>, Payload = <<"my payload">>, CorrelationId = <<"my correlation ๐Ÿ‘€"/utf8>>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{ queue = Q, @@ -4512,6 +4519,7 @@ trace(Q, QType, Config) -> timer:sleep(20), ?assertMatch(#'basic.get_empty'{}, amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), @@ -4556,9 +4564,9 @@ user_id(Config) -> message_ttl(Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), @@ -4606,43 +4614,45 @@ plugin(Config) -> idle_time_out_on_server(Config) -> App = rabbit, Par = heartbeat, - {ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]), - %% Configure RabbitMQ to use an idle-time-out of 1 second. - ok = rpc(Config, application, set_env, [App, Par, 1]), - - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - receive {amqp10_event, {connection, Connection, opened}} -> ok - after 30000 -> ct:fail({missing_event, ?LINE}) - end, - - %% Mock the server socket to not have received any bytes. - rabbit_ct_broker_helpers:setup_meck(Config), Mod = rabbit_net, - ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), - ok = rpc(Config, meck, expect, [Mod, getstat, fun(_Sock, [recv_oct]) -> - {ok, [{recv_oct, 999}]}; - (Sock, Opts) -> - meck:passthrough([Sock, Opts]) - end]), - - %% The server "SHOULD try to gracefully close the connection using a close - %% frame with an error explaining why" [2.4.5]. - %% Since we chose a heartbeat value of 1 second, the server should easily - %% close the connection within 5 seconds. - receive - {amqp10_event, - {connection, Connection, - {closed, - {resource_limit_exceeded, - <<"no frame received from client within idle timeout threshold">>}}}} -> ok - after 30000 -> - ct:fail({missing_event, ?LINE}) - end, - - ?assert(rpc(Config, meck, validate, [Mod])), - ok = rpc(Config, meck, unload, [Mod]), - ok = rpc(Config, application, set_env, [App, Par, DefaultVal]). + {ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]), + try + %% Configure RabbitMQ to use an idle-time-out of 1 second. + ok = rpc(Config, application, set_env, [App, Par, 1]), + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection, opened}} -> ok + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Mock the server socket to not have received any bytes. + rabbit_ct_broker_helpers:setup_meck(Config), + ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), + ok = rpc(Config, meck, expect, [Mod, getstat, fun(_Sock, [recv_oct]) -> + {ok, [{recv_oct, 999}]}; + (Sock, Opts) -> + meck:passthrough([Sock, Opts]) + end]), + + %% The server "SHOULD try to gracefully close the connection using a close + %% frame with an error explaining why" [2.4.5]. + %% Since we chose a heartbeat value of 1 second, the server should easily + %% close the connection within 5 seconds. + receive + {amqp10_event, + {connection, Connection, + {closed, + {resource_limit_exceeded, + <<"no frame received from client within idle timeout threshold">>}}}} -> ok + after 30000 -> + ct:fail({missing_event, ?LINE}) + end + after + ?assert(rpc(Config, meck, validate, [Mod])), + ok = rpc(Config, meck, unload, [Mod]), + ok = rpc(Config, application, set_env, [App, Par, DefaultVal]) + end. %% Test that the idle timeout threshold is exceeded on the client %% when no frames are sent from server to client. @@ -4741,7 +4751,7 @@ credential_expires(Config) -> %% Attaching to an exclusive source queue should fail. attach_to_exclusive_queue(Config) -> QName = <<"my queue">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call( Ch, #'queue.declare'{queue = QName, durable = true, @@ -4764,7 +4774,7 @@ attach_to_exclusive_queue(Config) -> ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch). + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). dynamic_target_short_link_name(Config) -> OpnConf0 = connection_config(Config), @@ -5436,12 +5446,15 @@ dead_letter_into_stream(Config) -> <<"x-dead-letter-exchange">> => {utf8, <<>>}, <<"x-dead-letter-routing-key">> => {utf8, QName1} }}), - {ok, #{type := <<"stream">>}} = rabbitmq_amqp_client:declare_queue( - LinkPair1, - QName1, - #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}, - <<"x-initial-cluster-size">> => {ulong, 1} - }}), + ?awaitMatch( + {ok, #{type := <<"stream">>}}, + rabbitmq_amqp_client:declare_queue( + LinkPair1, + QName1, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}, + <<"x-initial-cluster-size">> => {ulong, 1} + }}), + 60000), {ok, Receiver} = amqp10_client:attach_receiver_link( Session1, <<"receiver">>, <<"/amq/queue/", QName1/binary>>, settled, configuration, @@ -5874,9 +5887,9 @@ receive_many_auto_flow(QType, Config) -> %% incoming-window being closed. incoming_window_closed_transfer_flow_order(Config) -> QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), Address = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf),