Skip to content

Commit

Permalink
Merge pull request #11307 from rabbitmq/amqp-flow-control-poc-2
Browse files Browse the repository at this point in the history
Introduce outbound RabbitMQ internal AMQP flow control
  • Loading branch information
kjnilsson committed Jun 5, 2024
2 parents 5f659b5 + cf3c8ba commit ebbff46
Show file tree
Hide file tree
Showing 18 changed files with 1,269 additions and 345 deletions.
12 changes: 8 additions & 4 deletions deps/amqp10_client/src/amqp10_client_frame_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,18 @@ handle_input(StateName, Data, State) ->
defer_heartbeat_timer(State =
#state{heartbeat_timer_ref = TRef,
connection_config = #{idle_time_out := T}})
when is_number(T) andalso T > 0 ->
when is_integer(T) andalso T > 0 ->
_ = case TRef of
undefined -> ok;
_ -> _ = erlang:cancel_timer(TRef)
undefined ->
ok;
_ ->
erlang:cancel_timer(TRef, [{async, true},
{info, false}])
end,
NewTRef = erlang:send_after(T * 2, self(), heartbeat),
State#state{heartbeat_timer_ref = NewTRef};
defer_heartbeat_timer(State) -> State.
defer_heartbeat_timer(State) ->
State.

route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) ->
{DestinationPid, State} = find_destination(Channel, FrameType, Performative,
Expand Down
8 changes: 6 additions & 2 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -902,9 +902,10 @@ handle_link_flow(#'v1_0.flow'{delivery_count = MaybeTheirDC,
handle_link_flow(#'v1_0.flow'{delivery_count = TheirDC,
link_credit = {uint, TheirCredit},
available = Available,
drain = Drain},
drain = Drain0},
Link0 = #link{role = receiver}) ->
Link = case Drain andalso TheirCredit =< 0 of
Drain = default(Drain0, false),
Link = case Drain andalso TheirCredit =:= 0 of
true ->
notify_credit_exhausted(Link0),
Link0#link{delivery_count = unpack(TheirDC),
Expand Down Expand Up @@ -1212,6 +1213,9 @@ boolean_to_role(?AMQP_ROLE_SENDER) ->
boolean_to_role(?AMQP_ROLE_RECEIVER) ->
receiver.

default(undefined, Default) -> Default;
default(Thing, _Default) -> Thing.

format_status(Status = #{data := Data0}) ->
#state{channel = Channel,
remote_channel = RemoteChannel,
Expand Down
21 changes: 12 additions & 9 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
-import(rabbit_amqp_session,
[check_resource_access/4,
check_read_permitted_on_topic/4]).
-import(rabbit_misc,
[queue_resource/2,
exchange_resource/2]).

-type permission_caches() :: {rabbit_amqp_session:permission_cache(),
rabbit_amqp_session:topic_permission_cache()}.
Expand Down Expand Up @@ -78,7 +81,7 @@ handle_http_req(<<"GET">>,
_ConnPid,
PermCaches) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
QName = queue_resource(Vhost, QNameBin),
case rabbit_amqqueue:with(
QName,
fun(Q) ->
Expand Down Expand Up @@ -119,7 +122,7 @@ handle_http_req(HttpMethod = <<"PUT">>,
_ -> ok
end,
ok = prohibit_cr_lf(QNameBin),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
QName = queue_resource(Vhost, QNameBin),
ok = prohibit_reserved_amq(QName),
PermCache1 = check_resource_access(QName, configure, User, PermCache0),
rabbit_core_metrics:queue_declared(QName),
Expand Down Expand Up @@ -193,7 +196,7 @@ handle_http_req(<<"PUT">>,
catch exit:#amqp_error{explanation = Explanation} ->
throw(<<"400">>, Explanation, [])
end,
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
XName = exchange_resource(Vhost, XNameBin),
ok = prohibit_default_exchange(XName),
PermCache = check_resource_access(XName, configure, User, PermCache0),
X = case rabbit_exchange:lookup(XName) of
Expand Down Expand Up @@ -224,7 +227,7 @@ handle_http_req(<<"DELETE">>,
ConnPid,
{PermCache0, TopicPermCache}) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
QName = queue_resource(Vhost, QNameBin),
PermCache = check_resource_access(QName, read, User, PermCache0),
try rabbit_amqqueue:with_exclusive_access_or_die(
QName, ConnPid,
Expand Down Expand Up @@ -252,7 +255,7 @@ handle_http_req(<<"DELETE">>,
ConnPid,
{PermCache0, TopicPermCache}) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
QName = queue_resource(Vhost, QNameBin),
ok = prohibit_cr_lf(QNameBin),
PermCache = check_resource_access(QName, configure, User, PermCache0),
try rabbit_amqqueue:delete_with(QName, ConnPid, false, false, Username, true) of
Expand All @@ -272,7 +275,7 @@ handle_http_req(<<"DELETE">>,
_ConnPid,
{PermCache0, TopicPermCache}) ->
XNameBin = uri_string:unquote(XNameBinQuoted),
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
XName = exchange_resource(Vhost, XNameBin),
ok = prohibit_cr_lf(XNameBin),
ok = prohibit_default_exchange(XName),
ok = prohibit_reserved_amq(XName),
Expand All @@ -297,7 +300,7 @@ handle_http_req(<<"POST">>,
#{destination_exchange := Bin} ->
{exchange, Bin}
end,
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
SrcXName = exchange_resource(Vhost, SrcXNameBin),
DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin),
PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0),
Binding = #binding{source = SrcXName,
Expand All @@ -320,7 +323,7 @@ handle_http_req(<<"DELETE">>,
DstNameBin,
BindingKey,
ArgsHash} = decode_binding_path_segment(BindingSegment),
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
SrcXName = exchange_resource(Vhost, SrcXNameBin),
DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin),
PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0),
Bindings = rabbit_binding:list_for_source_and_destination(SrcXName, DstName),
Expand Down Expand Up @@ -352,7 +355,7 @@ handle_http_req(<<"GET">>,
"missing 'dste' or 'dstq' in query: ~tp",
QueryMap)
end,
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
SrcXName = exchange_resource(Vhost, SrcXNameBin),
DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin),
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName),
Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key],
Expand Down
Loading

0 comments on commit ebbff46

Please sign in to comment.