Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed May 30, 2024
1 parent 530d92d commit 83f8318
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 2 deletions.
2 changes: 1 addition & 1 deletion deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ handle_link_flow(#'v1_0.flow'{delivery_count = TheirDC,
available = Available,
drain = Drain},
Link0 = #link{role = receiver}) ->
Link = case Drain andalso TheirCredit =< 0 of
Link = case Drain andalso TheirCredit =:= 0 of
true ->
notify_credit_exhausted(Link0),
Link0#link{delivery_count = unpack(TheirDC),
Expand Down
150 changes: 149 additions & 1 deletion deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ groups() ->
roundtrip_with_drain_classic_queue,
roundtrip_with_drain_quorum_queue,
roundtrip_with_drain_stream,
drain_many_classic_queue,
drain_many_quorum_queue,
drain_many_stream,
amqp_stream_amqpl,
amqp_quorum_queue_amqpl,
message_headers_conversion,
Expand Down Expand Up @@ -106,6 +109,8 @@ groups() ->
dead_letter_headers_exchange,
dead_letter_reject,
immutable_bare_message,
receive_many_made_available_over_time,
receive_many_auto_flow,
incoming_window_closed_transfer_flow_order,
incoming_window_closed_close_link
]},
Expand Down Expand Up @@ -171,6 +176,7 @@ end_per_group(_, Config) ->
init_per_testcase(T, Config)
when T =:= message_headers_conversion orelse
T =:= roundtrip_with_drain_quorum_queue orelse
T =:= drain_many_quorum_queue orelse
T =:= timed_get_quorum_queue orelse
T =:= available_messages_quorum_queue ->
case rpc(Config, rabbit_feature_flags, is_enabled, [credit_api_v2]) of
Expand Down Expand Up @@ -646,6 +652,69 @@ roundtrip_with_drain(Config, QueueType, QName)
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:close_connection(Connection).

drain_many_classic_queue(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
drain_many(Config, <<"classic">>, QName).

drain_many_quorum_queue(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
drain_many(Config, <<"quorum">>, QName).

drain_many_stream(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
drain_many(Config, <<"stream">>, QName).

drain_many(Config, QueueType, QName)
when is_binary(QueueType) ->
Address = <<"/queue/", QName/binary>>,
{Connection, Session, LinkPair} = init(Config),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}},
{ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address),
wait_for_credit(Sender),

Num = 5000,
ok = send_messages(Sender, Num, false),
ok = wait_for_accepts(Num),

TerminusDurability = none,
Filter = consume_from_first(QueueType),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"test-receiver">>, Address, settled,
TerminusDurability, Filter),

ok = amqp10_client:flow_link_credit(Receiver, Num - 1, never, true),
?assertEqual(Num - 1, count_received_messages(Receiver)),
flush("drained 1"),

ok = amqp10_client:flow_link_credit(Receiver, Num, never, true),
?assertEqual(1, count_received_messages(Receiver)),
flush("drained 2"),

ok = send_messages(Sender, Num, false),
ok = wait_for_accepts(Num),
receive Unexpected -> ct:fail({unexpected, Unexpected})
after 10 -> ok
end,
%% Let's send 2 FLOW frames in sequence.
ok = amqp10_client:flow_link_credit(Receiver, Num, never, true),
ok = amqp10_client:flow_link_credit(Receiver, Num, never, true),
?assertEqual(Num, count_received_messages(Receiver)),
flush("drained 3"),

ok = send_messages(Sender, 1, false),
ok = wait_for_accepts(1),
%% Our receiver shouldn't have any credit left to consume the last message.
receive {amqp10_msg, _, _} -> ct:fail(unexpected_delivery)
after 30 -> ok
end,

ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:close_connection(Connection).

amqp_stream_amqpl(Config) ->
amqp_amqpl(<<"stream">>, Config).

Expand Down Expand Up @@ -3856,6 +3925,85 @@ footer_checksum(FooterOpt, Config) ->
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% This test grants many credits to the queue once while
%% messages are being made available at the source over time.
receive_many_made_available_over_time(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
QType = <<"quorum">>,
Address = <<"/queue/", QName/binary>>,
{Connection, Session, LinkPair} = init(Config),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address),
wait_for_credit(Sender),

%% Send first batch of messages.
ok = send_messages(Sender, 10, false),
ok = wait_for_accepts(10),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address, settled),
flush(attached),
%% Grant many credits to the queue once.
ok = amqp10_client:flow_link_credit(Receiver, 5000, never),
%% We expect to receive the first batch of messages.
?assertEqual(10, count_received_messages(Receiver)),

%% Make next batch of messages available.
ok = send_messages(Sender, 2990, false),
ok = wait_for_accepts(2990),
%% We expect to receive this batch of messages.
?assertEqual(2990, count_received_messages(Receiver)),

%% Make next batch of messages available.
ok = send_messages(Sender, 1999, false),
ok = wait_for_accepts(1999),
%% We expect to receive this batch of messages.
?assertEqual(1999, count_received_messages(Receiver)),

%% Make next batch of messages available.
ok = send_messages(Sender, 2, false),
ok = wait_for_accepts(2),
%% At this point, we only have 2 messages in the queue, but only 1 credit left.
?assertEqual(1, count_received_messages(Receiver)),

ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver),
{ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:close_connection(Connection).

receive_many_auto_flow(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
QType = <<"quorum">>,
Address = <<"/queue/", QName/binary>>,
{Connection, Session, LinkPair} = init(Config),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address),
wait_for_credit(Sender),

%% Send many messages.
Num = 10_000,
ok = send_messages(Sender, Num, false),
ok = wait_for_accepts(Num),

{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address, settled),
receive {amqp10_event, {link, Receiver, attached}} -> ok
after 5000 -> ct:fail(missing_attached)
end,
flush(receiver_attached),

%% Let's auto top up relatively often, but in large batches.
ok = amqp10_client:flow_link_credit(Receiver, 1300, 1200),
?assertEqual(Num, count_received_messages(Receiver)),

ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:close_connection(Connection).

%% This test ensures that the server sends us TRANSFER and FLOW frames in the correct order
%% even if the server is temporarily not allowed to send us any TRANSFERs due to our session
%% incoming-window being closed.
Expand Down Expand Up @@ -4126,7 +4274,7 @@ count_received_messages0(Receiver, Count) ->
receive
{amqp10_msg, Receiver, _Msg} ->
count_received_messages0(Receiver, Count + 1)
after 200 ->
after 1000 ->
Count
end.

Expand Down

0 comments on commit 83f8318

Please sign in to comment.