From d70e529d9ab104e805ceb7829cad74c3c76329c7 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Sat, 18 May 2024 11:11:12 +0200 Subject: [PATCH 1/2] Introduce outbound RabbitMQ internal AMQP flow control MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What? Introduce RabbitMQ internal flow control for messages sent to AMQP clients. Prior this PR, when an AMQP client granted a large amount of link credit (e.g. 100k) to the sending queue, the sending queue sent that amount of messages to the session process no matter what. This becomes problematic for memory usage when the session process cannot send out messages fast enough to the AMQP client, especially if 1. The writer proc cannot send fast enough. This can happen when the AMQP client does not receive fast enough and causes TCP back-pressure to the server. Or 2. The server session proc is limited by remote-incoming-window. Both scenarios are now added as test cases. Tests * tcp_back_pressure_rabbitmq_internal_flow_quorum_queue * tcp_back_pressure_rabbitmq_internal_flow_classic_queue cover scenario 1. Tests * incoming_window_closed_rabbitmq_internal_flow_quorum_queue * incoming_window_closed_rabbitmq_internal_flow_classic_queue cover scenario 2. This PR sends messages from queues to AMQP clients in a more controlled manner. To illustrate: ``` make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4" observer_cli:start() mq ``` where `mq` sorts by message queue length. Create a stream: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true ``` Next, send and receive from the Stream via AMQP. Grant a large number of link credit to the sending stream: ``` docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest bash-5.1# quiver --version quiver 0.4.0-SNAPSHOT bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000 ``` **Before** to this PR: ``` RESULTS Count ............................................... 100,696 messages Duration ............................................... 30.0 seconds Sender rate ......................................... 120,422 messages/s Receiver rate ......................................... 3,363 messages/s End-to-end rate ....................................... 3,359 messages/s ``` We observe that all 100k link credit worth of messages are buffered in the writer proc's mailbox: ``` |No | Pid | MsgQueue |Name or Initial Call | Memory | Reductions |Current Function | |1 |<0.845.0> |100001 |rabbit_amqp_writer:init/1 | 126.0734 MB| 466633491 |prim_inet:send/5 | ``` **After** to this PR: ``` RESULTS Count ............................................. 2,973,440 messages Duration ............................................... 30.0 seconds Sender rate ......................................... 123,322 messages/s Receiver rate ........................................ 99,250 messages/s End-to-end rate ...................................... 99,148 messages/s ``` We observe that the message queue lengths of both writer and session procs are low. ## How? Our goal is to have queues send out messages in a controlled manner without overloading RabbitMQ itself. We want RabbitMQ internal flow control between: ``` AMQP writer proc <--- session proc <--- queue proc ``` A similar concept exists for classic queues sending via AMQP 0.9.1. We want an approach that applies to AMQP and works generic for all queue types. For the interaction between AMQP writer proc and session proc we use a simple credit based approach reusing module `credit_flow`. For the interaction between session proc and queue proc, the following options exist: ### Option 1 The session process provides expliclity feedback to the queue after it has sent N messages. This approach is implemented in https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1 and works well. A new `rabbit_queue_type:sent/4` API was added which lets the queue proc know that it can send further messages to the session proc. Pros: * Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages in auto ack mode to AMQP 0.9.1 clients. * Simple for the session proc Cons: * Sligthly added complexity in every queue type implementation * Multiple Ra commands (settle, credit, sent) to decide when a quorum queue sends more messages. ### Option 2 A dual link approach where two AMQP links exists between ``` AMQP client <---link--> session proc <---link---> queue proc ``` When the client grants a large amount of credits, the session proc will top up credits to the queue proc periodically in smaller batches. Pros: * No queue type modifications required. * Re-uses AMQP link flow control Cons: * Significant added complexity in the session proc. A client can dynamically decrease or increase credits and dynamically change the drain mode while the session tops up credit to the queue. ### Option 3 Credit is a 32 bit unsigned integer. The spec mandates that the receiver independently chooses a credit. Nothing in the spec prevents the receiver to choose a credit of 1 billion. However the credit value is merely a **maximum**: > The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by. Therefore, the server is not required to send all available messages to this receiver. For delivery-count: > Only the sender MAY independently modify this field. "independently" could be interpreted as the sender could add to the delivery-count irrespective of what the client chose for drain and link-credit. Option 3: The queue proc could at credit time already consume credit and advance the delivery-count if credit is too large before checking out any messages. For example if credit is 100k, but the queue only wants to send 1k, the queue could consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages. If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver, otherwise the receiver wouldn’t know that it ran out of link-credit. Pros: * Very simple Cons: * Possibly unexpected behaviour for receiving AMQP clients * Possibly poor end-to-end throughput in auto-ack mode because the queue would send a batch of messages followed by a FLOW containing the advanced delivery-count. Only therafter the client will learn that it ran out of credits and top-up again. This feels like synchronously pulling a batch of messages. In contrast, option 2 sends out more messages as soon as the previous messages left RabbitMQ without requiring again a credit top up from the receiver. * drain mode with large credits requires the queue to send all available messages and only thereafter advance the delivery-count. Therefore, drain mode breaks option 3 somewhat. ### Option 4 Session proc drops message payload when its outgoing-pending queue gets too large and re-reads payloads from the queue once the message can be sent (see `get_checked_out` Ra command for quorum queues). Cons: * Would need to be implemented for every queue type, especially classic queues * Doesn't limit the amount of message metadata in the session proc's outgoing-pending queue ### Decision: Option 2 This commit implements option 2 to avoid any queue type modification. At most one credit request is in-flight between session process and queue process for a given queue consumer. If the AMQP client sends another FLOW in between, the session proc stashes the FLOW until it processes the previous credit reply. A delivery is only sent from the outgoing-pending queue if the session proc is not blocked by 1. writer proc, or 2. remote-incoming-window The credit reply is placed into the outgoing-pending queue. This ensures that the session proc will only top up the next batch of credits if sufficient messages were sent out to the writer proc. A future commit could additionally have each queue limit the number of unacked messages for a given AMQP consumer, or alternatively make use of session outgoing-window. --- .../src/amqp10_client_frame_reader.erl | 12 +- .../src/amqp10_client_session.erl | 8 +- deps/rabbit/src/rabbit_amqp_management.erl | 21 +- deps/rabbit/src/rabbit_amqp_session.erl | 678 +++++++++++++++--- deps/rabbit/src/rabbit_amqp_writer.erl | 59 +- deps/rabbit/src/rabbit_amqqueue_process.erl | 7 +- deps/rabbit/src/rabbit_classic_queue.erl | 6 +- deps/rabbit/src/rabbit_fifo.hrl | 2 +- deps/rabbit/src/rabbit_fifo_client.erl | 41 +- deps/rabbit/src/rabbit_queue_type.erl | 19 +- deps/rabbit/src/rabbit_quorum_queue.erl | 6 +- deps/rabbit/src/rabbit_stream_queue.erl | 32 +- deps/rabbit/test/amqp_client_SUITE.erl | 673 ++++++++++++++--- deps/rabbit/test/rabbit_fifo_int_SUITE.erl | 19 +- deps/rabbit_common/src/credit_flow.erl | 2 +- deps/rabbit_common/src/rabbit_misc.erl | 13 +- .../src/rabbit_mqtt_qos0_queue.erl | 6 +- .../test/shovel_test_utils.erl | 4 +- 18 files changed, 1271 insertions(+), 337 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_frame_reader.erl b/deps/amqp10_client/src/amqp10_client_frame_reader.erl index 96ded5751794..05d8823999b1 100644 --- a/deps/amqp10_client/src/amqp10_client_frame_reader.erl +++ b/deps/amqp10_client/src/amqp10_client_frame_reader.erl @@ -286,14 +286,18 @@ handle_input(StateName, Data, State) -> defer_heartbeat_timer(State = #state{heartbeat_timer_ref = TRef, connection_config = #{idle_time_out := T}}) - when is_number(T) andalso T > 0 -> + when is_integer(T) andalso T > 0 -> _ = case TRef of - undefined -> ok; - _ -> _ = erlang:cancel_timer(TRef) + undefined -> + ok; + _ -> + erlang:cancel_timer(TRef, [{async, true}, + {info, false}]) end, NewTRef = erlang:send_after(T * 2, self(), heartbeat), State#state{heartbeat_timer_ref = NewTRef}; -defer_heartbeat_timer(State) -> State. +defer_heartbeat_timer(State) -> + State. route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) -> {DestinationPid, State} = find_destination(Channel, FrameType, Performative, diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index d7617e43fec3..b66308a826b2 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -902,9 +902,10 @@ handle_link_flow(#'v1_0.flow'{delivery_count = MaybeTheirDC, handle_link_flow(#'v1_0.flow'{delivery_count = TheirDC, link_credit = {uint, TheirCredit}, available = Available, - drain = Drain}, + drain = Drain0}, Link0 = #link{role = receiver}) -> - Link = case Drain andalso TheirCredit =< 0 of + Drain = default(Drain0, false), + Link = case Drain andalso TheirCredit =:= 0 of true -> notify_credit_exhausted(Link0), Link0#link{delivery_count = unpack(TheirDC), @@ -1212,6 +1213,9 @@ boolean_to_role(?AMQP_ROLE_SENDER) -> boolean_to_role(?AMQP_ROLE_RECEIVER) -> receiver. +default(undefined, Default) -> Default; +default(Thing, _Default) -> Thing. + format_status(Status = #{data := Data0}) -> #state{channel = Channel, remote_channel = RemoteChannel, diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 29049e4c58aa..f15712b43224 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -8,6 +8,9 @@ -import(rabbit_amqp_session, [check_resource_access/4, check_read_permitted_on_topic/4]). +-import(rabbit_misc, + [queue_resource/2, + exchange_resource/2]). -type permission_caches() :: {rabbit_amqp_session:permission_cache(), rabbit_amqp_session:topic_permission_cache()}. @@ -77,7 +80,7 @@ handle_http_req(<<"GET">>, _ConnPid, PermCaches) -> QNameBin = uri_string:unquote(QNameBinQuoted), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), case rabbit_amqqueue:with( QName, fun(Q) -> @@ -118,7 +121,7 @@ handle_http_req(HttpMethod = <<"PUT">>, _ -> ok end, ok = prohibit_cr_lf(QNameBin), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), ok = prohibit_reserved_amq(QName), PermCache1 = check_resource_access(QName, configure, User, PermCache0), rabbit_core_metrics:queue_declared(QName), @@ -192,7 +195,7 @@ handle_http_req(<<"PUT">>, catch exit:#amqp_error{explanation = Explanation} -> throw(<<"400">>, Explanation, []) end, - XName = rabbit_misc:r(Vhost, exchange, XNameBin), + XName = exchange_resource(Vhost, XNameBin), ok = prohibit_default_exchange(XName), PermCache = check_resource_access(XName, configure, User, PermCache0), X = case rabbit_exchange:lookup(XName) of @@ -223,7 +226,7 @@ handle_http_req(<<"DELETE">>, ConnPid, {PermCache0, TopicPermCache}) -> QNameBin = uri_string:unquote(QNameBinQuoted), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), PermCache = check_resource_access(QName, read, User, PermCache0), try rabbit_amqqueue:with_exclusive_access_or_die( QName, ConnPid, @@ -251,7 +254,7 @@ handle_http_req(<<"DELETE">>, ConnPid, {PermCache0, TopicPermCache}) -> QNameBin = uri_string:unquote(QNameBinQuoted), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), ok = prohibit_cr_lf(QNameBin), PermCache = check_resource_access(QName, configure, User, PermCache0), try rabbit_amqqueue:delete_with(QName, ConnPid, false, false, Username, true) of @@ -271,7 +274,7 @@ handle_http_req(<<"DELETE">>, _ConnPid, {PermCache0, TopicPermCache}) -> XNameBin = uri_string:unquote(XNameBinQuoted), - XName = rabbit_misc:r(Vhost, exchange, XNameBin), + XName = exchange_resource(Vhost, XNameBin), ok = prohibit_cr_lf(XNameBin), ok = prohibit_default_exchange(XName), ok = prohibit_reserved_amq(XName), @@ -296,7 +299,7 @@ handle_http_req(<<"POST">>, #{destination_exchange := Bin} -> {exchange, Bin} end, - SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin), + SrcXName = exchange_resource(Vhost, SrcXNameBin), DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin), PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0), Binding = #binding{source = SrcXName, @@ -319,7 +322,7 @@ handle_http_req(<<"DELETE">>, DstNameBin, BindingKey, ArgsHash} = decode_binding_path_segment(BindingSegment), - SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin), + SrcXName = exchange_resource(Vhost, SrcXNameBin), DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin), PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0), Bindings = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), @@ -351,7 +354,7 @@ handle_http_req(<<"GET">>, "missing 'dste' or 'dstq' in query: ~tp", QueryMap) end, - SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin), + SrcXName = exchange_resource(Vhost, SrcXNameBin), DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin), Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key], diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 4287e6cb2e88..a61dcb64e970 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -53,12 +53,19 @@ %% 400 for classic queues %% If link target is a queue (rather than an exchange), we could use one of these depending %% on target queue type. For the time being just use a static value that's something in between. -%% An even better approach in future would be to dynamically grow (or shrink) the link credit -%% we grant depending on how fast target queue(s) actually confirm messages. +%% In future, we could dynamically grow (or shrink) the link credit we grant depending on how fast +%% target queue(s) actually confirm messages: see paper "Credit-Based Flow Control for ATM Networks" +%% from 1995, section 4.2 "Static vs. adaptive credit control" for pros and cons. -define(LINK_CREDIT_RCV, 128). -define(MANAGEMENT_LINK_CREDIT_RCV, 8). -define(MANAGEMENT_NODE_ADDRESS, <<"/management">>). -define(DEFAULT_EXCHANGE_NAME, <<>>). +%% This is the maximum credit we grant to a sending queue. +%% Only when we sent sufficient messages to the writer proc, we will again grant credits +%% to the sending queue. We have this limit in place to ensure that our session proc won't be flooded +%% with messages by the sending queue, especially if we are throttled sending messages to the client +%% either by the writer proc or by remote-incoming window (i.e. session flow control). +-define(LINK_CREDIT_RCV_FROM_QUEUE_MAX, 256). -export([start_link/8, process_frame/2, @@ -81,6 +88,9 @@ [add/2, diff/2, compare/2]). +-import(rabbit_misc, + [queue_resource/2, + exchange_resource/2]). -type permission_cache() :: [{rabbit_types:r(exchange | queue), rabbit_types:permission_atom()}]. @@ -113,7 +123,7 @@ -record(management_link, { name :: binary(), delivery_count :: sequence_no(), - credit :: non_neg_integer(), + credit :: rabbit_queue_type:credit(), max_message_size :: unlimited | pos_integer() }). @@ -129,7 +139,7 @@ %% queue_name_bin is only set if the link target address refers to a queue. queue_name_bin :: undefined | rabbit_misc:resource_name(), delivery_count :: sequence_no(), - credit :: non_neg_integer(), + credit :: rabbit_queue_type:credit(), %% TRANSFER delivery IDs published to queues but not yet confirmed by queues incoming_unconfirmed_map = #{} :: #{delivery_number() => {#{rabbit_amqqueue:name() := ok}, @@ -138,6 +148,32 @@ multi_transfer_msg :: undefined | #multi_transfer_msg{} }). +%% A credit request from the client (receiver) as sent in the FLOW frame. +-record(credit_req, { + delivery_count :: sequence_no(), + credit :: rabbit_queue_type:credit(), + drain :: boolean(), + echo :: boolean() + }). + +%% Link flow control state for link between client (receiver) and us (sender). +-record(client_flow_ctl, { + delivery_count :: sequence_no(), + credit :: rabbit_queue_type:credit(), + echo :: boolean() + }). + +%% Link flow control state for link between us (receiver) and queue (sender). +-record(queue_flow_ctl, { + delivery_count :: sequence_no(), + %% We cap the actual credit we grant to the sending queue. + credit :: 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX, + %% Credit as desired by the receiving client. If larger than + %% LINK_CREDIT_RCV_FROM_QUEUE_MAX, we will top up in batches to the sending queue. + desired_credit :: rabbit_queue_type:credit(), + drain :: boolean() + }). + -record(outgoing_link, { %% Although the source address of a link might be an exchange name and binding key %% or a topic filter, an outgoing link will always consume from a queue. @@ -145,10 +181,23 @@ queue_type :: rabbit_queue_type:queue_type(), send_settled :: boolean(), max_message_size :: unlimited | pos_integer(), + + %% When feature flag credit_api_v2 becomes required, + %% the following 2 fields should be deleted. + credit_api_version :: 1 | 2, %% When credit API v1 is used, our session process holds the delivery-count - %% When credit API v2 is used, the queue type implementation holds the delivery-count - %% When feature flag credit_api_v2 becomes required, this field should be deleted. - delivery_count :: {credit_api_v1, sequence_no()} | credit_api_v2 + delivery_count :: sequence_no() | credit_api_v2, + %% We use a dual link approach for messages we send to the client. + %% We hold link flow control state for the link to the receiving + %% client and for the link to the sending queue. + client_flow_ctl :: #client_flow_ctl{} | credit_api_v1, + queue_flow_ctl :: #queue_flow_ctl{} | credit_api_v1, + %% True if we sent a credit request to the sending queue + %% but haven't processed the corresponding credit reply yet. + credit_req_in_flight :: boolean() | credit_api_v1, + %% While credit_req_in_flight is true, we stash the + %% latest credit request from the receiving client. + stashed_credit_req :: none | #credit_req{} | credit_api_v1 }). -record(outgoing_unsettled, { @@ -248,6 +297,7 @@ %% and advanced delivery count. Otherwise, we would violate the AMQP protocol spec. outgoing_pending = queue:new() :: queue:queue(#pending_delivery{} | #pending_management_delivery{} | + rabbit_queue_type:credit_reply_action() | #'v1_0.flow'{}), %% The link or session endpoint assigns each message a unique delivery-id @@ -376,6 +426,10 @@ handle_call(Msg, _From, State) -> handle_info(timeout, State) -> noreply(State); +handle_info({bump_credit, Msg}, State) -> + %% We are receiving credit from the writer proc. + credit_flow:handle_bump_msg(Msg), + noreply(State); handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason}, #state{queue_states = QStates0, stashed_eol = Eol} = State0) -> @@ -947,16 +1001,16 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, user = User = #user{username = Username}, reader_pid = ReaderPid}}) -> ok = validate_attach(Attach), - {SndSettled, - EffectiveSndSettleMode} = case SndSettleMode of - ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> - {true, SndSettleMode}; - _ -> - %% In the future, we might want to support sender settle - %% mode mixed where we would expect a settlement from the - %% client only for durable messages. - {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} - end, + {SndSettled, EffectiveSndSettleMode} = + case SndSettleMode of + ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> + {true, SndSettleMode}; + _ -> + %% In the future, we might want to support sender settle + %% mode mixed where we would expect a settlement from the + %% client only for durable messages. + {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} + end, case ensure_source(Source, Vhost, User, PermCache0, TopicPermCache0) of {error, Reason} -> protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]); @@ -984,14 +1038,32 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% all consumers will use credit API v2. %% Streams always use credit API v2 since the stream client (rabbit_stream_queue) holds the link %% flow control state. Hence, credit API mixed version isn't an issue for streams. - {Mode, - DeliveryCount} = case rabbit_feature_flags:is_enabled(credit_api_v2) orelse - QType =:= rabbit_stream_queue of - true -> - {{credited, ?INITIAL_DELIVERY_COUNT}, credit_api_v2}; - false -> - {{credited, credit_api_v1}, {credit_api_v1, ?INITIAL_DELIVERY_COUNT}} - end, + {CreditApiVsn, Mode, DeliveryCount, ClientFlowCtl, + QueueFlowCtl, CreditReqInFlight, StashedCreditReq} = + case rabbit_feature_flags:is_enabled(credit_api_v2) orelse + QType =:= rabbit_stream_queue of + true -> + {2, + {credited, ?INITIAL_DELIVERY_COUNT}, + credit_api_v2, + #client_flow_ctl{delivery_count = ?INITIAL_DELIVERY_COUNT, + credit = 0, + echo = false}, + #queue_flow_ctl{delivery_count = ?INITIAL_DELIVERY_COUNT, + credit = 0, + desired_credit = 0, + drain = false}, + false, + none}; + false -> + {1, + {credited, credit_api_v1}, + ?INITIAL_DELIVERY_COUNT, + credit_api_v1, + credit_api_v1, + credit_api_v1, + credit_api_v1} + end, Spec = #{no_ack => SndSettled, channel_pid => self(), limiter_pid => none, @@ -1020,11 +1092,17 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% Echo back that we will respect the client's requested max-message-size. max_message_size = MaybeMaxMessageSize}, MaxMessageSize = max_message_size(MaybeMaxMessageSize), - Link = #outgoing_link{queue_name_bin = QNameBin, - queue_type = QType, - send_settled = SndSettled, - max_message_size = MaxMessageSize, - delivery_count = DeliveryCount}, + Link = #outgoing_link{ + queue_name_bin = QNameBin, + queue_type = QType, + send_settled = SndSettled, + max_message_size = MaxMessageSize, + credit_api_version = CreditApiVsn, + delivery_count = DeliveryCount, + client_flow_ctl = ClientFlowCtl, + queue_flow_ctl = QueueFlowCtl, + credit_req_in_flight = CreditReqInFlight, + stashed_credit_req = StashedCreditReq}, OutgoingLinks = OutgoingLinks0#{HandleInt => Link}, State1 = State0#state{queue_states = QStates, outgoing_links = OutgoingLinks, @@ -1138,10 +1216,10 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, {QStates, Unsettled, OutgoingLinks} = case maps:take(HandleInt, OutgoingLinks0) of {#outgoing_link{queue_name_bin = QNameBin}, OutgoingLinks1} -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), case rabbit_amqqueue:lookup(QName) of {ok, Q} -> - %%TODO Consider adding a new rabbit_queue_type:remove_consumer API that - from the point of view of + %%TODO Add a new rabbit_queue_type:remove_consumer API that - from the point of view of %% the queue process - behaves as if our session process terminated: All messages checked out %% to this consumer should be re-queued automatically instead of us requeueing them here after cancelling %% consumption. @@ -1154,6 +1232,8 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, %% first detaching and then re-attaching to the same session with the same link handle (the handle %% becomes available for re-use once a link is closed): This will result in the same consumer tag, %% and we ideally disallow "updating" an AMQP consumer. + %% If such an API is not added, we also must return messages in the outgoing_pending queue + %% which haven't made it to the outgoing_unsettled map yet. case rabbit_queue_type:cancel(Q, Ctag, undefined, Username, QStates0) of {ok, QStates1} -> {Unsettled1, MsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0), @@ -1286,84 +1366,406 @@ handle_control(Frame, _State) -> "Unexpected frame ~tp", [amqp10_framing:pprint(Frame)]). -send_pending(#state{remote_incoming_window = Space, - outgoing_pending = Buf0, - cfg = #cfg{writer_pid = WriterPid, - channel_num = Ch}} = State0) -> +send_pending(#state{remote_incoming_window = RemoteIncomingWindow, + outgoing_pending = Buf0 + } = State) -> case queue:out(Buf0) of {empty, _} -> - State0; + State; + {{value, CreditReply}, Buf} + when element(1, CreditReply) =:= credit_reply -> + State1 = State#state{outgoing_pending = Buf}, + State2 = handle_credit_reply(CreditReply, State1), + send_pending(State2); {{value, #'v1_0.flow'{} = Flow0}, Buf} -> - Flow = session_flow_fields(Flow0, State0), + #cfg{writer_pid = WriterPid, + channel_num = Ch} = State#state.cfg, + State1 = State#state{outgoing_pending = Buf}, + Flow = session_flow_fields(Flow0, State1), rabbit_amqp_writer:send_command(WriterPid, Ch, Flow), - send_pending(State0#state{outgoing_pending = Buf}); - {{value, #pending_delivery{ - frames = Frames, - queue_pid = QPid, - outgoing_unsettled = #outgoing_unsettled{queue_name = QName} - } = Pending}, Buf1} - when Space > 0 -> + send_pending(State1); + {{value, Delivery}, Buf} -> + case RemoteIncomingWindow =:= 0 orelse + credit_flow:blocked() of + true -> + State; + false -> + {NewRemoteIncomingWindow, State1} = + send_pending_delivery(Delivery, Buf, State), + NumTransfersSent = RemoteIncomingWindow - NewRemoteIncomingWindow, + State2 = session_flow_control_sent_transfers(NumTransfersSent, State1), + %% Recurse to possibly send FLOW frames. + send_pending(State2) + end + end. + +handle_credit_reply(Action = {credit_reply, Ctag, _DeliveryCount, _Credit, _Available, Drain}, + State = #state{outgoing_links = OutgoingLinks}) -> + Handle = ctag_to_handle(Ctag), + case OutgoingLinks of + #{Handle := Link = #outgoing_link{queue_flow_ctl = QFC, + credit_req_in_flight = CreditReqInFlight}} -> + %% Assert that we expect a credit reply for this consumer. + true = CreditReqInFlight, + %% Assert that "The sender's value is always the last known value indicated by the receiver." + Drain = QFC#queue_flow_ctl.drain, + handle_credit_reply0(Action, Handle, Link, State); + _ -> + %% Ignore credit reply for a detached link. + State + end. + +handle_credit_reply0( + {credit_reply, Ctag, DeliveryCount, Credit, Available, _Drain = false}, + Handle, + #outgoing_link{ + client_flow_ctl = #client_flow_ctl{ + delivery_count = CDeliveryCount, + credit = CCredit, + echo = CEcho + }, + queue_flow_ctl = #queue_flow_ctl{ + delivery_count = QDeliveryCount, + credit = QCredit, + desired_credit = DesiredCredit + } = QFC, + stashed_credit_req = StashedCreditReq + } = Link0, + #state{outgoing_links = OutgoingLinks, + queue_states = QStates0 + } = S0) -> + + %% Assert that flow control state between us and the queue is in sync. + QCredit = Credit, + QDeliveryCount = DeliveryCount, + + case StashedCreditReq of + #credit_req{} -> + %% We prioritise the stashed client request over finishing the current + %% top-up rounds because the latest link state from the client applies. + S = pop_credit_req(Handle, Ctag, Link0, S0), + echo(CEcho, Handle, CDeliveryCount, CCredit, Available, S), + S; + none when QCredit =:= 0 andalso + DesiredCredit > 0 -> + %% Provide queue next batch of credits. + CappedCredit = cap_credit(DesiredCredit), + QName = queue_resource(S0#state.cfg#cfg.vhost, + Link0#outgoing_link.queue_name_bin), + {ok, QStates, Actions} = + rabbit_queue_type:credit( + QName, Ctag, DeliveryCount, CappedCredit, false, QStates0), + Link = Link0#outgoing_link{ + queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit} + }, + S = S0#state{queue_states = QStates, + outgoing_links = OutgoingLinks#{Handle := Link}}, + handle_queue_actions(Actions, S); + none -> + Link = Link0#outgoing_link{credit_req_in_flight = false}, + S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}}, + echo(CEcho, Handle, CDeliveryCount, DesiredCredit, Available, S), + S + end; +handle_credit_reply0( + {credit_reply, Ctag, DeliveryCount, Credit, Available, _Drain = true}, + Handle, + Link0 = #outgoing_link{ + queue_name_bin = QNameBin, + client_flow_ctl = #client_flow_ctl{ + delivery_count = CDeliveryCount0 } = CFC, + queue_flow_ctl = #queue_flow_ctl{ + delivery_count = QDeliveryCount0, + desired_credit = DesiredCredit + } = QFC, + stashed_credit_req = StashedCreditReq}, + S0 = #state{cfg = #cfg{writer_pid = Writer, + vhost = Vhost, + channel_num = ChanNum}, + outgoing_links = OutgoingLinks, + queue_states = QStates0}) -> + %% If the queue sent us a drain credit_reply, + %% the queue must have consumed all our granted credit. + 0 = Credit, + + case DeliveryCount =:= QDeliveryCount0 andalso + DesiredCredit > 0 of + true -> + %% We're in drain mode. The queue did not advance its delivery-count which means + %% it might still have messages available for us. We also desire more messages. + %% Therefore, we do the next round of credit top-up. We prioritise finishing + %% the current drain credit top-up rounds over a stashed credit request because + %% this is easier to reason about and the queue will reply promptly meaning + %% the stashed request will be processed soon enough. + CappedCredit = cap_credit(DesiredCredit), + Link = Link0#outgoing_link{queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}}, + + QName = queue_resource(Vhost, QNameBin), + {ok, QStates, Actions} = + rabbit_queue_type:credit( + QName, Ctag, DeliveryCount, CappedCredit, true, QStates0), + S = S0#state{queue_states = QStates, + outgoing_links = OutgoingLinks#{Handle := Link}}, + handle_queue_actions(Actions, S); + false -> + %% We're in drain mode. + %% The queue either advanced its delivery-count which means it has + %% no more messages available for us, or we do not desire more messages. + %% Therefore, we're done with draining and we "the sender will (after sending + %% all available messages) advance the delivery-count as much as possible, + %% consuming all link-credit, and send the flow state to the receiver." + CDeliveryCount = add(CDeliveryCount0, DesiredCredit), + Flow0 = #'v1_0.flow'{handle = ?UINT(Handle), + delivery_count = ?UINT(CDeliveryCount), + link_credit = ?UINT(0), + drain = true, + available = ?UINT(Available)}, + Flow = session_flow_fields(Flow0, S0), + rabbit_amqp_writer:send_command(Writer, ChanNum, Flow), + Link = Link0#outgoing_link{ + client_flow_ctl = CFC#client_flow_ctl{ + delivery_count = CDeliveryCount, + credit = 0}, + queue_flow_ctl = QFC#queue_flow_ctl{ + delivery_count = DeliveryCount, + credit = 0, + desired_credit = 0, + drain = false}, + credit_req_in_flight = false + }, + S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}}, + case StashedCreditReq of + none -> + S; + #credit_req{} -> + pop_credit_req(Handle, Ctag, Link, S) + end + end. + +pop_credit_req( + Handle, Ctag, + Link0 = #outgoing_link{ + queue_name_bin = QNameBin, + client_flow_ctl = #client_flow_ctl{ + delivery_count = CDeliveryCount + } = CFC, + queue_flow_ctl = #queue_flow_ctl{ + delivery_count = QDeliveryCount + } = QFC, + stashed_credit_req = #credit_req{ + delivery_count = DeliveryCountRcv, + credit = LinkCreditRcv, + drain = Drain, + echo = Echo + }}, + S0 = #state{cfg = #cfg{vhost = Vhost}, + outgoing_links = OutgoingLinks, + queue_states = QStates0}) -> + LinkCreditSnd = amqp10_util:link_credit_snd( + DeliveryCountRcv, LinkCreditRcv, CDeliveryCount), + CappedCredit = cap_credit(LinkCreditSnd), + QName = queue_resource(Vhost, QNameBin), + {ok, QStates, Actions} = + rabbit_queue_type:credit( + QName, Ctag, QDeliveryCount, CappedCredit, Drain, QStates0), + Link = Link0#outgoing_link{ + client_flow_ctl = CFC#client_flow_ctl{ + credit = LinkCreditSnd, + echo = Echo}, + queue_flow_ctl = QFC#queue_flow_ctl{ + credit = CappedCredit, + desired_credit = LinkCreditSnd, + drain = Drain + }, + credit_req_in_flight = true, + stashed_credit_req = none + }, + S = S0#state{queue_states = QStates, + outgoing_links = OutgoingLinks#{Handle := Link}}, + handle_queue_actions(Actions, S). + +echo(Echo, HandleInt, DeliveryCount, LinkCredit, Available, State) -> + case Echo of + true -> + Flow0 = #'v1_0.flow'{handle = ?UINT(HandleInt), + delivery_count = ?UINT(DeliveryCount), + link_credit = ?UINT(LinkCredit), + available = ?UINT(Available)}, + Flow = session_flow_fields(Flow0, State), + #cfg{writer_pid = Writer, + channel_num = Channel} = State#state.cfg, + rabbit_amqp_writer:send_command(Writer, Channel, Flow); + false -> + ok + end. + +send_pending_delivery(#pending_delivery{ + frames = Frames, + queue_pid = QPid, + outgoing_unsettled = #outgoing_unsettled{consumer_tag = Ctag, + queue_name = QName} + } = Pending, + Buf0, + #state{remote_incoming_window = Space, + outgoing_links = OutgoingLinks, + queue_states = QStates, + cfg = #cfg{writer_pid = WriterPid, + channel_num = Ch} + } = State0) -> + Handle = ctag_to_handle(Ctag), + case is_map_key(Handle, OutgoingLinks) of + true -> SendFun = case QPid of credit_api_v2 -> send_fun(WriterPid, Ch); _ -> - case rabbit_queue_type:module(QName, State0#state.queue_states) of + case rabbit_queue_type:module(QName, QStates) of {ok, rabbit_classic_queue} -> %% Classic queue client and classic queue process that %% communicate via credit API v1 use RabbitMQ internal %% credit flow control. fun(Transfer, Sections) -> rabbit_amqp_writer:send_command_and_notify( - WriterPid, Ch, QPid, self(), Transfer, Sections) + WriterPid, QPid, Ch, Transfer, Sections) end; {ok, _QType} -> send_fun(WriterPid, Ch) end end, - {NumTransfersSent, Buf, State1} = case send_frames(SendFun, Frames, Space) of {sent_all, SpaceLeft} -> - {Space - SpaceLeft, - Buf1, - record_outgoing_unsettled(Pending, State0)}; - {sent_some, Rest} -> - {Space, - queue:in_r(Pending#pending_delivery{frames = Rest}, Buf1), - State0} - end, - State2 = session_flow_control_sent_transfers(NumTransfersSent, State1), - State = State2#state{outgoing_pending = Buf}, - send_pending(State); - {{value, Pending = #pending_management_delivery{frames = Frames}}, Buf1} - when Space > 0 -> - SendFun = send_fun(WriterPid, Ch), - {NumTransfersSent, Buf} = - case send_frames(SendFun, Frames, Space) of - {sent_all, SpaceLeft} -> - {Space - SpaceLeft, Buf1}; - {sent_some, Rest} -> - {Space, queue:in_r(Pending#pending_management_delivery{frames = Rest}, Buf1)} - end, - State1 = session_flow_control_sent_transfers(NumTransfersSent, State0), - State = State1#state{outgoing_pending = Buf}, - send_pending(State); - _ when Space =:= 0 -> - State0 + State1 = State0#state{outgoing_pending = Buf0}, + State = sent_pending_delivery(Pending, Handle, State1), + {SpaceLeft, State}; + {sent_some, SpaceLeft, Rest} -> + Buf = queue:in_r(Pending#pending_delivery{frames = Rest}, Buf0), + State = State0#state{outgoing_pending = Buf}, + {SpaceLeft, State} + end; + false -> + %% Link got detached. Either the client closed the link in which case the queue + %% already requeued all checked out messages or the queue doesn't exist anymore + %% in which case there is no point in requeuing this message. + %% Therefore, ignore (drop) this delivery. + State = State0#state{outgoing_pending = Buf0}, + {Space, State} + end; +send_pending_delivery(#pending_management_delivery{frames = Frames} = Pending, + Buf0, + #state{remote_incoming_window = Space, + cfg = #cfg{writer_pid = WriterPid, + channel_num = Ch} + } = State0) -> + SendFun = send_fun(WriterPid, Ch), + case send_frames(SendFun, Frames, Space) of + {sent_all, SpaceLeft} -> + State = State0#state{outgoing_pending = Buf0}, + {SpaceLeft, State}; + {sent_some, SpaceLeft, Rest} -> + Buf = queue:in_r(Pending#pending_management_delivery{frames = Rest}, Buf0), + State = State0#state{outgoing_pending = Buf}, + {SpaceLeft, State} end. send_frames(_, [], SpaceLeft) -> {sent_all, SpaceLeft}; -send_frames(_, Rest, 0) -> - {sent_some, Rest}; -send_frames(SendFun, [[Transfer, Sections] | Rest], SpaceLeft) -> - SendFun(Transfer, Sections), - send_frames(SendFun, Rest, SpaceLeft - 1). +send_frames(_, Rest, SpaceLeft = 0) -> + {sent_some, SpaceLeft, Rest}; +send_frames(SendFun, [[Transfer, Sections] | Rest] = Frames, SpaceLeft) -> + case SendFun(Transfer, Sections) of + ok -> + send_frames(SendFun, Rest, SpaceLeft - 1); + {error, blocked} -> + {sent_some, SpaceLeft, Frames} + end. send_fun(WriterPid, Ch) -> fun(Transfer, Sections) -> rabbit_amqp_writer:send_command(WriterPid, Ch, Transfer, Sections) end. +sent_pending_delivery( + Pending = #pending_delivery{ + outgoing_unsettled = #outgoing_unsettled{ + consumer_tag = Ctag, + queue_name = QName}}, + Handle, + S0 = #state{outgoing_links = OutgoingLinks0, + queue_states = QStates0}) -> + + #outgoing_link{ + credit_api_version = CreditApiVsn, + client_flow_ctl = CFC0, + queue_flow_ctl = QFC0, + credit_req_in_flight = CreditReqInFlight0 + } = Link0 = maps:get(Handle, OutgoingLinks0), + + S = case CreditApiVsn of + 2 -> + #client_flow_ctl{ + delivery_count = CDeliveryCount0, + credit = CCredit0 + } = CFC0, + #queue_flow_ctl{ + delivery_count = QDeliveryCount0, + credit = QCredit0, + desired_credit = DesiredCredit0 + } = QFC0, + + CDeliveryCount = add(CDeliveryCount0, 1), + %% Even though the spec mandates + %% "If the link-credit is less than or equal to zero, i.e., + %% the delivery-count is the same as or greater than the + %% delivery-limit, a sender MUST NOT send more messages." + %% we forced the message through to be sent to the client. + %% Due to our dual link approach, we don't want to buffer any + %% messages in the session if the receiving client dynamically + %% decreased link credit. The alternative is to requeue messages. + %% "the receiver MAY either handle the excess messages normally + %% or detach the link with a transfer-limit-exceeded error code." + CCredit = max(0, CCredit0 - 1), + + QDeliveryCount = add(QDeliveryCount0, 1), + QCredit1 = max(0, QCredit0 - 1), + DesiredCredit = max(0, DesiredCredit0 - 1), + + {QCredit, CreditReqInFlight, QStates, Actions} = + case QCredit1 =:= 0 andalso + DesiredCredit > 0 andalso + not CreditReqInFlight0 of + true -> + %% assertion + none = Link0#outgoing_link.stashed_credit_req, + %% Provide queue next batch of credits. + CappedCredit = cap_credit(DesiredCredit), + {ok, QStates1, Actions0} = + rabbit_queue_type:credit( + QName, Ctag, QDeliveryCount, CappedCredit, + QFC0#queue_flow_ctl.drain, QStates0), + {CappedCredit, true, QStates1, Actions0}; + false -> + {QCredit1, CreditReqInFlight0, QStates0, []} + end, + + CFC = CFC0#client_flow_ctl{ + delivery_count = CDeliveryCount, + credit = CCredit}, + QFC = QFC0#queue_flow_ctl{ + delivery_count = QDeliveryCount, + credit = QCredit, + desired_credit = DesiredCredit}, + Link = Link0#outgoing_link{client_flow_ctl = CFC, + queue_flow_ctl = QFC, + credit_req_in_flight = CreditReqInFlight}, + OutgoingLinks = OutgoingLinks0#{Handle := Link}, + S1 = S0#state{outgoing_links = OutgoingLinks, + queue_states = QStates}, + handle_queue_actions(Actions, S1); + 1 -> + S0 + end, + record_outgoing_unsettled(Pending, S). + record_outgoing_unsettled(#pending_delivery{queue_ack_required = true, delivery_id = DeliveryId, outgoing_unsettled = Unsettled}, @@ -1554,30 +1956,23 @@ handle_queue_actions(Actions, State) -> lists:foldl(fun(Msg, S) -> handle_deliver(CTag, AckRequired, Msg, S) end, S0, Msgs); - ({credit_reply, Ctag, DeliveryCount, Credit, Available, Drain}, + ({credit_reply, _Ctag, _DeliveryCount, _Credit, _Available, _Drain} = Action, S = #state{outgoing_pending = Pending}) -> %% credit API v2 - Handle = ctag_to_handle(Ctag), - Flow = #'v1_0.flow'{ - handle = ?UINT(Handle), - delivery_count = ?UINT(DeliveryCount), - link_credit = ?UINT(Credit), - available = ?UINT(Available), - drain = Drain}, - S#state{outgoing_pending = queue:in(Flow, Pending)}; + S#state{outgoing_pending = queue:in(Action, Pending)}; ({credit_reply_v1, Ctag, Credit0, Available, Drain}, S0 = #state{outgoing_links = OutgoingLinks0, outgoing_pending = Pending}) -> %% credit API v1 %% Delete this branch when feature flag credit_api_v2 becomes required. Handle = ctag_to_handle(Ctag), - Link = #outgoing_link{delivery_count = {credit_api_v1, Count0}} = maps:get(Handle, OutgoingLinks0), + Link = #outgoing_link{delivery_count = Count0} = maps:get(Handle, OutgoingLinks0), {Count, Credit, S} = case Drain of true -> Count1 = add(Count0, Credit0), OutgoingLinks = maps:update( Handle, - Link#outgoing_link{delivery_count = {credit_api_v1, Count1}}, + Link#outgoing_link{delivery_count = Count1}, OutgoingLinks0), S1 = S0#state{outgoing_links = OutgoingLinks}, {Count1, 0, S1}; @@ -1617,6 +2012,7 @@ handle_deliver(ConsumerTag, AckRequired, #{Handle := #outgoing_link{queue_type = QType, send_settled = SendSettled, max_message_size = MaxMessageSize, + credit_api_version = CreditApiVsn, delivery_count = DelCount} = Link0} -> Dtag = delivery_tag(MsgId, SendSettled), Transfer = #'v1_0.transfer'{ @@ -1633,11 +2029,12 @@ handle_deliver(ConsumerTag, AckRequired, messages_delivered(Redelivered, QType), rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, Trace), {OutgoingLinks, QPid - } = case DelCount of - credit_api_v2 -> + } = case CreditApiVsn of + 2 -> {OutgoingLinks0, credit_api_v2}; - {credit_api_v1, C} -> - Link = Link0#outgoing_link{delivery_count = {credit_api_v1, add(C, 1)}}, + 1 -> + DelCount = Link0#outgoing_link.delivery_count, + Link = Link0#outgoing_link{delivery_count = add(DelCount, 1)}, OutgoingLinks1 = maps:update(Handle, Link, OutgoingLinks0), {OutgoingLinks1, QPid0} end, @@ -1943,7 +2340,7 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) -> {utf8, String} -> case parse_target_v2_string(String) of {ok, XNameBin, RKey, _} -> - XName = rabbit_misc:r(Vhost, exchange, XNameBin), + XName = exchange_resource(Vhost, XNameBin), PermCache = check_resource_access(XName, write, User, PermCache0), case rabbit_exchange:lookup(XName) of {ok, X} -> @@ -2072,13 +2469,13 @@ ensure_source_v1(Address, case rabbit_routing_parser:parse_routing(Src) of {"", QNameList} -> true = string:equal(QNameList, QNameBin), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), {ok, QName, PermCache1, TopicPermCache0}; {XNameList, RoutingKeyList} -> RoutingKey = unicode:characters_to_binary(RoutingKeyList), XNameBin = unicode:characters_to_binary(XNameList), - XName = rabbit_misc:r(Vhost, exchange, XNameBin), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + XName = exchange_resource(Vhost, XNameBin), + QName = queue_resource(Vhost, QNameBin), Binding = #binding{source = XName, destination = QName, key = RoutingKey}, @@ -2101,7 +2498,7 @@ ensure_source_v1(Address, %% The only possible v2 source address format is: %% /queue/:queue ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), ok = exit_if_absent(QName), {ok, QName, PermCache, TopicPermCache}; ensure_source_v2(Address, _, _, _) -> @@ -2148,7 +2545,7 @@ try_target_v2(Address, Vhost, User, PermCache) -> end. check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> - XName = rabbit_misc:r(Vhost, exchange, XNameBin), + XName = exchange_resource(Vhost, XNameBin), PermCache = check_resource_access(XName, write, User, PermCache0), case rabbit_exchange:lookup(XName) of {ok, X} -> @@ -2277,31 +2674,73 @@ handle_outgoing_mgmt_link_flow_control( handle_outgoing_link_flow_control( #outgoing_link{queue_name_bin = QNameBin, - delivery_count = MaybeDeliveryCountSnd}, + credit_api_version = CreditApiVsn, + client_flow_ctl = CFC, + queue_flow_ctl = QFC, + credit_req_in_flight = CreditReqInFlight + } = Link0, #'v1_0.flow'{handle = ?UINT(HandleInt), delivery_count = MaybeDeliveryCountRcv, link_credit = ?UINT(LinkCreditRcv), drain = Drain0, echo = Echo0}, - State0 = #state{queue_states = QStates0, - cfg = #cfg{vhost = Vhost}}) -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), + #state{outgoing_links = OutgoingLinks, + queue_states = QStates0, + cfg = #cfg{vhost = Vhost} + } = State0) -> + QName = queue_resource(Vhost, QNameBin), Ctag = handle_to_ctag(HandleInt), DeliveryCountRcv = delivery_count_rcv(MaybeDeliveryCountRcv), Drain = default(Drain0, false), Echo = default(Echo0, false), - case MaybeDeliveryCountSnd of - credit_api_v2 -> - {ok, QStates, Actions} = rabbit_queue_type:credit( - QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, QStates0), - State1 = State0#state{queue_states = QStates}, - State = handle_queue_actions(Actions, State1), - %% We'll handle the credit_reply queue event async later - %% thanks to the queue event containing the consumer tag. - State; - {credit_api_v1, DeliveryCountSnd} -> - LinkCreditSnd = amqp10_util:link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd), - {ok, QStates, Actions} = rabbit_queue_type:credit_v1(QName, Ctag, LinkCreditSnd, Drain, QStates0), + case CreditApiVsn of + 2 -> + case CreditReqInFlight of + false -> + DesiredCredit = amqp10_util:link_credit_snd( + DeliveryCountRcv, + LinkCreditRcv, + CFC#client_flow_ctl.delivery_count), + CappedCredit = cap_credit(DesiredCredit), + Link = Link0#outgoing_link{ + credit_req_in_flight = true, + client_flow_ctl = CFC#client_flow_ctl{ + credit = DesiredCredit, + echo = Echo}, + queue_flow_ctl = QFC#queue_flow_ctl{ + credit = CappedCredit, + desired_credit = DesiredCredit, + drain = Drain}}, + {ok, QStates, Actions} = rabbit_queue_type:credit( + QName, Ctag, + QFC#queue_flow_ctl.delivery_count, + CappedCredit, Drain, QStates0), + State = State0#state{queue_states = QStates, + outgoing_links = OutgoingLinks#{HandleInt := Link}}, + handle_queue_actions(Actions, State); + true -> + %% A credit request is currently in-flight. Let's first process its reply + %% before sending the next request. This ensures our outgoing_pending + %% queue won't contain lots of credit replies for the same consumer + %% when the client floods us with credit requests, but closed its incoming-window. + %% Processing one credit top up at a time between us and the queue is also easier + %% to reason about. Therefore, we stash the new request. If there is already a + %% stashed request, we replace it because the latest flow control state from the + %% client applies. + Link = Link0#outgoing_link{ + stashed_credit_req = #credit_req{ + delivery_count = DeliveryCountRcv, + credit = LinkCreditRcv, + drain = Drain, + echo = Echo}}, + State0#state{outgoing_links = OutgoingLinks#{HandleInt := Link}} + end; + 1 -> + DeliveryCountSnd = Link0#outgoing_link.delivery_count, + LinkCreditSnd = amqp10_util:link_credit_snd( + DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd), + {ok, QStates, Actions} = rabbit_queue_type:credit_v1( + QName, Ctag, LinkCreditSnd, Drain, QStates0), State1 = State0#state{queue_states = QStates}, State = handle_queue_actions(Actions, State1), process_credit_reply_sync(Ctag, QName, LinkCreditSnd, State) @@ -2610,7 +3049,7 @@ declare_queue(QNameBin, User = #user{username = Username}, TerminusDurability, PermCache0) -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), PermCache = check_resource_access(QName, configure, User, PermCache0), rabbit_core_metrics:queue_declared(QName), Q0 = amqqueue:new(QName, @@ -2931,6 +3370,11 @@ not_found(Resource) -> address_v1_permitted() -> rabbit_deprecated_features:is_permitted(amqp_address_v1). +-spec cap_credit(rabbit_queue_type:credit()) -> + 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX. +cap_credit(DesiredCredit) -> + min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX). + format_status( #{state := #state{cfg = Cfg, outgoing_pending = OutgoingPending, diff --git a/deps/rabbit/src/rabbit_amqp_writer.erl b/deps/rabbit/src/rabbit_amqp_writer.erl index 2395b8c772ed..bd42fc1c2de2 100644 --- a/deps/rabbit/src/rabbit_amqp_writer.erl +++ b/deps/rabbit/src/rabbit_amqp_writer.erl @@ -15,7 +15,7 @@ send_command/3, send_command/4, send_command_sync/3, - send_command_and_notify/6, + send_command_and_notify/5, internal_send_command/3]). %% gen_server callbacks @@ -31,7 +31,8 @@ reader :: rabbit_types:connection(), pending :: iolist(), %% This field is just an optimisation to minimize the cost of erlang:iolist_size/1 - pending_size :: non_neg_integer() + pending_size :: non_neg_integer(), + monitored_sessions :: #{pid() => true} }). -define(HIBERNATE_AFTER, 6_000). @@ -62,10 +63,10 @@ send_command(Writer, ChannelNum, Performative) -> -spec send_command(pid(), rabbit_types:channel_number(), performative(), - payload()) -> ok. + payload()) -> ok | {error, blocked}. send_command(Writer, ChannelNum, Performative, Payload) -> - Request = {send_command, ChannelNum, Performative, Payload}, - gen_server:cast(Writer, Request). + Request = {send_command, self(), ChannelNum, Performative, Payload}, + maybe_send(Writer, Request). -spec send_command_sync(pid(), rabbit_types:channel_number(), @@ -76,14 +77,13 @@ send_command_sync(Writer, ChannelNum, Performative) -> %% Delete this function when feature flag credit_api_v2 becomes required. -spec send_command_and_notify(pid(), - rabbit_types:channel_number(), - pid(), pid(), + rabbit_types:channel_number(), performative(), - payload()) -> ok. -send_command_and_notify(Writer, ChannelNum, QueuePid, SessionPid, Performative, Payload) -> - Request = {send_command_and_notify, ChannelNum, QueuePid, SessionPid, Performative, Payload}, - gen_server:cast(Writer, Request). + payload()) -> ok | {error, blocked}. +send_command_and_notify(Writer, QueuePid, ChannelNum, Performative, Payload) -> + Request = {send_command_and_notify, QueuePid, self(), ChannelNum, Performative, Payload}, + maybe_send(Writer, Request). -spec internal_send_command(rabbit_net:socket(), performative(), @@ -101,19 +101,22 @@ init({Sock, MaxFrame, ReaderPid}) -> max_frame_size = MaxFrame, reader = ReaderPid, pending = [], - pending_size = 0}, + pending_size = 0, + monitored_sessions = #{}}, process_flag(message_queue_data, off_heap), {ok, State}. handle_cast({send_command, ChannelNum, Performative}, State0) -> State = internal_send_command_async(ChannelNum, Performative, State0), no_reply(State); -handle_cast({send_command, ChannelNum, Performative, Payload}, State0) -> - State = internal_send_command_async(ChannelNum, Performative, Payload, State0), +handle_cast({send_command, SessionPid, ChannelNum, Performative, Payload}, State0) -> + State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0), + State = credit_flow_ack(SessionPid, State1), no_reply(State); %% Delete below function clause when feature flag credit_api_v2 becomes required. -handle_cast({send_command_and_notify, ChannelNum, QueuePid, SessionPid, Performative, Payload}, State0) -> - State = internal_send_command_async(ChannelNum, Performative, Payload, State0), +handle_cast({send_command_and_notify, QueuePid, SessionPid, ChannelNum, Performative, Payload}, State0) -> + State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0), + State = credit_flow_ack(SessionPid, State1), rabbit_amqqueue:notify_sent(QueuePid, SessionPid), no_reply(State). @@ -125,6 +128,11 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) -> handle_info(timeout, State0) -> State = flush(State0), {noreply, State}; +handle_info({{'DOWN', session}, _MRef, process, SessionPid, _Reason}, + State0 = #state{monitored_sessions = Sessions}) -> + credit_flow:peer_down(SessionPid), + State = State0#state{monitored_sessions = maps:remove(SessionPid, Sessions)}, + no_reply(State); %% Delete below function clause when feature flag credit_api_v2 becomes required. handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QueuePid), @@ -154,6 +162,25 @@ format_status(Status) -> no_reply(State) -> {noreply, State, 0}. +maybe_send(Writer, Request) -> + case credit_flow:blocked() of + false -> + credit_flow:send(Writer), + gen_server:cast(Writer, Request); + true -> + {error, blocked} + end. + +credit_flow_ack(SessionPid, State = #state{monitored_sessions = Sessions}) -> + credit_flow:ack(SessionPid), + case is_map_key(SessionPid, Sessions) of + true -> + State; + false -> + _MonitorRef = monitor(process, SessionPid, [{tag, {'DOWN', session}}]), + State#state{monitored_sessions = maps:put(SessionPid, true, Sessions)} + end. + internal_send_command_async(Channel, Performative, State = #state{pending = Pending, pending_size = PendingSize}) -> diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index a26d71ca70b9..5380f5b5f751 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -1633,8 +1633,8 @@ handle_cast({credit, SessionPid, CTag, Credit, Drain}, %% Behave like non-native AMQP 1.0: Send send_credit_reply before deliveries. rabbit_classic_queue:send_credit_reply_credit_api_v1( SessionPid, amqqueue:get_name(Q), BQ:len(BQS0)), - handle_cast({credit, SessionPid, CTag, credit_api_v1, Credit, Drain, false}, State); -handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain, Echo}, + handle_cast({credit, SessionPid, CTag, credit_api_v1, Credit, Drain}, State); +handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain}, #q{consumers = Consumers0, q = Q} = State0) -> QName = amqqueue:get_name(Q), @@ -1666,8 +1666,7 @@ handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain, Echo}, rabbit_classic_queue:send_credit_reply( SessionPid, QName, CTag, AdvancedDeliveryCount, 0, Avail, Drain); {PostDeliveryCountSnd, PostCred} - when is_integer(PostDeliveryCountSnd) andalso - Echo -> + when is_integer(PostDeliveryCountSnd) -> %% credit API v2 Avail = BQ:len(PostBQS), rabbit_classic_queue:send_credit_reply( diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 42dbcbb096f0..4024de5ee476 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -42,7 +42,7 @@ deliver/3, settle/5, credit_v1/5, - credit/7, + credit/6, dequeue/5, info/2, state_info/1, @@ -306,8 +306,8 @@ credit_v1(_QName, Ctag, LinkCreditSnd, Drain, #?STATE{pid = QPid} = State) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}), {State, []}. -credit(_QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, #?STATE{pid = QPid} = State) -> - Request = {credit, self(), Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo}, +credit(_QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, #?STATE{pid = QPid} = State) -> + Request = {credit, self(), Ctag, DeliveryCountRcv, LinkCreditRcv, Drain}, delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}), {State, []}. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 65f2db8a601f..076c3c80e01e 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -119,7 +119,7 @@ checked_out = #{} :: #{msg_id() => msg()}, %% max number of messages that can be sent %% decremented for each delivery - credit = 0 : non_neg_integer(), + credit = 0 :: non_neg_integer(), %% AMQP 1.0 §2.6.7 delivery_count :: rabbit_queue_type:delivery_count() }). diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index f1bea8db3498..70ced853751e 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -23,7 +23,7 @@ return/3, discard/3, credit_v1/4, - credit/6, + credit/5, handle_ra_event/4, untracked_enqueue/2, purge/1, @@ -43,10 +43,6 @@ -record(consumer, {last_msg_id :: seq() | -1 | undefined, ack = false :: boolean(), - %% 'echo' field from latest FLOW, see AMQP 1.0 §2.7.4 - %% Quorum queue server will always echo back to us, - %% but we only emit a credit_reply if Echo=true - echo :: boolean(), %% Remove this field when feature flag credit_api_v2 becomes required. delivery_count :: {credit_api_v1, rabbit_queue_type:delivery_count()} | credit_api_v2 }). @@ -369,7 +365,6 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, fun (C) -> C#consumer{ack = Ack} end, #consumer{last_msg_id = LastMsgId, ack = Ack, - echo = false, delivery_count = DeliveryCount}, CDels0), {ok, State0#state{leader = Leader, @@ -397,12 +392,9 @@ query_single_active_consumer(#state{leader = Leader}) -> state()) -> {state(), rabbit_queue_type:actions()}. credit_v1(ConsumerTag, Credit, Drain, - #state{consumer_deliveries = CDels} = State0) -> - ConsumerId = consumer_id(ConsumerTag), + State = #state{consumer_deliveries = CDels}) -> #consumer{delivery_count = {credit_api_v1, Count}} = maps:get(ConsumerTag, CDels), - ServerId = pick_server(State0), - Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, Count, Drain), - {send_command(ServerId, undefined, Cmd, normal, State0), []}. + credit(ConsumerTag, Count, Credit, Drain, State). %% @doc Provide credit to the queue %% @@ -417,18 +409,12 @@ credit_v1(ConsumerTag, Credit, Drain, rabbit_queue_type:delivery_count(), rabbit_queue_type:credit(), Drain :: boolean(), - Echo :: boolean(), state()) -> {state(), rabbit_queue_type:actions()}. -credit(ConsumerTag, DeliveryCount, Credit, Drain, Echo, - #state{consumer_deliveries = CDels0} = State0) -> +credit(ConsumerTag, DeliveryCount, Credit, Drain, State) -> ConsumerId = consumer_id(ConsumerTag), - ServerId = pick_server(State0), + ServerId = pick_server(State), Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, DeliveryCount, Drain), - CDels = maps:update_with(ConsumerTag, - fun(C) -> C#consumer{echo = Echo} end, - CDels0), - State = State0#state{consumer_deliveries = CDels}, {send_command(ServerId, undefined, Cmd, normal, State), []}. %% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag @@ -594,21 +580,10 @@ handle_ra_event(QName, From, {applied, Seqs}, end; handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_delivery(QName, From, Del, State0); -handle_ra_event(_QName, _From, - {machine, {credit_reply_v1, _CTag, _Credit, _Available, _Drain = false} = Action}, - State) -> +handle_ra_event(_QName, _From, {machine, Action}, State) + when element(1, Action) =:= credit_reply orelse + element(1, Action) =:= credit_reply_v1 -> {ok, State, [Action]}; -handle_ra_event(_QName, _From, - {machine, {credit_reply, CTag, _DeliveryCount, _Credit, _Available, Drain} = Action}, - #state{consumer_deliveries = CDels} = State) -> - Actions = case CDels of - #{CTag := #consumer{echo = Echo}} - when Echo orelse Drain -> - [Action]; - _ -> - [] - end, - {ok, State, Actions}; handle_ra_event(_QName, _, {machine, {queue_status, Status}}, #state{} = State) -> %% just set the queue status diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index de7e908fd66f..3ea45e454d8c 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -47,7 +47,7 @@ deliver/4, settle/5, credit_v1/5, - credit/7, + credit/6, dequeue/5, fold_state/3, is_policy_applicable/2, @@ -85,6 +85,9 @@ -define(QUEUE_MODULES, [rabbit_classic_queue, rabbit_quorum_queue, rabbit_stream_queue]). -define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]). +-type credit_reply_action() :: {credit_reply, rabbit_types:ctag(), delivery_count(), credit(), + Available :: non_neg_integer(), Drain :: boolean()}. + %% anything that the host process needs to do on behalf of the queue type session -type action() :: %% indicate to the queue type module that a message has been delivered @@ -92,9 +95,7 @@ {settled, queue_name(), [correlation()]} | {deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]} | {block | unblock, QueueName :: term()} | - %% credit API v2 - {credit_reply, rabbit_types:ctag(), delivery_count(), credit(), - Available :: non_neg_integer(), Drain :: boolean()} | + credit_reply_action() | %% credit API v1 {credit_reply_v1, rabbit_types:ctag(), credit(), Available :: non_neg_integer(), Drain :: boolean()}. @@ -135,6 +136,7 @@ consume_mode/0, consume_spec/0, delivery_options/0, + credit_reply_action/0, action/0, actions/0, settle_op/0, @@ -222,9 +224,8 @@ -callback credit_v1(queue_name(), rabbit_types:ctag(), credit(), Drain :: boolean(), queue_state()) -> {queue_state(), actions()}. -%% credit API v2 -callback credit(queue_name(), rabbit_types:ctag(), delivery_count(), credit(), - Drain :: boolean(), Echo :: boolean(), queue_state()) -> + Drain :: boolean(), queue_state()) -> {queue_state(), actions()}. -callback dequeue(queue_name(), NoAck :: boolean(), LimiterPid :: pid(), @@ -675,12 +676,12 @@ credit_v1(QName, CTag, LinkCreditSnd, Drain, Ctxs) -> {ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}. %% credit API v2 --spec credit(queue_name(), rabbit_types:ctag(), delivery_count(), credit(), boolean(), boolean(), state()) -> +-spec credit(queue_name(), rabbit_types:ctag(), delivery_count(), credit(), boolean(), state()) -> {ok, state(), actions()}. -credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, Ctxs) -> +credit(QName, CTag, DeliveryCount, Credit, Drain, Ctxs) -> #ctx{state = State0, module = Mod} = Ctx = get_ctx(QName, Ctxs), - {State, Actions} = Mod:credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, State0), + {State, Actions} = Mod:credit(QName, CTag, DeliveryCount, Credit, Drain, State0), {ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}. -spec dequeue(amqqueue:amqqueue(), boolean(), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index e91a4bff19aa..58dae5c6562f 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -25,7 +25,7 @@ delete_immediately/1]). -export([state_info/1, info/2, stat/1, infos/1, infos/2]). -export([settle/5, dequeue/5, consume/3, cancel/5]). --export([credit_v1/5, credit/7]). +-export([credit_v1/5, credit/6]). -export([purge/1]). -export([stateless_deliver/2, deliver/3]). -export([dead_letter_publish/5]). @@ -810,8 +810,8 @@ settle(_QName, discard, CTag, MsgIds, QState) -> credit_v1(_QName, CTag, Credit, Drain, QState) -> rabbit_fifo_client:credit_v1(quorum_ctag(CTag), Credit, Drain, QState). -credit(_QName, CTag, DeliveryCount, Credit, Drain, Echo, QState) -> - rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, Echo, QState). +credit(_QName, CTag, DeliveryCount, Credit, Drain, QState) -> + rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, QState). -spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), pid(), rabbit_types:ctag(), rabbit_fifo_client:state()) -> diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 7444553f3271..eaadae002579 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -24,7 +24,7 @@ deliver/3, settle/5, credit_v1/5, - credit/7, + credit/6, dequeue/5, info/2, queue_length/1, @@ -101,6 +101,7 @@ }). -import(rabbit_queue_type_util, [args_policy_lookup/3]). +-import(rabbit_misc, [queue_resource/2]). -type client() :: #stream_client{}. @@ -467,7 +468,7 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, credit_v1(_, _, _, _, _) -> erlang:error(credit_v1_unsupported). -credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, +credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, #stream_client{readers = Readers, name = Name, local_pid = LocalPid} = State0) -> @@ -479,27 +480,20 @@ credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, {Str2 = #stream{delivery_count = DeliveryCount, credit = Credit, ack = Ack}, Msgs} = stream_entries(QName, Name, LocalPid, Str1), - DrainedInsufficientMsgs = Drain andalso Credit > 0, - Str = case DrainedInsufficientMsgs of + Str = case Drain andalso Credit > 0 of true -> Str2#stream{delivery_count = serial_number:add(DeliveryCount, Credit), credit = 0}; false -> Str2 end, - DeliverActions = deliver_actions(CTag, Ack, Msgs), State = State0#stream_client{readers = maps:update(CTag, Str, Readers)}, - Actions = case Echo orelse DrainedInsufficientMsgs of - true -> - DeliverActions ++ [{credit_reply, - CTag, - Str#stream.delivery_count, - Str#stream.credit, - available_messages(Str), - Drain}]; - false -> - DeliverActions - end, + Actions = deliver_actions(CTag, Ack, Msgs) ++ [{credit_reply, + CTag, + Str#stream.delivery_count, + Str#stream.credit, + available_messages(Str), + Drain}], {State, Actions}; _ -> {State0, []} @@ -973,7 +967,7 @@ set_retention_policy(Name, VHost, Policy) -> {error, _} = E -> E; MaxAge -> - QName = rabbit_misc:r(VHost, queue, Name), + QName = queue_resource(VHost, Name), Fun = fun(Q) -> Conf = amqqueue:get_type_state(Q), amqqueue:set_type_state(Q, Conf#{max_age => MaxAge}) @@ -1004,7 +998,7 @@ restart_stream(VHost, Queue, Options) add_replica(VHost, Name, Node) -> - QName = rabbit_misc:r(VHost, queue, Name), + QName = queue_resource(VHost, Name), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; @@ -1022,7 +1016,7 @@ add_replica(VHost, Name, Node) -> end. delete_replica(VHost, Name, Node) -> - QName = rabbit_misc:r(VHost, queue, Name), + QName = queue_resource(VHost, Name), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 3503c6be8f97..1ef79d0287d6 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -51,6 +51,9 @@ groups() -> roundtrip_with_drain_classic_queue, roundtrip_with_drain_quorum_queue, roundtrip_with_drain_stream, + drain_many_classic_queue, + drain_many_quorum_queue, + drain_many_stream, amqp_stream_amqpl, amqp_quorum_queue_amqpl, message_headers_conversion, @@ -85,7 +88,6 @@ groups() -> resource_alarm_after_session_begin, max_message_size_client_to_server, max_message_size_server_to_client, - receive_transfer_flow_order, global_counters, stream_filtering, available_messages_classic_queue, @@ -106,7 +108,20 @@ groups() -> classic_priority_queue, dead_letter_headers_exchange, dead_letter_reject, - immutable_bare_message + immutable_bare_message, + receive_many_made_available_over_time_classic_queue, + receive_many_made_available_over_time_quorum_queue, + receive_many_made_available_over_time_stream, + receive_many_auto_flow_classic_queue, + receive_many_auto_flow_quorum_queue, + receive_many_auto_flow_stream, + incoming_window_closed_transfer_flow_order, + incoming_window_closed_stop_link, + incoming_window_closed_close_link, + incoming_window_closed_rabbitmq_internal_flow_classic_queue, + incoming_window_closed_rabbitmq_internal_flow_quorum_queue, + tcp_back_pressure_rabbitmq_internal_flow_classic_queue, + tcp_back_pressure_rabbitmq_internal_flow_quorum_queue ]}, {cluster_size_3, [shuffle], @@ -170,6 +185,7 @@ end_per_group(_, Config) -> init_per_testcase(T, Config) when T =:= message_headers_conversion orelse T =:= roundtrip_with_drain_quorum_queue orelse + T =:= drain_many_quorum_queue orelse T =:= timed_get_quorum_queue orelse T =:= available_messages_quorum_queue -> case rpc(Config, rabbit_feature_flags, is_enabled, [credit_api_v2]) of @@ -179,6 +195,21 @@ init_per_testcase(T, Config) {skip, "Receiving with drain from quorum queues in credit API v1 have a known " "bug that they reply with send_drained before delivering the message."} end; +init_per_testcase(T, Config) + when T =:= incoming_window_closed_close_link orelse + T =:= incoming_window_closed_rabbitmq_internal_flow_classic_queue orelse + T =:= incoming_window_closed_rabbitmq_internal_flow_quorum_queue orelse + T =:= tcp_back_pressure_rabbitmq_internal_flow_classic_queue orelse + T =:= tcp_back_pressure_rabbitmq_internal_flow_quorum_queue -> + %% The new RabbitMQ internal flow control + %% writer proc <- session proc <- queue proc + %% is only available with credit API v2. + case rpc(Config, rabbit_feature_flags, is_enabled, [credit_api_v2]) of + true -> + rabbit_ct_helpers:testcase_started(Config, T); + false -> + {skip, "Feature flag credit_api_v2 is disabled"} + end; init_per_testcase(T = immutable_bare_message, Config) -> case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_store_amqp_v1]) of true -> @@ -274,7 +305,7 @@ reliable_send_receive(QType, Outcome, Config) -> True = {boolean, true}, Msg2 = amqp10_msg:set_headers(#{durable => True}, Msg1), ok = amqp10_client:send_msg(Sender, Msg2), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), @@ -625,7 +656,7 @@ roundtrip_with_drain(Config, QueueType, QName) end, OutMsg2 = amqp10_msg:new(<<"tag-2">>, <<"my-body2">>, false), ok = amqp10_client:send_msg(Sender, OutMsg2), - ok = wait_for_settlement(<<"tag-2">>), + ok = wait_for_accepted(<<"tag-2">>), %% no delivery should be made at this point receive {amqp10_msg, _, _} -> ct:fail(unexpected_delivery) @@ -638,6 +669,85 @@ roundtrip_with_drain(Config, QueueType, QName) ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = amqp10_client:close_connection(Connection). +drain_many_classic_queue(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + drain_many(Config, <<"classic">>, QName). + +drain_many_quorum_queue(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + drain_many(Config, <<"quorum">>, QName). + +drain_many_stream(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + drain_many(Config, <<"stream">>, QName). + +drain_many(Config, QueueType, QName) + when is_binary(QueueType) -> + Address = <<"/queue/", QName/binary>>, + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}}, + {ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address), + wait_for_credit(Sender), + + Num = 5000, + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + + TerminusDurability = none, + Filter = consume_from_first(QueueType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"test-receiver">>, Address, settled, + TerminusDurability, Filter), + + ok = amqp10_client:flow_link_credit(Receiver, Num - 1, never, true), + ?assertEqual(Num - 1, count_received_messages(Receiver)), + flush("drained 1"), + + ok = amqp10_client:flow_link_credit(Receiver, Num, never, true), + receive_messages(Receiver, 1), + flush("drained 2"), + + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + receive {amqp10_msg, _, _} = Unexpected1 -> ct:fail({unexpected, Unexpected1}) + after 10 -> ok + end, + + %% Let's send a couple of FLOW frames in sequence. + ok = amqp10_client:flow_link_credit(Receiver, 0, never, false), + ok = amqp10_client:flow_link_credit(Receiver, 1, never, false), + ok = amqp10_client:flow_link_credit(Receiver, Num div 2, never, false), + ok = amqp10_client:flow_link_credit(Receiver, Num, never, false), + ok = amqp10_client:flow_link_credit(Receiver, Num, never, true), + %% Eventually, we should receive all messages. + receive_messages(Receiver, Num), + flush("drained 3"), + + ok = send_messages(Sender, 1, false), + ok = wait_for_accepts(1), + %% Our receiver shouldn't have any credit left to consume this message. + receive {amqp10_msg, _, _} = Unexpected2 -> ct:fail({unexpected, Unexpected2}) + after 30 -> ok + end, + + %% Grant a huge amount of credits. + ok = amqp10_client:flow_link_credit(Receiver, 2_000_000_000, never, true), + %% We expect the server to send us the last message and + %% to advance the delivery-count promptly. + receive {amqp10_msg, _, _} -> ok + after 2000 -> ct:fail({missing_delivery, ?LINE}) + end, + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 300 -> ct:fail("expected credit_exhausted") + end, + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(Connection). + amqp_stream_amqpl(Config) -> amqp_amqpl(<<"stream">>, Config). @@ -993,7 +1103,7 @@ server_closes_link(QType, Config) -> DTag = <<0>>, Body = <<"body">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, Body, false)), - ok = wait_for_settlement(DTag), + ok = wait_for_accepted(DTag), receive {amqp10_msg, Receiver, Msg} -> ?assertEqual([Body], amqp10_msg:body(Msg)) @@ -1005,7 +1115,7 @@ server_closes_link(QType, Config) -> eventually(?_assertEqual( 1, begin - #{outgoing_unsettled_map := UnsettledMap} = gen_server_state(SessionPid), + #{outgoing_unsettled_map := UnsettledMap} = formatted_state(SessionPid), maps:size(UnsettledMap) end)), @@ -1030,7 +1140,7 @@ server_closes_link(QType, Config) -> eventually(?_assertEqual( 0, begin - #{outgoing_unsettled_map := UnsettledMap} = gen_server_state(SessionPid), + #{outgoing_unsettled_map := UnsettledMap} = formatted_state(SessionPid), maps:size(UnsettledMap) end)), @@ -1058,7 +1168,7 @@ server_closes_link_exchange(Config) -> DTag1 = <<1>>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), %% Server closes the link endpoint due to some AMQP 1.0 external condition: %% In this test, the external condition is that an AMQP 0.9.1 client deletes the exchange. @@ -1109,7 +1219,7 @@ link_target_queue_deleted(QType, Config) -> flush(credited), DTag1 = <<1>>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), %% Mock delivery to the target queue to do nothing. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), @@ -1169,7 +1279,7 @@ target_queues_deleted_accepted(Config) -> DTag1 = <<1>>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), %% Mock to deliver only to q1. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), @@ -1868,7 +1978,7 @@ detach_requeues(Config) -> eventually(?_assertEqual( 0, begin - #{outgoing_unsettled_map := UnsettledMap} = gen_server_state(SessionPid), + #{outgoing_unsettled_map := UnsettledMap} = formatted_state(SessionPid), maps:size(UnsettledMap) end)), @@ -1925,7 +2035,7 @@ resource_alarm_before_session_begin(Config) -> %% Hence, RabbitMQ should open its incoming window allowing our client to send TRANSFERs. ?assertEqual(ok, amqp10_client:send_msg(Sender, Msg1)), - ok = wait_for_settlement(Tag1), + ok = wait_for_accepted(Tag1), ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session1), @@ -1987,7 +2097,7 @@ resource_alarm_after_session_begin(Config) -> %% Now, we should be able to send again. ?assertEqual(ok, amqp10_client:send_msg(Sender, Msg4)), - ok = wait_for_settlement(<<"t4">>), + ok = wait_for_accepted(<<"t4">>), ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver1), @@ -2043,7 +2153,7 @@ max_message_size_client_to_server(Config) -> PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 10), ?assertEqual(ok, amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false))), - ok = wait_for_settlement(<<"t1">>), + ok = wait_for_accepted(<<"t1">>), PayloadTooLarge = binary:copy(<<0>>, MaxMessageSize + 1), ?assertEqual({error, message_size_exceeded}, @@ -2074,7 +2184,7 @@ max_message_size_server_to_client(Config) -> %% e.g. message annotations with routing key and exchange name. PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 200), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false)), - ok = wait_for_settlement(<<"t1">>), + ok = wait_for_accepted(<<"t1">>), AttachArgs = #{max_message_size => MaxMessageSize, name => <<"test-receiver">>, @@ -2091,7 +2201,7 @@ max_message_size_server_to_client(Config) -> %% The sending link has no maximum message size set. %% Hence, sending this large message from client to server should work. ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, PayloadTooLarge, false)), - ok = wait_for_settlement(<<"t2">>), + ok = wait_for_accepted(<<"t2">>), %% However, the receiving link has a maximum message size set. %% Hence, when the server attempts to deliver this large message, @@ -2111,59 +2221,6 @@ max_message_size_server_to_client(Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). -%% This test ensures that the server sends us TRANSFER and FLOW frames in the correct order -%% even if the server is temporarily not allowed to send us any TRANSFERs due to our incoming -%% window being closed. -receive_transfer_flow_order(Config) -> - QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), - Address = <<"/amq/queue/", QName/binary>>, - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), - ok = wait_for_credit(Sender), - DTag = <<"my tag">>, - Body = <<"my body">>, - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, Body, false)), - ok = wait_for_settlement(DTag), - ok = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), - receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail(missing_attached) - end, - flush(receiver_attached), - - %% Close our incoming window. - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 0}}}), - - ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), - %% Given our incoming window is closed, we shouldn't receive the TRANSFER yet, and therefore - %% must not yet receive the FLOW that comes thereafter with drain=true, credit=0, and advanced delivery-count. - receive Unexpected -> ct:fail({unexpected, Unexpected}) - after 300 -> ok - end, - - %% Open our incoming window - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), - %% Important: We should first receive the TRANSFER, - %% and only thereafter the FLOW (and hence the credit_exhausted notification). - receive First -> - {amqp10_msg, Receiver, Msg} = First, - ?assertEqual([Body], amqp10_msg:body(Msg)) - after 5000 -> ct:fail("timeout receiving message") - end, - receive Second -> - ?assertEqual({amqp10_event, {link, Receiver, credit_exhausted}}, Second) - after 5000 -> ct:fail("timeout receiving credit_exhausted") - end, - - ok = delete_queue(Session, QName), - ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). - last_queue_confirms(Config) -> ClassicQ = <<"my classic queue">>, QuorumQ = <<"my quorum queue">>, @@ -2329,7 +2386,7 @@ target_classic_queue_down(Config) -> DTag1 = <<"t1">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), {ok, Msg1} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), @@ -2361,7 +2418,7 @@ target_classic_queue_down(Config) -> end, DTag3 = <<"t3">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag3, <<"m3">>, false)), - ok = wait_for_settlement(DTag3), + ok = wait_for_accepted(DTag3), {ok, Msg3} = amqp10_client:get_msg(Receiver2), ?assertEqual([<<"m3">>], amqp10_msg:body(Msg3)), @@ -2422,7 +2479,7 @@ async_notify(SenderSettleMode, QType, Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Bin, Bin, false)) end || N <- lists:seq(1, NumMsgs)], %% Wait for last message to be confirmed. - ok = wait_for_settlement(integer_to_binary(NumMsgs)), + ok = wait_for_accepted(integer_to_binary(NumMsgs)), flush(settled), ok = detach_link_sync(Sender), @@ -2619,7 +2676,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) Tag = <<"tag">>, Body = <<"body">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Tag, Body, false)), - ok = wait_for_settlement(Tag), + ok = wait_for_accepted(Tag), ok = amqp10_client:flow_link_credit(Receiver, 999, never, true), [Msg] = receive_messages(Receiver, 1), ?assertEqual([Body], amqp10_msg:body(Msg)), @@ -2739,7 +2796,7 @@ global_counters(Config) -> {ok, QQReceiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver-qq">>, QQAddress, unsettled), ok = amqp10_client:send_msg(CQSender, amqp10_msg:new(<<0>>, <<"m0">>, true)), ok = amqp10_client:send_msg(QQSender, amqp10_msg:new(<<1>>, <<"m1">>, false)), - ok = wait_for_settlement(<<1>>), + ok = wait_for_accepted(<<1>>), {ok, Msg0} = amqp10_client:get_msg(CQReceiver), ?assertEqual([<<"m0">>], amqp10_msg:body(Msg0)), @@ -2779,7 +2836,7 @@ global_counters(Config) -> %% Test re-delivery. ok = amqp10_client:send_msg(QQSender, amqp10_msg:new(<<2>>, <<"m2">>, false)), - ok = wait_for_settlement(<<2>>), + ok = wait_for_accepted(<<2>>), {ok, Msg2a} = amqp10_client:get_msg(QQReceiver), ?assertEqual([<<"m2">>], amqp10_msg:body(Msg2a)), %% Releasing causes the message to be requeued. @@ -2883,7 +2940,7 @@ stream_filtering(Config) -> Msg1 = amqp10_msg:set_application_properties(AppProps, Msg0), Msg2 = amqp10_msg:set_message_annotations(Anns, Msg1), ok = amqp10_client:send_msg(Sender, Msg2), - ok = wait_for_settlement(DTag) + ok = wait_for_accepted(DTag) end, lists:seq(1, WaveCount)) end, @@ -3010,12 +3067,12 @@ available_messages(QType, Config) -> ok = wait_for_accepts(4), OutputHandle = element(4, Receiver), - Flow = #'v1_0.flow'{ - %% Grant 1 credit to the sending queue. - link_credit = {uint, 1}, - %% Request sending queue to send us a FLOW including available messages. - echo = true}, - ok = amqp10_client_session:flow(Session, OutputHandle, Flow, never), + Flow0 = #'v1_0.flow'{ + %% Grant 1 credit to the sending queue. + link_credit = {uint, 1}, + %% Request sending queue to send us a FLOW including available messages. + echo = true}, + ok = amqp10_client_session:flow(Session, OutputHandle, Flow0, never), receive_messages(Receiver, 1), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok after 5000 -> ct:fail({missing_event, ?LINE}) @@ -3041,6 +3098,29 @@ available_messages(QType, Config) -> end, ?assertEqual(0, get_available_messages(Receiver)), + ok = send_messages(Sender, 5000, false), + %% We know that Streams only return an approximation for available messages. + %% The committed Stream offset is queried by chunk ID. + %% So, we wait here a bit such that the 4th message goes into its own new chunk. + timer:sleep(50), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"last dtag">>, <<"last msg">>, false)), + ok = wait_for_accepts(5001), + + Flow1 = #'v1_0.flow'{ + link_credit = {uint, 0}, + echo = false}, + Flow2 = #'v1_0.flow'{ + link_credit = {uint, 1}, + echo = true}, + %% Send both FLOW frames in sequence. + ok = amqp10_client_session:flow(Session, OutputHandle, Flow1, never), + ok = amqp10_client_session:flow(Session, OutputHandle, Flow2, never), + receive_messages(Receiver, 1), + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ?assertEqual(5000, get_available_messages(Receiver)), + ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), ok = end_session_sync(Session), @@ -3073,7 +3153,7 @@ incoming_message_interceptors(Config) -> NowMillis = os:system_time(millisecond), Tag = <<"tag">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Tag, <<"body">>)), - ok = wait_for_settlement(Tag), + ok = wait_for_accepted(Tag), {ok, Msg1} = amqp10_client:get_msg(StreamReceiver), {ok, Msg2} = amqp10_client:get_msg(QQReceiver), @@ -3608,7 +3688,7 @@ dead_letter_reject(Config) -> Body = <<"my body">>, M = amqp10_msg:new(Tag, Body), ok = amqp10_client:send_msg(Sender, M), - ok = wait_for_settlement(Tag), + ok = wait_for_accepted(Tag), {ok, Msg1} = amqp10_client:get_msg(Receiver), ok = amqp10_client:settle_msg(Receiver, Msg1, rejected), @@ -3688,7 +3768,9 @@ dead_letter_into_stream(Config) -> <<"x-initial-cluster-size">> => {ulong, 1} }}), {ok, Receiver} = amqp10_client:attach_receiver_link( - Session1, <<"receiver">>, <<"/queue/", QName1/binary>>, settled), + Session1, <<"receiver">>, <<"/queue/", QName1/binary>>, + settled, configuration, + #{<<"rabbitmq:stream-offset-spec">> => <<"first">>}), {ok, Sender} = amqp10_client:attach_sender_link( Session0, <<"sender">>, <<"/queue/", QName0/binary>>), wait_for_credit(Sender), @@ -3808,7 +3890,7 @@ footer_checksum(FooterOpt, Config) -> "false" => false}, amqp10_msg:new(<<"t1">>, <<"m1">>)))))), ok = amqp10_client:send_msg(Sender, M1), - ok = wait_for_settlement(<<"t1">>), + ok = wait_for_accepted(<<"t1">>), {ok, Msg1} = amqp10_client:get_msg(Receiver), ?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg1)), @@ -3821,7 +3903,7 @@ footer_checksum(FooterOpt, Config) -> "false" => false}, amqp10_msg:new(<<"t2">>, <<"m2">>)), ok = amqp10_client:send_msg(Sender, M2), - ok = wait_for_settlement(<<"t2">>), + ok = wait_for_accepted(<<"t2">>), {ok, Msg2} = amqp10_client:get_msg(Receiver), ?assertEqual(<<"m2">>, amqp10_msg:body_bin(Msg2)), @@ -3829,7 +3911,7 @@ footer_checksum(FooterOpt, Config) -> %% bare message consists of single data section M3 = amqp10_msg:new(<<"t3">>, <<"m3">>), ok = amqp10_client:send_msg(Sender, M3), - ok = wait_for_settlement(<<"t3">>), + ok = wait_for_accepted(<<"t3">>), {ok, Msg3} = amqp10_client:get_msg(Receiver), ?assertEqual(<<"m3">>, amqp10_msg:body_bin(Msg3)), @@ -3838,7 +3920,7 @@ footer_checksum(FooterOpt, Config) -> M4 = amqp10_msg:new(<<"t4">>, [#'v1_0.data'{content = <<"m4 a">>}, #'v1_0.data'{content = <<"m4 b">>}]), ok = amqp10_client:send_msg(Sender, M4), - ok = wait_for_settlement(<<"t4">>), + ok = wait_for_accepted(<<"t4">>), {ok, Msg4} = amqp10_client:get_msg(Receiver), ?assertEqual([<<"m4 a">>, <<"m4 b">>], amqp10_msg:body(Msg4)), @@ -3851,7 +3933,7 @@ footer_checksum(FooterOpt, Config) -> %% Our serialiser uses 1 byte boolean encoding #'v1_0.amqp_sequence'{content = [true, false, undefined]}]), ok = amqp10_client:send_msg(Sender, M5), - ok = wait_for_settlement(<<"t5">>), + ok = wait_for_accepted(<<"t5">>), {ok, Msg5} = amqp10_client:get_msg(Receiver), ?assertEqual([#'v1_0.amqp_sequence'{content = [{ubyte, 255}]}, @@ -3876,7 +3958,7 @@ footer_checksum(FooterOpt, Config) -> {{symbol, <<"x-opt-carrot">>}, {char, $🥕}} ]}]), ok = amqp10_client:send_msg(Sender, M6), - ok = wait_for_settlement(<<"t6">>), + ok = wait_for_accepted(<<"t6">>), {ok, Msg6} = amqp10_client:get_msg(Receiver), ?assertEqual([<<"m6 a">>, <<"m6 b">>], amqp10_msg:body(Msg6)), @@ -3901,6 +3983,382 @@ footer_checksum(FooterOpt, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). +receive_many_made_available_over_time_classic_queue(Config) -> + receive_many_made_available_over_time(<<"classic">>, Config). + +receive_many_made_available_over_time_quorum_queue(Config) -> + receive_many_made_available_over_time(<<"quorum">>, Config). + +receive_many_made_available_over_time_stream(Config) -> + receive_many_made_available_over_time(<<"stream">>, Config). + +%% This test grants many credits to the queue once while +%% messages are being made available at the source over time. +receive_many_made_available_over_time(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address), + wait_for_credit(Sender), + + %% Send first batch of messages. + ok = send_messages(Sender, 10, false), + ok = wait_for_accepts(10), + Filter = consume_from_first(QType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, + settled, configuration, Filter), + flush(attached), + %% Grant many credits to the queue once. + ok = amqp10_client:flow_link_credit(Receiver, 5000, never), + %% We expect to receive the first batch of messages. + receive_messages(Receiver, 10), + + %% Make next batch of messages available. + ok = send_messages(Sender, 2990, false), + ok = wait_for_accepts(2990), + %% We expect to receive this batch of messages. + receive_messages(Receiver, 2990), + + %% Make next batch of messages available. + ok = send_messages(Sender, 1999, false), + ok = wait_for_accepts(1999), + %% We expect to receive this batch of messages. + receive_messages(Receiver, 1999), + + %% Make next batch of messages available. + ok = send_messages(Sender, 2, false), + ok = wait_for_accepts(2), + %% At this point, we only have 2 messages in the queue, but only 1 credit left. + ?assertEqual(1, count_received_messages(Receiver)), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(Connection). + +receive_many_auto_flow_classic_queue(Config) -> + receive_many_auto_flow(<<"classic">>, Config). + +receive_many_auto_flow_quorum_queue(Config) -> + receive_many_auto_flow(<<"quorum">>, Config). + +receive_many_auto_flow_stream(Config) -> + receive_many_auto_flow(<<"stream">>, Config). + +receive_many_auto_flow(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address), + wait_for_credit(Sender), + + %% Send many messages. + Num = 10_000, + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + + Filter = consume_from_first(QType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, + settled, configuration, Filter), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + %% Let's auto top up relatively often, but in large batches. + ok = amqp10_client:flow_link_credit(Receiver, 1300, 1200), + receive_messages(Receiver, Num), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(Connection). + +%% This test ensures that the server sends us TRANSFER and FLOW frames in the correct order +%% even if the server is temporarily not allowed to send us any TRANSFERs due to our session +%% incoming-window being closed. +incoming_window_closed_transfer_flow_order(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Ch = rabbit_ct_client_helpers:open_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), + ok = rabbit_ct_client_helpers:close_channel(Ch), + Address = <<"/amq/queue/", QName/binary>>, + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + DTag = <<"my tag">>, + Body = <<"my body">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, Body, false)), + ok = wait_for_accepted(DTag), + ok = amqp10_client:detach_link(Sender), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + ok = close_incoming_window(Session), + ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), + %% Given our incoming window is closed, we shouldn't receive the TRANSFER yet, and therefore + %% must not yet receive the FLOW that comes thereafter with drain=true, credit=0, and advanced delivery-count. + receive Unexpected -> ct:fail({unexpected, Unexpected}) + after 300 -> ok + end, + + %% Open our incoming window + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + %% Important: We should first receive the TRANSFER, + %% and only thereafter the FLOW (and hence the credit_exhausted notification). + receive First -> + {amqp10_msg, Receiver, Msg} = First, + ?assertEqual([Body], amqp10_msg:body(Msg)) + after 5000 -> ct:fail("timeout receiving message") + end, + receive Second -> + ?assertEqual({amqp10_event, {link, Receiver, credit_exhausted}}, Second) + after 5000 -> ct:fail("timeout receiving credit_exhausted") + end, + + ok = delete_queue(Session, QName), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +incoming_window_closed_stop_link(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{transfer_limit_margin => -1}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>, false)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>, false)), + ok = wait_for_accepted(<<"t1">>), + ok = wait_for_accepted(<<"t2">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + ok = close_incoming_window(Session), + %% We first grant a credit in drain mode. + ok = amqp10_client:flow_link_credit(Receiver, 1, never, true), + %% Then, we change our mind and stop the link. + ok = amqp10_client:stop_receiver_link(Receiver), + %% Given our incoming window is closed, we shouldn't receive any TRANSFER. + receive {amqp10_msg, _, _} = Unexp1 -> ct:fail({?LINE, unexpected_msg, Unexp1}) + after 10 -> ok + end, + + %% Open our incoming window + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + + %% Since we decreased link credit dynamically, we may or may not receive the 1st message. + receive {amqp10_msg, Receiver, Msg1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)) + after 500 -> ok + end, + %% We must not receive the 2nd message. + receive {amqp10_msg, _, _} = Unexp2 -> ct:fail({?LINE, unexpected_msg, Unexp2}) + after 5 -> ok + end, + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that we can close a link while our session incoming-window is closed. +incoming_window_closed_close_link(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + DTag = <<"my tag">>, + Body = <<"my body">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, Body, false)), + ok = wait_for_accepted(DTag), + ok = amqp10_client:detach_link(Sender), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + ok = close_incoming_window(Session), + ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), + %% Given our incoming window is closed, we shouldn't receive the TRANSFER yet, and therefore + %% must not yet receive the FLOW that comes thereafter with drain=true, credit=0, and advanced delivery-count. + receive Unexpected1 -> ct:fail({unexpected, Unexpected1}) + after 300 -> ok + end, + %% Close the link while our session incoming-window is closed. + ok = detach_link_sync(Receiver), + %% Open our incoming window. + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + %% Given that both endpoints have now destroyed the link, we do not + %% expect to receive any TRANSFER or FLOW frame referencing the destroyed link. + receive Unexpected2 -> ct:fail({unexpected, Unexpected2}) + after 300 -> ok + end, + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +incoming_window_closed_rabbitmq_internal_flow_classic_queue(Config) -> + incoming_window_closed_rabbitmq_internal_flow(<<"classic">>, Config). + +incoming_window_closed_rabbitmq_internal_flow_quorum_queue(Config) -> + incoming_window_closed_rabbitmq_internal_flow(<<"quorum">>, Config). + +incoming_window_closed_rabbitmq_internal_flow(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + %% Send many messages. + Num = 5_000, + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + ok = detach_link_sync(Sender), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + ok = close_incoming_window(Session), + %% Grant all link credit at once. + ok = amqp10_client:flow_link_credit(Receiver, Num, never), + %% Given our incoming window is closed, we shouldn't receive any TRANSFER yet. + receive Unexpected -> ct:fail({unexpected, Unexpected}) + after 200 -> ok + end, + + %% Here, we do a bit of white box testing: We assert that RabbitMQ has some form of internal + %% flow control by checking that the queue did not send all its messages to the server session + %% process. In other words, there should be ready messages in the queue. + MsgsReady = ready_messages(QName, Config), + ?assert(MsgsReady > 0), + + %% Open our incoming window. + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, Num}}}), + receive_messages(Receiver, Num), + + ok = detach_link_sync(Receiver), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +tcp_back_pressure_rabbitmq_internal_flow_classic_queue(Config) -> + tcp_back_pressure_rabbitmq_internal_flow(<<"classic">>, Config). + +tcp_back_pressure_rabbitmq_internal_flow_quorum_queue(Config) -> + tcp_back_pressure_rabbitmq_internal_flow(<<"quorum">>, Config). + +%% Test that RabbitMQ can handle clients that do not receive fast enough +%% causing TCP back-pressure to the server. RabbitMQ's internal flow control +%% writer proc <- session proc <- queue proc +%% should be able to protect the server by having the queue not send out all messages at once. +tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + + OpnConf0 = connection_config(Config), + %% We also want to test the code path where large messages are split into multiple transfer frames. + OpnConf = OpnConf0#{max_frame_size => 600}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + %% Send many messages. + %% The messages should be somewhat large to fill up buffers causing TCP back-pressure. + BodySuffix = binary:copy(<<"x">>, 1000), + Num = 5000, + ok = send_messages(Sender, Num, false, BodySuffix), + ok = wait_for_accepts(Num), + ok = detach_link_sync(Sender), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + {_GenStatemState, + #{reader := ReaderPid, + socket := {tcp, Socket}}} = formatted_state(Session), + + %% Provoke TCP back-pressure from client to server by using very small buffers. + ok = inet:setopts(Socket, [{recbuf, 256}, + {buffer, 256}]), + %% Suspend the receiving client such that it stops reading from its socket + %% causing TCP back-pressure to the server being applied. + true = erlang:suspend_process(ReaderPid), + + ok = amqp10_client:flow_link_credit(Receiver, Num, never), + %% We give the queue time to send messages to the session proc and writer proc. + timer:sleep(1000), + + %% Here, we do a bit of white box testing: We assert that RabbitMQ has some form of internal + %% flow control by checking that the queue sent some but, more importantly, not all its + %% messages to the server session and writer processes. In other words, there should be + %% ready messages in the queue. + MsgsReady = ready_messages(QName, Config), + ?assert(MsgsReady > 0), + ?assert(MsgsReady < Num), + + %% Use large buffers. This will considerably speed up receiving all messages (on Linux). + ok = inet:setopts(Socket, [{recbuf, 65536}, + {buffer, 65536}]), + %% When we resume the receiving client, we expect to receive all messages. + true = erlang:resume_process(ReaderPid), + receive_messages(Receiver, Num), + + ok = detach_link_sync(Receiver), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% internal %% @@ -4008,7 +4466,7 @@ wait_for_connection_close(Connection) -> ct:fail({connection_close_timeout, Connection}) end. -wait_for_settlement(Tag) -> +wait_for_accepted(Tag) -> wait_for_settlement(Tag, accepted). wait_for_settlement(Tag, State) -> @@ -4076,18 +4534,22 @@ count_received_messages0(Receiver, Count) -> receive {amqp10_msg, Receiver, _Msg} -> count_received_messages0(Receiver, Count + 1) - after 200 -> + after 1000 -> Count end. -send_messages(_Sender, 0, _Settled) -> - ok; send_messages(Sender, Left, Settled) -> + send_messages(Sender, Left, Settled, <<>>). + +send_messages(_, 0, _, _) -> + ok; +send_messages(Sender, Left, Settled, BodySuffix) -> Bin = integer_to_binary(Left), - Msg = amqp10_msg:new(Bin, Bin, Settled), + Body = <>, + Msg = amqp10_msg:new(Bin, Body, Settled), case amqp10_client:send_msg(Sender, Msg) of ok -> - send_messages(Sender, Left - 1, Settled); + send_messages(Sender, Left - 1, Settled, BodySuffix); {error, insufficient_credit} -> ok = wait_for_credit(Sender), %% The credited event we just processed could have been received some time ago, @@ -4099,7 +4561,7 @@ send_messages(Sender, Left, Settled) -> %% but do not process the credited event in our mailbox. %% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit} %% again causing us then to really wait to receive a credited event (instead of just processing an old credited event). - send_messages(Sender, Left, Settled) + send_messages(Sender, Left, Settled, BodySuffix) end. assert_link_credit_runs_out(_Sender, 0) -> @@ -4152,9 +4614,9 @@ consume_from_first(<<"stream">>) -> consume_from_first(_) -> #{}. -%% Return the formatted state of a gen_server via sys:get_status/1. +%% Return the formatted state of a gen_server or gen_statem via sys:get_status/1. %% (sys:get_state/1 is unformatted) -gen_server_state(Pid) -> +formatted_state(Pid) -> {status, _, _, L0} = sys:get_status(Pid, 20_000), L1 = lists:last(L0), {data, L2} = lists:last(L1), @@ -4180,6 +4642,14 @@ get_available_messages({link_ref, receiver, Session, OutputHandle}) -> {ok, Available} = maps:find(available, Link), Available. +ready_messages(QName, Config) + when is_binary(QName) -> + {ok, Q} = rpc(Config, rabbit_amqqueue, lookup, [QName, <<"/">>]), + {ok, MsgsReady, _ConsumerCount} = rpc(Config, rabbit_queue_type, stat, [Q]), + ?assert(is_integer(MsgsReady)), + ct:pal("Queue ~s has ~b ready messages.", [QName, MsgsReady]), + MsgsReady. + ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>). @@ -4212,3 +4682,6 @@ find_event(Type, Props, Events) when is_list(Props), is_list(Events) -> lists:keymember(Key, 1, EventProps) end, Props) end, Events). + +close_incoming_window(Session) -> + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 0}}}). diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index e2df8e370e0c..cc5844076982 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -508,32 +508,29 @@ credit_api_v2(Config) -> (D, _) -> error({unexpected_delivery, D}) end), %% Grant 1 credit. - {F6, []} = rabbit_fifo_client:credit(CTag, DC0, 1, false, _Echo0 = true, F5), + {F6, []} = rabbit_fifo_client:credit(CTag, DC0, 1, false, F5), %% We expect exactly 1 message due to 1 credit being granted. {[{_, _, _, _, m1}], - %% We expect a credit_reply action due to echo=true + %% We always expect a credit_reply action. [{credit_reply, CTag, DC1, _Credit0 = 0, _Available0 = 1, _Drain0 = false}], F7} = process_ra_events(receive_ra_events(), ClusterName, F6), %% Again, grant 1 credit. %% However, because we still use the initial delivery count DC0, rabbit_fifo - %% wont' send us a new message since it already sent us m1 for that old delivery-count. + %% won't send us a new message since it already sent us m1 for that old delivery-count. %% In other words, this credit top up simulates in-flight deliveries. - {F8, []} = rabbit_fifo_client:credit(CTag, DC0, 1, false, _Echo1 = true, F7), + {F8, []} = rabbit_fifo_client:credit(CTag, DC0, 1, false, F7), {_NoMessages = [], - %% We still expect a credit_reply action due to echo=true [{credit_reply, CTag, DC1, _Credit1 = 0, _Available1 = 1, _Drain1 = false}], F9} = process_ra_events(receive_ra_events(), ClusterName, F8), %% Grant 4 credits and drain. - {F10, []} = rabbit_fifo_client:credit(CTag, DC1, 4, true, _Echo2 = false, F9), + {F10, []} = rabbit_fifo_client:credit(CTag, DC1, 4, true, F9), %% rabbit_fifo should advance the delivery-count as much as possible %% consuming all credits due to drain=true and insufficient messages in the queue. DC2 = DC1 + 4, %% We expect to receive m2 which is the only message in the queue. {[{_, _, _, _, m2}], - %% Even though echo=false, we still expect a credit_reply action due - %% drain=true and insufficient messages in the queue. [{credit_reply, CTag, DC2, _Credit2 = 0, _Available2 = 0, _Drain2 = true}], F11} = process_ra_events(receive_ra_events(), ClusterName, F10), flush(), @@ -548,11 +545,11 @@ credit_api_v2(Config) -> end), %% Grant 10 credits and receive the last message. - {F14, []} = rabbit_fifo_client:credit(CTag, DC2, 10, false, _Echo = false, F13), + {F14, []} = rabbit_fifo_client:credit(CTag, DC2, 10, false, F13), + DC3 = DC2 + 1, ?assertMatch( {[{_, _, _, _, m3}], - %% Due to echo=false, we don't expect a credit_reply action. - _NoCreditReplyAction = [], + [{credit_reply, CTag, DC3, _Credit3 = 9, _Available3 = 0, _Drain3 = false}], _F15}, process_ra_events(receive_ra_events(), ClusterName, F14)). untracked_enqueue(Config) -> diff --git a/deps/rabbit_common/src/credit_flow.erl b/deps/rabbit_common/src/credit_flow.erl index 5cef875835ec..5681f0ed1fa9 100644 --- a/deps/rabbit_common/src/credit_flow.erl +++ b/deps/rabbit_common/src/credit_flow.erl @@ -156,7 +156,7 @@ state_delayed(BlockedAt) -> B -> Now = erlang:monotonic_time(), Diff = erlang:convert_time_unit(Now - B, native, - micro_seconds), + microsecond), case Diff < ?STATE_CHANGE_INTERVAL of true -> flow; false -> running diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index b5d817cbf549..6324165976e4 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -24,7 +24,8 @@ precondition_failed/1, precondition_failed/2]). -export([type_class/1, assert_args_equivalence/4, assert_field_equivalence/4]). -export([table_lookup/2, set_table_value/4, amqp_table/1, to_amqp_table/1]). --export([r/3, r/2, r_arg/4, rs/1]). +-export([r/3, r/2, r_arg/4, rs/1, + queue_resource/2, exchange_resource/2]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). @@ -437,6 +438,16 @@ rs(#resource{virtual_host = VHostPath, kind = topic, name = Name}) -> rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> format("~ts '~ts' in vhost '~ts'", [Kind, Name, VHostPath]). +-spec queue_resource(rabbit_types:vhost(), resource_name()) -> + rabbit_types:r(queue). +queue_resource(VHostPath, Name) -> + r(VHostPath, queue, Name). + +-spec exchange_resource(rabbit_types:vhost(), resource_name()) -> + rabbit_types:r(exchange). +exchange_resource(VHostPath, Name) -> + r(VHostPath, exchange, Name). + enable_cover() -> enable_cover(["."]). enable_cover(Dirs) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index 73885ddba2e2..9e80e4043064 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -52,7 +52,7 @@ handle_event/3, settle/5, credit_v1/5, - credit/7, + credit/6, dequeue/5, state_info/1 ]). @@ -281,8 +281,8 @@ settle(A1,A2,A3,A4,A5) -> credit_v1(A1,A2,A3,A4,A5) -> ?UNSUPPORTED([A1,A2,A3,A4,A5]). -credit(A1,A2,A3,A4,A5,A6,A7) -> - ?UNSUPPORTED([A1,A2,A3,A4,A5,A6,A7]). +credit(A1,A2,A3,A4,A5,A6) -> + ?UNSUPPORTED([A1,A2,A3,A4,A5,A6]). dequeue(A1,A2,A3,A4,A5) -> ?UNSUPPORTED([A1,A2,A3,A4,A5]). diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index a5589290fa90..b487a49a1a08 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -49,7 +49,9 @@ await_shovel(Config, Node, Name) -> ?MODULE, await_shovel1, [Config, Name]). await_shovel1(_Config, Name) -> - await(fun () -> lists:member(Name, shovels_from_status()) end). + await(fun() -> + lists:member(Name, shovels_from_status()) + end, 30_000). shovels_from_status() -> S = rabbit_shovel_status:status(), From cf3c8baa11efbf07e83192c3340af8d807a8eea8 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 5 Jun 2024 13:26:20 +0200 Subject: [PATCH 2/2] Avoid frequent #resource{} record creation Avoid #resource{} record creation in every queue type interaction by storing the queue #resource{} in the session state. --- deps/rabbit/src/rabbit_amqp_session.erl | 34 +++++++++---------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index a61dcb64e970..d8aaa142b188 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -177,7 +177,7 @@ -record(outgoing_link, { %% Although the source address of a link might be an exchange name and binding key %% or a topic filter, an outgoing link will always consume from a queue. - queue_name_bin :: rabbit_misc:resource_name(), + queue_name :: rabbit_amqqueue:name(), queue_type :: rabbit_queue_type:queue_type(), send_settled :: boolean(), max_message_size :: unlimited | pos_integer(), @@ -670,7 +670,7 @@ handle_stashed_down(#state{stashed_down = QNames, %% (This roughly corresponds to consumer_cancel_notify sent from server to client in AMQP 0.9.1.) {DetachFrames, OutgoingLinks} = lists:foldl(fun(#resource{name = QNameBinDown}, Acc = {_, OutgoingLinks1}) -> - maps:fold(fun(Handle, Link = #outgoing_link{queue_name_bin = QNameBin}, {Frames0, Links0}) + maps:fold(fun(Handle, Link = #outgoing_link{queue_name = #resource{name = QNameBin}}, {Frames0, Links0}) when QNameBin =:= QNameBinDown -> Detach = detach(Handle, Link, ?V_1_0_AMQP_ERROR_ILLEGAL_STATE), Frames = [Detach | Frames0], @@ -781,7 +781,7 @@ destroy_incoming_link(Handle, Link = #incoming_link{queue_name_bin = QNameBin}, destroy_incoming_link(_, _, _, Acc) -> Acc. -destroy_outgoing_link(Handle, Link = #outgoing_link{queue_name_bin = QNameBin}, QNameBin, {Frames, Unsettled0, Links}) -> +destroy_outgoing_link(Handle, Link = #outgoing_link{queue_name = #resource{name = QNameBin}}, QNameBin, {Frames, Unsettled0, Links}) -> {Unsettled, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Handle, Unsettled0), {[detach(Handle, Link, ?V_1_0_AMQP_ERROR_RESOURCE_DELETED) | Frames], Unsettled, @@ -1093,7 +1093,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, max_message_size = MaybeMaxMessageSize}, MaxMessageSize = max_message_size(MaybeMaxMessageSize), Link = #outgoing_link{ - queue_name_bin = QNameBin, + queue_name = queue_resource(Vhost, QNameBin), queue_type = QType, send_settled = SndSettled, max_message_size = MaxMessageSize, @@ -1207,16 +1207,13 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, incoming_links = IncomingLinks, outgoing_links = OutgoingLinks0, outgoing_unsettled_map = Unsettled0, - cfg = #cfg{ - vhost = Vhost, - user = #user{username = Username}}}) -> + cfg = #cfg{user = #user{username = Username}}}) -> Ctag = handle_to_ctag(HandleInt), %% TODO delete queue if closed flag is set to true? see 2.6.6 %% TODO keep the state around depending on the lifetime {QStates, Unsettled, OutgoingLinks} = case maps:take(HandleInt, OutgoingLinks0) of - {#outgoing_link{queue_name_bin = QNameBin}, OutgoingLinks1} -> - QName = queue_resource(Vhost, QNameBin), + {#outgoing_link{queue_name = QName}, OutgoingLinks1} -> case rabbit_amqqueue:lookup(QName) of {ok, Q} -> %%TODO Add a new rabbit_queue_type:remove_consumer API that - from the point of view of @@ -1448,10 +1445,9 @@ handle_credit_reply0( S; none when QCredit =:= 0 andalso DesiredCredit > 0 -> + QName = Link0#outgoing_link.queue_name, %% Provide queue next batch of credits. CappedCredit = cap_credit(DesiredCredit), - QName = queue_resource(S0#state.cfg#cfg.vhost, - Link0#outgoing_link.queue_name_bin), {ok, QStates, Actions} = rabbit_queue_type:credit( QName, Ctag, DeliveryCount, CappedCredit, false, QStates0), @@ -1471,7 +1467,7 @@ handle_credit_reply0( {credit_reply, Ctag, DeliveryCount, Credit, Available, _Drain = true}, Handle, Link0 = #outgoing_link{ - queue_name_bin = QNameBin, + queue_name = QName, client_flow_ctl = #client_flow_ctl{ delivery_count = CDeliveryCount0 } = CFC, queue_flow_ctl = #queue_flow_ctl{ @@ -1480,7 +1476,6 @@ handle_credit_reply0( } = QFC, stashed_credit_req = StashedCreditReq}, S0 = #state{cfg = #cfg{writer_pid = Writer, - vhost = Vhost, channel_num = ChanNum}, outgoing_links = OutgoingLinks, queue_states = QStates0}) -> @@ -1500,7 +1495,6 @@ handle_credit_reply0( CappedCredit = cap_credit(DesiredCredit), Link = Link0#outgoing_link{queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}}, - QName = queue_resource(Vhost, QNameBin), {ok, QStates, Actions} = rabbit_queue_type:credit( QName, Ctag, DeliveryCount, CappedCredit, true, QStates0), @@ -1545,7 +1539,7 @@ handle_credit_reply0( pop_credit_req( Handle, Ctag, Link0 = #outgoing_link{ - queue_name_bin = QNameBin, + queue_name = QName, client_flow_ctl = #client_flow_ctl{ delivery_count = CDeliveryCount } = CFC, @@ -1558,13 +1552,11 @@ pop_credit_req( drain = Drain, echo = Echo }}, - S0 = #state{cfg = #cfg{vhost = Vhost}, - outgoing_links = OutgoingLinks, + S0 = #state{outgoing_links = OutgoingLinks, queue_states = QStates0}) -> LinkCreditSnd = amqp10_util:link_credit_snd( DeliveryCountRcv, LinkCreditRcv, CDeliveryCount), CappedCredit = cap_credit(LinkCreditSnd), - QName = queue_resource(Vhost, QNameBin), {ok, QStates, Actions} = rabbit_queue_type:credit( QName, Ctag, QDeliveryCount, CappedCredit, Drain, QStates0), @@ -2673,7 +2665,7 @@ handle_outgoing_mgmt_link_flow_control( State#state{outgoing_management_links = Links}. handle_outgoing_link_flow_control( - #outgoing_link{queue_name_bin = QNameBin, + #outgoing_link{queue_name = QName, credit_api_version = CreditApiVsn, client_flow_ctl = CFC, queue_flow_ctl = QFC, @@ -2685,10 +2677,8 @@ handle_outgoing_link_flow_control( drain = Drain0, echo = Echo0}, #state{outgoing_links = OutgoingLinks, - queue_states = QStates0, - cfg = #cfg{vhost = Vhost} + queue_states = QStates0 } = State0) -> - QName = queue_resource(Vhost, QNameBin), Ctag = handle_to_ctag(HandleInt), DeliveryCountRcv = delivery_count_rcv(MaybeDeliveryCountRcv), Drain = default(Drain0, false),