diff --git a/deps/amqp10_client/src/amqp10_client_frame_reader.erl b/deps/amqp10_client/src/amqp10_client_frame_reader.erl index 96ded5751794..05d8823999b1 100644 --- a/deps/amqp10_client/src/amqp10_client_frame_reader.erl +++ b/deps/amqp10_client/src/amqp10_client_frame_reader.erl @@ -286,14 +286,18 @@ handle_input(StateName, Data, State) -> defer_heartbeat_timer(State = #state{heartbeat_timer_ref = TRef, connection_config = #{idle_time_out := T}}) - when is_number(T) andalso T > 0 -> + when is_integer(T) andalso T > 0 -> _ = case TRef of - undefined -> ok; - _ -> _ = erlang:cancel_timer(TRef) + undefined -> + ok; + _ -> + erlang:cancel_timer(TRef, [{async, true}, + {info, false}]) end, NewTRef = erlang:send_after(T * 2, self(), heartbeat), State#state{heartbeat_timer_ref = NewTRef}; -defer_heartbeat_timer(State) -> State. +defer_heartbeat_timer(State) -> + State. route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) -> {DestinationPid, State} = find_destination(Channel, FrameType, Performative, diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index d7617e43fec3..b66308a826b2 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -902,9 +902,10 @@ handle_link_flow(#'v1_0.flow'{delivery_count = MaybeTheirDC, handle_link_flow(#'v1_0.flow'{delivery_count = TheirDC, link_credit = {uint, TheirCredit}, available = Available, - drain = Drain}, + drain = Drain0}, Link0 = #link{role = receiver}) -> - Link = case Drain andalso TheirCredit =< 0 of + Drain = default(Drain0, false), + Link = case Drain andalso TheirCredit =:= 0 of true -> notify_credit_exhausted(Link0), Link0#link{delivery_count = unpack(TheirDC), @@ -1212,6 +1213,9 @@ boolean_to_role(?AMQP_ROLE_SENDER) -> boolean_to_role(?AMQP_ROLE_RECEIVER) -> receiver. +default(undefined, Default) -> Default; +default(Thing, _Default) -> Thing. + format_status(Status = #{data := Data0}) -> #state{channel = Channel, remote_channel = RemoteChannel, diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 29049e4c58aa..f15712b43224 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -8,6 +8,9 @@ -import(rabbit_amqp_session, [check_resource_access/4, check_read_permitted_on_topic/4]). +-import(rabbit_misc, + [queue_resource/2, + exchange_resource/2]). -type permission_caches() :: {rabbit_amqp_session:permission_cache(), rabbit_amqp_session:topic_permission_cache()}. @@ -77,7 +80,7 @@ handle_http_req(<<"GET">>, _ConnPid, PermCaches) -> QNameBin = uri_string:unquote(QNameBinQuoted), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), case rabbit_amqqueue:with( QName, fun(Q) -> @@ -118,7 +121,7 @@ handle_http_req(HttpMethod = <<"PUT">>, _ -> ok end, ok = prohibit_cr_lf(QNameBin), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), ok = prohibit_reserved_amq(QName), PermCache1 = check_resource_access(QName, configure, User, PermCache0), rabbit_core_metrics:queue_declared(QName), @@ -192,7 +195,7 @@ handle_http_req(<<"PUT">>, catch exit:#amqp_error{explanation = Explanation} -> throw(<<"400">>, Explanation, []) end, - XName = rabbit_misc:r(Vhost, exchange, XNameBin), + XName = exchange_resource(Vhost, XNameBin), ok = prohibit_default_exchange(XName), PermCache = check_resource_access(XName, configure, User, PermCache0), X = case rabbit_exchange:lookup(XName) of @@ -223,7 +226,7 @@ handle_http_req(<<"DELETE">>, ConnPid, {PermCache0, TopicPermCache}) -> QNameBin = uri_string:unquote(QNameBinQuoted), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), PermCache = check_resource_access(QName, read, User, PermCache0), try rabbit_amqqueue:with_exclusive_access_or_die( QName, ConnPid, @@ -251,7 +254,7 @@ handle_http_req(<<"DELETE">>, ConnPid, {PermCache0, TopicPermCache}) -> QNameBin = uri_string:unquote(QNameBinQuoted), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), ok = prohibit_cr_lf(QNameBin), PermCache = check_resource_access(QName, configure, User, PermCache0), try rabbit_amqqueue:delete_with(QName, ConnPid, false, false, Username, true) of @@ -271,7 +274,7 @@ handle_http_req(<<"DELETE">>, _ConnPid, {PermCache0, TopicPermCache}) -> XNameBin = uri_string:unquote(XNameBinQuoted), - XName = rabbit_misc:r(Vhost, exchange, XNameBin), + XName = exchange_resource(Vhost, XNameBin), ok = prohibit_cr_lf(XNameBin), ok = prohibit_default_exchange(XName), ok = prohibit_reserved_amq(XName), @@ -296,7 +299,7 @@ handle_http_req(<<"POST">>, #{destination_exchange := Bin} -> {exchange, Bin} end, - SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin), + SrcXName = exchange_resource(Vhost, SrcXNameBin), DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin), PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0), Binding = #binding{source = SrcXName, @@ -319,7 +322,7 @@ handle_http_req(<<"DELETE">>, DstNameBin, BindingKey, ArgsHash} = decode_binding_path_segment(BindingSegment), - SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin), + SrcXName = exchange_resource(Vhost, SrcXNameBin), DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin), PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0), Bindings = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), @@ -351,7 +354,7 @@ handle_http_req(<<"GET">>, "missing 'dste' or 'dstq' in query: ~tp", QueryMap) end, - SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin), + SrcXName = exchange_resource(Vhost, SrcXNameBin), DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin), Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key], diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 4287e6cb2e88..a61dcb64e970 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -53,12 +53,19 @@ %% 400 for classic queues %% If link target is a queue (rather than an exchange), we could use one of these depending %% on target queue type. For the time being just use a static value that's something in between. -%% An even better approach in future would be to dynamically grow (or shrink) the link credit -%% we grant depending on how fast target queue(s) actually confirm messages. +%% In future, we could dynamically grow (or shrink) the link credit we grant depending on how fast +%% target queue(s) actually confirm messages: see paper "Credit-Based Flow Control for ATM Networks" +%% from 1995, section 4.2 "Static vs. adaptive credit control" for pros and cons. -define(LINK_CREDIT_RCV, 128). -define(MANAGEMENT_LINK_CREDIT_RCV, 8). -define(MANAGEMENT_NODE_ADDRESS, <<"/management">>). -define(DEFAULT_EXCHANGE_NAME, <<>>). +%% This is the maximum credit we grant to a sending queue. +%% Only when we sent sufficient messages to the writer proc, we will again grant credits +%% to the sending queue. We have this limit in place to ensure that our session proc won't be flooded +%% with messages by the sending queue, especially if we are throttled sending messages to the client +%% either by the writer proc or by remote-incoming window (i.e. session flow control). +-define(LINK_CREDIT_RCV_FROM_QUEUE_MAX, 256). -export([start_link/8, process_frame/2, @@ -81,6 +88,9 @@ [add/2, diff/2, compare/2]). +-import(rabbit_misc, + [queue_resource/2, + exchange_resource/2]). -type permission_cache() :: [{rabbit_types:r(exchange | queue), rabbit_types:permission_atom()}]. @@ -113,7 +123,7 @@ -record(management_link, { name :: binary(), delivery_count :: sequence_no(), - credit :: non_neg_integer(), + credit :: rabbit_queue_type:credit(), max_message_size :: unlimited | pos_integer() }). @@ -129,7 +139,7 @@ %% queue_name_bin is only set if the link target address refers to a queue. queue_name_bin :: undefined | rabbit_misc:resource_name(), delivery_count :: sequence_no(), - credit :: non_neg_integer(), + credit :: rabbit_queue_type:credit(), %% TRANSFER delivery IDs published to queues but not yet confirmed by queues incoming_unconfirmed_map = #{} :: #{delivery_number() => {#{rabbit_amqqueue:name() := ok}, @@ -138,6 +148,32 @@ multi_transfer_msg :: undefined | #multi_transfer_msg{} }). +%% A credit request from the client (receiver) as sent in the FLOW frame. +-record(credit_req, { + delivery_count :: sequence_no(), + credit :: rabbit_queue_type:credit(), + drain :: boolean(), + echo :: boolean() + }). + +%% Link flow control state for link between client (receiver) and us (sender). +-record(client_flow_ctl, { + delivery_count :: sequence_no(), + credit :: rabbit_queue_type:credit(), + echo :: boolean() + }). + +%% Link flow control state for link between us (receiver) and queue (sender). +-record(queue_flow_ctl, { + delivery_count :: sequence_no(), + %% We cap the actual credit we grant to the sending queue. + credit :: 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX, + %% Credit as desired by the receiving client. If larger than + %% LINK_CREDIT_RCV_FROM_QUEUE_MAX, we will top up in batches to the sending queue. + desired_credit :: rabbit_queue_type:credit(), + drain :: boolean() + }). + -record(outgoing_link, { %% Although the source address of a link might be an exchange name and binding key %% or a topic filter, an outgoing link will always consume from a queue. @@ -145,10 +181,23 @@ queue_type :: rabbit_queue_type:queue_type(), send_settled :: boolean(), max_message_size :: unlimited | pos_integer(), + + %% When feature flag credit_api_v2 becomes required, + %% the following 2 fields should be deleted. + credit_api_version :: 1 | 2, %% When credit API v1 is used, our session process holds the delivery-count - %% When credit API v2 is used, the queue type implementation holds the delivery-count - %% When feature flag credit_api_v2 becomes required, this field should be deleted. - delivery_count :: {credit_api_v1, sequence_no()} | credit_api_v2 + delivery_count :: sequence_no() | credit_api_v2, + %% We use a dual link approach for messages we send to the client. + %% We hold link flow control state for the link to the receiving + %% client and for the link to the sending queue. + client_flow_ctl :: #client_flow_ctl{} | credit_api_v1, + queue_flow_ctl :: #queue_flow_ctl{} | credit_api_v1, + %% True if we sent a credit request to the sending queue + %% but haven't processed the corresponding credit reply yet. + credit_req_in_flight :: boolean() | credit_api_v1, + %% While credit_req_in_flight is true, we stash the + %% latest credit request from the receiving client. + stashed_credit_req :: none | #credit_req{} | credit_api_v1 }). -record(outgoing_unsettled, { @@ -248,6 +297,7 @@ %% and advanced delivery count. Otherwise, we would violate the AMQP protocol spec. outgoing_pending = queue:new() :: queue:queue(#pending_delivery{} | #pending_management_delivery{} | + rabbit_queue_type:credit_reply_action() | #'v1_0.flow'{}), %% The link or session endpoint assigns each message a unique delivery-id @@ -376,6 +426,10 @@ handle_call(Msg, _From, State) -> handle_info(timeout, State) -> noreply(State); +handle_info({bump_credit, Msg}, State) -> + %% We are receiving credit from the writer proc. + credit_flow:handle_bump_msg(Msg), + noreply(State); handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason}, #state{queue_states = QStates0, stashed_eol = Eol} = State0) -> @@ -947,16 +1001,16 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, user = User = #user{username = Username}, reader_pid = ReaderPid}}) -> ok = validate_attach(Attach), - {SndSettled, - EffectiveSndSettleMode} = case SndSettleMode of - ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> - {true, SndSettleMode}; - _ -> - %% In the future, we might want to support sender settle - %% mode mixed where we would expect a settlement from the - %% client only for durable messages. - {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} - end, + {SndSettled, EffectiveSndSettleMode} = + case SndSettleMode of + ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> + {true, SndSettleMode}; + _ -> + %% In the future, we might want to support sender settle + %% mode mixed where we would expect a settlement from the + %% client only for durable messages. + {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} + end, case ensure_source(Source, Vhost, User, PermCache0, TopicPermCache0) of {error, Reason} -> protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]); @@ -984,14 +1038,32 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% all consumers will use credit API v2. %% Streams always use credit API v2 since the stream client (rabbit_stream_queue) holds the link %% flow control state. Hence, credit API mixed version isn't an issue for streams. - {Mode, - DeliveryCount} = case rabbit_feature_flags:is_enabled(credit_api_v2) orelse - QType =:= rabbit_stream_queue of - true -> - {{credited, ?INITIAL_DELIVERY_COUNT}, credit_api_v2}; - false -> - {{credited, credit_api_v1}, {credit_api_v1, ?INITIAL_DELIVERY_COUNT}} - end, + {CreditApiVsn, Mode, DeliveryCount, ClientFlowCtl, + QueueFlowCtl, CreditReqInFlight, StashedCreditReq} = + case rabbit_feature_flags:is_enabled(credit_api_v2) orelse + QType =:= rabbit_stream_queue of + true -> + {2, + {credited, ?INITIAL_DELIVERY_COUNT}, + credit_api_v2, + #client_flow_ctl{delivery_count = ?INITIAL_DELIVERY_COUNT, + credit = 0, + echo = false}, + #queue_flow_ctl{delivery_count = ?INITIAL_DELIVERY_COUNT, + credit = 0, + desired_credit = 0, + drain = false}, + false, + none}; + false -> + {1, + {credited, credit_api_v1}, + ?INITIAL_DELIVERY_COUNT, + credit_api_v1, + credit_api_v1, + credit_api_v1, + credit_api_v1} + end, Spec = #{no_ack => SndSettled, channel_pid => self(), limiter_pid => none, @@ -1020,11 +1092,17 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% Echo back that we will respect the client's requested max-message-size. max_message_size = MaybeMaxMessageSize}, MaxMessageSize = max_message_size(MaybeMaxMessageSize), - Link = #outgoing_link{queue_name_bin = QNameBin, - queue_type = QType, - send_settled = SndSettled, - max_message_size = MaxMessageSize, - delivery_count = DeliveryCount}, + Link = #outgoing_link{ + queue_name_bin = QNameBin, + queue_type = QType, + send_settled = SndSettled, + max_message_size = MaxMessageSize, + credit_api_version = CreditApiVsn, + delivery_count = DeliveryCount, + client_flow_ctl = ClientFlowCtl, + queue_flow_ctl = QueueFlowCtl, + credit_req_in_flight = CreditReqInFlight, + stashed_credit_req = StashedCreditReq}, OutgoingLinks = OutgoingLinks0#{HandleInt => Link}, State1 = State0#state{queue_states = QStates, outgoing_links = OutgoingLinks, @@ -1138,10 +1216,10 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, {QStates, Unsettled, OutgoingLinks} = case maps:take(HandleInt, OutgoingLinks0) of {#outgoing_link{queue_name_bin = QNameBin}, OutgoingLinks1} -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), case rabbit_amqqueue:lookup(QName) of {ok, Q} -> - %%TODO Consider adding a new rabbit_queue_type:remove_consumer API that - from the point of view of + %%TODO Add a new rabbit_queue_type:remove_consumer API that - from the point of view of %% the queue process - behaves as if our session process terminated: All messages checked out %% to this consumer should be re-queued automatically instead of us requeueing them here after cancelling %% consumption. @@ -1154,6 +1232,8 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, %% first detaching and then re-attaching to the same session with the same link handle (the handle %% becomes available for re-use once a link is closed): This will result in the same consumer tag, %% and we ideally disallow "updating" an AMQP consumer. + %% If such an API is not added, we also must return messages in the outgoing_pending queue + %% which haven't made it to the outgoing_unsettled map yet. case rabbit_queue_type:cancel(Q, Ctag, undefined, Username, QStates0) of {ok, QStates1} -> {Unsettled1, MsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0), @@ -1286,84 +1366,406 @@ handle_control(Frame, _State) -> "Unexpected frame ~tp", [amqp10_framing:pprint(Frame)]). -send_pending(#state{remote_incoming_window = Space, - outgoing_pending = Buf0, - cfg = #cfg{writer_pid = WriterPid, - channel_num = Ch}} = State0) -> +send_pending(#state{remote_incoming_window = RemoteIncomingWindow, + outgoing_pending = Buf0 + } = State) -> case queue:out(Buf0) of {empty, _} -> - State0; + State; + {{value, CreditReply}, Buf} + when element(1, CreditReply) =:= credit_reply -> + State1 = State#state{outgoing_pending = Buf}, + State2 = handle_credit_reply(CreditReply, State1), + send_pending(State2); {{value, #'v1_0.flow'{} = Flow0}, Buf} -> - Flow = session_flow_fields(Flow0, State0), + #cfg{writer_pid = WriterPid, + channel_num = Ch} = State#state.cfg, + State1 = State#state{outgoing_pending = Buf}, + Flow = session_flow_fields(Flow0, State1), rabbit_amqp_writer:send_command(WriterPid, Ch, Flow), - send_pending(State0#state{outgoing_pending = Buf}); - {{value, #pending_delivery{ - frames = Frames, - queue_pid = QPid, - outgoing_unsettled = #outgoing_unsettled{queue_name = QName} - } = Pending}, Buf1} - when Space > 0 -> + send_pending(State1); + {{value, Delivery}, Buf} -> + case RemoteIncomingWindow =:= 0 orelse + credit_flow:blocked() of + true -> + State; + false -> + {NewRemoteIncomingWindow, State1} = + send_pending_delivery(Delivery, Buf, State), + NumTransfersSent = RemoteIncomingWindow - NewRemoteIncomingWindow, + State2 = session_flow_control_sent_transfers(NumTransfersSent, State1), + %% Recurse to possibly send FLOW frames. + send_pending(State2) + end + end. + +handle_credit_reply(Action = {credit_reply, Ctag, _DeliveryCount, _Credit, _Available, Drain}, + State = #state{outgoing_links = OutgoingLinks}) -> + Handle = ctag_to_handle(Ctag), + case OutgoingLinks of + #{Handle := Link = #outgoing_link{queue_flow_ctl = QFC, + credit_req_in_flight = CreditReqInFlight}} -> + %% Assert that we expect a credit reply for this consumer. + true = CreditReqInFlight, + %% Assert that "The sender's value is always the last known value indicated by the receiver." + Drain = QFC#queue_flow_ctl.drain, + handle_credit_reply0(Action, Handle, Link, State); + _ -> + %% Ignore credit reply for a detached link. + State + end. + +handle_credit_reply0( + {credit_reply, Ctag, DeliveryCount, Credit, Available, _Drain = false}, + Handle, + #outgoing_link{ + client_flow_ctl = #client_flow_ctl{ + delivery_count = CDeliveryCount, + credit = CCredit, + echo = CEcho + }, + queue_flow_ctl = #queue_flow_ctl{ + delivery_count = QDeliveryCount, + credit = QCredit, + desired_credit = DesiredCredit + } = QFC, + stashed_credit_req = StashedCreditReq + } = Link0, + #state{outgoing_links = OutgoingLinks, + queue_states = QStates0 + } = S0) -> + + %% Assert that flow control state between us and the queue is in sync. + QCredit = Credit, + QDeliveryCount = DeliveryCount, + + case StashedCreditReq of + #credit_req{} -> + %% We prioritise the stashed client request over finishing the current + %% top-up rounds because the latest link state from the client applies. + S = pop_credit_req(Handle, Ctag, Link0, S0), + echo(CEcho, Handle, CDeliveryCount, CCredit, Available, S), + S; + none when QCredit =:= 0 andalso + DesiredCredit > 0 -> + %% Provide queue next batch of credits. + CappedCredit = cap_credit(DesiredCredit), + QName = queue_resource(S0#state.cfg#cfg.vhost, + Link0#outgoing_link.queue_name_bin), + {ok, QStates, Actions} = + rabbit_queue_type:credit( + QName, Ctag, DeliveryCount, CappedCredit, false, QStates0), + Link = Link0#outgoing_link{ + queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit} + }, + S = S0#state{queue_states = QStates, + outgoing_links = OutgoingLinks#{Handle := Link}}, + handle_queue_actions(Actions, S); + none -> + Link = Link0#outgoing_link{credit_req_in_flight = false}, + S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}}, + echo(CEcho, Handle, CDeliveryCount, DesiredCredit, Available, S), + S + end; +handle_credit_reply0( + {credit_reply, Ctag, DeliveryCount, Credit, Available, _Drain = true}, + Handle, + Link0 = #outgoing_link{ + queue_name_bin = QNameBin, + client_flow_ctl = #client_flow_ctl{ + delivery_count = CDeliveryCount0 } = CFC, + queue_flow_ctl = #queue_flow_ctl{ + delivery_count = QDeliveryCount0, + desired_credit = DesiredCredit + } = QFC, + stashed_credit_req = StashedCreditReq}, + S0 = #state{cfg = #cfg{writer_pid = Writer, + vhost = Vhost, + channel_num = ChanNum}, + outgoing_links = OutgoingLinks, + queue_states = QStates0}) -> + %% If the queue sent us a drain credit_reply, + %% the queue must have consumed all our granted credit. + 0 = Credit, + + case DeliveryCount =:= QDeliveryCount0 andalso + DesiredCredit > 0 of + true -> + %% We're in drain mode. The queue did not advance its delivery-count which means + %% it might still have messages available for us. We also desire more messages. + %% Therefore, we do the next round of credit top-up. We prioritise finishing + %% the current drain credit top-up rounds over a stashed credit request because + %% this is easier to reason about and the queue will reply promptly meaning + %% the stashed request will be processed soon enough. + CappedCredit = cap_credit(DesiredCredit), + Link = Link0#outgoing_link{queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}}, + + QName = queue_resource(Vhost, QNameBin), + {ok, QStates, Actions} = + rabbit_queue_type:credit( + QName, Ctag, DeliveryCount, CappedCredit, true, QStates0), + S = S0#state{queue_states = QStates, + outgoing_links = OutgoingLinks#{Handle := Link}}, + handle_queue_actions(Actions, S); + false -> + %% We're in drain mode. + %% The queue either advanced its delivery-count which means it has + %% no more messages available for us, or we do not desire more messages. + %% Therefore, we're done with draining and we "the sender will (after sending + %% all available messages) advance the delivery-count as much as possible, + %% consuming all link-credit, and send the flow state to the receiver." + CDeliveryCount = add(CDeliveryCount0, DesiredCredit), + Flow0 = #'v1_0.flow'{handle = ?UINT(Handle), + delivery_count = ?UINT(CDeliveryCount), + link_credit = ?UINT(0), + drain = true, + available = ?UINT(Available)}, + Flow = session_flow_fields(Flow0, S0), + rabbit_amqp_writer:send_command(Writer, ChanNum, Flow), + Link = Link0#outgoing_link{ + client_flow_ctl = CFC#client_flow_ctl{ + delivery_count = CDeliveryCount, + credit = 0}, + queue_flow_ctl = QFC#queue_flow_ctl{ + delivery_count = DeliveryCount, + credit = 0, + desired_credit = 0, + drain = false}, + credit_req_in_flight = false + }, + S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}}, + case StashedCreditReq of + none -> + S; + #credit_req{} -> + pop_credit_req(Handle, Ctag, Link, S) + end + end. + +pop_credit_req( + Handle, Ctag, + Link0 = #outgoing_link{ + queue_name_bin = QNameBin, + client_flow_ctl = #client_flow_ctl{ + delivery_count = CDeliveryCount + } = CFC, + queue_flow_ctl = #queue_flow_ctl{ + delivery_count = QDeliveryCount + } = QFC, + stashed_credit_req = #credit_req{ + delivery_count = DeliveryCountRcv, + credit = LinkCreditRcv, + drain = Drain, + echo = Echo + }}, + S0 = #state{cfg = #cfg{vhost = Vhost}, + outgoing_links = OutgoingLinks, + queue_states = QStates0}) -> + LinkCreditSnd = amqp10_util:link_credit_snd( + DeliveryCountRcv, LinkCreditRcv, CDeliveryCount), + CappedCredit = cap_credit(LinkCreditSnd), + QName = queue_resource(Vhost, QNameBin), + {ok, QStates, Actions} = + rabbit_queue_type:credit( + QName, Ctag, QDeliveryCount, CappedCredit, Drain, QStates0), + Link = Link0#outgoing_link{ + client_flow_ctl = CFC#client_flow_ctl{ + credit = LinkCreditSnd, + echo = Echo}, + queue_flow_ctl = QFC#queue_flow_ctl{ + credit = CappedCredit, + desired_credit = LinkCreditSnd, + drain = Drain + }, + credit_req_in_flight = true, + stashed_credit_req = none + }, + S = S0#state{queue_states = QStates, + outgoing_links = OutgoingLinks#{Handle := Link}}, + handle_queue_actions(Actions, S). + +echo(Echo, HandleInt, DeliveryCount, LinkCredit, Available, State) -> + case Echo of + true -> + Flow0 = #'v1_0.flow'{handle = ?UINT(HandleInt), + delivery_count = ?UINT(DeliveryCount), + link_credit = ?UINT(LinkCredit), + available = ?UINT(Available)}, + Flow = session_flow_fields(Flow0, State), + #cfg{writer_pid = Writer, + channel_num = Channel} = State#state.cfg, + rabbit_amqp_writer:send_command(Writer, Channel, Flow); + false -> + ok + end. + +send_pending_delivery(#pending_delivery{ + frames = Frames, + queue_pid = QPid, + outgoing_unsettled = #outgoing_unsettled{consumer_tag = Ctag, + queue_name = QName} + } = Pending, + Buf0, + #state{remote_incoming_window = Space, + outgoing_links = OutgoingLinks, + queue_states = QStates, + cfg = #cfg{writer_pid = WriterPid, + channel_num = Ch} + } = State0) -> + Handle = ctag_to_handle(Ctag), + case is_map_key(Handle, OutgoingLinks) of + true -> SendFun = case QPid of credit_api_v2 -> send_fun(WriterPid, Ch); _ -> - case rabbit_queue_type:module(QName, State0#state.queue_states) of + case rabbit_queue_type:module(QName, QStates) of {ok, rabbit_classic_queue} -> %% Classic queue client and classic queue process that %% communicate via credit API v1 use RabbitMQ internal %% credit flow control. fun(Transfer, Sections) -> rabbit_amqp_writer:send_command_and_notify( - WriterPid, Ch, QPid, self(), Transfer, Sections) + WriterPid, QPid, Ch, Transfer, Sections) end; {ok, _QType} -> send_fun(WriterPid, Ch) end end, - {NumTransfersSent, Buf, State1} = case send_frames(SendFun, Frames, Space) of {sent_all, SpaceLeft} -> - {Space - SpaceLeft, - Buf1, - record_outgoing_unsettled(Pending, State0)}; - {sent_some, Rest} -> - {Space, - queue:in_r(Pending#pending_delivery{frames = Rest}, Buf1), - State0} - end, - State2 = session_flow_control_sent_transfers(NumTransfersSent, State1), - State = State2#state{outgoing_pending = Buf}, - send_pending(State); - {{value, Pending = #pending_management_delivery{frames = Frames}}, Buf1} - when Space > 0 -> - SendFun = send_fun(WriterPid, Ch), - {NumTransfersSent, Buf} = - case send_frames(SendFun, Frames, Space) of - {sent_all, SpaceLeft} -> - {Space - SpaceLeft, Buf1}; - {sent_some, Rest} -> - {Space, queue:in_r(Pending#pending_management_delivery{frames = Rest}, Buf1)} - end, - State1 = session_flow_control_sent_transfers(NumTransfersSent, State0), - State = State1#state{outgoing_pending = Buf}, - send_pending(State); - _ when Space =:= 0 -> - State0 + State1 = State0#state{outgoing_pending = Buf0}, + State = sent_pending_delivery(Pending, Handle, State1), + {SpaceLeft, State}; + {sent_some, SpaceLeft, Rest} -> + Buf = queue:in_r(Pending#pending_delivery{frames = Rest}, Buf0), + State = State0#state{outgoing_pending = Buf}, + {SpaceLeft, State} + end; + false -> + %% Link got detached. Either the client closed the link in which case the queue + %% already requeued all checked out messages or the queue doesn't exist anymore + %% in which case there is no point in requeuing this message. + %% Therefore, ignore (drop) this delivery. + State = State0#state{outgoing_pending = Buf0}, + {Space, State} + end; +send_pending_delivery(#pending_management_delivery{frames = Frames} = Pending, + Buf0, + #state{remote_incoming_window = Space, + cfg = #cfg{writer_pid = WriterPid, + channel_num = Ch} + } = State0) -> + SendFun = send_fun(WriterPid, Ch), + case send_frames(SendFun, Frames, Space) of + {sent_all, SpaceLeft} -> + State = State0#state{outgoing_pending = Buf0}, + {SpaceLeft, State}; + {sent_some, SpaceLeft, Rest} -> + Buf = queue:in_r(Pending#pending_management_delivery{frames = Rest}, Buf0), + State = State0#state{outgoing_pending = Buf}, + {SpaceLeft, State} end. send_frames(_, [], SpaceLeft) -> {sent_all, SpaceLeft}; -send_frames(_, Rest, 0) -> - {sent_some, Rest}; -send_frames(SendFun, [[Transfer, Sections] | Rest], SpaceLeft) -> - SendFun(Transfer, Sections), - send_frames(SendFun, Rest, SpaceLeft - 1). +send_frames(_, Rest, SpaceLeft = 0) -> + {sent_some, SpaceLeft, Rest}; +send_frames(SendFun, [[Transfer, Sections] | Rest] = Frames, SpaceLeft) -> + case SendFun(Transfer, Sections) of + ok -> + send_frames(SendFun, Rest, SpaceLeft - 1); + {error, blocked} -> + {sent_some, SpaceLeft, Frames} + end. send_fun(WriterPid, Ch) -> fun(Transfer, Sections) -> rabbit_amqp_writer:send_command(WriterPid, Ch, Transfer, Sections) end. +sent_pending_delivery( + Pending = #pending_delivery{ + outgoing_unsettled = #outgoing_unsettled{ + consumer_tag = Ctag, + queue_name = QName}}, + Handle, + S0 = #state{outgoing_links = OutgoingLinks0, + queue_states = QStates0}) -> + + #outgoing_link{ + credit_api_version = CreditApiVsn, + client_flow_ctl = CFC0, + queue_flow_ctl = QFC0, + credit_req_in_flight = CreditReqInFlight0 + } = Link0 = maps:get(Handle, OutgoingLinks0), + + S = case CreditApiVsn of + 2 -> + #client_flow_ctl{ + delivery_count = CDeliveryCount0, + credit = CCredit0 + } = CFC0, + #queue_flow_ctl{ + delivery_count = QDeliveryCount0, + credit = QCredit0, + desired_credit = DesiredCredit0 + } = QFC0, + + CDeliveryCount = add(CDeliveryCount0, 1), + %% Even though the spec mandates + %% "If the link-credit is less than or equal to zero, i.e., + %% the delivery-count is the same as or greater than the + %% delivery-limit, a sender MUST NOT send more messages." + %% we forced the message through to be sent to the client. + %% Due to our dual link approach, we don't want to buffer any + %% messages in the session if the receiving client dynamically + %% decreased link credit. The alternative is to requeue messages. + %% "the receiver MAY either handle the excess messages normally + %% or detach the link with a transfer-limit-exceeded error code." + CCredit = max(0, CCredit0 - 1), + + QDeliveryCount = add(QDeliveryCount0, 1), + QCredit1 = max(0, QCredit0 - 1), + DesiredCredit = max(0, DesiredCredit0 - 1), + + {QCredit, CreditReqInFlight, QStates, Actions} = + case QCredit1 =:= 0 andalso + DesiredCredit > 0 andalso + not CreditReqInFlight0 of + true -> + %% assertion + none = Link0#outgoing_link.stashed_credit_req, + %% Provide queue next batch of credits. + CappedCredit = cap_credit(DesiredCredit), + {ok, QStates1, Actions0} = + rabbit_queue_type:credit( + QName, Ctag, QDeliveryCount, CappedCredit, + QFC0#queue_flow_ctl.drain, QStates0), + {CappedCredit, true, QStates1, Actions0}; + false -> + {QCredit1, CreditReqInFlight0, QStates0, []} + end, + + CFC = CFC0#client_flow_ctl{ + delivery_count = CDeliveryCount, + credit = CCredit}, + QFC = QFC0#queue_flow_ctl{ + delivery_count = QDeliveryCount, + credit = QCredit, + desired_credit = DesiredCredit}, + Link = Link0#outgoing_link{client_flow_ctl = CFC, + queue_flow_ctl = QFC, + credit_req_in_flight = CreditReqInFlight}, + OutgoingLinks = OutgoingLinks0#{Handle := Link}, + S1 = S0#state{outgoing_links = OutgoingLinks, + queue_states = QStates}, + handle_queue_actions(Actions, S1); + 1 -> + S0 + end, + record_outgoing_unsettled(Pending, S). + record_outgoing_unsettled(#pending_delivery{queue_ack_required = true, delivery_id = DeliveryId, outgoing_unsettled = Unsettled}, @@ -1554,30 +1956,23 @@ handle_queue_actions(Actions, State) -> lists:foldl(fun(Msg, S) -> handle_deliver(CTag, AckRequired, Msg, S) end, S0, Msgs); - ({credit_reply, Ctag, DeliveryCount, Credit, Available, Drain}, + ({credit_reply, _Ctag, _DeliveryCount, _Credit, _Available, _Drain} = Action, S = #state{outgoing_pending = Pending}) -> %% credit API v2 - Handle = ctag_to_handle(Ctag), - Flow = #'v1_0.flow'{ - handle = ?UINT(Handle), - delivery_count = ?UINT(DeliveryCount), - link_credit = ?UINT(Credit), - available = ?UINT(Available), - drain = Drain}, - S#state{outgoing_pending = queue:in(Flow, Pending)}; + S#state{outgoing_pending = queue:in(Action, Pending)}; ({credit_reply_v1, Ctag, Credit0, Available, Drain}, S0 = #state{outgoing_links = OutgoingLinks0, outgoing_pending = Pending}) -> %% credit API v1 %% Delete this branch when feature flag credit_api_v2 becomes required. Handle = ctag_to_handle(Ctag), - Link = #outgoing_link{delivery_count = {credit_api_v1, Count0}} = maps:get(Handle, OutgoingLinks0), + Link = #outgoing_link{delivery_count = Count0} = maps:get(Handle, OutgoingLinks0), {Count, Credit, S} = case Drain of true -> Count1 = add(Count0, Credit0), OutgoingLinks = maps:update( Handle, - Link#outgoing_link{delivery_count = {credit_api_v1, Count1}}, + Link#outgoing_link{delivery_count = Count1}, OutgoingLinks0), S1 = S0#state{outgoing_links = OutgoingLinks}, {Count1, 0, S1}; @@ -1617,6 +2012,7 @@ handle_deliver(ConsumerTag, AckRequired, #{Handle := #outgoing_link{queue_type = QType, send_settled = SendSettled, max_message_size = MaxMessageSize, + credit_api_version = CreditApiVsn, delivery_count = DelCount} = Link0} -> Dtag = delivery_tag(MsgId, SendSettled), Transfer = #'v1_0.transfer'{ @@ -1633,11 +2029,12 @@ handle_deliver(ConsumerTag, AckRequired, messages_delivered(Redelivered, QType), rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, Trace), {OutgoingLinks, QPid - } = case DelCount of - credit_api_v2 -> + } = case CreditApiVsn of + 2 -> {OutgoingLinks0, credit_api_v2}; - {credit_api_v1, C} -> - Link = Link0#outgoing_link{delivery_count = {credit_api_v1, add(C, 1)}}, + 1 -> + DelCount = Link0#outgoing_link.delivery_count, + Link = Link0#outgoing_link{delivery_count = add(DelCount, 1)}, OutgoingLinks1 = maps:update(Handle, Link, OutgoingLinks0), {OutgoingLinks1, QPid0} end, @@ -1943,7 +2340,7 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) -> {utf8, String} -> case parse_target_v2_string(String) of {ok, XNameBin, RKey, _} -> - XName = rabbit_misc:r(Vhost, exchange, XNameBin), + XName = exchange_resource(Vhost, XNameBin), PermCache = check_resource_access(XName, write, User, PermCache0), case rabbit_exchange:lookup(XName) of {ok, X} -> @@ -2072,13 +2469,13 @@ ensure_source_v1(Address, case rabbit_routing_parser:parse_routing(Src) of {"", QNameList} -> true = string:equal(QNameList, QNameBin), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), {ok, QName, PermCache1, TopicPermCache0}; {XNameList, RoutingKeyList} -> RoutingKey = unicode:characters_to_binary(RoutingKeyList), XNameBin = unicode:characters_to_binary(XNameList), - XName = rabbit_misc:r(Vhost, exchange, XNameBin), - QName = rabbit_misc:r(Vhost, queue, QNameBin), + XName = exchange_resource(Vhost, XNameBin), + QName = queue_resource(Vhost, QNameBin), Binding = #binding{source = XName, destination = QName, key = RoutingKey}, @@ -2101,7 +2498,7 @@ ensure_source_v1(Address, %% The only possible v2 source address format is: %% /queue/:queue ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), ok = exit_if_absent(QName), {ok, QName, PermCache, TopicPermCache}; ensure_source_v2(Address, _, _, _) -> @@ -2148,7 +2545,7 @@ try_target_v2(Address, Vhost, User, PermCache) -> end. check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> - XName = rabbit_misc:r(Vhost, exchange, XNameBin), + XName = exchange_resource(Vhost, XNameBin), PermCache = check_resource_access(XName, write, User, PermCache0), case rabbit_exchange:lookup(XName) of {ok, X} -> @@ -2277,31 +2674,73 @@ handle_outgoing_mgmt_link_flow_control( handle_outgoing_link_flow_control( #outgoing_link{queue_name_bin = QNameBin, - delivery_count = MaybeDeliveryCountSnd}, + credit_api_version = CreditApiVsn, + client_flow_ctl = CFC, + queue_flow_ctl = QFC, + credit_req_in_flight = CreditReqInFlight + } = Link0, #'v1_0.flow'{handle = ?UINT(HandleInt), delivery_count = MaybeDeliveryCountRcv, link_credit = ?UINT(LinkCreditRcv), drain = Drain0, echo = Echo0}, - State0 = #state{queue_states = QStates0, - cfg = #cfg{vhost = Vhost}}) -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), + #state{outgoing_links = OutgoingLinks, + queue_states = QStates0, + cfg = #cfg{vhost = Vhost} + } = State0) -> + QName = queue_resource(Vhost, QNameBin), Ctag = handle_to_ctag(HandleInt), DeliveryCountRcv = delivery_count_rcv(MaybeDeliveryCountRcv), Drain = default(Drain0, false), Echo = default(Echo0, false), - case MaybeDeliveryCountSnd of - credit_api_v2 -> - {ok, QStates, Actions} = rabbit_queue_type:credit( - QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, QStates0), - State1 = State0#state{queue_states = QStates}, - State = handle_queue_actions(Actions, State1), - %% We'll handle the credit_reply queue event async later - %% thanks to the queue event containing the consumer tag. - State; - {credit_api_v1, DeliveryCountSnd} -> - LinkCreditSnd = amqp10_util:link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd), - {ok, QStates, Actions} = rabbit_queue_type:credit_v1(QName, Ctag, LinkCreditSnd, Drain, QStates0), + case CreditApiVsn of + 2 -> + case CreditReqInFlight of + false -> + DesiredCredit = amqp10_util:link_credit_snd( + DeliveryCountRcv, + LinkCreditRcv, + CFC#client_flow_ctl.delivery_count), + CappedCredit = cap_credit(DesiredCredit), + Link = Link0#outgoing_link{ + credit_req_in_flight = true, + client_flow_ctl = CFC#client_flow_ctl{ + credit = DesiredCredit, + echo = Echo}, + queue_flow_ctl = QFC#queue_flow_ctl{ + credit = CappedCredit, + desired_credit = DesiredCredit, + drain = Drain}}, + {ok, QStates, Actions} = rabbit_queue_type:credit( + QName, Ctag, + QFC#queue_flow_ctl.delivery_count, + CappedCredit, Drain, QStates0), + State = State0#state{queue_states = QStates, + outgoing_links = OutgoingLinks#{HandleInt := Link}}, + handle_queue_actions(Actions, State); + true -> + %% A credit request is currently in-flight. Let's first process its reply + %% before sending the next request. This ensures our outgoing_pending + %% queue won't contain lots of credit replies for the same consumer + %% when the client floods us with credit requests, but closed its incoming-window. + %% Processing one credit top up at a time between us and the queue is also easier + %% to reason about. Therefore, we stash the new request. If there is already a + %% stashed request, we replace it because the latest flow control state from the + %% client applies. + Link = Link0#outgoing_link{ + stashed_credit_req = #credit_req{ + delivery_count = DeliveryCountRcv, + credit = LinkCreditRcv, + drain = Drain, + echo = Echo}}, + State0#state{outgoing_links = OutgoingLinks#{HandleInt := Link}} + end; + 1 -> + DeliveryCountSnd = Link0#outgoing_link.delivery_count, + LinkCreditSnd = amqp10_util:link_credit_snd( + DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd), + {ok, QStates, Actions} = rabbit_queue_type:credit_v1( + QName, Ctag, LinkCreditSnd, Drain, QStates0), State1 = State0#state{queue_states = QStates}, State = handle_queue_actions(Actions, State1), process_credit_reply_sync(Ctag, QName, LinkCreditSnd, State) @@ -2610,7 +3049,7 @@ declare_queue(QNameBin, User = #user{username = Username}, TerminusDurability, PermCache0) -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), + QName = queue_resource(Vhost, QNameBin), PermCache = check_resource_access(QName, configure, User, PermCache0), rabbit_core_metrics:queue_declared(QName), Q0 = amqqueue:new(QName, @@ -2931,6 +3370,11 @@ not_found(Resource) -> address_v1_permitted() -> rabbit_deprecated_features:is_permitted(amqp_address_v1). +-spec cap_credit(rabbit_queue_type:credit()) -> + 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX. +cap_credit(DesiredCredit) -> + min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX). + format_status( #{state := #state{cfg = Cfg, outgoing_pending = OutgoingPending, diff --git a/deps/rabbit/src/rabbit_amqp_writer.erl b/deps/rabbit/src/rabbit_amqp_writer.erl index 2395b8c772ed..bd42fc1c2de2 100644 --- a/deps/rabbit/src/rabbit_amqp_writer.erl +++ b/deps/rabbit/src/rabbit_amqp_writer.erl @@ -15,7 +15,7 @@ send_command/3, send_command/4, send_command_sync/3, - send_command_and_notify/6, + send_command_and_notify/5, internal_send_command/3]). %% gen_server callbacks @@ -31,7 +31,8 @@ reader :: rabbit_types:connection(), pending :: iolist(), %% This field is just an optimisation to minimize the cost of erlang:iolist_size/1 - pending_size :: non_neg_integer() + pending_size :: non_neg_integer(), + monitored_sessions :: #{pid() => true} }). -define(HIBERNATE_AFTER, 6_000). @@ -62,10 +63,10 @@ send_command(Writer, ChannelNum, Performative) -> -spec send_command(pid(), rabbit_types:channel_number(), performative(), - payload()) -> ok. + payload()) -> ok | {error, blocked}. send_command(Writer, ChannelNum, Performative, Payload) -> - Request = {send_command, ChannelNum, Performative, Payload}, - gen_server:cast(Writer, Request). + Request = {send_command, self(), ChannelNum, Performative, Payload}, + maybe_send(Writer, Request). -spec send_command_sync(pid(), rabbit_types:channel_number(), @@ -76,14 +77,13 @@ send_command_sync(Writer, ChannelNum, Performative) -> %% Delete this function when feature flag credit_api_v2 becomes required. -spec send_command_and_notify(pid(), - rabbit_types:channel_number(), - pid(), pid(), + rabbit_types:channel_number(), performative(), - payload()) -> ok. -send_command_and_notify(Writer, ChannelNum, QueuePid, SessionPid, Performative, Payload) -> - Request = {send_command_and_notify, ChannelNum, QueuePid, SessionPid, Performative, Payload}, - gen_server:cast(Writer, Request). + payload()) -> ok | {error, blocked}. +send_command_and_notify(Writer, QueuePid, ChannelNum, Performative, Payload) -> + Request = {send_command_and_notify, QueuePid, self(), ChannelNum, Performative, Payload}, + maybe_send(Writer, Request). -spec internal_send_command(rabbit_net:socket(), performative(), @@ -101,19 +101,22 @@ init({Sock, MaxFrame, ReaderPid}) -> max_frame_size = MaxFrame, reader = ReaderPid, pending = [], - pending_size = 0}, + pending_size = 0, + monitored_sessions = #{}}, process_flag(message_queue_data, off_heap), {ok, State}. handle_cast({send_command, ChannelNum, Performative}, State0) -> State = internal_send_command_async(ChannelNum, Performative, State0), no_reply(State); -handle_cast({send_command, ChannelNum, Performative, Payload}, State0) -> - State = internal_send_command_async(ChannelNum, Performative, Payload, State0), +handle_cast({send_command, SessionPid, ChannelNum, Performative, Payload}, State0) -> + State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0), + State = credit_flow_ack(SessionPid, State1), no_reply(State); %% Delete below function clause when feature flag credit_api_v2 becomes required. -handle_cast({send_command_and_notify, ChannelNum, QueuePid, SessionPid, Performative, Payload}, State0) -> - State = internal_send_command_async(ChannelNum, Performative, Payload, State0), +handle_cast({send_command_and_notify, QueuePid, SessionPid, ChannelNum, Performative, Payload}, State0) -> + State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0), + State = credit_flow_ack(SessionPid, State1), rabbit_amqqueue:notify_sent(QueuePid, SessionPid), no_reply(State). @@ -125,6 +128,11 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) -> handle_info(timeout, State0) -> State = flush(State0), {noreply, State}; +handle_info({{'DOWN', session}, _MRef, process, SessionPid, _Reason}, + State0 = #state{monitored_sessions = Sessions}) -> + credit_flow:peer_down(SessionPid), + State = State0#state{monitored_sessions = maps:remove(SessionPid, Sessions)}, + no_reply(State); %% Delete below function clause when feature flag credit_api_v2 becomes required. handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QueuePid), @@ -154,6 +162,25 @@ format_status(Status) -> no_reply(State) -> {noreply, State, 0}. +maybe_send(Writer, Request) -> + case credit_flow:blocked() of + false -> + credit_flow:send(Writer), + gen_server:cast(Writer, Request); + true -> + {error, blocked} + end. + +credit_flow_ack(SessionPid, State = #state{monitored_sessions = Sessions}) -> + credit_flow:ack(SessionPid), + case is_map_key(SessionPid, Sessions) of + true -> + State; + false -> + _MonitorRef = monitor(process, SessionPid, [{tag, {'DOWN', session}}]), + State#state{monitored_sessions = maps:put(SessionPid, true, Sessions)} + end. + internal_send_command_async(Channel, Performative, State = #state{pending = Pending, pending_size = PendingSize}) -> diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index a26d71ca70b9..5380f5b5f751 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -1633,8 +1633,8 @@ handle_cast({credit, SessionPid, CTag, Credit, Drain}, %% Behave like non-native AMQP 1.0: Send send_credit_reply before deliveries. rabbit_classic_queue:send_credit_reply_credit_api_v1( SessionPid, amqqueue:get_name(Q), BQ:len(BQS0)), - handle_cast({credit, SessionPid, CTag, credit_api_v1, Credit, Drain, false}, State); -handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain, Echo}, + handle_cast({credit, SessionPid, CTag, credit_api_v1, Credit, Drain}, State); +handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain}, #q{consumers = Consumers0, q = Q} = State0) -> QName = amqqueue:get_name(Q), @@ -1666,8 +1666,7 @@ handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain, Echo}, rabbit_classic_queue:send_credit_reply( SessionPid, QName, CTag, AdvancedDeliveryCount, 0, Avail, Drain); {PostDeliveryCountSnd, PostCred} - when is_integer(PostDeliveryCountSnd) andalso - Echo -> + when is_integer(PostDeliveryCountSnd) -> %% credit API v2 Avail = BQ:len(PostBQS), rabbit_classic_queue:send_credit_reply( diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 42dbcbb096f0..4024de5ee476 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -42,7 +42,7 @@ deliver/3, settle/5, credit_v1/5, - credit/7, + credit/6, dequeue/5, info/2, state_info/1, @@ -306,8 +306,8 @@ credit_v1(_QName, Ctag, LinkCreditSnd, Drain, #?STATE{pid = QPid} = State) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}), {State, []}. -credit(_QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, #?STATE{pid = QPid} = State) -> - Request = {credit, self(), Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo}, +credit(_QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, #?STATE{pid = QPid} = State) -> + Request = {credit, self(), Ctag, DeliveryCountRcv, LinkCreditRcv, Drain}, delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}), {State, []}. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 65f2db8a601f..076c3c80e01e 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -119,7 +119,7 @@ checked_out = #{} :: #{msg_id() => msg()}, %% max number of messages that can be sent %% decremented for each delivery - credit = 0 : non_neg_integer(), + credit = 0 :: non_neg_integer(), %% AMQP 1.0 §2.6.7 delivery_count :: rabbit_queue_type:delivery_count() }). diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index f1bea8db3498..70ced853751e 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -23,7 +23,7 @@ return/3, discard/3, credit_v1/4, - credit/6, + credit/5, handle_ra_event/4, untracked_enqueue/2, purge/1, @@ -43,10 +43,6 @@ -record(consumer, {last_msg_id :: seq() | -1 | undefined, ack = false :: boolean(), - %% 'echo' field from latest FLOW, see AMQP 1.0 §2.7.4 - %% Quorum queue server will always echo back to us, - %% but we only emit a credit_reply if Echo=true - echo :: boolean(), %% Remove this field when feature flag credit_api_v2 becomes required. delivery_count :: {credit_api_v1, rabbit_queue_type:delivery_count()} | credit_api_v2 }). @@ -369,7 +365,6 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, fun (C) -> C#consumer{ack = Ack} end, #consumer{last_msg_id = LastMsgId, ack = Ack, - echo = false, delivery_count = DeliveryCount}, CDels0), {ok, State0#state{leader = Leader, @@ -397,12 +392,9 @@ query_single_active_consumer(#state{leader = Leader}) -> state()) -> {state(), rabbit_queue_type:actions()}. credit_v1(ConsumerTag, Credit, Drain, - #state{consumer_deliveries = CDels} = State0) -> - ConsumerId = consumer_id(ConsumerTag), + State = #state{consumer_deliveries = CDels}) -> #consumer{delivery_count = {credit_api_v1, Count}} = maps:get(ConsumerTag, CDels), - ServerId = pick_server(State0), - Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, Count, Drain), - {send_command(ServerId, undefined, Cmd, normal, State0), []}. + credit(ConsumerTag, Count, Credit, Drain, State). %% @doc Provide credit to the queue %% @@ -417,18 +409,12 @@ credit_v1(ConsumerTag, Credit, Drain, rabbit_queue_type:delivery_count(), rabbit_queue_type:credit(), Drain :: boolean(), - Echo :: boolean(), state()) -> {state(), rabbit_queue_type:actions()}. -credit(ConsumerTag, DeliveryCount, Credit, Drain, Echo, - #state{consumer_deliveries = CDels0} = State0) -> +credit(ConsumerTag, DeliveryCount, Credit, Drain, State) -> ConsumerId = consumer_id(ConsumerTag), - ServerId = pick_server(State0), + ServerId = pick_server(State), Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, DeliveryCount, Drain), - CDels = maps:update_with(ConsumerTag, - fun(C) -> C#consumer{echo = Echo} end, - CDels0), - State = State0#state{consumer_deliveries = CDels}, {send_command(ServerId, undefined, Cmd, normal, State), []}. %% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag @@ -594,21 +580,10 @@ handle_ra_event(QName, From, {applied, Seqs}, end; handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_delivery(QName, From, Del, State0); -handle_ra_event(_QName, _From, - {machine, {credit_reply_v1, _CTag, _Credit, _Available, _Drain = false} = Action}, - State) -> +handle_ra_event(_QName, _From, {machine, Action}, State) + when element(1, Action) =:= credit_reply orelse + element(1, Action) =:= credit_reply_v1 -> {ok, State, [Action]}; -handle_ra_event(_QName, _From, - {machine, {credit_reply, CTag, _DeliveryCount, _Credit, _Available, Drain} = Action}, - #state{consumer_deliveries = CDels} = State) -> - Actions = case CDels of - #{CTag := #consumer{echo = Echo}} - when Echo orelse Drain -> - [Action]; - _ -> - [] - end, - {ok, State, Actions}; handle_ra_event(_QName, _, {machine, {queue_status, Status}}, #state{} = State) -> %% just set the queue status diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index de7e908fd66f..3ea45e454d8c 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -47,7 +47,7 @@ deliver/4, settle/5, credit_v1/5, - credit/7, + credit/6, dequeue/5, fold_state/3, is_policy_applicable/2, @@ -85,6 +85,9 @@ -define(QUEUE_MODULES, [rabbit_classic_queue, rabbit_quorum_queue, rabbit_stream_queue]). -define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]). +-type credit_reply_action() :: {credit_reply, rabbit_types:ctag(), delivery_count(), credit(), + Available :: non_neg_integer(), Drain :: boolean()}. + %% anything that the host process needs to do on behalf of the queue type session -type action() :: %% indicate to the queue type module that a message has been delivered @@ -92,9 +95,7 @@ {settled, queue_name(), [correlation()]} | {deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]} | {block | unblock, QueueName :: term()} | - %% credit API v2 - {credit_reply, rabbit_types:ctag(), delivery_count(), credit(), - Available :: non_neg_integer(), Drain :: boolean()} | + credit_reply_action() | %% credit API v1 {credit_reply_v1, rabbit_types:ctag(), credit(), Available :: non_neg_integer(), Drain :: boolean()}. @@ -135,6 +136,7 @@ consume_mode/0, consume_spec/0, delivery_options/0, + credit_reply_action/0, action/0, actions/0, settle_op/0, @@ -222,9 +224,8 @@ -callback credit_v1(queue_name(), rabbit_types:ctag(), credit(), Drain :: boolean(), queue_state()) -> {queue_state(), actions()}. -%% credit API v2 -callback credit(queue_name(), rabbit_types:ctag(), delivery_count(), credit(), - Drain :: boolean(), Echo :: boolean(), queue_state()) -> + Drain :: boolean(), queue_state()) -> {queue_state(), actions()}. -callback dequeue(queue_name(), NoAck :: boolean(), LimiterPid :: pid(), @@ -675,12 +676,12 @@ credit_v1(QName, CTag, LinkCreditSnd, Drain, Ctxs) -> {ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}. %% credit API v2 --spec credit(queue_name(), rabbit_types:ctag(), delivery_count(), credit(), boolean(), boolean(), state()) -> +-spec credit(queue_name(), rabbit_types:ctag(), delivery_count(), credit(), boolean(), state()) -> {ok, state(), actions()}. -credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, Ctxs) -> +credit(QName, CTag, DeliveryCount, Credit, Drain, Ctxs) -> #ctx{state = State0, module = Mod} = Ctx = get_ctx(QName, Ctxs), - {State, Actions} = Mod:credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, State0), + {State, Actions} = Mod:credit(QName, CTag, DeliveryCount, Credit, Drain, State0), {ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}. -spec dequeue(amqqueue:amqqueue(), boolean(), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index e91a4bff19aa..58dae5c6562f 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -25,7 +25,7 @@ delete_immediately/1]). -export([state_info/1, info/2, stat/1, infos/1, infos/2]). -export([settle/5, dequeue/5, consume/3, cancel/5]). --export([credit_v1/5, credit/7]). +-export([credit_v1/5, credit/6]). -export([purge/1]). -export([stateless_deliver/2, deliver/3]). -export([dead_letter_publish/5]). @@ -810,8 +810,8 @@ settle(_QName, discard, CTag, MsgIds, QState) -> credit_v1(_QName, CTag, Credit, Drain, QState) -> rabbit_fifo_client:credit_v1(quorum_ctag(CTag), Credit, Drain, QState). -credit(_QName, CTag, DeliveryCount, Credit, Drain, Echo, QState) -> - rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, Echo, QState). +credit(_QName, CTag, DeliveryCount, Credit, Drain, QState) -> + rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, QState). -spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), pid(), rabbit_types:ctag(), rabbit_fifo_client:state()) -> diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 7444553f3271..eaadae002579 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -24,7 +24,7 @@ deliver/3, settle/5, credit_v1/5, - credit/7, + credit/6, dequeue/5, info/2, queue_length/1, @@ -101,6 +101,7 @@ }). -import(rabbit_queue_type_util, [args_policy_lookup/3]). +-import(rabbit_misc, [queue_resource/2]). -type client() :: #stream_client{}. @@ -467,7 +468,7 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, credit_v1(_, _, _, _, _) -> erlang:error(credit_v1_unsupported). -credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, +credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, #stream_client{readers = Readers, name = Name, local_pid = LocalPid} = State0) -> @@ -479,27 +480,20 @@ credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, {Str2 = #stream{delivery_count = DeliveryCount, credit = Credit, ack = Ack}, Msgs} = stream_entries(QName, Name, LocalPid, Str1), - DrainedInsufficientMsgs = Drain andalso Credit > 0, - Str = case DrainedInsufficientMsgs of + Str = case Drain andalso Credit > 0 of true -> Str2#stream{delivery_count = serial_number:add(DeliveryCount, Credit), credit = 0}; false -> Str2 end, - DeliverActions = deliver_actions(CTag, Ack, Msgs), State = State0#stream_client{readers = maps:update(CTag, Str, Readers)}, - Actions = case Echo orelse DrainedInsufficientMsgs of - true -> - DeliverActions ++ [{credit_reply, - CTag, - Str#stream.delivery_count, - Str#stream.credit, - available_messages(Str), - Drain}]; - false -> - DeliverActions - end, + Actions = deliver_actions(CTag, Ack, Msgs) ++ [{credit_reply, + CTag, + Str#stream.delivery_count, + Str#stream.credit, + available_messages(Str), + Drain}], {State, Actions}; _ -> {State0, []} @@ -973,7 +967,7 @@ set_retention_policy(Name, VHost, Policy) -> {error, _} = E -> E; MaxAge -> - QName = rabbit_misc:r(VHost, queue, Name), + QName = queue_resource(VHost, Name), Fun = fun(Q) -> Conf = amqqueue:get_type_state(Q), amqqueue:set_type_state(Q, Conf#{max_age => MaxAge}) @@ -1004,7 +998,7 @@ restart_stream(VHost, Queue, Options) add_replica(VHost, Name, Node) -> - QName = rabbit_misc:r(VHost, queue, Name), + QName = queue_resource(VHost, Name), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; @@ -1022,7 +1016,7 @@ add_replica(VHost, Name, Node) -> end. delete_replica(VHost, Name, Node) -> - QName = rabbit_misc:r(VHost, queue, Name), + QName = queue_resource(VHost, Name), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 3503c6be8f97..1ef79d0287d6 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -51,6 +51,9 @@ groups() -> roundtrip_with_drain_classic_queue, roundtrip_with_drain_quorum_queue, roundtrip_with_drain_stream, + drain_many_classic_queue, + drain_many_quorum_queue, + drain_many_stream, amqp_stream_amqpl, amqp_quorum_queue_amqpl, message_headers_conversion, @@ -85,7 +88,6 @@ groups() -> resource_alarm_after_session_begin, max_message_size_client_to_server, max_message_size_server_to_client, - receive_transfer_flow_order, global_counters, stream_filtering, available_messages_classic_queue, @@ -106,7 +108,20 @@ groups() -> classic_priority_queue, dead_letter_headers_exchange, dead_letter_reject, - immutable_bare_message + immutable_bare_message, + receive_many_made_available_over_time_classic_queue, + receive_many_made_available_over_time_quorum_queue, + receive_many_made_available_over_time_stream, + receive_many_auto_flow_classic_queue, + receive_many_auto_flow_quorum_queue, + receive_many_auto_flow_stream, + incoming_window_closed_transfer_flow_order, + incoming_window_closed_stop_link, + incoming_window_closed_close_link, + incoming_window_closed_rabbitmq_internal_flow_classic_queue, + incoming_window_closed_rabbitmq_internal_flow_quorum_queue, + tcp_back_pressure_rabbitmq_internal_flow_classic_queue, + tcp_back_pressure_rabbitmq_internal_flow_quorum_queue ]}, {cluster_size_3, [shuffle], @@ -170,6 +185,7 @@ end_per_group(_, Config) -> init_per_testcase(T, Config) when T =:= message_headers_conversion orelse T =:= roundtrip_with_drain_quorum_queue orelse + T =:= drain_many_quorum_queue orelse T =:= timed_get_quorum_queue orelse T =:= available_messages_quorum_queue -> case rpc(Config, rabbit_feature_flags, is_enabled, [credit_api_v2]) of @@ -179,6 +195,21 @@ init_per_testcase(T, Config) {skip, "Receiving with drain from quorum queues in credit API v1 have a known " "bug that they reply with send_drained before delivering the message."} end; +init_per_testcase(T, Config) + when T =:= incoming_window_closed_close_link orelse + T =:= incoming_window_closed_rabbitmq_internal_flow_classic_queue orelse + T =:= incoming_window_closed_rabbitmq_internal_flow_quorum_queue orelse + T =:= tcp_back_pressure_rabbitmq_internal_flow_classic_queue orelse + T =:= tcp_back_pressure_rabbitmq_internal_flow_quorum_queue -> + %% The new RabbitMQ internal flow control + %% writer proc <- session proc <- queue proc + %% is only available with credit API v2. + case rpc(Config, rabbit_feature_flags, is_enabled, [credit_api_v2]) of + true -> + rabbit_ct_helpers:testcase_started(Config, T); + false -> + {skip, "Feature flag credit_api_v2 is disabled"} + end; init_per_testcase(T = immutable_bare_message, Config) -> case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_store_amqp_v1]) of true -> @@ -274,7 +305,7 @@ reliable_send_receive(QType, Outcome, Config) -> True = {boolean, true}, Msg2 = amqp10_msg:set_headers(#{durable => True}, Msg1), ok = amqp10_client:send_msg(Sender, Msg2), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session), @@ -625,7 +656,7 @@ roundtrip_with_drain(Config, QueueType, QName) end, OutMsg2 = amqp10_msg:new(<<"tag-2">>, <<"my-body2">>, false), ok = amqp10_client:send_msg(Sender, OutMsg2), - ok = wait_for_settlement(<<"tag-2">>), + ok = wait_for_accepted(<<"tag-2">>), %% no delivery should be made at this point receive {amqp10_msg, _, _} -> ct:fail(unexpected_delivery) @@ -638,6 +669,85 @@ roundtrip_with_drain(Config, QueueType, QName) ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = amqp10_client:close_connection(Connection). +drain_many_classic_queue(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + drain_many(Config, <<"classic">>, QName). + +drain_many_quorum_queue(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + drain_many(Config, <<"quorum">>, QName). + +drain_many_stream(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + drain_many(Config, <<"stream">>, QName). + +drain_many(Config, QueueType, QName) + when is_binary(QueueType) -> + Address = <<"/queue/", QName/binary>>, + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}}, + {ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address), + wait_for_credit(Sender), + + Num = 5000, + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + + TerminusDurability = none, + Filter = consume_from_first(QueueType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"test-receiver">>, Address, settled, + TerminusDurability, Filter), + + ok = amqp10_client:flow_link_credit(Receiver, Num - 1, never, true), + ?assertEqual(Num - 1, count_received_messages(Receiver)), + flush("drained 1"), + + ok = amqp10_client:flow_link_credit(Receiver, Num, never, true), + receive_messages(Receiver, 1), + flush("drained 2"), + + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + receive {amqp10_msg, _, _} = Unexpected1 -> ct:fail({unexpected, Unexpected1}) + after 10 -> ok + end, + + %% Let's send a couple of FLOW frames in sequence. + ok = amqp10_client:flow_link_credit(Receiver, 0, never, false), + ok = amqp10_client:flow_link_credit(Receiver, 1, never, false), + ok = amqp10_client:flow_link_credit(Receiver, Num div 2, never, false), + ok = amqp10_client:flow_link_credit(Receiver, Num, never, false), + ok = amqp10_client:flow_link_credit(Receiver, Num, never, true), + %% Eventually, we should receive all messages. + receive_messages(Receiver, Num), + flush("drained 3"), + + ok = send_messages(Sender, 1, false), + ok = wait_for_accepts(1), + %% Our receiver shouldn't have any credit left to consume this message. + receive {amqp10_msg, _, _} = Unexpected2 -> ct:fail({unexpected, Unexpected2}) + after 30 -> ok + end, + + %% Grant a huge amount of credits. + ok = amqp10_client:flow_link_credit(Receiver, 2_000_000_000, never, true), + %% We expect the server to send us the last message and + %% to advance the delivery-count promptly. + receive {amqp10_msg, _, _} -> ok + after 2000 -> ct:fail({missing_delivery, ?LINE}) + end, + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 300 -> ct:fail("expected credit_exhausted") + end, + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(Connection). + amqp_stream_amqpl(Config) -> amqp_amqpl(<<"stream">>, Config). @@ -993,7 +1103,7 @@ server_closes_link(QType, Config) -> DTag = <<0>>, Body = <<"body">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, Body, false)), - ok = wait_for_settlement(DTag), + ok = wait_for_accepted(DTag), receive {amqp10_msg, Receiver, Msg} -> ?assertEqual([Body], amqp10_msg:body(Msg)) @@ -1005,7 +1115,7 @@ server_closes_link(QType, Config) -> eventually(?_assertEqual( 1, begin - #{outgoing_unsettled_map := UnsettledMap} = gen_server_state(SessionPid), + #{outgoing_unsettled_map := UnsettledMap} = formatted_state(SessionPid), maps:size(UnsettledMap) end)), @@ -1030,7 +1140,7 @@ server_closes_link(QType, Config) -> eventually(?_assertEqual( 0, begin - #{outgoing_unsettled_map := UnsettledMap} = gen_server_state(SessionPid), + #{outgoing_unsettled_map := UnsettledMap} = formatted_state(SessionPid), maps:size(UnsettledMap) end)), @@ -1058,7 +1168,7 @@ server_closes_link_exchange(Config) -> DTag1 = <<1>>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), %% Server closes the link endpoint due to some AMQP 1.0 external condition: %% In this test, the external condition is that an AMQP 0.9.1 client deletes the exchange. @@ -1109,7 +1219,7 @@ link_target_queue_deleted(QType, Config) -> flush(credited), DTag1 = <<1>>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), %% Mock delivery to the target queue to do nothing. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), @@ -1169,7 +1279,7 @@ target_queues_deleted_accepted(Config) -> DTag1 = <<1>>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), %% Mock to deliver only to q1. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), @@ -1868,7 +1978,7 @@ detach_requeues(Config) -> eventually(?_assertEqual( 0, begin - #{outgoing_unsettled_map := UnsettledMap} = gen_server_state(SessionPid), + #{outgoing_unsettled_map := UnsettledMap} = formatted_state(SessionPid), maps:size(UnsettledMap) end)), @@ -1925,7 +2035,7 @@ resource_alarm_before_session_begin(Config) -> %% Hence, RabbitMQ should open its incoming window allowing our client to send TRANSFERs. ?assertEqual(ok, amqp10_client:send_msg(Sender, Msg1)), - ok = wait_for_settlement(Tag1), + ok = wait_for_accepted(Tag1), ok = amqp10_client:detach_link(Sender), ok = end_session_sync(Session1), @@ -1987,7 +2097,7 @@ resource_alarm_after_session_begin(Config) -> %% Now, we should be able to send again. ?assertEqual(ok, amqp10_client:send_msg(Sender, Msg4)), - ok = wait_for_settlement(<<"t4">>), + ok = wait_for_accepted(<<"t4">>), ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver1), @@ -2043,7 +2153,7 @@ max_message_size_client_to_server(Config) -> PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 10), ?assertEqual(ok, amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false))), - ok = wait_for_settlement(<<"t1">>), + ok = wait_for_accepted(<<"t1">>), PayloadTooLarge = binary:copy(<<0>>, MaxMessageSize + 1), ?assertEqual({error, message_size_exceeded}, @@ -2074,7 +2184,7 @@ max_message_size_server_to_client(Config) -> %% e.g. message annotations with routing key and exchange name. PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 200), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false)), - ok = wait_for_settlement(<<"t1">>), + ok = wait_for_accepted(<<"t1">>), AttachArgs = #{max_message_size => MaxMessageSize, name => <<"test-receiver">>, @@ -2091,7 +2201,7 @@ max_message_size_server_to_client(Config) -> %% The sending link has no maximum message size set. %% Hence, sending this large message from client to server should work. ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, PayloadTooLarge, false)), - ok = wait_for_settlement(<<"t2">>), + ok = wait_for_accepted(<<"t2">>), %% However, the receiving link has a maximum message size set. %% Hence, when the server attempts to deliver this large message, @@ -2111,59 +2221,6 @@ max_message_size_server_to_client(Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). -%% This test ensures that the server sends us TRANSFER and FLOW frames in the correct order -%% even if the server is temporarily not allowed to send us any TRANSFERs due to our incoming -%% window being closed. -receive_transfer_flow_order(Config) -> - QName = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), - ok = rabbit_ct_client_helpers:close_channel(Ch), - Address = <<"/amq/queue/", QName/binary>>, - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), - ok = wait_for_credit(Sender), - DTag = <<"my tag">>, - Body = <<"my body">>, - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, Body, false)), - ok = wait_for_settlement(DTag), - ok = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), - receive {amqp10_event, {link, Receiver, attached}} -> ok - after 5000 -> ct:fail(missing_attached) - end, - flush(receiver_attached), - - %% Close our incoming window. - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 0}}}), - - ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), - %% Given our incoming window is closed, we shouldn't receive the TRANSFER yet, and therefore - %% must not yet receive the FLOW that comes thereafter with drain=true, credit=0, and advanced delivery-count. - receive Unexpected -> ct:fail({unexpected, Unexpected}) - after 300 -> ok - end, - - %% Open our incoming window - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), - %% Important: We should first receive the TRANSFER, - %% and only thereafter the FLOW (and hence the credit_exhausted notification). - receive First -> - {amqp10_msg, Receiver, Msg} = First, - ?assertEqual([Body], amqp10_msg:body(Msg)) - after 5000 -> ct:fail("timeout receiving message") - end, - receive Second -> - ?assertEqual({amqp10_event, {link, Receiver, credit_exhausted}}, Second) - after 5000 -> ct:fail("timeout receiving credit_exhausted") - end, - - ok = delete_queue(Session, QName), - ok = end_session_sync(Session), - ok = amqp10_client:close_connection(Connection). - last_queue_confirms(Config) -> ClassicQ = <<"my classic queue">>, QuorumQ = <<"my quorum queue">>, @@ -2329,7 +2386,7 @@ target_classic_queue_down(Config) -> DTag1 = <<"t1">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_settlement(DTag1), + ok = wait_for_accepted(DTag1), {ok, Msg1} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), @@ -2361,7 +2418,7 @@ target_classic_queue_down(Config) -> end, DTag3 = <<"t3">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag3, <<"m3">>, false)), - ok = wait_for_settlement(DTag3), + ok = wait_for_accepted(DTag3), {ok, Msg3} = amqp10_client:get_msg(Receiver2), ?assertEqual([<<"m3">>], amqp10_msg:body(Msg3)), @@ -2422,7 +2479,7 @@ async_notify(SenderSettleMode, QType, Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Bin, Bin, false)) end || N <- lists:seq(1, NumMsgs)], %% Wait for last message to be confirmed. - ok = wait_for_settlement(integer_to_binary(NumMsgs)), + ok = wait_for_accepted(integer_to_binary(NumMsgs)), flush(settled), ok = detach_link_sync(Sender), @@ -2619,7 +2676,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) Tag = <<"tag">>, Body = <<"body">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Tag, Body, false)), - ok = wait_for_settlement(Tag), + ok = wait_for_accepted(Tag), ok = amqp10_client:flow_link_credit(Receiver, 999, never, true), [Msg] = receive_messages(Receiver, 1), ?assertEqual([Body], amqp10_msg:body(Msg)), @@ -2739,7 +2796,7 @@ global_counters(Config) -> {ok, QQReceiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver-qq">>, QQAddress, unsettled), ok = amqp10_client:send_msg(CQSender, amqp10_msg:new(<<0>>, <<"m0">>, true)), ok = amqp10_client:send_msg(QQSender, amqp10_msg:new(<<1>>, <<"m1">>, false)), - ok = wait_for_settlement(<<1>>), + ok = wait_for_accepted(<<1>>), {ok, Msg0} = amqp10_client:get_msg(CQReceiver), ?assertEqual([<<"m0">>], amqp10_msg:body(Msg0)), @@ -2779,7 +2836,7 @@ global_counters(Config) -> %% Test re-delivery. ok = amqp10_client:send_msg(QQSender, amqp10_msg:new(<<2>>, <<"m2">>, false)), - ok = wait_for_settlement(<<2>>), + ok = wait_for_accepted(<<2>>), {ok, Msg2a} = amqp10_client:get_msg(QQReceiver), ?assertEqual([<<"m2">>], amqp10_msg:body(Msg2a)), %% Releasing causes the message to be requeued. @@ -2883,7 +2940,7 @@ stream_filtering(Config) -> Msg1 = amqp10_msg:set_application_properties(AppProps, Msg0), Msg2 = amqp10_msg:set_message_annotations(Anns, Msg1), ok = amqp10_client:send_msg(Sender, Msg2), - ok = wait_for_settlement(DTag) + ok = wait_for_accepted(DTag) end, lists:seq(1, WaveCount)) end, @@ -3010,12 +3067,12 @@ available_messages(QType, Config) -> ok = wait_for_accepts(4), OutputHandle = element(4, Receiver), - Flow = #'v1_0.flow'{ - %% Grant 1 credit to the sending queue. - link_credit = {uint, 1}, - %% Request sending queue to send us a FLOW including available messages. - echo = true}, - ok = amqp10_client_session:flow(Session, OutputHandle, Flow, never), + Flow0 = #'v1_0.flow'{ + %% Grant 1 credit to the sending queue. + link_credit = {uint, 1}, + %% Request sending queue to send us a FLOW including available messages. + echo = true}, + ok = amqp10_client_session:flow(Session, OutputHandle, Flow0, never), receive_messages(Receiver, 1), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok after 5000 -> ct:fail({missing_event, ?LINE}) @@ -3041,6 +3098,29 @@ available_messages(QType, Config) -> end, ?assertEqual(0, get_available_messages(Receiver)), + ok = send_messages(Sender, 5000, false), + %% We know that Streams only return an approximation for available messages. + %% The committed Stream offset is queried by chunk ID. + %% So, we wait here a bit such that the 4th message goes into its own new chunk. + timer:sleep(50), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"last dtag">>, <<"last msg">>, false)), + ok = wait_for_accepts(5001), + + Flow1 = #'v1_0.flow'{ + link_credit = {uint, 0}, + echo = false}, + Flow2 = #'v1_0.flow'{ + link_credit = {uint, 1}, + echo = true}, + %% Send both FLOW frames in sequence. + ok = amqp10_client_session:flow(Session, OutputHandle, Flow1, never), + ok = amqp10_client_session:flow(Session, OutputHandle, Flow2, never), + receive_messages(Receiver, 1), + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ?assertEqual(5000, get_available_messages(Receiver)), + ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Receiver), ok = end_session_sync(Session), @@ -3073,7 +3153,7 @@ incoming_message_interceptors(Config) -> NowMillis = os:system_time(millisecond), Tag = <<"tag">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Tag, <<"body">>)), - ok = wait_for_settlement(Tag), + ok = wait_for_accepted(Tag), {ok, Msg1} = amqp10_client:get_msg(StreamReceiver), {ok, Msg2} = amqp10_client:get_msg(QQReceiver), @@ -3608,7 +3688,7 @@ dead_letter_reject(Config) -> Body = <<"my body">>, M = amqp10_msg:new(Tag, Body), ok = amqp10_client:send_msg(Sender, M), - ok = wait_for_settlement(Tag), + ok = wait_for_accepted(Tag), {ok, Msg1} = amqp10_client:get_msg(Receiver), ok = amqp10_client:settle_msg(Receiver, Msg1, rejected), @@ -3688,7 +3768,9 @@ dead_letter_into_stream(Config) -> <<"x-initial-cluster-size">> => {ulong, 1} }}), {ok, Receiver} = amqp10_client:attach_receiver_link( - Session1, <<"receiver">>, <<"/queue/", QName1/binary>>, settled), + Session1, <<"receiver">>, <<"/queue/", QName1/binary>>, + settled, configuration, + #{<<"rabbitmq:stream-offset-spec">> => <<"first">>}), {ok, Sender} = amqp10_client:attach_sender_link( Session0, <<"sender">>, <<"/queue/", QName0/binary>>), wait_for_credit(Sender), @@ -3808,7 +3890,7 @@ footer_checksum(FooterOpt, Config) -> "false" => false}, amqp10_msg:new(<<"t1">>, <<"m1">>)))))), ok = amqp10_client:send_msg(Sender, M1), - ok = wait_for_settlement(<<"t1">>), + ok = wait_for_accepted(<<"t1">>), {ok, Msg1} = amqp10_client:get_msg(Receiver), ?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg1)), @@ -3821,7 +3903,7 @@ footer_checksum(FooterOpt, Config) -> "false" => false}, amqp10_msg:new(<<"t2">>, <<"m2">>)), ok = amqp10_client:send_msg(Sender, M2), - ok = wait_for_settlement(<<"t2">>), + ok = wait_for_accepted(<<"t2">>), {ok, Msg2} = amqp10_client:get_msg(Receiver), ?assertEqual(<<"m2">>, amqp10_msg:body_bin(Msg2)), @@ -3829,7 +3911,7 @@ footer_checksum(FooterOpt, Config) -> %% bare message consists of single data section M3 = amqp10_msg:new(<<"t3">>, <<"m3">>), ok = amqp10_client:send_msg(Sender, M3), - ok = wait_for_settlement(<<"t3">>), + ok = wait_for_accepted(<<"t3">>), {ok, Msg3} = amqp10_client:get_msg(Receiver), ?assertEqual(<<"m3">>, amqp10_msg:body_bin(Msg3)), @@ -3838,7 +3920,7 @@ footer_checksum(FooterOpt, Config) -> M4 = amqp10_msg:new(<<"t4">>, [#'v1_0.data'{content = <<"m4 a">>}, #'v1_0.data'{content = <<"m4 b">>}]), ok = amqp10_client:send_msg(Sender, M4), - ok = wait_for_settlement(<<"t4">>), + ok = wait_for_accepted(<<"t4">>), {ok, Msg4} = amqp10_client:get_msg(Receiver), ?assertEqual([<<"m4 a">>, <<"m4 b">>], amqp10_msg:body(Msg4)), @@ -3851,7 +3933,7 @@ footer_checksum(FooterOpt, Config) -> %% Our serialiser uses 1 byte boolean encoding #'v1_0.amqp_sequence'{content = [true, false, undefined]}]), ok = amqp10_client:send_msg(Sender, M5), - ok = wait_for_settlement(<<"t5">>), + ok = wait_for_accepted(<<"t5">>), {ok, Msg5} = amqp10_client:get_msg(Receiver), ?assertEqual([#'v1_0.amqp_sequence'{content = [{ubyte, 255}]}, @@ -3876,7 +3958,7 @@ footer_checksum(FooterOpt, Config) -> {{symbol, <<"x-opt-carrot">>}, {char, $🥕}} ]}]), ok = amqp10_client:send_msg(Sender, M6), - ok = wait_for_settlement(<<"t6">>), + ok = wait_for_accepted(<<"t6">>), {ok, Msg6} = amqp10_client:get_msg(Receiver), ?assertEqual([<<"m6 a">>, <<"m6 b">>], amqp10_msg:body(Msg6)), @@ -3901,6 +3983,382 @@ footer_checksum(FooterOpt, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). +receive_many_made_available_over_time_classic_queue(Config) -> + receive_many_made_available_over_time(<<"classic">>, Config). + +receive_many_made_available_over_time_quorum_queue(Config) -> + receive_many_made_available_over_time(<<"quorum">>, Config). + +receive_many_made_available_over_time_stream(Config) -> + receive_many_made_available_over_time(<<"stream">>, Config). + +%% This test grants many credits to the queue once while +%% messages are being made available at the source over time. +receive_many_made_available_over_time(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address), + wait_for_credit(Sender), + + %% Send first batch of messages. + ok = send_messages(Sender, 10, false), + ok = wait_for_accepts(10), + Filter = consume_from_first(QType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, + settled, configuration, Filter), + flush(attached), + %% Grant many credits to the queue once. + ok = amqp10_client:flow_link_credit(Receiver, 5000, never), + %% We expect to receive the first batch of messages. + receive_messages(Receiver, 10), + + %% Make next batch of messages available. + ok = send_messages(Sender, 2990, false), + ok = wait_for_accepts(2990), + %% We expect to receive this batch of messages. + receive_messages(Receiver, 2990), + + %% Make next batch of messages available. + ok = send_messages(Sender, 1999, false), + ok = wait_for_accepts(1999), + %% We expect to receive this batch of messages. + receive_messages(Receiver, 1999), + + %% Make next batch of messages available. + ok = send_messages(Sender, 2, false), + ok = wait_for_accepts(2), + %% At this point, we only have 2 messages in the queue, but only 1 credit left. + ?assertEqual(1, count_received_messages(Receiver)), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(Connection). + +receive_many_auto_flow_classic_queue(Config) -> + receive_many_auto_flow(<<"classic">>, Config). + +receive_many_auto_flow_quorum_queue(Config) -> + receive_many_auto_flow(<<"quorum">>, Config). + +receive_many_auto_flow_stream(Config) -> + receive_many_auto_flow(<<"stream">>, Config). + +receive_many_auto_flow(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address), + wait_for_credit(Sender), + + %% Send many messages. + Num = 10_000, + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + + Filter = consume_from_first(QType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, + settled, configuration, Filter), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + %% Let's auto top up relatively often, but in large batches. + ok = amqp10_client:flow_link_credit(Receiver, 1300, 1200), + receive_messages(Receiver, Num), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(Connection). + +%% This test ensures that the server sends us TRANSFER and FLOW frames in the correct order +%% even if the server is temporarily not allowed to send us any TRANSFERs due to our session +%% incoming-window being closed. +incoming_window_closed_transfer_flow_order(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Ch = rabbit_ct_client_helpers:open_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), + ok = rabbit_ct_client_helpers:close_channel(Ch), + Address = <<"/amq/queue/", QName/binary>>, + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + DTag = <<"my tag">>, + Body = <<"my body">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, Body, false)), + ok = wait_for_accepted(DTag), + ok = amqp10_client:detach_link(Sender), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + ok = close_incoming_window(Session), + ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), + %% Given our incoming window is closed, we shouldn't receive the TRANSFER yet, and therefore + %% must not yet receive the FLOW that comes thereafter with drain=true, credit=0, and advanced delivery-count. + receive Unexpected -> ct:fail({unexpected, Unexpected}) + after 300 -> ok + end, + + %% Open our incoming window + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + %% Important: We should first receive the TRANSFER, + %% and only thereafter the FLOW (and hence the credit_exhausted notification). + receive First -> + {amqp10_msg, Receiver, Msg} = First, + ?assertEqual([Body], amqp10_msg:body(Msg)) + after 5000 -> ct:fail("timeout receiving message") + end, + receive Second -> + ?assertEqual({amqp10_event, {link, Receiver, credit_exhausted}}, Second) + after 5000 -> ct:fail("timeout receiving credit_exhausted") + end, + + ok = delete_queue(Session, QName), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +incoming_window_closed_stop_link(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{transfer_limit_margin => -1}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>, false)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>, false)), + ok = wait_for_accepted(<<"t1">>), + ok = wait_for_accepted(<<"t2">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + ok = close_incoming_window(Session), + %% We first grant a credit in drain mode. + ok = amqp10_client:flow_link_credit(Receiver, 1, never, true), + %% Then, we change our mind and stop the link. + ok = amqp10_client:stop_receiver_link(Receiver), + %% Given our incoming window is closed, we shouldn't receive any TRANSFER. + receive {amqp10_msg, _, _} = Unexp1 -> ct:fail({?LINE, unexpected_msg, Unexp1}) + after 10 -> ok + end, + + %% Open our incoming window + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + + %% Since we decreased link credit dynamically, we may or may not receive the 1st message. + receive {amqp10_msg, Receiver, Msg1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)) + after 500 -> ok + end, + %% We must not receive the 2nd message. + receive {amqp10_msg, _, _} = Unexp2 -> ct:fail({?LINE, unexpected_msg, Unexp2}) + after 5 -> ok + end, + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that we can close a link while our session incoming-window is closed. +incoming_window_closed_close_link(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + DTag = <<"my tag">>, + Body = <<"my body">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, Body, false)), + ok = wait_for_accepted(DTag), + ok = amqp10_client:detach_link(Sender), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + ok = close_incoming_window(Session), + ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), + %% Given our incoming window is closed, we shouldn't receive the TRANSFER yet, and therefore + %% must not yet receive the FLOW that comes thereafter with drain=true, credit=0, and advanced delivery-count. + receive Unexpected1 -> ct:fail({unexpected, Unexpected1}) + after 300 -> ok + end, + %% Close the link while our session incoming-window is closed. + ok = detach_link_sync(Receiver), + %% Open our incoming window. + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + %% Given that both endpoints have now destroyed the link, we do not + %% expect to receive any TRANSFER or FLOW frame referencing the destroyed link. + receive Unexpected2 -> ct:fail({unexpected, Unexpected2}) + after 300 -> ok + end, + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +incoming_window_closed_rabbitmq_internal_flow_classic_queue(Config) -> + incoming_window_closed_rabbitmq_internal_flow(<<"classic">>, Config). + +incoming_window_closed_rabbitmq_internal_flow_quorum_queue(Config) -> + incoming_window_closed_rabbitmq_internal_flow(<<"quorum">>, Config). + +incoming_window_closed_rabbitmq_internal_flow(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + %% Send many messages. + Num = 5_000, + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + ok = detach_link_sync(Sender), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + ok = close_incoming_window(Session), + %% Grant all link credit at once. + ok = amqp10_client:flow_link_credit(Receiver, Num, never), + %% Given our incoming window is closed, we shouldn't receive any TRANSFER yet. + receive Unexpected -> ct:fail({unexpected, Unexpected}) + after 200 -> ok + end, + + %% Here, we do a bit of white box testing: We assert that RabbitMQ has some form of internal + %% flow control by checking that the queue did not send all its messages to the server session + %% process. In other words, there should be ready messages in the queue. + MsgsReady = ready_messages(QName, Config), + ?assert(MsgsReady > 0), + + %% Open our incoming window. + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, Num}}}), + receive_messages(Receiver, Num), + + ok = detach_link_sync(Receiver), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +tcp_back_pressure_rabbitmq_internal_flow_classic_queue(Config) -> + tcp_back_pressure_rabbitmq_internal_flow(<<"classic">>, Config). + +tcp_back_pressure_rabbitmq_internal_flow_quorum_queue(Config) -> + tcp_back_pressure_rabbitmq_internal_flow(<<"quorum">>, Config). + +%% Test that RabbitMQ can handle clients that do not receive fast enough +%% causing TCP back-pressure to the server. RabbitMQ's internal flow control +%% writer proc <- session proc <- queue proc +%% should be able to protect the server by having the queue not send out all messages at once. +tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = <<"/queue/", QName/binary>>, + + OpnConf0 = connection_config(Config), + %% We also want to test the code path where large messages are split into multiple transfer frames. + OpnConf = OpnConf0#{max_frame_size => 600}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + %% Send many messages. + %% The messages should be somewhat large to fill up buffers causing TCP back-pressure. + BodySuffix = binary:copy(<<"x">>, 1000), + Num = 5000, + ok = send_messages(Sender, Num, false, BodySuffix), + ok = wait_for_accepts(Num), + ok = detach_link_sync(Sender), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail(missing_attached) + end, + flush(receiver_attached), + + {_GenStatemState, + #{reader := ReaderPid, + socket := {tcp, Socket}}} = formatted_state(Session), + + %% Provoke TCP back-pressure from client to server by using very small buffers. + ok = inet:setopts(Socket, [{recbuf, 256}, + {buffer, 256}]), + %% Suspend the receiving client such that it stops reading from its socket + %% causing TCP back-pressure to the server being applied. + true = erlang:suspend_process(ReaderPid), + + ok = amqp10_client:flow_link_credit(Receiver, Num, never), + %% We give the queue time to send messages to the session proc and writer proc. + timer:sleep(1000), + + %% Here, we do a bit of white box testing: We assert that RabbitMQ has some form of internal + %% flow control by checking that the queue sent some but, more importantly, not all its + %% messages to the server session and writer processes. In other words, there should be + %% ready messages in the queue. + MsgsReady = ready_messages(QName, Config), + ?assert(MsgsReady > 0), + ?assert(MsgsReady < Num), + + %% Use large buffers. This will considerably speed up receiving all messages (on Linux). + ok = inet:setopts(Socket, [{recbuf, 65536}, + {buffer, 65536}]), + %% When we resume the receiving client, we expect to receive all messages. + true = erlang:resume_process(ReaderPid), + receive_messages(Receiver, Num), + + ok = detach_link_sync(Receiver), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% internal %% @@ -4008,7 +4466,7 @@ wait_for_connection_close(Connection) -> ct:fail({connection_close_timeout, Connection}) end. -wait_for_settlement(Tag) -> +wait_for_accepted(Tag) -> wait_for_settlement(Tag, accepted). wait_for_settlement(Tag, State) -> @@ -4076,18 +4534,22 @@ count_received_messages0(Receiver, Count) -> receive {amqp10_msg, Receiver, _Msg} -> count_received_messages0(Receiver, Count + 1) - after 200 -> + after 1000 -> Count end. -send_messages(_Sender, 0, _Settled) -> - ok; send_messages(Sender, Left, Settled) -> + send_messages(Sender, Left, Settled, <<>>). + +send_messages(_, 0, _, _) -> + ok; +send_messages(Sender, Left, Settled, BodySuffix) -> Bin = integer_to_binary(Left), - Msg = amqp10_msg:new(Bin, Bin, Settled), + Body = <>, + Msg = amqp10_msg:new(Bin, Body, Settled), case amqp10_client:send_msg(Sender, Msg) of ok -> - send_messages(Sender, Left - 1, Settled); + send_messages(Sender, Left - 1, Settled, BodySuffix); {error, insufficient_credit} -> ok = wait_for_credit(Sender), %% The credited event we just processed could have been received some time ago, @@ -4099,7 +4561,7 @@ send_messages(Sender, Left, Settled) -> %% but do not process the credited event in our mailbox. %% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit} %% again causing us then to really wait to receive a credited event (instead of just processing an old credited event). - send_messages(Sender, Left, Settled) + send_messages(Sender, Left, Settled, BodySuffix) end. assert_link_credit_runs_out(_Sender, 0) -> @@ -4152,9 +4614,9 @@ consume_from_first(<<"stream">>) -> consume_from_first(_) -> #{}. -%% Return the formatted state of a gen_server via sys:get_status/1. +%% Return the formatted state of a gen_server or gen_statem via sys:get_status/1. %% (sys:get_state/1 is unformatted) -gen_server_state(Pid) -> +formatted_state(Pid) -> {status, _, _, L0} = sys:get_status(Pid, 20_000), L1 = lists:last(L0), {data, L2} = lists:last(L1), @@ -4180,6 +4642,14 @@ get_available_messages({link_ref, receiver, Session, OutputHandle}) -> {ok, Available} = maps:find(available, Link), Available. +ready_messages(QName, Config) + when is_binary(QName) -> + {ok, Q} = rpc(Config, rabbit_amqqueue, lookup, [QName, <<"/">>]), + {ok, MsgsReady, _ConsumerCount} = rpc(Config, rabbit_queue_type, stat, [Q]), + ?assert(is_integer(MsgsReady)), + ct:pal("Queue ~s has ~b ready messages.", [QName, MsgsReady]), + MsgsReady. + ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>). @@ -4212,3 +4682,6 @@ find_event(Type, Props, Events) when is_list(Props), is_list(Events) -> lists:keymember(Key, 1, EventProps) end, Props) end, Events). + +close_incoming_window(Session) -> + gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 0}}}). diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index e2df8e370e0c..cc5844076982 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -508,32 +508,29 @@ credit_api_v2(Config) -> (D, _) -> error({unexpected_delivery, D}) end), %% Grant 1 credit. - {F6, []} = rabbit_fifo_client:credit(CTag, DC0, 1, false, _Echo0 = true, F5), + {F6, []} = rabbit_fifo_client:credit(CTag, DC0, 1, false, F5), %% We expect exactly 1 message due to 1 credit being granted. {[{_, _, _, _, m1}], - %% We expect a credit_reply action due to echo=true + %% We always expect a credit_reply action. [{credit_reply, CTag, DC1, _Credit0 = 0, _Available0 = 1, _Drain0 = false}], F7} = process_ra_events(receive_ra_events(), ClusterName, F6), %% Again, grant 1 credit. %% However, because we still use the initial delivery count DC0, rabbit_fifo - %% wont' send us a new message since it already sent us m1 for that old delivery-count. + %% won't send us a new message since it already sent us m1 for that old delivery-count. %% In other words, this credit top up simulates in-flight deliveries. - {F8, []} = rabbit_fifo_client:credit(CTag, DC0, 1, false, _Echo1 = true, F7), + {F8, []} = rabbit_fifo_client:credit(CTag, DC0, 1, false, F7), {_NoMessages = [], - %% We still expect a credit_reply action due to echo=true [{credit_reply, CTag, DC1, _Credit1 = 0, _Available1 = 1, _Drain1 = false}], F9} = process_ra_events(receive_ra_events(), ClusterName, F8), %% Grant 4 credits and drain. - {F10, []} = rabbit_fifo_client:credit(CTag, DC1, 4, true, _Echo2 = false, F9), + {F10, []} = rabbit_fifo_client:credit(CTag, DC1, 4, true, F9), %% rabbit_fifo should advance the delivery-count as much as possible %% consuming all credits due to drain=true and insufficient messages in the queue. DC2 = DC1 + 4, %% We expect to receive m2 which is the only message in the queue. {[{_, _, _, _, m2}], - %% Even though echo=false, we still expect a credit_reply action due - %% drain=true and insufficient messages in the queue. [{credit_reply, CTag, DC2, _Credit2 = 0, _Available2 = 0, _Drain2 = true}], F11} = process_ra_events(receive_ra_events(), ClusterName, F10), flush(), @@ -548,11 +545,11 @@ credit_api_v2(Config) -> end), %% Grant 10 credits and receive the last message. - {F14, []} = rabbit_fifo_client:credit(CTag, DC2, 10, false, _Echo = false, F13), + {F14, []} = rabbit_fifo_client:credit(CTag, DC2, 10, false, F13), + DC3 = DC2 + 1, ?assertMatch( {[{_, _, _, _, m3}], - %% Due to echo=false, we don't expect a credit_reply action. - _NoCreditReplyAction = [], + [{credit_reply, CTag, DC3, _Credit3 = 9, _Available3 = 0, _Drain3 = false}], _F15}, process_ra_events(receive_ra_events(), ClusterName, F14)). untracked_enqueue(Config) -> diff --git a/deps/rabbit_common/src/credit_flow.erl b/deps/rabbit_common/src/credit_flow.erl index 5cef875835ec..5681f0ed1fa9 100644 --- a/deps/rabbit_common/src/credit_flow.erl +++ b/deps/rabbit_common/src/credit_flow.erl @@ -156,7 +156,7 @@ state_delayed(BlockedAt) -> B -> Now = erlang:monotonic_time(), Diff = erlang:convert_time_unit(Now - B, native, - micro_seconds), + microsecond), case Diff < ?STATE_CHANGE_INTERVAL of true -> flow; false -> running diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index b5d817cbf549..6324165976e4 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -24,7 +24,8 @@ precondition_failed/1, precondition_failed/2]). -export([type_class/1, assert_args_equivalence/4, assert_field_equivalence/4]). -export([table_lookup/2, set_table_value/4, amqp_table/1, to_amqp_table/1]). --export([r/3, r/2, r_arg/4, rs/1]). +-export([r/3, r/2, r_arg/4, rs/1, + queue_resource/2, exchange_resource/2]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). @@ -437,6 +438,16 @@ rs(#resource{virtual_host = VHostPath, kind = topic, name = Name}) -> rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> format("~ts '~ts' in vhost '~ts'", [Kind, Name, VHostPath]). +-spec queue_resource(rabbit_types:vhost(), resource_name()) -> + rabbit_types:r(queue). +queue_resource(VHostPath, Name) -> + r(VHostPath, queue, Name). + +-spec exchange_resource(rabbit_types:vhost(), resource_name()) -> + rabbit_types:r(exchange). +exchange_resource(VHostPath, Name) -> + r(VHostPath, exchange, Name). + enable_cover() -> enable_cover(["."]). enable_cover(Dirs) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index 73885ddba2e2..9e80e4043064 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -52,7 +52,7 @@ handle_event/3, settle/5, credit_v1/5, - credit/7, + credit/6, dequeue/5, state_info/1 ]). @@ -281,8 +281,8 @@ settle(A1,A2,A3,A4,A5) -> credit_v1(A1,A2,A3,A4,A5) -> ?UNSUPPORTED([A1,A2,A3,A4,A5]). -credit(A1,A2,A3,A4,A5,A6,A7) -> - ?UNSUPPORTED([A1,A2,A3,A4,A5,A6,A7]). +credit(A1,A2,A3,A4,A5,A6) -> + ?UNSUPPORTED([A1,A2,A3,A4,A5,A6]). dequeue(A1,A2,A3,A4,A5) -> ?UNSUPPORTED([A1,A2,A3,A4,A5]). diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index a5589290fa90..b487a49a1a08 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -49,7 +49,9 @@ await_shovel(Config, Node, Name) -> ?MODULE, await_shovel1, [Config, Name]). await_shovel1(_Config, Name) -> - await(fun () -> lists:member(Name, shovels_from_status()) end). + await(fun() -> + lists:member(Name, shovels_from_status()) + end, 30_000). shovels_from_status() -> S = rabbit_shovel_status:status(),