Skip to content

Commit

Permalink
WIP Add at-least once dead-lettering for quorum queues
Browse files Browse the repository at this point in the history
and message TTL
  • Loading branch information
ansd committed Dec 3, 2021
1 parent 823cdc4 commit d15bf6d
Show file tree
Hide file tree
Showing 22 changed files with 2,190 additions and 435 deletions.
25 changes: 21 additions & 4 deletions deps/rabbit/src/rabbit_amqqueue.erl
Expand Up @@ -778,6 +778,7 @@ declare_args() ->
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
Expand Down Expand Up @@ -945,6 +946,22 @@ check_dlxrk_arg(Val, Args) when is_binary(Val) ->
check_dlxrk_arg(_Val, _Args) ->
{error, {unacceptable_type, "expected a string"}}.

-define(KNOWN_DLX_STRATEGIES, [<<"at-most-once">>, <<"at-least-once">>]).
check_dlxstrategy_arg({longstr, Val}, _Args) ->
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
true -> ok;
false -> {error, invalid_dlx_strategy}
end;
check_dlxstrategy_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}};
check_dlxstrategy_arg(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
true -> ok;
false -> {error, invalid_dlx_strategy}
end;
check_dlxstrategy_arg(_Val, _Args) ->
{error, invalid_dlx_strategy}.

-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]).
check_overflow({longstr, Val}, _Args) ->
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
Expand Down Expand Up @@ -1641,8 +1658,8 @@ credit(Q, CTag, Credit, Drain, QStates) ->
{'ok', non_neg_integer(), qmsg(), rabbit_queue_type:state()} |
{'empty', rabbit_queue_type:state()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates0).
basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates).


-spec basic_consume(amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
Expand All @@ -1654,7 +1671,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
basic_consume(Q, NoAck, ChPid, LimiterPid,
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) ->
ExclusiveConsume, Args, OkMsg, ActingUser, QStates) ->

QName = amqqueue:get_name(Q),
%% first phase argument validation
Expand All @@ -1670,7 +1687,7 @@ basic_consume(Q, NoAck, ChPid, LimiterPid,
args => Args,
ok_msg => OkMsg,
acting_user => ActingUser},
rabbit_queue_type:consume(Q, Spec, Contexts).
rabbit_queue_type:consume(Q, Spec, QStates).

-spec basic_cancel(amqqueue:amqqueue(), rabbit_types:ctag(), any(),
rabbit_types:username(),
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_basic.erl
Expand Up @@ -12,7 +12,8 @@
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
header_routes/1, parse_expiration/1, header/2, header/3,
is_message_persistent/1]).
-export([build_content/2, from_content/1, msg_size/1,
maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
-export([add_header/4,
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_classic_queue.erl
Expand Up @@ -445,8 +445,10 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->

capabilities() ->
#{unsupported_policies => [ %% Stream policies
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>,
%% Quorum policies
<<"dead-letter-strategy">>],
queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_dead_letter.erl
Expand Up @@ -7,7 +7,9 @@

-module(rabbit_dead_letter).

-export([publish/5]).
-export([publish/5,
make_msg/5,
detect_cycles/3]).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
Expand Down Expand Up @@ -39,7 +41,7 @@ make_msg(Msg = #basic_message{content = Content,
undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
ReasonBin = atom_to_binary(Reason),
TimeSec = os:system_time(seconds),
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
HeadersFun2 =
Expand Down

0 comments on commit d15bf6d

Please sign in to comment.