Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QQ: introduce new machine version (2) #3121

Merged
merged 97 commits into from Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
83e6b9c
QQ: introduce new machine version (2)
ansd Oct 7, 2021
9bab3c3
rabbit_fifo: code readability
kjnilsson Jun 18, 2021
7c92a53
Replace nested tuples with improper lists
kjnilsson Jun 21, 2021
1c4f563
QQ memory use improvement
kjnilsson Jun 21, 2021
aec42ec
Delete oqueue
ansd Oct 7, 2021
e3ccefb
Maintain order when dead-lettering rejected messages in quorum queues
ansd Oct 28, 2021
1c17773
Add at-least once dead-lettering for quorum queues
ansd Oct 8, 2021
8dc7332
Add integration tests for at-least-once dead-lettering
ansd Dec 7, 2021
795b5b5
QQ: better handle repeated requeues.
kjnilsson Nov 12, 2021
5d35cad
QQ: resend protocol refacor
kjnilsson Nov 29, 2021
25741db
QQ: handle all dlx commands in one clause
kjnilsson Dec 9, 2021
d268f3f
Add dlx integration tests for stats and many_target_queues
ansd Dec 9, 2021
51d5bbe
Move timers to aux eval event
kjnilsson Dec 10, 2021
bbef82a
QQ Fix invalid_policy test
kjnilsson Dec 10, 2021
42f5e52
Add more rabbit_fifo_dlx_integration_SUITE tests
ansd Dec 13, 2021
e142556
Bazel suite config
kjnilsson Dec 13, 2021
87b67c2
update rabbit_fifo_SUITE expectations
kjnilsson Dec 13, 2021
e538205
Set delivery_limit on quorum queue dead_lettering tests
kjnilsson Dec 13, 2021
9873b24
rabbit_fifo_prop fixes
kjnilsson Dec 13, 2021
cd785fc
Make dlx_worker terminate itself if leader is down
ansd Dec 13, 2021
29714e7
Send acks asynchronously
ansd Dec 14, 2021
f1ec322
add lg:stop/0 to xref ignore list
kjnilsson Dec 14, 2021
fc1538b
rabbit_fifo_prop: turn down iterations of snapshots_dlx a bit
kjnilsson Dec 14, 2021
711be2d
Try an even smaller size for dlx property tests.
kjnilsson Dec 14, 2021
4eb8eb0
Increase timeout to 15 minutes
ansd Dec 16, 2021
65dbb3b
Do not register rabbit_fifo_dlx_worker
ansd Dec 17, 2021
517fcd9
QQ: optimise expire messages
kjnilsson Dec 20, 2021
d733942
QQ: add back expire before checkout
kjnilsson Dec 20, 2021
2f2710f
Simplify rabbit_fifo
ansd Dec 27, 2021
a47468b
Add rabbit_fifo_dlx unit tests
ansd Dec 28, 2021
7134ca1
Simplify receiving dlx deliveries
ansd Jan 3, 2022
897947b
Clean up when terminating dlx worker
ansd Jan 3, 2022
a87d337
Handle queue eol event
ansd Jan 4, 2022
61fcf33
Make rabbit_fifo_dlx not depend on rabbit_fifo
ansd Jan 6, 2022
5bbf471
Do not redeliver if target is quorum or stream queue
ansd Jan 10, 2022
db4ad10
Log warning once in dlx worker
ansd Jan 11, 2022
679bcc8
Allow for higher dlx throughput by default
ansd Jan 12, 2022
60422c5
Improve performance of dlx worker
ansd Jan 13, 2022
297ae96
Fix dialyzer warnings
ansd Jan 14, 2022
c23fccf
Add more specs
ansd Jan 14, 2022
1f295c4
Run dead letter TTL tests for quorum queues
ansd Jan 18, 2022
8c286cc
Add Prometheus metrics for dead-lettered messages
ansd Jan 19, 2022
bc0cc29
Emit release cursor when expiring messages
ansd Jan 21, 2022
b8ab77e
Do not keep rabbit_fifo messages in memory
kjnilsson Jan 7, 2022
953fe90
Fix pipeline
ansd Jan 24, 2022
9d784fa
rabbit_fifo: optimise enqueue path
kjnilsson Jan 24, 2022
5223786
rabbit_fifo: remove some commented code
kjnilsson Jan 24, 2022
d44666d
Reduce dlx consumer prefetch
ansd Jan 24, 2022
62f3d40
rabbit_fifo: cache incoming message if it can be immedately delivered
kjnilsson Jan 24, 2022
1425155
rabbit_fifo: slight optimisation around completion
kjnilsson Jan 24, 2022
dcae0e5
Comment out unused function
ansd Jan 25, 2022
63a93b9
rabbit_fifo: tidy up, fix tests
kjnilsson Jan 25, 2022
cdca2e5
rabbit_fifo alloc opts
kjnilsson Jan 25, 2022
8dd5c13
rabbit_fifo_client: allocation optimisations of common code paths.
kjnilsson Jan 25, 2022
c7cd530
Add memory details of dlx workers
ansd Jan 25, 2022
672b983
rabbit_fifo: conversion bug fixes and remove prefix_msgs field
kjnilsson Jan 26, 2022
96e6fab
rabbit_fifo_prop: fix build error
kjnilsson Jan 27, 2022
31b3916
Output some logs only on Ra leader
ansd Jan 27, 2022
4618fac
Allocate less memory
ansd Jan 27, 2022
13a3089
Delete queue returns number of ready messages
ansd Jan 27, 2022
fffb9c4
Add dead-letter-strategy policy validator test
ansd Jan 31, 2022
15b70bf
rabbit_fifo: refactor consumer record
kjnilsson Feb 2, 2022
5179419
rabbit_fifo: enqueue any pending messages during v2 conversion
kjnilsson Feb 3, 2022
f85731e
Take into account dead-lettered messages for max-length
ansd Feb 9, 2022
1f0fd49
Redeliver rejected messages in dlx worker
ansd Feb 10, 2022
94e0be3
rabbit_fifo: refactor decorator notifications
kjnilsson Feb 9, 2022
c55fd21
rabbit_fifo: fix SAC bug when a cancelled consumer is revived
kjnilsson Feb 10, 2022
e7280e4
Fix rebase onto origin/master
ansd Feb 10, 2022
2361e29
Replace lqueue:peek/1 with new lqueue:get/2
ansd Feb 10, 2022
a3eaaa1
rabbit_fifo: test fixes
kjnilsson Feb 10, 2022
d50dd90
Fix dialyzer warnings
ansd Feb 10, 2022
1e812f9
Enable required feature flags for mixed version cluster tests
ansd Feb 10, 2022
b934a42
Fall back to at-most-once if feature flag stream_queue is disabled
ansd Feb 14, 2022
9230f62
Delete from unsettled target queues when target queue is deleted
ansd Feb 16, 2022
64c078a
Reject message if it cannot be delivered
ansd Feb 16, 2022
0bf0768
Test at-least-once routes to different target queues based on reason
ansd Feb 17, 2022
863d02a
Fix next_seq in rabbit_fifo_client
ansd Feb 18, 2022
9433100
Resend pending commands if same leader re-elected
ansd Feb 18, 2022
0729a60
Avoid ETS lookup
ansd Feb 18, 2022
fc486fc
Fix failing GitHub action
ansd Feb 20, 2022
fc2d37e
Route also to extra BCC queues when dead-lettering
ansd Feb 21, 2022
a3905da
Add note about missed Prometheus counter updates
ansd Feb 21, 2022
781328b
Fix dialyzer warning
ansd Feb 22, 2022
6eed54f
Move eventually and consistently to rabbit_ct_helpers
ansd Feb 22, 2022
6554bca
Save memory for dead-lettered messages
ansd Feb 22, 2022
980ef3f
Use one ?MSG macro
ansd Feb 22, 2022
fd470b8
Reduce per-message memory usage if message TTL is set
ansd Feb 22, 2022
57f9211
Remove prefix message code
ansd Feb 23, 2022
5860301
Remove purge from rabbit_fifo_dlx
ansd Feb 23, 2022
4e415f5
Speed up purging quorum queues
ansd Feb 23, 2022
9f10f79
Fix upgrades from rabbit_fifo_v1 to v2
ansd Feb 25, 2022
17954d7
Fix bug in v1 to v2 conversion
ansd Feb 25, 2022
c4b6d32
Add test for all-with-x binding argument
ansd Feb 27, 2022
fd2023a
Exclude single_dlx_worker from mixed version tests
ansd Feb 28, 2022
2db128f
Delete commented quorum queue in-memory tests
ansd Feb 28, 2022
08f2061
Fix badmatch when basic.get on quorum queue with expired messages
ansd Mar 2, 2022
4a2b00a
rabbit_fifo: tidy up and formatting
kjnilsson Mar 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 36 additions & 3 deletions deps/rabbit/BUILD.bazel
Expand Up @@ -144,7 +144,9 @@ _APP_ENV = """[
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
{stream_messages_soft_limit, 256},
{track_auth_attempt_source, false}
{track_auth_attempt_source, false},
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000}
]
"""

