Skip to content

Commit

Permalink
Merge pull request #3121 from rabbitmq/quorum-queues-v2
Browse files Browse the repository at this point in the history
QQ: introduce new machine version (2)
  • Loading branch information
kjnilsson committed Mar 8, 2022
2 parents c842a6c + 4a2b00a commit 4b19360
Show file tree
Hide file tree
Showing 49 changed files with 8,569 additions and 2,030 deletions.
39 changes: 36 additions & 3 deletions deps/rabbit/BUILD.bazel
Expand Up @@ -144,7 +144,9 @@ _APP_ENV = """[
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
{stream_messages_soft_limit, 256},
{track_auth_attempt_source, false}
{track_auth_attempt_source, false},
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000}
]
"""

Expand Down Expand Up @@ -401,7 +403,7 @@ suites = [
":quorum_queue_utils",
],
flaky = True,
shard_count = 3,
shard_count = 7,
),
rabbitmq_integration_suite(
PACKAGE,
Expand Down Expand Up @@ -698,7 +700,7 @@ suites = [
),
rabbitmq_suite(
name = "rabbit_fifo_prop_SUITE",
size = "medium",
size = "large",
additional_beam = [
":test_util",
],
Expand All @@ -716,6 +718,37 @@ suites = [
"@proper//:erlang_app",
],
),
rabbitmq_suite(
name = "rabbit_fifo_dlx_SUITE",
size = "small",
additional_hdrs = [
"src/rabbit_fifo.hrl",
"src/rabbit_fifo_dlx.hrl",
],
deps = [
"//deps/rabbit_common:erlang_app",
],
),
rabbitmq_integration_suite(
PACKAGE,
name = "rabbit_fifo_dlx_integration_SUITE",
size = "medium",
additional_beam = [
":test_util",
":quorum_queue_utils",
":quorum_queue_SUITE_beam_files",
],
additional_hdrs = [
"src/rabbit_fifo.hrl",
"src/rabbit_fifo_dlx.hrl",
],
runtime_deps = [
"@ra//:erlang_app",
],
deps = [
"@proper//:erlang_app",
],
),
rabbitmq_suite(
name = "rabbit_fifo_SUITE",
size = "medium",
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/Makefile
Expand Up @@ -122,7 +122,9 @@ define PROJECT_ENV
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
{stream_messages_soft_limit, 256},
{track_auth_attempt_source, false}
{track_auth_attempt_source, false},
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000}
]
endef

Expand Down
28 changes: 23 additions & 5 deletions deps/rabbit/src/rabbit_amqqueue.erl
Expand Up @@ -778,6 +778,7 @@ declare_args() ->
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
Expand Down Expand Up @@ -946,6 +947,22 @@ check_dlxrk_arg(Val, Args) when is_binary(Val) ->
check_dlxrk_arg(_Val, _Args) ->
{error, {unacceptable_type, "expected a string"}}.

-define(KNOWN_DLX_STRATEGIES, [<<"at-most-once">>, <<"at-least-once">>]).
check_dlxstrategy_arg({longstr, Val}, _Args) ->
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
true -> ok;
false -> {error, invalid_dlx_strategy}
end;
check_dlxstrategy_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}};
check_dlxstrategy_arg(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
true -> ok;
false -> {error, invalid_dlx_strategy}
end;
check_dlxstrategy_arg(_Val, _Args) ->
{error, invalid_dlx_strategy}.

-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]).
check_overflow({longstr, Val}, _Args) ->
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
Expand Down Expand Up @@ -1503,7 +1520,8 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
[lists:zip(ConsumerInfoKeys,
[amqqueue:get_name(Q), ChPid, CTag,
AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _}
<- consumers(Q)].

-spec stat(amqqueue:amqqueue()) ->
{'ok', non_neg_integer(), non_neg_integer()}.
Expand Down Expand Up @@ -1657,8 +1675,8 @@ credit(Q, CTag, Credit, Drain, QStates) ->
{'ok', non_neg_integer(), qmsg(), rabbit_queue_type:state()} |
{'empty', rabbit_queue_type:state()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates0).
basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates).


-spec basic_consume(amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
Expand All @@ -1670,7 +1688,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
basic_consume(Q, NoAck, ChPid, LimiterPid,
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) ->
ExclusiveConsume, Args, OkMsg, ActingUser, QStates) ->

QName = amqqueue:get_name(Q),
%% first phase argument validation
Expand All @@ -1686,7 +1704,7 @@ basic_consume(Q, NoAck, ChPid, LimiterPid,
args => Args,
ok_msg => OkMsg,
acting_user => ActingUser},
rabbit_queue_type:consume(Q, Spec, Contexts).
rabbit_queue_type:consume(Q, Spec, QStates).

