Skip to content

Commit

Permalink
Refine routing header stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Jun 15, 2023
1 parent 8446105 commit 79a737c
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 42 deletions.
64 changes: 50 additions & 14 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
timestamp/1,
priority/1,
set_ttl/2,
proto_header/2,
x_header/2,
routing_headers/2,
%%
convert/2,
protocol_state/1,
Expand Down Expand Up @@ -75,9 +76,12 @@
{MetadataSize :: non_neg_integer(),
PayloadSize :: non_neg_integer()}.

-callback header(binary(), proto_state()) ->
-callback x_header(binary(), proto_state()) ->
{property_value(), proto_state()}.

-callback routing_headers(proto_state(), [x_headers | complex_types]) ->
#{binary() => term()}.

%% all protocol must be able to convert to amqp (1.0)
-callback convert(protocol(), proto_state()) ->
proto_state() | not_supported.
Expand Down Expand Up @@ -134,14 +138,42 @@ set_annotation(Key, Value, #?MODULE{annotations = Anns} = State) ->
set_annotation(Key, Value, BasicMessage) ->
mc_compat:set_annotation(Key, Value, BasicMessage).

-spec proto_header(Key :: binary(), state()) ->
-spec x_header(Key :: binary(), state()) ->
property_value() | undefined.
proto_header(Key, #?MODULE{protocol = Proto,
data = Data}) ->
{Result, _} = Proto:header(Key, Data),
Result;
proto_header(Key, BasicMsg) ->
mc_compat:proto_header(Key, BasicMsg).
x_header(Key, #?MODULE{protocol = Proto,
annotations = Anns,
data = Data}) ->
%% x-headers may be have been added to the annotations map so
%% we need to check that first
case Anns of
#{Key := Value} ->
Value;
_ ->
%% if not we have to call into the protocol specific handler
{Result, _} = Proto:x_header(Key, Data),
Result
end;
x_header(Key, BasicMsg) ->
mc_compat:x_header(Key, BasicMsg).

-spec routing_headers(state(), [x_header | complex_types]) ->
#{binary() => property_value()}.
routing_headers(#?MODULE{protocol = Proto,
annotations = Anns,
data = Data}, Options) ->
%% TODO: fake death headers also as this is what most users
%% use for x- filtering
New = case lists:member(x_headers, Options) of
true ->
maps:filter(fun (<<"x-", _/binary>>, _) -> true;
(_, _) -> false
end, Anns);
false ->
#{}
end,
maps:merge(Proto:routing_headers(Data, Options), New);
routing_headers(Key, BasicMsg) ->
mc_compat:routing_headers(Key, BasicMsg).

-spec is_persistent(state()) -> boolean().
is_persistent(#?MODULE{annotations = Anns}) ->
Expand Down Expand Up @@ -224,14 +256,14 @@ prepare(State) ->
record_death(Reason, SourceQueue,
#?MODULE{protocol = _Mod,
data = _Data,
annotations = Anns,
annotations = Anns0,
deaths = Ds0} = State)
when is_atom(Reason) andalso is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
Exchange = maps:get(exchange, Anns),
RoutingKeys = maps:get(routing_keys, Anns),
Exchange = maps:get(exchange, Anns0),
RoutingKeys = maps:get(routing_keys, Anns0),
Timestamp = os:system_time(millisecond),
Ttl = maps:get(ttl, Anns, undefined),
Ttl = maps:get(ttl, Anns0, undefined),
case Ds0 of
undefined ->
Ds = #deaths{last = Key,
Expand All @@ -241,8 +273,12 @@ record_death(Reason, SourceQueue,
exchange = Exchange,
routing_keys = RoutingKeys,
timestamp = Timestamp}}},
Anns = Anns0#{<<"x-first-death-reason">> => atom_to_binary(Reason),
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange},