Expand Down Expand Up @@ -401,7 +403,7 @@ suites = [
":quorum_queue_utils",
],
flaky = True,
shard_count = 3,
shard_count = 7,
),
rabbitmq_integration_suite(
PACKAGE,
Expand Down Expand Up @@ -698,7 +700,7 @@ suites = [
),
rabbitmq_suite(
name = "rabbit_fifo_prop_SUITE",
size = "medium",
size = "large",
additional_beam = [
":test_util",
],
Expand All @@ -716,6 +718,37 @@ suites = [
"@proper//:erlang_app",
],
),
rabbitmq_suite(
name = "rabbit_fifo_dlx_SUITE",
size = "small",
additional_hdrs = [
"src/rabbit_fifo.hrl",
"src/rabbit_fifo_dlx.hrl",
],
deps = [
"//deps/rabbit_common:erlang_app",
],
),
rabbitmq_integration_suite(
PACKAGE,
name = "rabbit_fifo_dlx_integration_SUITE",
size = "medium",
additional_beam = [
":test_util",
":quorum_queue_utils",
":quorum_queue_SUITE_beam_files",
],
additional_hdrs = [
"src/rabbit_fifo.hrl",
"src/rabbit_fifo_dlx.hrl",
],
runtime_deps = [
"@ra//:erlang_app",
],
deps = [
"@proper//:erlang_app",
],
),
rabbitmq_suite(
name = "rabbit_fifo_SUITE",
size = "medium",
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/Makefile
Expand Up @@ -122,7 +122,9 @@ define PROJECT_ENV
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
{stream_messages_soft_limit, 256},
{track_auth_attempt_source, false}
{track_auth_attempt_source, false},
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000}
]
endef

