Skip to content

Commit

Permalink
Merge pull request #7141 from rabbitmq/send-drained-bug-fix
Browse files Browse the repository at this point in the history
Fix channel crash when draining AMQP 1.0 credits from classic queue
  • Loading branch information
michaelklishin committed Feb 2, 2023
2 parents 535a0be + 3bb3273 commit 70094e6
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 60 deletions.
11 changes: 10 additions & 1 deletion deps/amqp10_client/src/amqp10_client.erl
Expand Up @@ -269,7 +269,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
snd_settle_mode(), terminus_durability(), filter(),
properties()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
when is_pid(Session) andalso
is_binary(Name) andalso
is_binary(Source) andalso
(SettleMode == unsettled orelse
SettleMode == settled orelse
SettleMode == mixed) andalso
is_atom(Durability) andalso
is_map(Filter) andalso
is_map(Properties) ->
AttachArgs = #{name => Name,
role => {receiver, #{address => Source,
durable => Durability}, self()},
Expand Down
80 changes: 46 additions & 34 deletions deps/amqp10_client/src/amqp10_client_session.erl
Expand Up @@ -648,41 +648,51 @@ translate_terminus_durability(none) -> 0;
translate_terminus_durability(configuration) -> 1;
translate_terminus_durability(unsettled_state) -> 2.

