Skip to content

Commit

Permalink
Add stream tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Jun 4, 2024
1 parent 798bbfb commit e10f6f2
Showing 1 changed file with 33 additions and 9 deletions.
42 changes: 33 additions & 9 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ groups() ->
dead_letter_headers_exchange,
dead_letter_reject,
immutable_bare_message,
receive_many_made_available_over_time,
receive_many_auto_flow,
receive_many_made_available_over_time_classic_queue,
receive_many_made_available_over_time_quorum_queue,
receive_many_made_available_over_time_stream,
receive_many_auto_flow_classic_queue,
receive_many_auto_flow_quorum_queue,
receive_many_auto_flow_stream,
incoming_window_closed_transfer_flow_order,
incoming_window_closed_stop_link,
incoming_window_closed_close_link,
Expand Down Expand Up @@ -3979,11 +3983,19 @@ footer_checksum(FooterOpt, Config) ->
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

receive_many_made_available_over_time_classic_queue(Config) ->
receive_many_made_available_over_time(<<"classic">>, Config).

receive_many_made_available_over_time_quorum_queue(Config) ->
receive_many_made_available_over_time(<<"quorum">>, Config).

receive_many_made_available_over_time_stream(Config) ->
receive_many_made_available_over_time(<<"stream">>, Config).

%% This test grants many credits to the queue once while
%% messages are being made available at the source over time.
receive_many_made_available_over_time(Config) ->
receive_many_made_available_over_time(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
QType = <<"quorum">>,
Address = <<"/queue/", QName/binary>>,
{Connection, Session, LinkPair} = init(Config),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
Expand All @@ -3994,8 +4006,10 @@ receive_many_made_available_over_time(Config) ->
%% Send first batch of messages.
ok = send_messages(Sender, 10, false),
ok = wait_for_accepts(10),
Filter = consume_from_first(QType),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address, settled),
Session, <<"receiver">>, Address,
settled, configuration, Filter),
flush(attached),
%% Grant many credits to the queue once.
ok = amqp10_client:flow_link_credit(Receiver, 5000, never),
Expand All @@ -4022,13 +4036,21 @@ receive_many_made_available_over_time(Config) ->

ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver),
{ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:close_connection(Connection).

receive_many_auto_flow(Config) ->
receive_many_auto_flow_classic_queue(Config) ->
receive_many_auto_flow(<<"classic">>, Config).

receive_many_auto_flow_quorum_queue(Config) ->
receive_many_auto_flow(<<"quorum">>, Config).

receive_many_auto_flow_stream(Config) ->
receive_many_auto_flow(<<"stream">>, Config).

receive_many_auto_flow(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
QType = <<"quorum">>,
Address = <<"/queue/", QName/binary>>,
{Connection, Session, LinkPair} = init(Config),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
Expand All @@ -4041,8 +4063,10 @@ receive_many_auto_flow(Config) ->
ok = send_messages(Sender, Num, false),
ok = wait_for_accepts(Num),

Filter = consume_from_first(QType),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address, settled),
Session, <<"receiver">>, Address,
settled, configuration, Filter),
receive {amqp10_event, {link, Receiver, attached}} -> ok
after 5000 -> ct:fail(missing_attached)
end,
Expand Down

0 comments on commit e10f6f2

Please sign in to comment.