Expand Down
28 changes: 23 additions & 5 deletions deps/rabbit/src/rabbit_amqqueue.erl
Expand Up @@ -778,6 +778,7 @@ declare_args() ->
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
Expand Down Expand Up @@ -946,6 +947,22 @@ check_dlxrk_arg(Val, Args) when is_binary(Val) ->
check_dlxrk_arg(_Val, _Args) ->
{error, {unacceptable_type, "expected a string"}}.

-define(KNOWN_DLX_STRATEGIES, [<<"at-most-once">>, <<"at-least-once">>]).
check_dlxstrategy_arg({longstr, Val}, _Args) ->
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
true -> ok;
false -> {error, invalid_dlx_strategy}
end;
check_dlxstrategy_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}};
check_dlxstrategy_arg(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
true -> ok;
false -> {error, invalid_dlx_strategy}
end;
check_dlxstrategy_arg(_Val, _Args) ->
{error, invalid_dlx_strategy}.

-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]).
check_overflow({longstr, Val}, _Args) ->
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
Expand Down Expand Up @@ -1503,7 +1520,8 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
[lists:zip(ConsumerInfoKeys,
[amqqueue:get_name(Q), ChPid, CTag,
AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _}
<- consumers(Q)].

-spec stat(amqqueue:amqqueue()) ->
{'ok', non_neg_integer(), non_neg_integer()}.
Expand Down Expand Up @@ -1657,8 +1675,8 @@ credit(Q, CTag, Credit, Drain, QStates) ->
{'ok', non_neg_integer(), qmsg(), rabbit_queue_type:state()} |
{'empty', rabbit_queue_type:state()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates0).
basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates).