translate_filters(Filters) when is_map(Filters) andalso map_size(Filters) =< 0 -> undefined;
translate_filters(Filters) when is_map(Filters) -> {
map,
maps:fold(
fun(<<"apache.org:legacy-amqp-direct-binding:string">> = K, V, Acc) when is_binary(V) ->
[{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc];
(<<"apache.org:legacy-amqp-topic-binding:string">> = K, V, Acc) when is_binary(V) ->
[{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc];
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
[{{symbol, K}, {described, {symbol, K}, translate_legacy_amqp_headers_binding(V)}} | Acc];
(<<"apache.org:no-local-filter:list">> = K, V, Acc) when is_list(V) ->
[{{symbol, K}, {described, {symbol, K}, lists:map(fun(Id) -> {utf8, Id} end, V)}} | Acc];
(<<"apache.org:selector-filter:string">> = K, V, Acc) when is_binary(V) ->
[{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]
end,
[],
Filters)
}.
translate_filters(Filters)
when is_map(Filters) andalso
map_size(Filters) == 0 ->
undefined;
translate_filters(Filters)
when is_map(Filters) ->
{map,
maps:fold(
fun
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
%% special case conversion
Key = sym(K),
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
(K, V, Acc) when is_binary(K) ->
%% try treat any filter value generically
Key = sym(K),
Value = filter_value_type(V),
[{Key, {described, Key, Value}} | Acc]
end, [], Filters)}.

filter_value_type(V) when is_binary(V) ->
%% this is clearly not always correct
{utf8, V};
filter_value_type(V)
when is_integer(V) andalso V >= 0 ->
{uint, V};
filter_value_type(VList) when is_list(VList) ->
[filter_value_type(V) || V <- VList];
filter_value_type({T, _} = V) when is_atom(T) ->
%% looks like an already tagged type, just pass it through
V.

% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
translate_legacy_amqp_headers_binding(LegacyHeaders) -> {
map,
maps:fold(
fun(<<"x-match">> = K, <<"any">> = V, Acc) ->
[{{utf8, K}, {utf8, V}} | Acc];
(<<"x-match">> = K, <<"all">> = V, Acc) ->
[{{utf8, K}, {utf8, V}} | Acc];
(<<"x-",_/binary>>, _, Acc) ->
Acc;
(K, V, Acc) ->
[{{utf8, K}, {utf8, V}} | Acc]
end,
[],
LegacyHeaders)
}.
translate_legacy_amqp_headers_binding(LegacyHeaders) ->
{map,
maps:fold(
fun(<<"x-match">> = K, <<"any">> = V, Acc) ->
[{{utf8, K}, {utf8, V}} | Acc];
(<<"x-match">> = K, <<"all">> = V, Acc) ->
[{{utf8, K}, {utf8, V}} | Acc];
(<<"x-", _/binary>>, _, Acc) ->
Acc;
(K, V, Acc) ->
[{{utf8, K}, filter_value_type(V)} | Acc]
end, [], LegacyHeaders)}.

send_detach(Send, {detach, OutHandle}, _From, State = #state{links = Links}) ->
case Links of
Expand Down Expand Up @@ -1011,8 +1021,10 @@ wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).

utf8(V) -> amqp10_client_types:utf8(V).

sym(B) when is_binary(B) -> {symbol, B};
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
sym(B) when is_binary(B) -> {symbol, B}.
sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand Down
30 changes: 20 additions & 10 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -2831,16 +2831,7 @@ evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
handle_queue_actions(Actions, #ch{} = State0) ->
WriterPid = State0#ch.cfg#conf.writer_pid,
lists:foldl(
fun ({send_credit_reply, Avail}, S0) ->
ok = rabbit_writer:send_command(WriterPid,
#'basic.credit_ok'{available = Avail}),
S0;
({send_drained, {CTag, Credit}}, S0) ->
ok = rabbit_writer:send_command(
WriterPid,
#'basic.credit_drained'{consumer_tag = CTag,
credit_drained = Credit}),
S0;
fun
({settled, QRef, MsgSeqNos}, S0) ->
confirm(MsgSeqNos, QRef, S0);
({rejected, _QRef, MsgSeqNos}, S0) ->
Expand All @@ -2865,9 +2856,28 @@ handle_queue_actions(Actions, #ch{} = State0) ->
S0;
({unblock, QName}, S0) ->
credit_flow:unblock(QName),
S0;
({send_credit_reply, Avail}, S0) ->
ok = rabbit_writer:send_command(WriterPid,
#'basic.credit_ok'{available = Avail}),
S0;
({send_drained, {CTag, Credit}}, S0) ->
ok = send_drained_to_writer(WriterPid, CTag, Credit),
S0;
({send_drained, CTagCredits}, S0) when is_list(CTagCredits) ->
%% this is the backwards compatible option that classic queues
%% used to send, this can be removed in 4.0
[ok = send_drained_to_writer(WriterPid, CTag, Credit)
|| {CTag, Credit} <- CTagCredits],
S0
end, State0, Actions).

send_drained_to_writer(WriterPid, CTag, Credit) ->
ok = rabbit_writer:send_command(
WriterPid,
#'basic.credit_drained'{consumer_tag = CTag,
credit_drained = Credit}).

maybe_increase_global_publishers(#ch{publishing_mode = true} = State0) ->
State0;
maybe_increase_global_publishers(State0) ->
Expand Down
9 changes: 7 additions & 2 deletions deps/rabbit/src/rabbit_classic_queue.erl
Expand Up @@ -551,9 +551,14 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) ->
Evt = {queue_event, QName, Deliver},
gen_server:cast(Pid, Evt).

send_drained(Pid, QName, CTagCredits) ->
send_drained(Pid, QName, CTagCredits) when is_list(CTagCredits) ->
[_ = gen_server:cast(Pid, {queue_event, QName,
{send_drained, CTagCredit}})
|| CTagCredit <- CTagCredits],
ok;
send_drained(Pid, QName, CTagCredit) when is_tuple(CTagCredit) ->
gen_server:cast(Pid, {queue_event, QName,
{send_drained, CTagCredits}}).
{send_drained, CTagCredit}}).

send_credit_reply(Pid, QName, Len) when is_integer(Len) ->
gen_server:cast(Pid, {queue_event, QName,
Expand Down
10 changes: 5 additions & 5 deletions deps/rabbit/src/rabbit_queue_consumers.erl
Expand Up @@ -193,9 +193,9 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
end.

-spec send_drained(rabbit_amqqueue:name()) -> 'ok'.

send_drained(QName) -> [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()],
ok.
send_drained(QName) ->
[update_ch_record(send_drained(QName, C)) || C <- all_ch_record()],
ok.

-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
rabbit_amqqueue:name(), state(), boolean(),
Expand Down Expand Up @@ -528,15 +528,15 @@ send_drained(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
case rabbit_limiter:drained(Limiter) of
{[], Limiter} -> C;
{CTagCredits, Limiter2} ->
rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits),
ok = rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits),
C#cr{limiter = Limiter2}
end.

credit_and_drain(QName, C = #cr{ch_pid = ChPid, limiter = Limiter},
CTag, Credit, Mode, IsEmpty) ->
case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
{true, Limiter1} ->
rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]),
ok = rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]),
C#cr{limiter = Limiter1};
{false, Limiter1} -> C#cr{limiter = Limiter1}
end.
Expand Down
16 changes: 15 additions & 1 deletion deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
Expand Up @@ -250,7 +250,7 @@ transferred(DeliveryTag, Channel,

source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
Key = {symbol, <<"rabbitmq:stream-offset-spec">>},
case lists:keyfind(Key, 1, KVList) of
case keyfind_unpack_described(Key, KVList) of
{_, {timestamp, Ts}} ->
[{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
{_, {utf8, Spec}} ->
Expand All @@ -262,3 +262,17 @@ source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
end;
source_filters_to_consumer_args(_Source) ->
[].

keyfind_unpack_described(Key, KvList) ->
%% filterset values _should_ be described values
%% they aren't always however for historical reasons so we need this bit of
%% code to return a plain value for the given filter key
case lists:keyfind(Key, 1, KvList) of
{Key, {described, Key, Value}} ->
{Key, Value};
{Key, _} = Kv ->
Kv;
false ->
false
end.

41 changes: 34 additions & 7 deletions deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
Expand Up @@ -24,7 +24,9 @@ groups() ->
[
{tests, [], [
reliable_send_receive_with_outcomes,
roundtrip_classic_queue_with_drain,
roundtrip_quorum_queue_with_drain,
roundtrip_stream_queue_with_drain,
message_headers_conversion
]},
{metrics, [], [
Expand Down Expand Up @@ -149,16 +151,28 @@ reliable_send_receive(Config, Outcome) ->

ok.

roundtrip_classic_queue_with_drain(Config) ->
QName = atom_to_binary(?FUNCTION_NAME, utf8),
roundtrip_queue_with_drain(Config, <<"classic">>, QName).

roundtrip_quorum_queue_with_drain(Config) ->
QName = atom_to_binary(?FUNCTION_NAME, utf8),
roundtrip_queue_with_drain(Config, <<"quorum">>, QName).

roundtrip_stream_queue_with_drain(Config) ->
QName = atom_to_binary(?FUNCTION_NAME, utf8),
roundtrip_queue_with_drain(Config, <<"stream">>, QName).

roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QName = atom_to_binary(?FUNCTION_NAME, utf8),
Address = <<"/amq/queue/", QName/binary>>,
%% declare a quorum queue
%% declare a queue
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
Args = [{<<"x-queue-type">>, longstr, QueueType}],
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
arguments = Args}),
% create a configuration map
OpnConf = #{address => Host,
port => Port,
Expand All @@ -182,16 +196,29 @@ roundtrip_quorum_queue_with_drain(Config) ->

flush("pre-receive"),
% create a receiver link
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
<<"test-receiver">>,
Address),

TerminusDurability = none,
Filter = case QueueType of
<<"stream">> ->
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>};
_ ->
#{}
end,
Properties = #{},
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
Address, unsettled,
TerminusDurability,
Filter, Properties),

% grant credit and drain
ok = amqp10_client:flow_link_credit(Receiver, 1, never, true),

% wait for a delivery
receive
{amqp10_msg, Receiver, _InMsg} -> ok
{amqp10_msg, Receiver, InMsg} ->
ok = amqp10_client:accept_msg(Receiver, InMsg),
wait_for_accepts(1),
ok
after 2000 ->
exit(delivery_timeout)
end,
Expand Down

0 comments on commit 70094e6

Please sign in to comment.