Skip to content

Commit

Permalink
Merge pull request #250 from rabbitmq/message-containers-3-13
Browse files Browse the repository at this point in the history
Make plugin compatible with message containers (3.13)
  • Loading branch information
acogoluegnes committed Sep 19, 2023
2 parents 1b7524a + 86e864b commit 37f56e4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 75 deletions.
30 changes: 18 additions & 12 deletions src/rabbit_delayed_message.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@


-spec delay_message(rabbit_types:exchange(),
rabbit_types:delivery(),
mc:state(),
delay()) ->
nodelay | {ok, t_reference()}.

-spec internal_delay_message(t_reference(),
rabbit_types:exchange(),
rabbit_types:delivery(),
mc:state(),
delay()) ->
nodelay | {ok, t_reference()}.

Expand Down Expand Up @@ -80,8 +80,8 @@ start_link() ->
go() ->
gen_server:cast(?MODULE, go).

delay_message(Exchange, Delivery, Delay) ->
gen_server:call(?MODULE, {delay_message, Exchange, Delivery, Delay},
delay_message(Exchange, Message, Delay) ->
gen_server:call(?MODULE, {delay_message, Exchange, Message, Delay},
infinity).

setup_mnesia() ->
Expand Down Expand Up @@ -118,9 +118,9 @@ init([]) ->
_ = recover(),
{ok, #state{timer = not_set}}.

handle_call({delay_message, Exchange, Delivery, Delay},
handle_call({delay_message, Exchange, Message, Delay},
_From, State = #state{timer = CurrTimer}) ->
Reply = {ok, NewTimer} = internal_delay_message(CurrTimer, Exchange, Delivery, Delay),
Reply = {ok, NewTimer} = internal_delay_message(CurrTimer, Exchange, Message, Delay),
State2 = State#state{timer = NewTimer},
{reply, Reply, State2};
handle_call(refresh_config, _From, State) ->
Expand Down Expand Up @@ -168,22 +168,28 @@ maybe_delay_first() ->

route(#delay_key{exchange = Ex}, Deliveries, State) ->
ExName = Ex#exchange.name,
lists:map(fun (#delay_entry{delivery = D}) ->
D2 = swap_delay_header(D),
Dests = rabbit_exchange:route(Ex, D2),
lists:map(fun (#delay_entry{delivery = Msg0}) ->
Msg1 = case Msg0 of
#delivery{message = BasicMessage} ->
BasicMessage;
_MC ->
Msg0
end,
Msg2 = swap_delay_header(Msg1),
Dests = rabbit_exchange:route(Ex, Msg2),
Qs = rabbit_amqqueue:lookup_many(Dests),
rabbit_amqqueue:deliver(Qs, D2),
_ = rabbit_queue_type:deliver(Qs, Msg2, #{}, stateless),
bump_routed_stats(ExName, Qs, State)
end, Deliveries).

internal_delay_message(CurrTimer, Exchange, Delivery, Delay) ->
internal_delay_message(CurrTimer, Exchange, Message, Delay) ->
Now = erlang:system_time(milli_seconds),
%% keys are timestamps in milliseconds,in the future
DelayTS = Now + Delay,
mnesia:dirty_write(?INDEX_TABLE_NAME,
make_index(DelayTS, Exchange)),
mnesia:dirty_write(?TABLE_NAME,
make_delay(DelayTS, Exchange, Delivery)),
make_delay(DelayTS, Exchange, Message)),
case CurrTimer of
not_set ->
%% No timer in progress, so we start our own.
Expand Down
67 changes: 12 additions & 55 deletions src/rabbit_delayed_message_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,87 +12,44 @@

-export([get_delay/1, swap_delay_header/1]).

-define(INTEGER_ARG_TYPES, [byte, short, signedint, long, unsignedbyte, unsignedshort, unsignedint]).
-define(INTEGER_ARG_TYPES, [long, ubyte, short, ushort, int, uint]).

-define(STRING_ARG_TYPES, [longstr, shortstr]).
-define(STRING_ARG_TYPES, [utf8, binary]).

-define(FLOAT_ARG_TYPES, [decimal, double, float]).
-define(FLOAT_ARG_TYPES, [double, float]).

-import(rabbit_misc, [table_lookup/2, set_table_value/4]).

get_delay(Delivery) ->
case msg_headers(Delivery) of
case mc:x_header(<<"x-delay">>, Delivery) of
undefined ->
{error, nodelay};
H ->
get_delay_header(H)
end.

get_delay_header(H) ->
case table_lookup(H, <<"x-delay">>) of
{Type, Delay} ->
case check_int_arg(Type) of
ok -> {ok, Delay};
_ ->
_ ->
case try_convert_to_int(Type, Delay) of
{ok, Converted} -> {ok, Converted};
_ -> {error, nodelay}
end
end;
_ ->
{error, nodelay}
end
end.

%% set the x-delay header to -Delay, so it won't be re-delayed and the
%% header can still be passed down via e2e to other queues that might
%% lay after the next exchange so these queues/consumers can tell the
%% message comes via the delay plugin.
swap_delay_header(Delivery) ->
case msg_headers(Delivery) of
undefined ->
Delivery;
H ->
case get_delay_header(H) of
{ok, Delay} ->
H2 = set_table_value(H, <<"x-delay">>, signedint, -Delay),
set_delivery_headers(Delivery, H2);
_ ->
Delivery
end
case get_delay(Delivery) of
{ok, Delay} ->
mc:set_annotation(<<"x-delay">>, -Delay, Delivery);
_ ->
Delivery
end.

set_delivery_headers(Delivery, H) ->
Msg = get_msg(Delivery),
Content = get_content(Msg),
Props = get_props(Content),

Props2 = Props#'P_basic'{headers = H},
Content2 = Content#content{properties = Props2},
Msg2 = Msg#basic_message{content = Content2},

Delivery#delivery{message = Msg2}.

msg_headers(Delivery) ->
lists:foldl(fun (F, Acc) -> F(Acc) end,
Delivery,
[fun get_msg/1, fun get_content/1,
fun get_props/1, fun get_headers/1]).

get_msg(#delivery{message = Msg}) ->
Msg.

get_content(#basic_message{content = Content}) ->
Content.

get_props(#content{properties = Props}) ->
Props.

get_headers(#'P_basic'{headers = H}) ->
H.

try_convert_to_int(Type, Delay) ->
case lists:member(Type, ?STRING_ARG_TYPES) of
true -> {ok, binary_to_integer(Delay)};
true -> {ok, rabbit_data_coercion:to_integer(Delay)};
false ->
case lists:member(Type, ?FLOAT_ARG_TYPES) of
true -> {ok, trunc(Delay)};
Expand Down
18 changes: 10 additions & 8 deletions src/rabbit_exchange_type_delayed_message.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
-import(rabbit_misc, [table_lookup/2]).
-import(rabbit_delayed_message_utils, [get_delay/1]).

-export([description/0, serialise_events/0, route/2]).
-export([description/0, serialise_events/0, route/3]).
-export([validate/1, validate_binding/2,
create/2, delete/2, policy_changed/2,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
Expand All @@ -41,19 +41,21 @@ description() ->
{description, <<"Delayed Message Exchange.">>}].

route(X = #exchange{name = Name},
Delivery = #delivery{message = #basic_message{routing_keys = RKeys}}) ->
case delay_message(X, Delivery) of
Message,
Opts) ->
case delay_message(X, Message) of
nodelay ->
%% route the message using proxy module
case ?EXCHANGE(X) of
rabbit_exchange_type_direct ->
RKs = mc:get_annotation(routing_keys, Message),
%% Exchange type x-delayed-message routes via "direct exchange routing v1"
%% even when feature flag direct_exchange_routing_v2 is enabled because
%% table rabbit_index_route only stores bindings whose source exchange
%% is of type direct exchange.
rabbit_router:match_routing_key(Name, RKeys);
rabbit_router:match_routing_key(Name, RKs);
Mod ->
Mod:route(X, Delivery)
Mod:route(X, Message, Opts)
end;
_ ->
[]
Expand Down Expand Up @@ -108,10 +110,10 @@ info(Exchange, Items) ->

%%----------------------------------------------------------------------------

delay_message(Exchange, Delivery) ->
case get_delay(Delivery) of
delay_message(Exchange, Message) ->
case get_delay(Message) of
{ok, Delay} when Delay > 0, Delay =< ?ERL_MAX_T ->
rabbit_delayed_message:delay_message(Exchange, Delivery, Delay);
rabbit_delayed_message:delay_message(Exchange, Message, Delay);
_ ->
nodelay
end.
Expand Down

0 comments on commit 37f56e4

Please sign in to comment.