-spec basic_consume(amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
Expand All @@ -1670,7 +1688,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
basic_consume(Q, NoAck, ChPid, LimiterPid,
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) ->
ExclusiveConsume, Args, OkMsg, ActingUser, QStates) ->

QName = amqqueue:get_name(Q),
%% first phase argument validation
Expand All @@ -1686,7 +1704,7 @@ basic_consume(Q, NoAck, ChPid, LimiterPid,
args => Args,
ok_msg => OkMsg,
acting_user => ActingUser},
rabbit_queue_type:consume(Q, Spec, Contexts).
rabbit_queue_type:consume(Q, Spec, QStates).

-spec basic_cancel(amqqueue:amqqueue(), rabbit_types:ctag(), any(),
rabbit_types:username(),
Expand Down
26 changes: 23 additions & 3 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Expand Up @@ -728,10 +728,14 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
with_dlx(
DLX,
fun (X) ->
rabbit_global_counters:messages_dead_lettered(maxlen, rabbit_classic_queue,
at_most_once, 1),
QName = qname(State),
rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
end,
fun () -> ok end),
fun () -> rabbit_global_counters:messages_dead_lettered(maxlen, rabbit_classic_queue,
disabled, 1)
end),
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
Expand Down Expand Up @@ -763,6 +767,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{undelivered, State2 = #q{ttl = 0, dlx = undefined,
backing_queue_state = BQS,
msg_id_to_channel = MTC}} ->
rabbit_global_counters:messages_dead_lettered(expired, rabbit_classic_queue,
disabled, 1),
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC, amqqueue:get_name(Q)),
State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
{undelivered, State2 = #q{backing_queue_state = BQS}} ->
Expand Down Expand Up @@ -804,6 +810,9 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
State#q.dlx,
fun (X) -> dead_letter_maxlen_msg(X, State) end,
fun () ->
rabbit_global_counters:messages_dead_lettered(maxlen,
rabbit_classic_queue,
disabled, 1),
{_, BQS1} = BQ:drop(false, BQS),
State#q{backing_queue_state = BQS1}
end));
Expand Down Expand Up @@ -1012,11 +1021,18 @@ drop_expired_msgs(State) ->
drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
ExpirePredIncrement = fun(Properties) ->
ExpirePred(Properties) andalso
rabbit_global_counters:messages_dead_lettered(expired,
rabbit_classic_queue,
disabled,
1) =:= ok
end,
{Props, State1} =
with_dlx(
State#q.dlx,
fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end,
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePredIncrement, BQS),
{Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
undefined -> undefined;
Expand Down Expand Up @@ -1058,6 +1074,8 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
QName = qname(State),
{Res, Acks1, BQS1} =
Fun(fun (Msg, AckTag, Acks) ->
rabbit_global_counters:messages_dead_lettered(Reason, rabbit_classic_queue,
at_most_once, 1),
rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
[AckTag | Acks]
end, [], BQS),
Expand Down Expand Up @@ -1575,7 +1593,9 @@ handle_cast({reject, false, AckTags, ChPid}, State) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
end) end,
fun () -> ack(AckTags, ChPid, State) end));
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
disabled, length(AckTags)),
ack(AckTags, ChPid, State) end));

