Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce per message disk overhead #10339

Merged
merged 3 commits into from Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions deps/rabbit/include/mc.hrl
Expand Up @@ -20,3 +20,11 @@
%% "Short strings can carry up to 255 octets of UTF-8 data, but
%% may not contain binary zero octets." [AMQP 0.9.1 $4.2.5.3]
-define(IS_SHORTSTR_LEN(B), byte_size(B) < 256).

%% We keep the following atom annotation keys short as they are stored per message on disk.
-define(ANN_EXCHANGE, x).
-define(ANN_ROUTING_KEYS, rk).
-define(ANN_TIMESTAMP, ts).
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
-define(ANN_DURABLE, d).
-define(ANN_PRIORITY, p).
27 changes: 20 additions & 7 deletions deps/rabbit/src/mc.erl
Expand Up @@ -18,6 +18,8 @@
set_ttl/2,
x_header/2,
routing_headers/2,
exchange/1,
routing_keys/1,
%%
convert/2,
convert/3,
Expand Down Expand Up @@ -223,9 +225,21 @@ routing_headers(#?MODULE{protocol = Proto,
routing_headers(BasicMsg, Opts) ->
mc_compat:routing_headers(BasicMsg, Opts).

-spec exchange(state()) -> undefined | rabbit_misc:resource_name().
exchange(#?MODULE{annotations = Anns}) ->
maps:get(?ANN_EXCHANGE, Anns, undefined);
exchange(BasicMessage) ->
mc_compat:get_annotation(?ANN_EXCHANGE, BasicMessage).

-spec routing_keys(state()) -> [rabbit_types:routing_key()].
routing_keys(#?MODULE{annotations = Anns}) ->
maps:get(?ANN_ROUTING_KEYS, Anns, []);
routing_keys(BasicMessage) ->
mc_compat:get_annotation(?ANN_ROUTING_KEYS, BasicMessage).

-spec is_persistent(state()) -> boolean().
is_persistent(#?MODULE{annotations = Anns}) ->
maps:get(durable, Anns, true);
maps:get(?ANN_DURABLE, Anns, true);
is_persistent(BasicMsg) ->
mc_compat:is_persistent(BasicMsg).

Expand All @@ -235,16 +249,15 @@ ttl(#?MODULE{annotations = Anns}) ->
ttl(BasicMsg) ->
mc_compat:ttl(BasicMsg).


-spec timestamp(state()) -> undefined | non_neg_integer().
timestamp(#?MODULE{annotations = Anns}) ->
maps:get(timestamp, Anns, undefined);
maps:get(?ANN_TIMESTAMP, Anns, undefined);
timestamp(BasicMsg) ->
mc_compat:timestamp(BasicMsg).

-spec priority(state()) -> undefined | non_neg_integer().
priority(#?MODULE{annotations = Anns}) ->
maps:get(priority, Anns, undefined);
maps:get(?ANN_PRIORITY, Anns, undefined);
priority(BasicMsg) ->
mc_compat:priority(BasicMsg).

Expand Down Expand Up @@ -327,8 +340,8 @@ record_death(Reason, SourceQueue,
annotations = Anns0} = State)
when is_atom(Reason) andalso is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
Exchange = maps:get(exchange, Anns0),
RoutingKeys = maps:get(routing_keys, Anns0),
#{?ANN_EXCHANGE := Exchange,
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
Timestamp = os:system_time(millisecond),
Ttl = maps:get(ttl, Anns0, undefined),

Expand Down Expand Up @@ -427,7 +440,7 @@ is_cycle(Queue, [_ | Rem]) ->

set_received_at_timestamp(Anns) ->
Millis = os:system_time(millisecond),
maps:put(rts, Millis, Anns).
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand Down
37 changes: 10 additions & 27 deletions deps/rabbit/src/mc_amqp.erl
Expand Up @@ -10,7 +10,6 @@
x_header/2,
property/2,
routing_headers/2,
get_property/2,
convert_to/3,
convert_from/3,
protocol_state/2,
Expand Down Expand Up @@ -113,7 +112,7 @@ routing_headers(Msg, Opts) ->
get_property(durable, Msg) ->
case Msg of
#msg{header = #'v1_0.header'{durable = Durable}}
when is_atom(Durable) ->
when is_boolean(Durable) ->
Durable;
#msg{header = #'v1_0.header'{durable = {boolean, Durable}}} ->
Durable;
Expand All @@ -133,20 +132,6 @@ get_property(timestamp, Msg) ->
_ ->
undefined
end;
get_property(correlation_id, Msg) ->
ansd marked this conversation as resolved.
Show resolved Hide resolved
case Msg of
#msg{properties = #'v1_0.properties'{correlation_id = {_Type, CorrId}}} ->
CorrId;
_ ->
undefined
end;
get_property(message_id, Msg) ->
case Msg of
#msg{properties = #'v1_0.properties'{message_id = {_Type, CorrId}}} ->
CorrId;
_ ->
undefined
end;
get_property(ttl, Msg) ->
case Msg of
#msg{header = #'v1_0.header'{ttl = {_, Ttl}}} ->
Expand All @@ -173,9 +158,7 @@ get_property(priority, Msg) ->
_ ->
undefined
end
end;
get_property(_P, _Msg) ->
undefined.
end.

convert_to(?MODULE, Msg, _Env) ->
Msg;
Expand All @@ -188,8 +171,8 @@ serialize(Sections) ->
encode_bin(Sections).

protocol_state(Msg, Anns) ->
Exchange = maps:get(exchange, Anns),
[RKey | _] = maps:get(routing_keys, Anns),
#{?ANN_EXCHANGE := Exchange,
?ANN_ROUTING_KEYS := [RKey | _]} = Anns,

%% any x-* annotations get added as message annotations
AnnsToAdd = maps:filter(fun (Key, _) -> mc_util:is_x_header(Key) end, Anns),
Expand Down Expand Up @@ -426,11 +409,11 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
undefined
end,
Anns = maps_put_falsy(
durable, Durable,
?ANN_DURABLE, Durable,
maps_put_truthy(
priority, Priority,
?ANN_PRIORITY, Priority,
maps_put_truthy(
timestamp, Timestamp,
?ANN_TIMESTAMP, Timestamp,
maps_put_truthy(
ttl, Ttl,
maps_put_truthy(
Expand All @@ -443,20 +426,20 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
lists:foldl(
fun ({{symbol, <<"x-routing-key">>},
{utf8, Key}}, Acc) ->
maps:update_with(routing_keys,
maps:update_with(?ANN_ROUTING_KEYS,
fun(L) -> [Key | L] end,
[Key],
Acc);
({{symbol, <<"x-cc">>},
{list, CCs0}}, Acc) ->
CCs = [CC || {_T, CC} <- CCs0],
maps:update_with(routing_keys,
maps:update_with(?ANN_ROUTING_KEYS,
fun(L) -> L ++ CCs end,
CCs,
Acc);
({{symbol, <<"x-exchange">>},
{utf8, Exchange}}, Acc) ->
Acc#{exchange => Exchange};
Acc#{?ANN_EXCHANGE => Exchange};
(_, Acc) ->
Acc
end, Anns, MA)
Expand Down
12 changes: 6 additions & 6 deletions deps/rabbit/src/mc_amqpl.erl
Expand Up @@ -420,7 +420,7 @@ protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C,
end, Headers1)
end,
Timestamp = case Anns of
#{timestamp := Ts} ->
#{?ANN_TIMESTAMP := Ts} ->
Ts div 1000;
_ ->
undefined
Expand Down Expand Up @@ -473,8 +473,8 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
HeaderRoutes ->
{ok, mc:init(?MODULE,
rabbit_basic:strip_bcc_header(Content),
Anns#{routing_keys => [RoutingKey | HeaderRoutes],
exchange => ExchangeNameBin})}
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
?ANN_EXCHANGE => ExchangeNameBin})}
end;
message(#resource{} = XName, RoutingKey,
#content{} = Content, Anns, false) ->
Expand Down Expand Up @@ -707,13 +707,13 @@ essential_properties(#content{} = C) ->
end,
Durable = Mode == 2,
maps_put_truthy(
priority, Priority,
?ANN_PRIORITY, Priority,
maps_put_truthy(
ttl, MsgTTL,
maps_put_truthy(
timestamp, Timestamp,
?ANN_TIMESTAMP, Timestamp,
maps_put_falsy(
durable, Durable,
?ANN_DURABLE, Durable,
#{})))).

%% headers that are added as annotations during conversions
Expand Down
10 changes: 5 additions & 5 deletions deps/rabbit/src/mc_compat.erl
Expand Up @@ -49,18 +49,18 @@ is(_) ->
false.

-spec get_annotation(mc:ann_key(), state()) -> mc:ann_value() | undefined.
get_annotation(routing_keys, #basic_message{routing_keys = RKeys}) ->
get_annotation(?ANN_ROUTING_KEYS, #basic_message{routing_keys = RKeys}) ->
RKeys;
get_annotation(exchange, #basic_message{exchange_name = Ex}) ->
get_annotation(?ANN_EXCHANGE, #basic_message{exchange_name = Ex}) ->
Ex#resource.name;
get_annotation(id, #basic_message{id = Id}) ->
Id.

set_annotation(id, Value, #basic_message{} = Msg) ->
Msg#basic_message{id = Value};
set_annotation(routing_keys, Value, #basic_message{} = Msg) ->
set_annotation(?ANN_ROUTING_KEYS, Value, #basic_message{} = Msg) ->
Msg#basic_message{routing_keys = Value};
set_annotation(exchange, Value, #basic_message{exchange_name = Ex} = Msg) ->
set_annotation(?ANN_EXCHANGE, Value, #basic_message{exchange_name = Ex} = Msg) ->
Msg#basic_message{exchange_name = Ex#resource{name = Value}};
set_annotation(<<"x-", _/binary>> = Key, Value,
#basic_message{content = Content0} = Msg) ->
Expand Down Expand Up @@ -88,7 +88,7 @@ set_annotation(<<"x-", _/binary>> = Key, Value,
Msg#basic_message{content = C};
set_annotation(<<"timestamp_in_ms">> = Name, Value, #basic_message{} = Msg) ->
rabbit_basic:add_header(Name, long, Value, Msg);
set_annotation(timestamp, Millis,
set_annotation(?ANN_TIMESTAMP, Millis,
#basic_message{content = #content{properties = B} = C0} = Msg) ->
C = C0#content{properties = B#'P_basic'{timestamp = Millis div 1000},
properties_bin = none},
Expand Down
21 changes: 10 additions & 11 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -688,8 +688,8 @@ handle_cast({deliver_reply, Key, Msg},
next_tag = DeliveryTag,
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
Content = mc:protocol_state(mc:convert(mc_amqpl, Msg)),
ExchName = mc:get_annotation(exchange, Msg),
[RoutingKey | _] = mc:get_annotation(routing_keys, Msg),
ExchName = mc:exchange(Msg),
[RoutingKey | _] = mc:routing_keys(Msg),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.deliver'{consumer_tag = ConsumerTag,
Expand Down Expand Up @@ -2125,8 +2125,7 @@ notify_limiter(Limiter, Acked) ->

deliver_to_queues({Message, _Options, _RoutedToQueues} = Delivery,
#ch{cfg = #conf{virtual_host = VHost}} = State) ->
XNameBin = mc:get_annotation(exchange, Message),
XName = rabbit_misc:r(VHost, exchange, XNameBin),
XName = rabbit_misc:r(VHost, exchange, mc:exchange(Message)),
deliver_to_queues(XName, Delivery, State).

deliver_to_queues(XName,
Expand Down Expand Up @@ -2192,7 +2191,7 @@ process_routing_mandatory(_Mandatory = true,
false ->
Content0
end,
[RoutingKey | _] = mc:get_annotation(routing_keys, Msg),
[RoutingKey | _] = mc:routing_keys(Msg),
ok = basic_return(Content, RoutingKey, XName#resource.name, State, no_route);
process_routing_mandatory(_Mandatory = false,
_RoutedToQs = [],
Expand Down Expand Up @@ -2673,14 +2672,14 @@ handle_deliver0(ConsumerTag, AckRequired,
writer_gc_threshold = GCThreshold},
next_tag = DeliveryTag,
queue_states = Qs}) ->
[RoutingKey | _] = mc:get_annotation(routing_keys, MsgCont0),
ExchangeNameBin = mc:get_annotation(exchange, MsgCont0),
Exchange = mc:exchange(MsgCont0),
[RoutingKey | _] = mc:routing_keys(MsgCont0),
MsgCont = mc:convert(mc_amqpl, MsgCont0),
Content = mc:protocol_state(MsgCont),
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
redelivered = Redelivered,
exchange = ExchangeNameBin,
exchange = Exchange,
routing_key = RoutingKey},
{ok, QueueType} = rabbit_queue_type:module(QName, Qs),
case QueueType of
Expand All @@ -2699,15 +2698,15 @@ handle_deliver0(ConsumerTag, AckRequired,
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
Msg0 = {_QName, _QPid, _MsgId, Redelivered, MsgCont0},
QueueType, State) ->
[RoutingKey | _] = mc:get_annotation(routing_keys, MsgCont0),
ExchangeName = mc:get_annotation(exchange, MsgCont0),
Exchange = mc:exchange(MsgCont0),
[RoutingKey | _] = mc:routing_keys(MsgCont0),
MsgCont = mc:convert(mc_amqpl, MsgCont0),
Content = mc:protocol_state(MsgCont),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
redelivered = Redelivered,
exchange = ExchangeName,
exchange = Exchange,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
Expand Down
7 changes: 4 additions & 3 deletions deps/rabbit/src/rabbit_dead_letter.erl
Expand Up @@ -6,6 +6,7 @@
%%

-module(rabbit_dead_letter).
-include("mc.hrl").

-export([publish/5,
detect_cycles/3]).
Expand All @@ -26,15 +27,15 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
#resource{name = SourceQName}) ->
DLRKeys = case RK of
undefined ->
mc:get_annotation(routing_keys, Msg0);
mc:routing_keys(Msg0);
_ ->
[RK]
end,
Msg1 = mc:record_death(Reason, SourceQName, Msg0),
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
Msg3 = mc:set_ttl(Ttl, Msg2),
Msg4 = mc:set_annotation(routing_keys, DLRKeys, Msg3),
DLMsg = mc:set_annotation(exchange, XName#resource.name, Msg4),
Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3),
DLMsg = mc:set_annotation(?ANN_EXCHANGE, XName#resource.name, Msg4),
Routed = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}),
{QNames, Cycles} = detect_cycles(Reason, DLMsg, Routed),
lists:foreach(fun log_cycle_once/1, Cycles),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_exchange.erl
Expand Up @@ -351,7 +351,7 @@ route(Exchange, Message) ->
route(#exchange{name = #resource{name = ?DEFAULT_EXCHANGE_NAME,
virtual_host = VHost}},
Message, _Opts) ->
RKs0 = mc:get_annotation(routing_keys, Message),
RKs0 = mc:routing_keys(Message),
RKs = lists:usort(RKs0),
[begin
case virtual_reply_queue(RK) of
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_exchange_type_direct.erl
Expand Up @@ -35,7 +35,7 @@ route(#exchange{name = Name, type = Type}, Msg) ->
route(#exchange{name = Name, type = Type}, Msg, #{}).

route(#exchange{name = Name, type = Type}, Msg, _Opts) ->
Routes = mc:get_annotation(routing_keys, Msg),
Routes = mc:routing_keys(Msg),
rabbit_db_binding:match_routing_key(Name, Routes, Type =:= direct).

validate(_X) -> ok.
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_exchange_type_topic.erl
Expand Up @@ -40,7 +40,7 @@ route(Exchange, Msg) ->
route(Exchange, Msg, #{}).

route(#exchange{name = XName}, Msg, Opts) ->
RKeys = mc:get_annotation(routing_keys, Msg),
RKeys = mc:routing_keys(Msg),
lists:append([rabbit_db_topic_exchange:match(XName, RKey, Opts) || RKey <- RKeys]).

validate(_X) -> ok.
Expand Down