-spec basic_cancel(amqqueue:amqqueue(), rabbit_types:ctag(), any(),
rabbit_types:username(),
Expand Down
26 changes: 23 additions & 3 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Expand Up @@ -728,10 +728,14 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
with_dlx(
DLX,
fun (X) ->
rabbit_global_counters:messages_dead_lettered(maxlen, rabbit_classic_queue,
at_most_once, 1),
QName = qname(State),
rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
end,
fun () -> ok end),
fun () -> rabbit_global_counters:messages_dead_lettered(maxlen, rabbit_classic_queue,
disabled, 1)
end),
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
Expand Down Expand Up @@ -763,6 +767,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{undelivered, State2 = #q{ttl = 0, dlx = undefined,
backing_queue_state = BQS,
msg_id_to_channel = MTC}} ->
rabbit_global_counters:messages_dead_lettered(expired, rabbit_classic_queue,
disabled, 1),
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC, amqqueue:get_name(Q)),
State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
{undelivered, State2 = #q{backing_queue_state = BQS}} ->
Expand Down Expand Up @@ -804,6 +810,9 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
State#q.dlx,
fun (X) -> dead_letter_maxlen_msg(X, State) end,
fun () ->
rabbit_global_counters:messages_dead_lettered(maxlen,
rabbit_classic_queue,
disabled, 1),
{_, BQS1} = BQ:drop(false, BQS),
State#q{backing_queue_state = BQS1}
end));
Expand Down Expand Up @@ -1012,11 +1021,18 @@ drop_expired_msgs(State) ->
drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
ExpirePredIncrement = fun(Properties) ->
ExpirePred(Properties) andalso
rabbit_global_counters:messages_dead_lettered(expired,
rabbit_classic_queue,
disabled,
1) =:= ok
end,
{Props, State1} =
with_dlx(
State#q.dlx,
fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end,
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePredIncrement, BQS),
{Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
undefined -> undefined;
Expand Down Expand Up @@ -1058,6 +1074,8 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
QName = qname(State),
{Res, Acks1, BQS1} =
Fun(fun (Msg, AckTag, Acks) ->
rabbit_global_counters:messages_dead_lettered(Reason, rabbit_classic_queue,
at_most_once, 1),
rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
[AckTag | Acks]
end, [], BQS),
Expand Down Expand Up @@ -1575,7 +1593,9 @@ handle_cast({reject, false, AckTags, ChPid}, State) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
end) end,
fun () -> ack(AckTags, ChPid, State) end));
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
disabled, length(AckTags)),
ack(AckTags, ChPid, State) end));

handle_cast({delete_exclusive, ConnPid}, State) ->
log_delete_exclusive(ConnPid, State),
Expand Down
52 changes: 10 additions & 42 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -2165,22 +2165,18 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
_RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
AllNames = case rabbit_amqqueue:lookup(QName) of
{ok, Q0} ->
case amqqueue:get_options(Q0) of
#{extra_bcc := BCC} -> [QName, rabbit_misc:r(QName#resource.virtual_host, queue, BCC)];
_ -> [QName]
end;
_ -> []
end,
Qs = rabbit_amqqueue:lookup(AllNames),
{QueueNames, Qs} = case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
{[QName], [Q]};
_ -> {[], []}
end,
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
State1 = process_routing_confirm(Confirm, AllNames, MsgSeqNo, XName, State0),
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes
State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
Expand Down Expand Up @@ -2208,21 +2204,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
AllQueueNames = lists:map(fun amqqueue:get_name/1, Qs0),
AllExtraBCCs = infer_extra_bcc(Qs0),
%% Collect implicit BCC targets these queues may have
Qs = case AllExtraBCCs of
[] -> Qs0;
ExtraNames -> Qs0 ++ rabbit_amqqueue:lookup(ExtraNames)
end,
Qs = rabbit_amqqueue:lookup(RoutedToQueueNames),
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
State1 = process_routing_confirm(Confirm, AllQueueNames,
State1 = process_routing_confirm(Confirm, QueueNames,
MsgSeqNo, XName, State0),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes
Expand All @@ -2231,7 +2221,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish)
|| QName <- AllQueueNames];
|| QName <- QueueNames];
_ ->
ok
end,
Expand All @@ -2243,28 +2233,6 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
[rabbit_misc:rs(Resource)])
end.

-spec infer_extra_bcc([amqqueue:amqqueue()]) -> [rabbit_amqqueue:name()].
infer_extra_bcc([]) ->
[];
infer_extra_bcc([Q]) ->
case amqqueue:get_options(Q) of
#{extra_bcc := BCC} ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
[rabbit_misc:r(VHost, queue, BCC)];
_ ->
[]
end;
infer_extra_bcc(Qs) ->
lists:foldl(fun(Q, Acc) ->
case amqqueue:get_options(Q) of
#{extra_bcc := BCC} ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
[rabbit_misc:r(VHost, queue, BCC) | Acc];
_ ->
Acc
end
end, [], Qs).

process_routing_mandatory(_Mandatory = true,
_RoutedToQs = [],
Msg, State) ->
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_classic_queue.erl
Expand Up @@ -445,8 +445,10 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->

capabilities() ->
#{unsupported_policies => [ %% Stream policies
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>,
%% Quorum policies
<<"dead-letter-strategy">>],
queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
Expand Down
7 changes: 5 additions & 2 deletions deps/rabbit/src/rabbit_dead_letter.erl
Expand Up @@ -7,14 +7,17 @@

-module(rabbit_dead_letter).

-export([publish/5]).
-export([publish/5,
make_msg/5,
detect_cycles/3]).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

%%----------------------------------------------------------------------------

-type reason() :: 'expired' | 'rejected' | 'maxlen' | delivery_limit.
-export_type([reason/0]).

%%----------------------------------------------------------------------------

Expand All @@ -39,7 +42,7 @@ make_msg(Msg = #basic_message{content = Content,
undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
ReasonBin = atom_to_binary(Reason),
TimeSec = os:system_time(seconds),
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
HeadersFun2 =
Expand Down
7 changes: 2 additions & 5 deletions deps/rabbit/src/rabbit_disk_monitor.erl
Expand Up @@ -246,11 +246,8 @@ get_disk_free(Dir, {win32, _}) ->
rabbit_log:warning("Expected the mnesia directory absolute "
"path to start with a drive letter like "
"'C:'. The path is: '~p'", [Dir]),
case win32_get_disk_free_dir(Dir) of
{ok, Free} ->
Free;
_ -> exit(could_not_determine_disk_free)
end;
{ok, Free} = win32_get_disk_free_dir(Dir),
Free;
DriveLetter ->
case catch win32_get_disk_free_pwsh(DriveLetter) of
{ok, Free1} -> Free1;
Expand Down

0 comments on commit 4b19360

Please sign in to comment.