diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 0975f65c57be..63f6e37e5eb9 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -50,6 +50,29 @@ Val :: term()}]. -type opt(T) :: T | undefined. +%% This representation was used in v3.13.7. 4.x understands this record for +%% backward compatibility, specifically for the rare case where: +%% 1. a 3.13 node internally parsed a message from a stream via +%% ``` +%% Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{}) +%% ``` +%% 2. published this Message to a queue +%% 3. RabbitMQ got upgraded to 4.x +%% +%% This record along with all its conversions in this module can therefore +%% be deleted in some future RabbitMQ version once it's safe to assume that +%% these upgraded messages have all been consumed. +-record(msg, + { + header :: opt(#'v1_0.header'{}), + delivery_annotations = []:: list(), + message_annotations = [] :: list(), + properties :: opt(#'v1_0.properties'{}), + application_properties = [] :: list(), + data = [] :: amqp10_data(), + footer = [] :: list() + }). + %% This representation is used when the message was originally sent with %% a protocol other than AMQP and the message was not read from a stream. -record(msg_body_decoded, @@ -97,7 +120,7 @@ body_code :: body_descriptor_code() }). --opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}. +-opaque state() :: #msg{} | #msg_body_decoded{} | #msg_body_encoded{} | #v1{}. -export_type([state/0]). @@ -128,6 +151,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) -> convert_from(_SourceProto, _, _Env) -> not_implemented. +convert_to(?MODULE, Msg = #msg{}, _Env) -> + convert_from_3_13_msg(Msg); convert_to(?MODULE, Msg, _Env) -> Msg; convert_to(TargetProto, Msg, Env) -> @@ -139,7 +164,22 @@ size(#v1{message_annotations = MA, [] -> 0; _ -> ?MESSAGE_ANNOTATIONS_GUESS_SIZE end, - {MetaSize, byte_size(Body)}. + {MetaSize, byte_size(Body)}; +%% Copied from v3.13.7. +%% This might be called in rabbit_fifo_v3 and must therefore not be modified +%% to ensure determinism of quorum queues version 3. +size(#msg{data = Body}) -> + BodySize = if is_list(Body) -> + lists:foldl( + fun(#'v1_0.data'{content = Data}, Acc) -> + iolist_size(Data) + Acc; + (#'v1_0.amqp_sequence'{content = _}, Acc) -> + Acc + end, 0, Body); + is_record(Body, 'v1_0.amqp_value') -> + 0 + end, + {_MetaSize = 0, BodySize}. x_header(Key, Msg) -> message_annotation(Key, Msg, undefined). @@ -151,6 +191,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) -> undefined; property(Prop, #msg_body_encoded{properties = Props}) -> property0(Prop, Props); +property(_Prop, #msg{properties = undefined}) -> + undefined; +property(Prop, #msg{properties = Props}) -> + property0(Prop, Props); property(_Prop, #v1{bare_and_footer_properties_pos = ?OMITTED_SECTION}) -> undefined; property(Prop, #v1{bare_and_footer = Bin, @@ -298,7 +342,9 @@ protocol_state(#v1{message_annotations = MA0, ttl = Ttl}, Anns), MA = protocol_state_message_annotations(MA0, Anns), Sections = to_sections(Header, MA, []), - [encode(Sections), BareAndFooter]. + [encode(Sections), BareAndFooter]; +protocol_state(#msg{} = Msg, Anns) -> + protocol_state(convert_from_3_13_msg(Msg), Anns). prepare(read, Msg) -> Msg; @@ -322,7 +368,9 @@ prepare(store, #msg_body_encoded{ bare_and_footer_application_properties_pos = AppPropsPos, bare_and_footer_body_pos = BodyPos, body_code = BodyCode - }. + }; +prepare(store, Msg = #msg{}) -> + Msg. %% internal @@ -379,7 +427,9 @@ msg_to_sections(#v1{message_annotations = MAC, Sections = amqp10_framing:decode_bin(Bin), Sections ++ [{amqp_encoded_body_and_footer, BodyAndFooterBin}] end, - to_sections(undefined, MAC, Tail). + to_sections(undefined, MAC, Tail); +msg_to_sections(#msg{} = Msg) -> + msg_to_sections(convert_from_3_13_msg(Msg)). to_sections(H, MAC, P, APC, Tail) -> S0 = case APC of @@ -410,6 +460,20 @@ to_sections(H, MAC, Tail) -> [H | S] end. +convert_from_3_13_msg(#msg{header = H, + delivery_annotations = _, + message_annotations = MAC, + properties = P, + application_properties = APC, + data = Data, + footer = FC}) -> + #msg_body_decoded{header = H, + message_annotations = MAC, + properties = P, + application_properties = APC, + data = Data, + footer = FC}. + -spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) -> amqp_annotations(). protocol_state_message_annotations(MA, Anns) -> @@ -482,11 +546,14 @@ message_annotation(Key, State, Default) message_annotations(#msg_body_decoded{message_annotations = L}) -> L; message_annotations(#msg_body_encoded{message_annotations = L}) -> L; -message_annotations(#v1{message_annotations = L}) -> L. +message_annotations(#v1{message_annotations = L}) -> L; +message_annotations(#msg{message_annotations = L}) -> L. message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) -> message_annotations_as_simple_map0(Content); message_annotations_as_simple_map(#v1{message_annotations = Content}) -> + message_annotations_as_simple_map0(Content); +message_annotations_as_simple_map(#msg{message_annotations = Content}) -> message_annotations_as_simple_map0(Content). message_annotations_as_simple_map0(Content) -> @@ -501,6 +568,9 @@ message_annotations_as_simple_map0(Content) -> application_properties_as_simple_map( #msg_body_encoded{application_properties = Content}, L) -> application_properties_as_simple_map0(Content, L); +application_properties_as_simple_map( + #msg{application_properties = Content}, L) -> + application_properties_as_simple_map0(Content, L); application_properties_as_simple_map( #v1{bare_and_footer_application_properties_pos = ?OMITTED_SECTION}, L) -> L;