Skip to content

Commit

Permalink
Merge pull request #4787 from rabbitmq/extra-bcc
Browse files Browse the repository at this point in the history
Avoid ETS lookup if no extra_bcc queue set
  • Loading branch information
michaelklishin committed May 11, 2022
2 parents 2640b5c + 70a639c commit 5c6ecdc
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 61 deletions.
23 changes: 23 additions & 0 deletions deps/rabbit/src/rabbit_amqqueue.erl
Expand Up @@ -67,6 +67,8 @@

-export([deactivate_limit_all/2]).

-export([prepend_extra_bcc/1]).

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
Expand Down Expand Up @@ -2087,3 +2089,24 @@ get_quorum_nodes(Q) ->
_ ->
[]
end.

-spec prepend_extra_bcc([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
prepend_extra_bcc([]) ->
[];
prepend_extra_bcc([Q] = Qs) ->
prepend_extra_bcc(Q, Qs);
prepend_extra_bcc(Qs) ->
lists:foldl(fun(Q, Acc) ->
prepend_extra_bcc(Q, Acc)
end, Qs, Qs).

prepend_extra_bcc(Q, Qs) ->
case amqqueue:get_options(Q) of
#{extra_bcc := BCC} ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
BCCQueueName = rabbit_misc:r(VHost, queue, BCC),
BCCQueue = rabbit_amqqueue:lookup([BCCQueueName]),
BCCQueue ++ Qs;
_ ->
Qs
end.
13 changes: 6 additions & 7 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -2173,12 +2173,10 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
mandatory = Mandatory,
confirm = Confirm,
msg_seq_no = MsgSeqNo},
_RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
{QueueNames, Qs} = case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
{[QName], [Q]};
_ -> {[], []}
end,
RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
Expand Down Expand Up @@ -2213,7 +2211,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
Qs = rabbit_amqqueue:lookup(RoutedToQueueNames),
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
Expand Down
11 changes: 6 additions & 5 deletions deps/rabbit/src/rabbit_dead_letter.erl
Expand Up @@ -23,14 +23,15 @@

-spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(),
'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
publish(Msg, Reason, X, RK, SourceQName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, SourceQName),
Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
{Queues, Cycles} = detect_cycles(Reason, DLMsg,
{QNames, Cycles} = detect_cycles(Reason, DLMsg,
rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
_ = rabbit_queue_type:deliver(rabbit_amqqueue:lookup(Queues),
Delivery, stateless),
Qs0 = rabbit_amqqueue:lookup(QNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
_ = rabbit_queue_type:deliver(Qs, Delivery, stateless),
ok.

make_msg(Msg = #basic_message{content = Content,
Expand Down
47 changes: 11 additions & 36 deletions deps/rabbit/src/rabbit_exchange.erl
Expand Up @@ -412,46 +412,21 @@ info_all(VHostPath, Items, Ref, AggregatorPid) ->
route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
QNames = case RName of
<<>> ->
RKsSorted = lists:usort(RKs),
[rabbit_channel:deliver_reply(RK, Delivery) ||
RK <- RKsSorted, virtual_reply_queue(RK)],
[rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted,
not virtual_reply_queue(RK)];
_ ->
Decs = rabbit_exchange_decorator:select(route, Decorators),
lists:usort(route1(Delivery, Decs, {[X], XName, []}))
end,
Qs = rabbit_amqqueue:lookup(QNames),
ExtraBccQNames = infer_extra_bcc(Qs),
ExtraBccQNames ++ QNames.
case RName of
<<>> ->
RKsSorted = lists:usort(RKs),
[rabbit_channel:deliver_reply(RK, Delivery) ||
RK <- RKsSorted, virtual_reply_queue(RK)],
[rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted,
not virtual_reply_queue(RK)];
_ ->
Decs = rabbit_exchange_decorator:select(route, Decorators),
lists:usort(route1(Delivery, Decs, {[X], XName, []}))
end.

virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true;
virtual_reply_queue(_) -> false.

-spec infer_extra_bcc([amqqueue:amqqueue()]) -> [rabbit_amqqueue:name()].
infer_extra_bcc([]) ->
[];
infer_extra_bcc([Q]) ->
case amqqueue:get_options(Q) of
#{extra_bcc := BCC} ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
[rabbit_misc:r(VHost, queue, BCC)];
_ ->
[]
end;
infer_extra_bcc(Qs) ->
lists:foldl(fun(Q, Acc) ->
case amqqueue:get_options(Q) of
#{extra_bcc := BCC} ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
[rabbit_misc:r(VHost, queue, BCC) | Acc];
_ ->
Acc
end
end, [], Qs).

route1(_, _, {[], _, QNames}) ->
QNames;
route1(Delivery, Decorators,
Expand Down
24 changes: 14 additions & 10 deletions deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Expand Up @@ -312,7 +312,8 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
RouteToQs0 = rabbit_exchange:route(DLX, Delivery),
{RouteToQs1, Cycles} = rabbit_dead_letter:detect_cycles(Reason, Msg, RouteToQs0),
State1 = log_cycles(Cycles, RKeys, State0),
RouteToQs = rabbit_amqqueue:lookup(RouteToQs1),
RouteToQs2 = rabbit_amqqueue:lookup(RouteToQs1),
RouteToQs = rabbit_amqqueue:prepend_extra_bcc(RouteToQs2),
State2 = case RouteToQs of
[] ->
log_no_route_once(State1);
Expand Down Expand Up @@ -459,26 +460,29 @@ redeliver0(#pending{delivery = #delivery{message = BasicMsg} = Delivery0,
when is_list(DLRKeys) ->
Delivery = Delivery0#delivery{message = BasicMsg#basic_message{exchange_name = DLXRef,
routing_keys = DLRKeys}},
RouteToQs0 = rabbit_exchange:route(DLX, Delivery),
%% rabbit_exchange:route/2 can route to target queues that do not exist (e.g. in case of default exchange).
%% rabbit_exchange:route/2 can route to target queues that do not exist
%% (feature flag implicit_default_bindings).
%% Therefore, filter out non-existent target queues.
RouteToQs1 = queue_names(rabbit_amqqueue:lookup(RouteToQs0)),
case {RouteToQs1, Settled} of
RouteToQs0 = queue_names(
rabbit_amqqueue:prepend_extra_bcc(
rabbit_amqqueue:lookup(
rabbit_exchange:route(DLX, Delivery)))),
case {RouteToQs0, Settled} of
{[], [_|_]} ->
%% Routes changed dynamically so that we don't await any publisher confirms anymore.
%% Since we also received at least once publisher confirm (mandatory flag semantics),
%% we can ack the messasge to the source quorum queue.
%% Since we also received at least one publisher confirm (mandatory flag semantics),
%% we can ack the message to the source quorum queue.
State0#state{pendings = maps:remove(OutSeq, Pendings),
settled_ids = [ConsumedId | SettledIds]};
_ ->
%% Do not redeliver message to a target queue
%% 1. for which we already received a publisher confirm, or
Unsettled = RouteToQs1 -- Settled,
Unsettled = RouteToQs0 -- Settled,
%% 2. whose queue client redelivers on our behalf.
%% Note that a quorum queue client does not redeliver on our behalf if it previously
%% rejected the message. This is why we always redeliver rejected messages here.
RouteToQs2 = Unsettled -- clients_redeliver(Unsettled0, QTypeState),
{RouteToQs, Cycles} = rabbit_dead_letter:detect_cycles(Reason, BasicMsg, RouteToQs2),
RouteToQs1 = Unsettled -- clients_redeliver(Unsettled0, QTypeState),
{RouteToQs, Cycles} = rabbit_dead_letter:detect_cycles(Reason, BasicMsg, RouteToQs1),
State1 = log_cycles(Cycles, DLRKeys, State0),
case RouteToQs of
[] ->
Expand Down
43 changes: 40 additions & 3 deletions deps/rabbit/test/queue_parallel_SUITE.erl
Expand Up @@ -58,13 +58,15 @@ groups() ->
trigger_message_store_compaction]},
{quorum_queue, [parallel], AllTests ++ [
delete_immediately_by_pid_fails,
extra_bcc_option
extra_bcc_option,
extra_bcc_option_multiple
]},
{quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{stream_queue, [parallel], [publish,
subscribe,
extra_bcc_option]}
extra_bcc_option,
extra_bcc_option_multiple]}

]}
].
Expand Down Expand Up @@ -704,6 +706,35 @@ extra_bcc_option(Config) ->
delete_queue(Ch, QName),
delete_queue(Ch, ExtraBCC).

