From 95dd67003b70c2b7ab3bc7a0862d6a484aae84dc Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 27 Feb 2025 10:16:34 +0100 Subject: [PATCH] Handle mc_amqp 3.13 `msg` record in 4.x The `msg` record was used in 3.13. This commit makes 4.x understand this record for backward compatibility, specifically for the rare case where: 1. a 3.13 node internally parsed a message from a stream via ``` Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{}) ``` 2. published this Message to a queue 3. RabbitMQ got upgraded to 4.x (This commit can be reverted in some future RabbitMQ version once it's safe to assume that these upgraded messages have been consumed.) The changes were manually tested as described in Jira RMQ-1525. (cherry picked from commit 91f5ce2544ab549f70b5a86781e19fb49ee59cc3) (cherry picked from commit 75cffc9ff521761d9b34c595ceb4e65ff3f1afcb) --- deps/rabbit/src/mc_amqp.erl | 82 ++++++++++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 6 deletions(-) diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index be63597c3f96..9e8ef28406a5 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -47,6 +47,29 @@ Val :: term()}]. -type opt(T) :: T | undefined. +%% This representation was used in v3.13.7. 4.x understands this record for +%% backward compatibility, specifically for the rare case where: +%% 1. a 3.13 node internally parsed a message from a stream via +%% ``` +%% Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{}) +%% ``` +%% 2. published this Message to a queue +%% 3. RabbitMQ got upgraded to 4.x +%% +%% This record along with all its conversions in this module can therefore +%% be deleted in some future RabbitMQ version once it's safe to assume that +%% these upgraded messages have all been consumed. +-record(msg, + { + header :: opt(#'v1_0.header'{}), + delivery_annotations = []:: list(), + message_annotations = [] :: list(), + properties :: opt(#'v1_0.properties'{}), + application_properties = [] :: list(), + data = [] :: amqp10_data(), + footer = [] :: list() + }). + %% This representation is used when the message was originally sent with %% a protocol other than AMQP and the message was not read from a stream. -record(msg_body_decoded, @@ -94,7 +117,7 @@ body_code :: body_descriptor_code() }). --opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}. +-opaque state() :: #msg{} | #msg_body_decoded{} | #msg_body_encoded{} | #v1{}. -export_type([state/0]). @@ -109,6 +132,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) -> convert_from(_SourceProto, _, _Env) -> not_implemented. +convert_to(?MODULE, Msg = #msg{}, _Env) -> + convert_from_3_13_msg(Msg); convert_to(?MODULE, Msg, _Env) -> Msg; convert_to(TargetProto, Msg, Env) -> @@ -120,7 +145,22 @@ size(#v1{message_annotations = MA, [] -> 0; _ -> ?MESSAGE_ANNOTATIONS_GUESS_SIZE end, - {MetaSize, byte_size(Body)}. + {MetaSize, byte_size(Body)}; +%% Copied from v3.13.7. +%% This might be called in rabbit_fifo_v3 and must therefore not be modified +%% to ensure determinism of quorum queues version 3. +size(#msg{data = Body}) -> + BodySize = if is_list(Body) -> + lists:foldl( + fun(#'v1_0.data'{content = Data}, Acc) -> + iolist_size(Data) + Acc; + (#'v1_0.amqp_sequence'{content = _}, Acc) -> + Acc + end, 0, Body); + is_record(Body, 'v1_0.amqp_value') -> + 0 + end, + {_MetaSize = 0, BodySize}. x_header(Key, Msg) -> message_annotation(Key, Msg, undefined). @@ -129,6 +169,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) -> undefined; property(Prop, #msg_body_encoded{properties = Props}) -> property0(Prop, Props); +property(_Prop, #msg{properties = undefined}) -> + undefined; +property(Prop, #msg{properties = Props}) -> + property0(Prop, Props); property(_Prop, #v1{bare_and_footer_properties_pos = ?OMITTED_SECTION}) -> undefined; property(Prop, #v1{bare_and_footer = Bin, @@ -260,7 +304,9 @@ protocol_state(#v1{message_annotations = MA0, ttl = Ttl}, Anns), MA = protocol_state_message_annotations(MA0, Anns), Sections = to_sections(Header, MA, []), - [encode(Sections), BareAndFooter]. + [encode(Sections), BareAndFooter]; +protocol_state(#msg{} = Msg, Anns) -> + protocol_state(convert_from_3_13_msg(Msg), Anns). prepare(read, Msg) -> Msg; @@ -284,7 +330,9 @@ prepare(store, #msg_body_encoded{ bare_and_footer_application_properties_pos = AppPropsPos, bare_and_footer_body_pos = BodyPos, body_code = BodyCode - }. + }; +prepare(store, Msg = #msg{}) -> + Msg. %% internal @@ -341,7 +389,9 @@ msg_to_sections(#v1{message_annotations = MAC, Sections = amqp10_framing:decode_bin(Bin), Sections ++ [{amqp_encoded_body_and_footer, BodyAndFooterBin}] end, - to_sections(undefined, MAC, Tail). + to_sections(undefined, MAC, Tail); +msg_to_sections(#msg{} = Msg) -> + msg_to_sections(convert_from_3_13_msg(Msg)). to_sections(H, MAC, P, APC, Tail) -> S0 = case APC of @@ -372,6 +422,20 @@ to_sections(H, MAC, Tail) -> [H | S] end. +convert_from_3_13_msg(#msg{header = H, + delivery_annotations = _, + message_annotations = MAC, + properties = P, + application_properties = APC, + data = Data, + footer = FC}) -> + #msg_body_decoded{header = H, + message_annotations = MAC, + properties = P, + application_properties = APC, + data = Data, + footer = FC}. + -spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) -> amqp_annotations(). protocol_state_message_annotations(MA, Anns) -> @@ -444,11 +508,14 @@ message_annotation(Key, State, Default) message_annotations(#msg_body_decoded{message_annotations = L}) -> L; message_annotations(#msg_body_encoded{message_annotations = L}) -> L; -message_annotations(#v1{message_annotations = L}) -> L. +message_annotations(#v1{message_annotations = L}) -> L; +message_annotations(#msg{message_annotations = L}) -> L. message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) -> message_annotations_as_simple_map0(Content); message_annotations_as_simple_map(#v1{message_annotations = Content}) -> + message_annotations_as_simple_map0(Content); +message_annotations_as_simple_map(#msg{message_annotations = Content}) -> message_annotations_as_simple_map0(Content). message_annotations_as_simple_map0(Content) -> @@ -463,6 +530,9 @@ message_annotations_as_simple_map0(Content) -> application_properties_as_simple_map( #msg_body_encoded{application_properties = Content}, L) -> application_properties_as_simple_map0(Content, L); +application_properties_as_simple_map( + #msg{application_properties = Content}, L) -> + application_properties_as_simple_map0(Content, L); application_properties_as_simple_map( #v1{bare_and_footer_application_properties_pos = ?OMITTED_SECTION}, L) -> L;