Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-make-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- parallel-ct-set-2
- parallel-ct-set-3
- parallel-ct-set-4
- ct-amqp_client
- ct-clustering_management
- eunit ct-dead_lettering
- ct-feature_flags
Expand Down
34 changes: 34 additions & 0 deletions deps/amqp10_client/src/amqp10_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,25 @@ open_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
{keep_state, State1};
<<<<<<< HEAD
open_sent(info, {'DOWN', MRef, _, _, _},
=======
open_sent(_EvtType, {close, Reason}, State) ->
case send_close(State, Reason) of
ok ->
%% "After writing this frame the peer SHOULD continue to read from the connection
%% until it receives the partner's close frame (in order to guard against
%% erroneously or maliciously implemented partners, a peer SHOULD implement a
%% timeout to give its partner a reasonable time to receive and process the close
%% before giving up and simply closing the underlying transport mechanism)." [§2.4.3]
{next_state, close_sent, State, {state_timeout, ?TIMEOUT, received_no_close_frame}};
{error, closed} ->
{stop, normal, State};
Error ->
{stop, Error, State}
end;
open_sent(info, {'DOWN', MRef, process, _, _},
>>>>>>> 77e363627 (amqp10_client: Handle `close` message in the `open_sent` state)
#state{reader_m_ref = MRef}) ->
{stop, {shutdown, reader_down}}.

Expand Down Expand Up @@ -324,6 +342,7 @@ close_sent(_EvtType, heartbeat, State) ->
{next_state, close_sent, State};
close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) ->
%% monitored processes may exit during closure
<<<<<<< HEAD
{next_state, close_sent, State};
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _},
#state{reader = ReaderPid} = State) ->
Expand All @@ -333,6 +352,21 @@ close_sent(_EvtType, #'v1_0.close'{}, State) ->
%% TODO: we should probably set up a timer before this to ensure
%% we close down event if no reply is received
{stop, normal, State}.
=======
keep_state_and_data;
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _Reason},
#state{reader = ReaderPid}) ->
%% if the reader exits we probably won't receive a close frame
{stop, normal};
close_sent(_EvtType, #'v1_0.close'{} = Close, #state{config = Config}) ->
ok = notify_closed(Config, Close),
{stop, normal};
close_sent(state_timeout, received_no_close_frame, _Data) ->
{stop, normal};
close_sent(_EvtType, #'v1_0.open'{}, _Data) ->
%% Transition from CLOSE_PIPE to CLOSE_SENT in figure 2.23.
keep_state_and_data.
>>>>>>> 65576863f (amqp10_client: Fix crash in close_sent)

set_other_procs0(OtherProcs, State) ->
#{sessions_sup := SessionsSup,
Expand Down
10 changes: 8 additions & 2 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ bats: $(BATS)

tests:: bats

SLOW_CT_SUITES := backing_queue \
SLOW_CT_SUITES := amqp_client \
backing_queue \
channel_interceptor \
cluster \
cluster_rename \
Expand Down Expand Up @@ -257,8 +258,13 @@ define ct_master.erl
halt(0)
endef

<<<<<<< HEAD
PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
=======
PARALLEL_CT_SET_1_A = unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
>>>>>>> 2c6619104 (amqp_client_SUITE: Use a dedicated CI job for this testsuite)
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit

Expand All @@ -282,7 +288,7 @@ PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARAL
PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D))
PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D))

SEQUENTIAL_CT_SUITES = clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue
SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4)

ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),)
Expand Down
Loading
Loading