handle_cast({delete_exclusive, ConnPid}, State) ->
log_delete_exclusive(ConnPid, State),
Expand Down
52 changes: 10 additions & 42 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -2165,22 +2165,18 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
_RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
AllNames = case rabbit_amqqueue:lookup(QName) of
{ok, Q0} ->
case amqqueue:get_options(Q0) of
#{extra_bcc := BCC} -> [QName, rabbit_misc:r(QName#resource.virtual_host, queue, BCC)];
_ -> [QName]
end;
_ -> []
end,
Qs = rabbit_amqqueue:lookup(AllNames),
{QueueNames, Qs} = case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
{[QName], [Q]};
_ -> {[], []}
end,
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
State1 = process_routing_confirm(Confirm, AllNames, MsgSeqNo, XName, State0),
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes
State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
Expand Down Expand Up @@ -2208,21 +2204,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
AllQueueNames = lists:map(fun amqqueue:get_name/1, Qs0),
AllExtraBCCs = infer_extra_bcc(Qs0),
%% Collect implicit BCC targets these queues may have
Qs = case AllExtraBCCs of
[] -> Qs0;
ExtraNames -> Qs0 ++ rabbit_amqqueue:lookup(ExtraNames)
end,
Qs = rabbit_amqqueue:lookup(RoutedToQueueNames),
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
State1 = process_routing_confirm(Confirm, AllQueueNames,
State1 = process_routing_confirm(Confirm, QueueNames,
MsgSeqNo, XName, State0),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes
Expand All @@ -2231,7 +2221,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish)
|| QName <- AllQueueNames];
|| QName <- QueueNames];
_ ->
ok
end,
Expand All @@ -2243,28 +2233,6 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
[rabbit_misc:rs(Resource)])
end.

-spec infer_extra_bcc([amqqueue:amqqueue()]) -> [rabbit_amqqueue:name()].
infer_extra_bcc([]) ->
[];
infer_extra_bcc([Q]) ->
case amqqueue:get_options(Q) of
#{extra_bcc := BCC} ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
[rabbit_misc:r(VHost, queue, BCC)];
_ ->
[]
end;
infer_extra_bcc(Qs) ->
lists:foldl(fun(Q, Acc) ->
case amqqueue:get_options(Q) of
#{extra_bcc := BCC} ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
[rabbit_misc:r(VHost, queue, BCC) | Acc];
_ ->
Acc
end
end, [], Qs).

process_routing_mandatory(_Mandatory = true,
_RoutedToQs = [],
Msg, State) ->
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_classic_queue.erl
Expand Up @@ -445,8 +445,10 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->

capabilities() ->
#{unsupported_policies => [ %% Stream policies
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>,
%% Quorum policies
<<"dead-letter-strategy">>],
queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
Expand Down
7 changes: 5 additions & 2 deletions deps/rabbit/src/rabbit_dead_letter.erl
Expand Up @@ -7,14 +7,17 @@

-module(rabbit_dead_letter).

-export([publish/5]).
-export([publish/5,
make_msg/5,
detect_cycles/3]).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

%%----------------------------------------------------------------------------

-type reason() :: 'expired' | 'rejected' | 'maxlen' | delivery_limit.
-export_type([reason/0]).

%%----------------------------------------------------------------------------

Expand All @@ -39,7 +42,7 @@ make_msg(Msg = #basic_message{content = Content,
undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
ReasonBin = atom_to_binary(Reason),
TimeSec = os:system_time(seconds),
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
HeadersFun2 =
Expand Down
7 changes: 2 additions & 5 deletions deps/rabbit/src/rabbit_disk_monitor.erl
Expand Up @@ -246,11 +246,8 @@ get_disk_free(Dir, {win32, _}) ->
rabbit_log:warning("Expected the mnesia directory absolute "
"path to start with a drive letter like "
"'C:'. The path is: '~p'", [Dir]),
case win32_get_disk_free_dir(Dir) of
{ok, Free} ->
Free;
_ -> exit(could_not_determine_disk_free)
end;
{ok, Free} = win32_get_disk_free_dir(Dir),
Free;
DriveLetter ->
case catch win32_get_disk_free_pwsh(DriveLetter) of
{ok, Free1} -> Free1;
Expand Down