State#?MODULE{deaths = Ds};
State#?MODULE{deaths = Ds,
annotations = Anns};
#deaths{records = Rs} ->
Death = #death{count = C} = maps:get(Key, Rs,
#death{ttl = Ttl,
Expand Down
21 changes: 12 additions & 9 deletions deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
timestamp/1,
priority/1,
set_ttl/2,
proto_header/2,
x_header/2,
routing_headers/2,
%%%
convert/2,
protocol_state/1,
Expand Down Expand Up @@ -84,30 +85,32 @@ set_annotation(<<"x-", _/binary>> = Key, Value,


is_persistent(#basic_message{content = Content}) ->
element(1, get_property(durable, Content)).
get_property(durable, Content).

ttl(#basic_message{content = Content}) ->
element(1, get_property(ttl, Content)).
get_property(?FUNCTION_NAME, Content).

timestamp(#basic_message{content = Content}) ->
element(1, get_property(timestamp, Content)).
get_property(?FUNCTION_NAME, Content).

priority(#basic_message{content = Content}) ->
element(1, get_property(priority, Content)).
get_property(?FUNCTION_NAME, Content).

correlation_id(#basic_message{content = Content}) ->
element(1, get_property(correlation_id, Content)).
get_property(?FUNCTION_NAME, Content).

message_id(#basic_message{content = Content}) ->
element(1, get_property(message_id, Content)).
get_property(message_id, Content).

set_ttl(Value, #basic_message{content = Content0} = Msg) ->
Content = rabbit_mc_amqp_legacy:set_property(ttl, Value, Content0),
Msg#basic_message{content = Content}.

proto_header(Key,#basic_message{content = Content}) ->
element(1, rabbit_mc_amqp_legacy:header(Key, Content)).
x_header(Key,#basic_message{content = Content}) ->
element(1, rabbit_mc_amqp_legacy:x_header(Key, Content)).

routing_headers(#basic_message{content = Content}, Opts) ->
rabbit_mc_amqp_legacy:routing_headers(Content, Opts).

convert(rabbit_mc_amqp_legacy, #basic_message{} = BasicMsg) ->
BasicMsg;
Expand Down
61 changes: 51 additions & 10 deletions deps/rabbit/src/rabbit_exchange_type_headers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,58 @@ description() ->

serialise_events() -> false.

route(#exchange{name = Name}, Msg0) ->
%% TODO converting to amqp legacy means this will be slow for all protocols
%% except amqp legacy, ok for now to get it working but will need addressing
Msg = mc:convert(rabbit_mc_amqp_legacy, Msg0),
#content{} = Content = mc:protocol_state(Msg),
Headers = case (Content#content.properties)#'P_basic'.headers of
undefined -> [];
H -> rabbit_misc:sort_field_table(H)
end,
route(#exchange{name = Name}, Msg) ->
%% TODO: find a way not to extract x headers unless necessary
Headers = mc:routing_headers(Msg, [x_headers]),

rabbit_router:match_bindings(
Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end).
Name, fun(#binding{args = Args}) ->
case rabbit_misc:table_lookup(Args, <<"x-match">>) of
{longstr, <<"any">>} ->
match_any(Args, Headers, fun match/2);
{longstr, <<"any-with-x">>} ->
match_any(Args, Headers, fun match_x/2);
{longstr, <<"all-with-x">>} ->
match_all(Args, Headers, fun match_x/2);
_ ->
match_all(Args, Headers, fun match/2)
end
end).

match_x({<<"x-match">>, _, _}, _M) ->
skip;
match_x({K, void, _}, M) ->
maps:is_key(K, M);
match_x({K, _, V}, M) ->
maps:get(K, M, undefined) =:= V.

match({<<"x-", _/binary>>, _, _}, _M) ->
skip;
match({K, void, _}, M) ->
maps:is_key(K, M);
match({K, _, V}, M) ->
maps:get(K, M, undefined) =:= V.


match_all([], _, _MatchFun) ->
true;
match_all([Arg | Rem], M, Fun) ->
case Fun(Arg, M) of
false ->
false;
_ ->
match_all(Rem, M, Fun)
end.

match_any([], _, _Fun) ->
false;
match_any([Arg | Rem], M, Fun) ->
case Fun(Arg, M) of
true ->
true;
_ ->
match_any(Rem, M, Fun)
end.

validate_binding(_X, #binding{args = Args}) ->
case rabbit_misc:table_lookup(Args, <<"x-match">>) of
Expand Down
47 changes: 44 additions & 3 deletions deps/rabbit/src/rabbit_mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
init/1,
init_amqp/1,
size/1,
header/2,
x_header/2,
routing_headers/2,
get_property/2,
convert/2,
protocol_state/3,
Expand All @@ -28,6 +29,10 @@
#'v1_0.amqp_value'{} |
#'v1_0.footer'{}.

-define(SIMPLE_VALUE(V), is_binary(V) orelse
is_number(V) orelse
is_boolean(V)).

-type maybe(T) :: T | undefined.
-type amqp10_data() :: #'v1_0.data'{} |
[#'v1_0.amqp_sequence'{} | #'v1_0.data'{}] |
Expand Down Expand Up @@ -67,9 +72,20 @@ size(#msg{data = #'v1_0.data'{content = Data}}) ->
MetaSize = 0,
{MetaSize, iolist_size(Data)}.

header(Key, Msg) ->
x_header(Key, Msg) ->
{_Type, Value} = message_annotation(Key, Msg, undefined),
{Value, Msg}.
Value.

routing_headers(Msg, Opts) ->
IncludeX = lists:member(x_header, Opts),
X = case IncludeX of
true ->
message_annotations_as_simple_map(Msg);
false ->
#{}
end,
application_properties_as_simple_map(Msg, X).


get_property(durable, Msg) ->
case Msg of
Expand Down Expand Up @@ -187,6 +203,31 @@ message_annotation(Key, #msg{message_annotations =
Default
end.

message_annotations_as_simple_map(#msg{message_annotations = undefined}) ->
#{};
message_annotations_as_simple_map(
#msg{message_annotations = #'v1_0.message_annotations'{content = Content}}) ->
%% the section record format really is terrible
lists:foldl(fun ({{symbol, K},{_T, V}}, Acc)
when ?SIMPLE_VALUE(V) ->
Acc#{K => V};
(_, Acc)->
Acc
end, #{}, Content).

application_properties_as_simple_map(#msg{application_properties = undefined}, M) ->
M;
application_properties_as_simple_map(
#msg{application_properties = #'v1_0.application_properties'{content = Content}},
M) ->
%% the section record format really is terrible
lists:foldl(fun ({{symbol, K}, {_T, V}}, Acc)
when ?SIMPLE_VALUE(V) ->
Acc#{K => V};
(_, Acc)->
Acc
end, M, Content).

decode([], Acc) ->
Acc;
decode([#'v1_0.header'{} = H | Rem], Msg) ->
Expand Down
32 changes: 28 additions & 4 deletions deps/rabbit/src/rabbit_mc_amqp_legacy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
init/1,
init_amqp/1,
size/1,
header/2,
x_header/2,
routing_headers/2,
% get_property/2,
% set_property/3,
convert/2,
Expand All @@ -19,7 +20,8 @@
message/4,
message/5,
from_basic_message/1,
set_property/3
set_property/3,
serialize/2
]).

-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
Expand Down Expand Up @@ -135,16 +137,35 @@ size(#content{properties_bin = PropsBin,
end,
{MetaSize, iolist_size(Payload)}.

header(_Key, #content{properties = #'P_basic'{headers = undefined}} = C) ->
x_header(_Key, #content{properties = #'P_basic'{headers = undefined}} = C) ->
{undefined, C};
header(Key, #content{properties = #'P_basic'{headers = Headers}} = C) ->
x_header(Key, #content{properties = #'P_basic'{headers = Headers}} = C) ->
case rabbit_misc:table_lookup(Headers, Key) of
undefined ->
{undefined, C};
{_Type, Value} ->
{Value, C}
end.

routing_headers(#content{properties = #'P_basic'{headers = undefined}}, _Opts) ->
#{};
routing_headers(#content{properties = #'P_basic'{headers = Headers}}, Opts) ->
IncludeX = lists:member(x_headers, Opts),
%% TODO: filter out complex AMQP legacy values such as array and table?
lists:foldl(
fun({<<"x-", _/binary>> = Key, _T, Value}, Acc) ->
case IncludeX of
true ->
Acc#{Key => Value};
false ->
Acc
end;
({Key, _T, Value}, Acc) ->
Acc#{Key => Value}
end, #{}, Headers).



% get_property(durable,
% #content{properties = #'P_basic'{delivery_mode = Mode}} = C) ->
% {Mode == 2, C};
Expand Down Expand Up @@ -172,6 +193,9 @@ set_property(_P, _V, Msg) ->
%% TODO: impl at least ttl set (needed for dead lettering)
Msg.

serialize(_, _) ->
<<>>.

convert(?MODULE, C) ->
C;
convert(rabbit_mc_amqp, #content{properties = Props,
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ deliver_to_client(Msgs, Ack, State) ->
deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Msg} = Delivery,
AckRequired, State0) ->
%% internal annotation, should be atom annotation key?
PublisherQoS = case mc:proto_header(<<"x-mqtt-publish-qos">>, Msg) of
PublisherQoS = case mc:x_header(<<"x-mqtt-publish-qos">>, Msg) of
undefined ->
%% non-MQTT publishes are assumed to be QoS 1 regardless of delivery_mode
?QOS_1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ disable_plugin() ->
%%----------------------------------------------------------------------------
%%private
maybe_cache_msg(XName, Message, Length) ->
case mc:proto_header(<<"x-recent-history-no-store">>, Message) of
case mc:x_header(<<"x-recent-history-no-store">>, Message) of
true ->
ok;
_ ->
Expand Down

0 comments on commit 79a737c

Please sign in to comment.