diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 43480ec9a36..79eebd9c8b5 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -153,6 +153,10 @@ all_tests() -> dead_letter_to_quorum_queue, dead_letter_from_classic_to_quorum_queue, dead_letter_policy, + at_most_once_dead_letter_order_maxlen, + at_most_once_dead_letter_order_rejected, + at_most_once_dead_letter_order_delivery_limit, + at_most_once_dead_letter_order_expired, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, sync_queue, @@ -167,7 +171,6 @@ all_tests() -> subscribe_redelivery_count, message_bytes_metrics, queue_length_limit_drop_head, - queue_length_bytes_limit_drop_head, queue_length_limit_reject_publish, queue_length_limit_policy_cleared, subscribe_redelivery_limit, @@ -2092,6 +2095,196 @@ dead_letter_policy(Config) -> ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>), test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ). +%% Test that messages are at most once dead letter in the correct order +%% for reason 'maxlen'. +at_most_once_dead_letter_order_maxlen(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + DLQ = <<"dead letter queue">>, + + ?assertEqual({'queue.declare_ok', DLQ, 0, 0}, + declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-overflow">>, longstr, <<"drop-head">>}, + {<<"x-max-length-bytes">>, long, 1000}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLQ}])), + + LargePayload = binary:copy(<<"x">>, 1500), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m1">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m2">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = LargePayload}), + wait_for_consensus(QQ, Config), + wait_for_consensus(DLQ, Config), + RaName = ra_name(DLQ), + wait_for_messages_ready(Servers, RaName, 3), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + + [?assertEqual(#'queue.delete_ok'{message_count = 0}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})) + || Q <- [QQ, DLQ]]. + +%% Test that messages are at most once dead letter in the correct order +%% for reason 'rejected'. +at_most_once_dead_letter_order_rejected(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + DLQ = <<"dead letter queue">>, + + ?assertEqual({'queue.declare_ok', DLQ, 0, 0}, + declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLQ}])), + + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m1">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m2">>}), + + ok = subscribe(Ch, QQ, false), + receive {_, #amqp_msg{payload = P1}} -> + ?assertEqual(<<"m1">>, P1) + end, + receive {_, #amqp_msg{payload = P2}} -> + ?assertEqual(<<"m2">>, P2) + end, + ok = amqp_channel:call(Ch, #'basic.nack'{delivery_tag = 0, + multiple = true, + requeue = false}), + + wait_for_consensus(DLQ, Config), + wait_for_messages_ready(Servers, ra_name(DLQ), 2), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + + [?assertEqual(#'queue.delete_ok'{message_count = 0}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})) + || Q <- [QQ, DLQ]]. + +%% Test that messages are at most once dead letter in the correct order +%% for reason 'delivery_limit'. +at_most_once_dead_letter_order_delivery_limit(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + DLQ = <<"dead letter queue">>, + + ?assertEqual({'queue.declare_ok', DLQ, 0, 0}, + declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-delivery-limit">>, long, 0}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLQ}])), + + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m1">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m2">>}), + + ok = subscribe(Ch, QQ, false), + receive {_, #amqp_msg{payload = P1}} -> + ?assertEqual(<<"m1">>, P1) + end, + receive {_, #amqp_msg{payload = P2}} -> + ?assertEqual(<<"m2">>, P2) + end, + ok = amqp_channel:call(Ch, #'basic.nack'{delivery_tag = 0, + multiple = true, + requeue = true}), + + wait_for_consensus(DLQ, Config), + wait_for_messages_ready(Servers, ra_name(DLQ), 2), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + + [?assertEqual(#'queue.delete_ok'{message_count = 0}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})) + || Q <- [QQ, DLQ]]. + +%% Test that messages are at most once dead letter in the correct order +%% for reason 'expired'. +at_most_once_dead_letter_order_expired(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + DLQ = <<"dead letter queue">>, + + ?assertEqual({'queue.declare_ok', DLQ, 0, 0}, + declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLQ}])), + + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m1">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{expiration = <<"1">>}, + payload = <<"m2">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{expiration = <<"1">>}, + payload = <<"m3">>}), + wait_for_consensus(QQ, Config), + wait_for_messages_ready(Servers, ra_name(QQ), 3), + + %% Let m2 and m3 expire before consuming m1. + timer:sleep(10), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + + wait_for_consensus(DLQ, Config), + wait_for_messages_ready(Servers, ra_name(DLQ), 2), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m3">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + + [?assertEqual(#'queue.delete_ok'{message_count = 0}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})) + || Q <- [QQ, DLQ]]. + invalid_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -3670,50 +3863,6 @@ queue_length_limit_drop_head(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})). -queue_length_bytes_limit_drop_head(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - DLQ = <<"dead letter queue">>, - - ?assertEqual({'queue.declare_ok', DLQ, 0, 0}, - declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-overflow">>, longstr, <<"drop-head">>}, - {<<"x-max-length-bytes">>, long, 1000}, - {<<"x-dead-letter-exchange">>, longstr, <<>>}, - {<<"x-dead-letter-routing-key">>, longstr, DLQ}])), - - LargePayload = binary:copy(<<"x">>, 1500), - ok = amqp_channel:cast(Ch, - #'basic.publish'{routing_key = QQ}, - #amqp_msg{payload = <<"m1">>}), - ok = amqp_channel:cast(Ch, - #'basic.publish'{routing_key = QQ}, - #amqp_msg{payload = <<"m2">>}), - ok = amqp_channel:cast(Ch, - #'basic.publish'{routing_key = QQ}, - #amqp_msg{payload = LargePayload}), - wait_for_consensus(QQ, Config), - wait_for_consensus(DLQ, Config), - RaName = ra_name(DLQ), - wait_for_messages_ready(Servers, RaName, 3), - ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, - amqp_channel:call(Ch, #'basic.get'{queue = DLQ, - no_ack = true})), - ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, - amqp_channel:call(Ch, #'basic.get'{queue = DLQ, - no_ack = true})), - ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}}, - amqp_channel:call(Ch, #'basic.get'{queue = DLQ, - no_ack = true})), - - [?assertEqual(#'queue.delete_ok'{message_count = 0}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})) - || Q <- [QQ, DLQ]]. - queue_length_limit_reject_publish(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),