Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 194 additions & 45 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ all_tests() ->
dead_letter_to_quorum_queue,
dead_letter_from_classic_to_quorum_queue,
dead_letter_policy,
at_most_once_dead_letter_order_maxlen,
at_most_once_dead_letter_order_rejected,
at_most_once_dead_letter_order_delivery_limit,
at_most_once_dead_letter_order_expired,
cleanup_queue_state_on_channel_after_publish,
cleanup_queue_state_on_channel_after_subscribe,
sync_queue,
Expand All @@ -167,7 +171,6 @@ all_tests() ->
subscribe_redelivery_count,
message_bytes_metrics,
queue_length_limit_drop_head,
queue_length_bytes_limit_drop_head,
queue_length_limit_reject_publish,
queue_length_limit_policy_cleared,
subscribe_redelivery_limit,
Expand Down Expand Up @@ -2092,6 +2095,196 @@ dead_letter_policy(Config) ->
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>),
test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ).

%% Test that messages are at most once dead letter in the correct order
%% for reason 'maxlen'.
at_most_once_dead_letter_order_maxlen(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
DLQ = <<"dead letter queue">>,

?assertEqual({'queue.declare_ok', DLQ, 0, 0},
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-overflow">>, longstr, <<"drop-head">>},
{<<"x-max-length-bytes">>, long, 1000},
{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),

LargePayload = binary:copy(<<"x">>, 1500),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m1">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m2">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = LargePayload}),
wait_for_consensus(QQ, Config),
wait_for_consensus(DLQ, Config),
RaName = ra_name(DLQ),
wait_for_messages_ready(Servers, RaName, 3),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),

[?assertEqual(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- [QQ, DLQ]].

%% Test that messages are at most once dead letter in the correct order
%% for reason 'rejected'.
at_most_once_dead_letter_order_rejected(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
DLQ = <<"dead letter queue">>,

?assertEqual({'queue.declare_ok', DLQ, 0, 0},
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),

ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m1">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m2">>}),

ok = subscribe(Ch, QQ, false),
receive {_, #amqp_msg{payload = P1}} ->
?assertEqual(<<"m1">>, P1)
end,
receive {_, #amqp_msg{payload = P2}} ->
?assertEqual(<<"m2">>, P2)
end,
ok = amqp_channel:call(Ch, #'basic.nack'{delivery_tag = 0,
multiple = true,
requeue = false}),

wait_for_consensus(DLQ, Config),
wait_for_messages_ready(Servers, ra_name(DLQ), 2),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),

[?assertEqual(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- [QQ, DLQ]].

%% Test that messages are at most once dead letter in the correct order
%% for reason 'delivery_limit'.
at_most_once_dead_letter_order_delivery_limit(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
DLQ = <<"dead letter queue">>,

?assertEqual({'queue.declare_ok', DLQ, 0, 0},
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-delivery-limit">>, long, 0},
{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),

ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m1">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m2">>}),

ok = subscribe(Ch, QQ, false),
receive {_, #amqp_msg{payload = P1}} ->
?assertEqual(<<"m1">>, P1)
end,
receive {_, #amqp_msg{payload = P2}} ->
?assertEqual(<<"m2">>, P2)
end,
ok = amqp_channel:call(Ch, #'basic.nack'{delivery_tag = 0,
multiple = true,
requeue = true}),

wait_for_consensus(DLQ, Config),
wait_for_messages_ready(Servers, ra_name(DLQ), 2),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),

[?assertEqual(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- [QQ, DLQ]].

%% Test that messages are at most once dead letter in the correct order
%% for reason 'expired'.
at_most_once_dead_letter_order_expired(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
DLQ = <<"dead letter queue">>,

?assertEqual({'queue.declare_ok', DLQ, 0, 0},
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),

ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m1">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{props = #'P_basic'{expiration = <<"1">>},
payload = <<"m2">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{props = #'P_basic'{expiration = <<"1">>},
payload = <<"m3">>}),
wait_for_consensus(QQ, Config),
wait_for_messages_ready(Servers, ra_name(QQ), 3),

%% Let m2 and m3 expire before consuming m1.
timer:sleep(10),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})),

wait_for_consensus(DLQ, Config),
wait_for_messages_ready(Servers, ra_name(DLQ), 2),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m3">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),

[?assertEqual(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- [QQ, DLQ]].

invalid_policy(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Expand Down Expand Up @@ -3670,50 +3863,6 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).

queue_length_bytes_limit_drop_head(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
DLQ = <<"dead letter queue">>,

?assertEqual({'queue.declare_ok', DLQ, 0, 0},
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-overflow">>, longstr, <<"drop-head">>},
{<<"x-max-length-bytes">>, long, 1000},
{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),

LargePayload = binary:copy(<<"x">>, 1500),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m1">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m2">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = LargePayload}),
wait_for_consensus(QQ, Config),
wait_for_consensus(DLQ, Config),
RaName = ra_name(DLQ),
wait_for_messages_ready(Servers, RaName, 3),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),

[?assertEqual(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- [QQ, DLQ]].

queue_length_limit_reject_publish(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Expand Down
Loading