diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index b0f0a43967fb..997a2bb26bc2 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -853,6 +853,8 @@ overview(#?STATE{consumers = Cons, Conf = #{name => Cfg#cfg.name, resource => Cfg#cfg.resource, dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler, + dead_letter_handler => Cfg#cfg.dead_letter_handler, + overflow_strategy => Cfg#cfg.overflow_strategy, max_length => Cfg#cfg.max_length, max_bytes => Cfg#cfg.max_bytes, consumer_strategy => Cfg#cfg.consumer_strategy, diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 6c967b396d7a..f63edc9a2449 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -316,9 +316,8 @@ declare_queue_error(Error, Queue, Leader, ActingUser) -> ra_machine(Q) -> {module, rabbit_fifo, ra_machine_config(Q)}. -ra_machine_config(Q) when ?is_amqqueue(Q) -> +gather_policy_config(Q, IsQueueDeclaration) -> QName = amqqueue:get_name(Q), - {Name, _} = amqqueue:get_pid(Q), %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q), @@ -327,18 +326,22 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>, fun resolve_delivery_limit/2, Q) of undefined -> - rabbit_log:info("~ts: delivery_limit not set, defaulting to ~b", - [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]), + case IsQueueDeclaration of + true -> + rabbit_log:info( + "~ts: delivery_limit not set, defaulting to ~b", + [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]); + false -> + ok + end, ?DEFAULT_DELIVERY_LIMIT; DL -> DL end, Expires = args_policy_lookup(<<"expires">>, fun min/2, Q), MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q), - #{name => Name, - queue_resource => QName, - dead_letter_handler => dead_letter_handler(Q, Overflow), - become_leader_handler => {?MODULE, become_leader, [QName]}, + DeadLetterHandler = dead_letter_handler(Q, Overflow), + #{dead_letter_handler => DeadLetterHandler, max_length => MaxLength, max_bytes => MaxBytes, single_active_consumer_on => single_active_consumer_on(Q), @@ -349,6 +352,18 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> msg_ttl => MsgTTL }. +ra_machine_config(Q) when ?is_amqqueue(Q) -> + PolicyConfig = gather_policy_config(Q, true), + QName = amqqueue:get_name(Q), + {Name, _} = amqqueue:get_pid(Q), + PolicyConfig#{ + name => Name, + queue_resource => QName, + become_leader_handler => {?MODULE, become_leader, [QName]}, + single_active_consumer_on => single_active_consumer_on(Q), + created => erlang:system_time(millisecond) + }. + resolve_delivery_limit(PolVal, ArgVal) when PolVal < 0 orelse ArgVal < 0 -> max(PolVal, ArgVal); @@ -624,7 +639,9 @@ handle_tick(QName, ok; _ -> ok - end + end, + maybe_apply_policies(Q, Overview), + ok catch _:Err -> rabbit_log:debug("~ts: handle tick failed with ~p", @@ -708,6 +725,21 @@ system_recover(quorum_queues) -> ok end. +maybe_apply_policies(Q, #{config := CurrentConfig}) -> + NewPolicyConfig = gather_policy_config(Q, false), + + RelevantKeys = maps:keys(NewPolicyConfig), + CurrentPolicyConfig = maps:with(RelevantKeys, CurrentConfig), + + ShouldUpdate = NewPolicyConfig =/= CurrentPolicyConfig, + case ShouldUpdate of + true -> + rabbit_log:debug("Re-applying policies to ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]), + policy_changed(Q), + ok; + false -> ok + end. + -spec recover(binary(), [amqqueue:amqqueue()]) -> {[amqqueue:amqqueue()], [amqqueue:amqqueue()]}. recover(_Vhost, Queues) -> @@ -2064,3 +2096,4 @@ file_handle_other_reservation() -> file_handle_release_reservation() -> ok. + diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index deaf095409d9..718754cd4eb8 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -95,7 +95,8 @@ groups() -> single_active_consumer_priority, force_shrink_member_to_current_member, force_all_queues_shrink_member_to_current_member, - force_vhost_queues_shrink_member_to_current_member + force_vhost_queues_shrink_member_to_current_member, + policy_repair ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -1300,6 +1301,159 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> ?assertEqual(3, length(Nodes0)) end || Q <- QQs, VHost <- VHosts]. +% Tests that, if the process of a QQ is dead in the moment of declaring a policy +% that affects such queue, when the process is made available again, the policy +% will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863) +policy_repair(Config) -> + [Server0, _Server1, _Server2] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + ExpectedMaxLength1 = 10, + Priority1 = 1, + ok = rabbit_ct_broker_helpers:rpc( + Config, + 0, + rabbit_policy, + set, + [ + <<"/">>, + <>, + QQ, + [{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}], + Priority1, + <<"quorum_queues">>, + <<"acting-user">> + ]), + + % Wait for the policy to apply + QueryFun = fun rabbit_fifo:overview/1, + ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _}, + rpc:call(Server0, ra, local_query, [RaName, QueryFun]), + ?DEFAULT_AWAIT), + + % Check the policy has been applied + % Insert MaxLength1 + some messages but after consuming all messages only + % MaxLength1 are retrieved. + % Checking twice to ensure consistency + publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1), + % +1 because QQs let one pass + wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1), + fail = publish_confirm(Ch, QQ), + fail = publish_confirm(Ch, QQ), + consume_all(Ch, QQ), + + % Set higher priority policy, allowing more messages + ExpectedMaxLength2 = 20, + Priority2 = 2, + ok = rabbit_ct_broker_helpers:rpc( + Config, + 0, + rabbit_policy, + set, + [ + <<"/">>, + <>, + QQ, + [{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}], + Priority2, + <<"quorum_queues">>, + <<"acting-user">> + ]), + + % Wait for the policy to apply + ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _}, + rpc:call(Server0, ra, local_query, [RaName, QueryFun]), + ?DEFAULT_AWAIT), + + % Check the policy has been applied + % Insert MaxLength2 + some messages but after consuming all messages only + % MaxLength2 are retrieved. + % Checking twice to ensure consistency. + % + 1 because QQs let one pass + publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1), + wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1), + fail = publish_confirm(Ch, QQ), + fail = publish_confirm(Ch, QQ), + consume_all(Ch, QQ), + + % Ensure the queue process is unavailable + lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers), + + % Add policy with higher priority, allowing even more messages. + ExpectedMaxLength3 = 30, + Priority3 = 3, + ok = rabbit_ct_broker_helpers:rpc( + Config, + 0, + rabbit_policy, + set, + [ + <<"/">>, + <>, + QQ, + [{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}], + Priority3, + <<"quorum_queues">>, + <<"acting-user">> + ]), + + % Restart the queue process. + {ok, Queue} = + rabbit_ct_broker_helpers:rpc( + Config, + 0, + rabbit_amqqueue, + lookup, + [{resource, <<"/">>, queue, QQ}]), + lists:foreach( + fun(Srv) -> + rabbit_ct_broker_helpers:rpc( + Config, + Srv, + rabbit_quorum_queue, + recover, + [foo, [Queue]] + ) + end, + Servers), + + % Wait for the queue to be available again. + lists:foreach(fun(Srv) -> + rabbit_ct_helpers:await_condition( + fun () -> + is_pid( + rabbit_ct_broker_helpers:rpc( + Config, + Srv, + erlang, + whereis, + [RaName])) + end) + end, + Servers), + + % Wait for the policy to apply + ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _}, + rpc:call(Server0, ra, local_query, [RaName, QueryFun]), + ?DEFAULT_AWAIT), + + % Check the policy has been applied + % Insert MaxLength3 + some messages but after consuming all messages only + % MaxLength3 are retrieved. + % Checking twice to ensure consistency. + % + 1 because QQs let one pass + publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1), + wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1), + fail = publish_confirm(Ch, QQ), + fail = publish_confirm(Ch, QQ), + consume_all(Ch, QQ). + priority_queue_fifo(Config) -> %% testing: if hi priority messages are published before lo priority %% messages they are always consumed first (fifo) @@ -4333,3 +4487,28 @@ lists_interleave([Item | Items], List) {Left, Right} = lists:split(2, List), Left ++ [Item | lists_interleave(Items, Right)]. +publish_confirm_many(Ch, Queue, Count) -> + lists:foreach(fun(_) -> publish_confirm(Ch, Queue) end, lists:seq(1, Count)). + +consume_all(Ch, QQ) -> + Consume = fun C(Acc) -> + case amqp_channel:call(Ch, #'basic.get'{queue = QQ}) of + {#'basic.get_ok'{}, Msg} -> + C([Msg | Acc]); + _ -> + Acc + end + end, + Consume([]). + +ensure_qq_proc_dead(Config, Server, RaName) -> + case rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [RaName]) of + undefined -> + ok; + Pid -> + rabbit_ct_broker_helpers:rpc(Config, Server, erlang, exit, [Pid, kill]), + %% Give some time for the supervisor to restart the process + timer:sleep(500), + ensure_qq_proc_dead(Config, Server, RaName) + end. +