%% Test single message being routed to 2 target queues where 1 target queue
%% has an extra BCC.
extra_bcc_option_multiple(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Exchange = <<"amq.fanout">>,

QName1 = <<"queue_with_extra_bcc">>,
declare_queue(Ch, Config, QName1),
#'queue.bind_ok'{} = amqp_channel:call(
Ch, #'queue.bind'{queue = QName1,
exchange = Exchange}),
ExtraBCC = <<"my_extra_bcc">>,
declare_bcc_queue(Ch, ExtraBCC),
set_queue_options(Config, QName1, #{extra_bcc => ExtraBCC}),
QName2 = <<"queue_without_extra_bcc">>,
declare_queue(Ch, Config, QName2),
#'queue.bind_ok'{} = amqp_channel:call(
Ch, #'queue.bind'{queue = QName2,
exchange = Exchange}),

publish(Ch, <<"ignore">>, [<<"msg">>], Exchange),
wait_for_messages(Config, [[QName1, <<"1">>, <<"1">>, <<"0">>]]),
wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]),
wait_for_messages(Config, [[ExtraBCC, <<"1">>, <<"1">>, <<"0">>]]),

delete_queue(Ch, QName1),
delete_queue(Ch, QName2),
delete_queue(Ch, ExtraBCC).

%%%%%%%%%%%%%%%%%%%%%%%%
%% Test helpers
%%%%%%%%%%%%%%%%%%%%%%%%
Expand All @@ -723,7 +754,13 @@ delete_queue(Ch, QName) ->
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}).

publish(Ch, QName, Payloads) ->
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
publish(Ch, QName, Payloads, <<"">>).

publish(Ch, QName, Payloads, Exchange) ->
[amqp_channel:call(Ch,
#'basic.publish'{exchange = Exchange,
routing_key = QName},
#amqp_msg{payload = Payload})
|| Payload <- Payloads].

consume(Ch, QName, Payloads) ->
Expand Down

0 comments on commit 5c6ecdc

Please sign in to comment.