diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 6fb752535342..6f9478697a9e 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -14,7 +14,8 @@ timestamp/1, priority/1, set_ttl/2, - proto_header/2, + x_header/2, + routing_headers/2, %% convert/2, protocol_state/1, @@ -75,9 +76,12 @@ {MetadataSize :: non_neg_integer(), PayloadSize :: non_neg_integer()}. --callback header(binary(), proto_state()) -> +-callback x_header(binary(), proto_state()) -> {property_value(), proto_state()}. +-callback routing_headers(proto_state(), [x_headers | complex_types]) -> + #{binary() => term()}. + %% all protocol must be able to convert to amqp (1.0) -callback convert(protocol(), proto_state()) -> proto_state() | not_supported. @@ -134,14 +138,42 @@ set_annotation(Key, Value, #?MODULE{annotations = Anns} = State) -> set_annotation(Key, Value, BasicMessage) -> mc_compat:set_annotation(Key, Value, BasicMessage). --spec proto_header(Key :: binary(), state()) -> +-spec x_header(Key :: binary(), state()) -> property_value() | undefined. -proto_header(Key, #?MODULE{protocol = Proto, - data = Data}) -> - {Result, _} = Proto:header(Key, Data), - Result; -proto_header(Key, BasicMsg) -> - mc_compat:proto_header(Key, BasicMsg). +x_header(Key, #?MODULE{protocol = Proto, + annotations = Anns, + data = Data}) -> + %% x-headers may be have been added to the annotations map so + %% we need to check that first + case Anns of + #{Key := Value} -> + Value; + _ -> + %% if not we have to call into the protocol specific handler + {Result, _} = Proto:x_header(Key, Data), + Result + end; +x_header(Key, BasicMsg) -> + mc_compat:x_header(Key, BasicMsg). + +-spec routing_headers(state(), [x_header | complex_types]) -> + #{binary() => property_value()}. +routing_headers(#?MODULE{protocol = Proto, + annotations = Anns, + data = Data}, Options) -> + %% TODO: fake death headers also as this is what most users + %% use for x- filtering + New = case lists:member(x_headers, Options) of + true -> + maps:filter(fun (<<"x-", _/binary>>, _) -> true; + (_, _) -> false + end, Anns); + false -> + #{} + end, + maps:merge(Proto:routing_headers(Data, Options), New); +routing_headers(Key, BasicMsg) -> + mc_compat:routing_headers(Key, BasicMsg). -spec is_persistent(state()) -> boolean(). is_persistent(#?MODULE{annotations = Anns}) -> @@ -224,14 +256,14 @@ prepare(State) -> record_death(Reason, SourceQueue, #?MODULE{protocol = _Mod, data = _Data, - annotations = Anns, + annotations = Anns0, deaths = Ds0} = State) when is_atom(Reason) andalso is_binary(SourceQueue) -> Key = {SourceQueue, Reason}, - Exchange = maps:get(exchange, Anns), - RoutingKeys = maps:get(routing_keys, Anns), + Exchange = maps:get(exchange, Anns0), + RoutingKeys = maps:get(routing_keys, Anns0), Timestamp = os:system_time(millisecond), - Ttl = maps:get(ttl, Anns, undefined), + Ttl = maps:get(ttl, Anns0, undefined), case Ds0 of undefined -> Ds = #deaths{last = Key, @@ -241,8 +273,12 @@ record_death(Reason, SourceQueue, exchange = Exchange, routing_keys = RoutingKeys, timestamp = Timestamp}}}, + Anns = Anns0#{<<"x-first-death-reason">> => atom_to_binary(Reason), + <<"x-first-death-queue">> => SourceQueue, + <<"x-first-death-exchange">> => Exchange}, - State#?MODULE{deaths = Ds}; + State#?MODULE{deaths = Ds, + annotations = Anns}; #deaths{records = Rs} -> Death = #death{count = C} = maps:get(Key, Rs, #death{ttl = Ttl, diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index 7525ab20d88f..7a2f968bb7bb 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -18,7 +18,8 @@ timestamp/1, priority/1, set_ttl/2, - proto_header/2, + x_header/2, + routing_headers/2, %%% convert/2, protocol_state/1, @@ -84,30 +85,32 @@ set_annotation(<<"x-", _/binary>> = Key, Value, is_persistent(#basic_message{content = Content}) -> - element(1, get_property(durable, Content)). + get_property(durable, Content). ttl(#basic_message{content = Content}) -> - element(1, get_property(ttl, Content)). + get_property(?FUNCTION_NAME, Content). timestamp(#basic_message{content = Content}) -> - element(1, get_property(timestamp, Content)). + get_property(?FUNCTION_NAME, Content). priority(#basic_message{content = Content}) -> - element(1, get_property(priority, Content)). + get_property(?FUNCTION_NAME, Content). correlation_id(#basic_message{content = Content}) -> - element(1, get_property(correlation_id, Content)). + get_property(?FUNCTION_NAME, Content). message_id(#basic_message{content = Content}) -> - element(1, get_property(message_id, Content)). + get_property(message_id, Content). set_ttl(Value, #basic_message{content = Content0} = Msg) -> Content = rabbit_mc_amqp_legacy:set_property(ttl, Value, Content0), Msg#basic_message{content = Content}. -proto_header(Key,#basic_message{content = Content}) -> - element(1, rabbit_mc_amqp_legacy:header(Key, Content)). +x_header(Key,#basic_message{content = Content}) -> + element(1, rabbit_mc_amqp_legacy:x_header(Key, Content)). +routing_headers(#basic_message{content = Content}, Opts) -> + rabbit_mc_amqp_legacy:routing_headers(Content, Opts). convert(rabbit_mc_amqp_legacy, #basic_message{} = BasicMsg) -> BasicMsg; diff --git a/deps/rabbit/src/rabbit_exchange_type_headers.erl b/deps/rabbit/src/rabbit_exchange_type_headers.erl index e9b55669f98d..0283dcdf3288 100644 --- a/deps/rabbit/src/rabbit_exchange_type_headers.erl +++ b/deps/rabbit/src/rabbit_exchange_type_headers.erl @@ -32,17 +32,58 @@ description() -> serialise_events() -> false. -route(#exchange{name = Name}, Msg0) -> - %% TODO converting to amqp legacy means this will be slow for all protocols - %% except amqp legacy, ok for now to get it working but will need addressing - Msg = mc:convert(rabbit_mc_amqp_legacy, Msg0), - #content{} = Content = mc:protocol_state(Msg), - Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> rabbit_misc:sort_field_table(H) - end, +route(#exchange{name = Name}, Msg) -> + %% TODO: find a way not to extract x headers unless necessary + Headers = mc:routing_headers(Msg, [x_headers]), + rabbit_router:match_bindings( - Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end). + Name, fun(#binding{args = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-match">>) of + {longstr, <<"any">>} -> + match_any(Args, Headers, fun match/2); + {longstr, <<"any-with-x">>} -> + match_any(Args, Headers, fun match_x/2); + {longstr, <<"all-with-x">>} -> + match_all(Args, Headers, fun match_x/2); + _ -> + match_all(Args, Headers, fun match/2) + end + end). + +match_x({<<"x-match">>, _, _}, _M) -> + skip; +match_x({K, void, _}, M) -> + maps:is_key(K, M); +match_x({K, _, V}, M) -> + maps:get(K, M, undefined) =:= V. + +match({<<"x-", _/binary>>, _, _}, _M) -> + skip; +match({K, void, _}, M) -> + maps:is_key(K, M); +match({K, _, V}, M) -> + maps:get(K, M, undefined) =:= V. + + +match_all([], _, _MatchFun) -> + true; +match_all([Arg | Rem], M, Fun) -> + case Fun(Arg, M) of + false -> + false; + _ -> + match_all(Rem, M, Fun) + end. + +match_any([], _, _Fun) -> + false; +match_any([Arg | Rem], M, Fun) -> + case Fun(Arg, M) of + true -> + true; + _ -> + match_any(Rem, M, Fun) + end. validate_binding(_X, #binding{args = Args}) -> case rabbit_misc:table_lookup(Args, <<"x-match">>) of diff --git a/deps/rabbit/src/rabbit_mc_amqp.erl b/deps/rabbit/src/rabbit_mc_amqp.erl index abf8b8122370..63174fb8609f 100644 --- a/deps/rabbit/src/rabbit_mc_amqp.erl +++ b/deps/rabbit/src/rabbit_mc_amqp.erl @@ -10,7 +10,8 @@ init/1, init_amqp/1, size/1, - header/2, + x_header/2, + routing_headers/2, get_property/2, convert/2, protocol_state/3, @@ -28,6 +29,10 @@ #'v1_0.amqp_value'{} | #'v1_0.footer'{}. +-define(SIMPLE_VALUE(V), is_binary(V) orelse + is_number(V) orelse + is_boolean(V)). + -type maybe(T) :: T | undefined. -type amqp10_data() :: #'v1_0.data'{} | [#'v1_0.amqp_sequence'{} | #'v1_0.data'{}] | @@ -67,9 +72,20 @@ size(#msg{data = #'v1_0.data'{content = Data}}) -> MetaSize = 0, {MetaSize, iolist_size(Data)}. -header(Key, Msg) -> +x_header(Key, Msg) -> {_Type, Value} = message_annotation(Key, Msg, undefined), - {Value, Msg}. + Value. + +routing_headers(Msg, Opts) -> + IncludeX = lists:member(x_header, Opts), + X = case IncludeX of + true -> + message_annotations_as_simple_map(Msg); + false -> + #{} + end, + application_properties_as_simple_map(Msg, X). + get_property(durable, Msg) -> case Msg of @@ -187,6 +203,31 @@ message_annotation(Key, #msg{message_annotations = Default end. +message_annotations_as_simple_map(#msg{message_annotations = undefined}) -> + #{}; +message_annotations_as_simple_map( + #msg{message_annotations = #'v1_0.message_annotations'{content = Content}}) -> + %% the section record format really is terrible + lists:foldl(fun ({{symbol, K},{_T, V}}, Acc) + when ?SIMPLE_VALUE(V) -> + Acc#{K => V}; + (_, Acc)-> + Acc + end, #{}, Content). + +application_properties_as_simple_map(#msg{application_properties = undefined}, M) -> + M; +application_properties_as_simple_map( + #msg{application_properties = #'v1_0.application_properties'{content = Content}}, + M) -> + %% the section record format really is terrible + lists:foldl(fun ({{symbol, K}, {_T, V}}, Acc) + when ?SIMPLE_VALUE(V) -> + Acc#{K => V}; + (_, Acc)-> + Acc + end, M, Content). + decode([], Acc) -> Acc; decode([#'v1_0.header'{} = H | Rem], Msg) -> diff --git a/deps/rabbit/src/rabbit_mc_amqp_legacy.erl b/deps/rabbit/src/rabbit_mc_amqp_legacy.erl index 1efba57c8c05..55132f5e3ac2 100644 --- a/deps/rabbit/src/rabbit_mc_amqp_legacy.erl +++ b/deps/rabbit/src/rabbit_mc_amqp_legacy.erl @@ -10,7 +10,8 @@ init/1, init_amqp/1, size/1, - header/2, + x_header/2, + routing_headers/2, % get_property/2, % set_property/3, convert/2, @@ -19,7 +20,8 @@ message/4, message/5, from_basic_message/1, - set_property/3 + set_property/3, + serialize/2 ]). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 @@ -135,9 +137,9 @@ size(#content{properties_bin = PropsBin, end, {MetaSize, iolist_size(Payload)}. -header(_Key, #content{properties = #'P_basic'{headers = undefined}} = C) -> +x_header(_Key, #content{properties = #'P_basic'{headers = undefined}} = C) -> {undefined, C}; -header(Key, #content{properties = #'P_basic'{headers = Headers}} = C) -> +x_header(Key, #content{properties = #'P_basic'{headers = Headers}} = C) -> case rabbit_misc:table_lookup(Headers, Key) of undefined -> {undefined, C}; @@ -145,6 +147,25 @@ header(Key, #content{properties = #'P_basic'{headers = Headers}} = C) -> {Value, C} end. +routing_headers(#content{properties = #'P_basic'{headers = undefined}}, _Opts) -> + #{}; +routing_headers(#content{properties = #'P_basic'{headers = Headers}}, Opts) -> + IncludeX = lists:member(x_headers, Opts), + %% TODO: filter out complex AMQP legacy values such as array and table? + lists:foldl( + fun({<<"x-", _/binary>> = Key, _T, Value}, Acc) -> + case IncludeX of + true -> + Acc#{Key => Value}; + false -> + Acc + end; + ({Key, _T, Value}, Acc) -> + Acc#{Key => Value} + end, #{}, Headers). + + + % get_property(durable, % #content{properties = #'P_basic'{delivery_mode = Mode}} = C) -> % {Mode == 2, C}; @@ -172,6 +193,9 @@ set_property(_P, _V, Msg) -> %% TODO: impl at least ttl set (needed for dead lettering) Msg. +serialize(_, _) -> + <<>>. + convert(?MODULE, C) -> C; convert(rabbit_mc_amqp, #content{properties = Props, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 6f569eef5fad..c2d884a8bc21 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1405,7 +1405,7 @@ deliver_to_client(Msgs, Ack, State) -> deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Msg} = Delivery, AckRequired, State0) -> %% internal annotation, should be atom annotation key? - PublisherQoS = case mc:proto_header(<<"x-mqtt-publish-qos">>, Msg) of + PublisherQoS = case mc:x_header(<<"x-mqtt-publish-qos">>, Msg) of undefined -> %% non-MQTT publishes are assumed to be QoS 1 regardless of delivery_mode ?QOS_1; diff --git a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl index dd335c3af7d4..ab1f506d829d 100644 --- a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl +++ b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl @@ -123,7 +123,7 @@ disable_plugin() -> %%---------------------------------------------------------------------------- %%private maybe_cache_msg(XName, Message, Length) -> - case mc:proto_header(<<"x-recent-history-no-store">>, Message) of + case mc:x_header(<<"x-recent-history-no-store">>, Message) of true -> ok; _ ->