diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index f9c13fa02731..0ea28a059201 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -67,6 +67,8 @@ -export([deactivate_limit_all/2]). +-export([prepend_extra_bcc/1]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -2087,3 +2089,24 @@ get_quorum_nodes(Q) -> _ -> [] end. + +-spec prepend_extra_bcc([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()]. +prepend_extra_bcc([]) -> + []; +prepend_extra_bcc([Q] = Qs) -> + prepend_extra_bcc(Q, Qs); +prepend_extra_bcc(Qs) -> + lists:foldl(fun(Q, Acc) -> + prepend_extra_bcc(Q, Acc) + end, Qs, Qs). + +prepend_extra_bcc(Q, Qs) -> + case amqqueue:get_options(Q) of + #{extra_bcc := BCC} -> + #resource{virtual_host = VHost} = amqqueue:get_name(Q), + BCCQueueName = rabbit_misc:r(VHost, queue, BCC), + BCCQueue = rabbit_amqqueue:lookup([BCCQueueName]), + BCCQueue ++ Qs; + _ -> + Qs + end. diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 7a0386092b11..e6141bf5ef64 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2173,12 +2173,10 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex mandatory = Mandatory, confirm = Confirm, msg_seq_no = MsgSeqNo}, - _RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue - {QueueNames, Qs} = case rabbit_amqqueue:lookup(QName) of - {ok, Q} -> - {[QName], [Q]}; - _ -> {[], []} - end, + RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue + Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames), + Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), + QueueNames = lists:map(fun amqqueue:get_name/1, Qs), case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of {ok, QueueStates, Actions} -> rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))), @@ -2213,7 +2211,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex confirm = Confirm, msg_seq_no = MsgSeqNo}, RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) -> - Qs = rabbit_amqqueue:lookup(RoutedToQueueNames), + Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames), + Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), QueueNames = lists:map(fun amqqueue:get_name/1, Qs), case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of {ok, QueueStates, Actions} -> diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index 9ceb2bcc5fd4..a81d8e345221 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -23,14 +23,15 @@ -spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(), 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. -publish(Msg, Reason, X, RK, QName) -> - DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), +publish(Msg, Reason, X, RK, SourceQName) -> + DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, SourceQName), Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), - {Queues, Cycles} = detect_cycles(Reason, DLMsg, + {QNames, Cycles} = detect_cycles(Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - _ = rabbit_queue_type:deliver(rabbit_amqqueue:lookup(Queues), - Delivery, stateless), + Qs0 = rabbit_amqqueue:lookup(QNames), + Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), + _ = rabbit_queue_type:deliver(Qs, Delivery, stateless), ok. make_msg(Msg = #basic_message{content = Content, diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index 9b975e589628..9ec967523051 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -412,46 +412,21 @@ info_all(VHostPath, Items, Ref, AggregatorPid) -> route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> - QNames = case RName of - <<>> -> - RKsSorted = lists:usort(RKs), - [rabbit_channel:deliver_reply(RK, Delivery) || - RK <- RKsSorted, virtual_reply_queue(RK)], - [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted, - not virtual_reply_queue(RK)]; - _ -> - Decs = rabbit_exchange_decorator:select(route, Decorators), - lists:usort(route1(Delivery, Decs, {[X], XName, []})) - end, - Qs = rabbit_amqqueue:lookup(QNames), - ExtraBccQNames = infer_extra_bcc(Qs), - ExtraBccQNames ++ QNames. + case RName of + <<>> -> + RKsSorted = lists:usort(RKs), + [rabbit_channel:deliver_reply(RK, Delivery) || + RK <- RKsSorted, virtual_reply_queue(RK)], + [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted, + not virtual_reply_queue(RK)]; + _ -> + Decs = rabbit_exchange_decorator:select(route, Decorators), + lists:usort(route1(Delivery, Decs, {[X], XName, []})) + end. virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true; virtual_reply_queue(_) -> false. --spec infer_extra_bcc([amqqueue:amqqueue()]) -> [rabbit_amqqueue:name()]. -infer_extra_bcc([]) -> - []; -infer_extra_bcc([Q]) -> - case amqqueue:get_options(Q) of - #{extra_bcc := BCC} -> - #resource{virtual_host = VHost} = amqqueue:get_name(Q), - [rabbit_misc:r(VHost, queue, BCC)]; - _ -> - [] - end; -infer_extra_bcc(Qs) -> - lists:foldl(fun(Q, Acc) -> - case amqqueue:get_options(Q) of - #{extra_bcc := BCC} -> - #resource{virtual_host = VHost} = amqqueue:get_name(Q), - [rabbit_misc:r(VHost, queue, BCC) | Acc]; - _ -> - Acc - end - end, [], Qs). - route1(_, _, {[], _, QNames}) -> QNames; route1(Delivery, Decorators, diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index 22590e7678a2..414a0e628e4f 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -312,7 +312,8 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, RouteToQs0 = rabbit_exchange:route(DLX, Delivery), {RouteToQs1, Cycles} = rabbit_dead_letter:detect_cycles(Reason, Msg, RouteToQs0), State1 = log_cycles(Cycles, RKeys, State0), - RouteToQs = rabbit_amqqueue:lookup(RouteToQs1), + RouteToQs2 = rabbit_amqqueue:lookup(RouteToQs1), + RouteToQs = rabbit_amqqueue:prepend_extra_bcc(RouteToQs2), State2 = case RouteToQs of [] -> log_no_route_once(State1); @@ -459,26 +460,29 @@ redeliver0(#pending{delivery = #delivery{message = BasicMsg} = Delivery0, when is_list(DLRKeys) -> Delivery = Delivery0#delivery{message = BasicMsg#basic_message{exchange_name = DLXRef, routing_keys = DLRKeys}}, - RouteToQs0 = rabbit_exchange:route(DLX, Delivery), - %% rabbit_exchange:route/2 can route to target queues that do not exist (e.g. in case of default exchange). + %% rabbit_exchange:route/2 can route to target queues that do not exist + %% (feature flag implicit_default_bindings). %% Therefore, filter out non-existent target queues. - RouteToQs1 = queue_names(rabbit_amqqueue:lookup(RouteToQs0)), - case {RouteToQs1, Settled} of + RouteToQs0 = queue_names( + rabbit_amqqueue:prepend_extra_bcc( + rabbit_amqqueue:lookup( + rabbit_exchange:route(DLX, Delivery)))), + case {RouteToQs0, Settled} of {[], [_|_]} -> %% Routes changed dynamically so that we don't await any publisher confirms anymore. - %% Since we also received at least once publisher confirm (mandatory flag semantics), - %% we can ack the messasge to the source quorum queue. + %% Since we also received at least one publisher confirm (mandatory flag semantics), + %% we can ack the message to the source quorum queue. State0#state{pendings = maps:remove(OutSeq, Pendings), settled_ids = [ConsumedId | SettledIds]}; _ -> %% Do not redeliver message to a target queue %% 1. for which we already received a publisher confirm, or - Unsettled = RouteToQs1 -- Settled, + Unsettled = RouteToQs0 -- Settled, %% 2. whose queue client redelivers on our behalf. %% Note that a quorum queue client does not redeliver on our behalf if it previously %% rejected the message. This is why we always redeliver rejected messages here. - RouteToQs2 = Unsettled -- clients_redeliver(Unsettled0, QTypeState), - {RouteToQs, Cycles} = rabbit_dead_letter:detect_cycles(Reason, BasicMsg, RouteToQs2), + RouteToQs1 = Unsettled -- clients_redeliver(Unsettled0, QTypeState), + {RouteToQs, Cycles} = rabbit_dead_letter:detect_cycles(Reason, BasicMsg, RouteToQs1), State1 = log_cycles(Cycles, DLRKeys, State0), case RouteToQs of [] -> diff --git a/deps/rabbit/test/queue_parallel_SUITE.erl b/deps/rabbit/test/queue_parallel_SUITE.erl index c2c526820a77..69b287bafa7a 100644 --- a/deps/rabbit/test/queue_parallel_SUITE.erl +++ b/deps/rabbit/test/queue_parallel_SUITE.erl @@ -58,13 +58,15 @@ groups() -> trigger_message_store_compaction]}, {quorum_queue, [parallel], AllTests ++ [ delete_immediately_by_pid_fails, - extra_bcc_option + extra_bcc_option, + extra_bcc_option_multiple ]}, {quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, {stream_queue, [parallel], [publish, subscribe, - extra_bcc_option]} + extra_bcc_option, + extra_bcc_option_multiple]} ]} ]. @@ -704,6 +706,35 @@ extra_bcc_option(Config) -> delete_queue(Ch, QName), delete_queue(Ch, ExtraBCC). +%% Test single message being routed to 2 target queues where 1 target queue +%% has an extra BCC. +extra_bcc_option_multiple(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Exchange = <<"amq.fanout">>, + + QName1 = <<"queue_with_extra_bcc">>, + declare_queue(Ch, Config, QName1), + #'queue.bind_ok'{} = amqp_channel:call( + Ch, #'queue.bind'{queue = QName1, + exchange = Exchange}), + ExtraBCC = <<"my_extra_bcc">>, + declare_bcc_queue(Ch, ExtraBCC), + set_queue_options(Config, QName1, #{extra_bcc => ExtraBCC}), + QName2 = <<"queue_without_extra_bcc">>, + declare_queue(Ch, Config, QName2), + #'queue.bind_ok'{} = amqp_channel:call( + Ch, #'queue.bind'{queue = QName2, + exchange = Exchange}), + + publish(Ch, <<"ignore">>, [<<"msg">>], Exchange), + wait_for_messages(Config, [[QName1, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]), + wait_for_messages(Config, [[ExtraBCC, <<"1">>, <<"1">>, <<"0">>]]), + + delete_queue(Ch, QName1), + delete_queue(Ch, QName2), + delete_queue(Ch, ExtraBCC). + %%%%%%%%%%%%%%%%%%%%%%%% %% Test helpers %%%%%%%%%%%%%%%%%%%%%%%% @@ -723,7 +754,13 @@ delete_queue(Ch, QName) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}). publish(Ch, QName, Payloads) -> - [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + publish(Ch, QName, Payloads, <<"">>). + +publish(Ch, QName, Payloads, Exchange) -> + [amqp_channel:call(Ch, + #'basic.publish'{exchange = Exchange, + routing_key = QName}, + #amqp_msg{payload = Payload}) || Payload <- Payloads]. consume(Ch, QName, Payloads) ->