Skip to content

Commit

Permalink
rabbit_fifo: tidy up and formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Mar 8, 2022
1 parent 08f2061 commit 4a2b00a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 51 deletions.
22 changes: 1 addition & 21 deletions deps/rabbit/src/rabbit_fifo.erl
Expand Up @@ -106,21 +106,6 @@
-record(purge_nodes, {nodes :: [node()]}).
-record(update_config, {config :: config()}).
-record(garbage_collection, {}).
%% v2 alternative commands
%% each consumer is assigned an integer index which can be used
%% instead of the consumer id to identify the consumer
-type consumer_idx() :: non_neg_integer().

-record(?SETTLE_V2, {consumer_idx :: consumer_idx(),
msg_ids :: [msg_id()]}).
-record(?RETURN_V2, {consumer_idx :: consumer_idx(),
msg_ids :: [msg_id()]}).
-record(?DISCARD_V2, {consumer_idx :: consumer_idx(),
msg_ids :: [msg_id()]}).
-record(?CREDIT_V2, {consumer_idx :: consumer_idx(),
credit :: non_neg_integer(),
delivery_count :: non_neg_integer(),
drain :: boolean()}).

-opaque protocol() ::
#enqueue{} |
Expand All @@ -134,12 +119,7 @@
#purge{} |
#purge_nodes{} |
#update_config{} |
#garbage_collection{} |
% v2
#?SETTLE_V2{} |
#?RETURN_V2{} |
#?DISCARD_V2{} |
#?CREDIT_V2{}.
#garbage_collection{}.

-type command() :: protocol() |
rabbit_fifo_dlx:protocol() |
Expand Down
27 changes: 13 additions & 14 deletions deps/rabbit/src/rabbit_fifo_dlx.erl
Expand Up @@ -29,10 +29,8 @@
smallest_raft_index/1
]).

-record(checkout,{
consumer :: pid(),
prefetch :: non_neg_integer()
}).
-record(checkout, {consumer :: pid(),
prefetch :: non_neg_integer()}).
-record(settle, {msg_ids :: [msg_id()]}).
-type protocol() :: {dlx, #checkout{} | #settle{}}.
-opaque state() :: #?MODULE{}.
Expand Down Expand Up @@ -93,16 +91,17 @@ stat(#?MODULE{consumer = Con,
apply(_Meta, {dlx, #settle{msg_ids = MsgIds}}, at_least_once,
#?MODULE{consumer = #dlx_consumer{checked_out = Checked0}} = State0) ->
Acked = maps:with(MsgIds, Checked0),
State = maps:fold(fun(MsgId, ?TUPLE(_Rsn, ?MSG(Idx, _) = Msg),
#?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C,
msg_bytes_checkout = BytesCheckout,
ra_indexes = Indexes0} = S) ->
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
S#?MODULE{consumer = C#dlx_consumer{checked_out =
maps:remove(MsgId, Checked)},
msg_bytes_checkout = BytesCheckout - size_in_bytes(Msg),
ra_indexes = Indexes}
end, State0, Acked),
State = maps:fold(
fun(MsgId, ?TUPLE(_Rsn, ?MSG(Idx, _) = Msg),
#?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C,
msg_bytes_checkout = BytesCheckout,
ra_indexes = Indexes0} = S) ->
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
S#?MODULE{consumer = C#dlx_consumer{checked_out =
maps:remove(MsgId, Checked)},
msg_bytes_checkout = BytesCheckout - size_in_bytes(Msg),
ra_indexes = Indexes}
end, State0, Acked),
{State, [{mod_call, rabbit_global_counters, messages_dead_lettered_confirmed,
[rabbit_quorum_queue, at_least_once, maps:size(Acked)]}]};
apply(_, {dlx, #checkout{consumer = Pid,
Expand Down
30 changes: 14 additions & 16 deletions deps/rabbit/src/rabbit_fifo_dlx.hrl
Expand Up @@ -4,20 +4,18 @@
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.

-record(dlx_consumer,{
pid :: pid(),
prefetch :: non_neg_integer(),
checked_out = #{} :: #{msg_id() => tuple(rabbit_dead_letter:reason(), msg())},
next_msg_id = 0 :: msg_id()
}).
-record(dlx_consumer,
{pid :: pid(),
prefetch :: non_neg_integer(),
checked_out = #{} :: #{msg_id() => tuple(rabbit_dead_letter:reason(), msg())},
next_msg_id = 0 :: msg_id()}).

-record(rabbit_fifo_dlx,{
consumer :: option(#dlx_consumer{}),
%% Queue of dead-lettered messages.
discards = lqueue:new() :: lqueue:lqueue(tuple(rabbit_dead_letter:reason(), msg())),
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
%% so that we get the smallest ra index in O(1).
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
msg_bytes = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer()
}).
-record(rabbit_fifo_dlx,
{consumer :: option(#dlx_consumer{}),
%% Queue of dead-lettered messages.
discards = lqueue:new() :: lqueue:lqueue(tuple(rabbit_dead_letter:reason(), msg())),
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
%% so that we get the smallest ra index in O(1).
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
msg_bytes = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer()}).

0 comments on commit 4a2b00a

Please sign in to comment.