Skip to content

Commit

Permalink
Simplify essential proprty access
Browse files Browse the repository at this point in the history
Such as durable, ttl and priority by extracting them into annotations
at message container init time.

Move type

to remove dependenc on amqp10 stuff in mc.erl

mostly because I don't know how to make bazel do the right thing

add more stuff
  • Loading branch information
kjnilsson committed Jun 15, 2023
1 parent 49ff09c commit 8446105
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 142 deletions.
99 changes: 36 additions & 63 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
%% properties
is_persistent/1,
ttl/1,
correlation_id/1,
message_id/1,
timestamp/1,
priority/1,
set_ttl/2,
Expand All @@ -25,9 +27,7 @@
death_queue_names/1
]).

% -define(NIL, []).
-include("mc.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").

-type str() :: atom() | string() | binary().

Expand All @@ -36,22 +36,10 @@
-type protocol() :: module().
-type annotations() :: #{ann_key() => ann_value()}.

-type amqp_message_section() ::
#'v1_0.header'{} |
#'v1_0.delivery_annotations'{} |
#'v1_0.message_annotations'{} |
#'v1_0.properties'{} |
#'v1_0.application_properties'{} |
#'v1_0.data'{} |
#'v1_0.amqp_sequence'{} |
#'v1_0.amqp_value'{} |
#'v1_0.footer'{}.


%% the protocol module must implement the mc behaviour
-record(?MODULE, {protocol :: module(),
%% protocol specific data term
data :: term(),
data :: proto_state(),
%% any annotations done by the broker itself
%% such as recording the exchange / routing keys used
annotations = #{} :: annotations(),
Expand All @@ -62,21 +50,12 @@

-export_type([
state/0,
amqp_message_section/0,
ann_key/0,
ann_value/0
]).

-type proto_state() :: term().
-type property() :: user_id |
reply_to |
correlation_id |
message_id |
ttl |
priority |
durable |
timestamp
. %% etc

-type property_value() :: undefined |
string() |
binary() |
Expand All @@ -90,22 +69,15 @@
-callback init(term()) ->
{proto_state(), annotations()}.

-callback init_amqp([amqp_message_section()]) -> proto_state().
-callback init_amqp([rabbit_mc_amqp:message_section()]) -> proto_state().

-callback size(proto_state()) ->
{MetadataSize :: non_neg_integer(),
PayloadSize :: non_neg_integer()}.

-callback header(property(), proto_state()) ->
{property_value(), proto_state()}.

-callback get_property(property(), proto_state()) ->
-callback header(binary(), proto_state()) ->
{property_value(), proto_state()}.

%% strictly speaking properties ought to be immutable
-callback set_property(property(), Value :: term(), proto_state()) ->
proto_state().

%% all protocol must be able to convert to amqp (1.0)
-callback convert(protocol(), proto_state()) ->
proto_state() | not_supported.
Expand Down Expand Up @@ -142,7 +114,7 @@ size(#?MODULE{protocol = Proto,
size(BasicMsg) ->
mc_compat:size(BasicMsg).

-spec is(state()) -> boolean().
-spec is(term()) -> boolean().
is(#?MODULE{}) ->
true;
is(Term) ->
Expand Down Expand Up @@ -172,42 +144,45 @@ proto_header(Key, BasicMsg) ->
mc_compat:proto_header(Key, BasicMsg).

-spec is_persistent(state()) -> boolean().
is_persistent(#?MODULE{protocol = Proto,
data = Data}) ->
{Result, _} = Proto:get_property(durable, Data),
Result;
is_persistent(#?MODULE{annotations = Anns}) ->
maps:get(durable, Anns, false);
is_persistent(BasicMsg) ->
mc_compat:is_persistent(BasicMsg).

-spec ttl(state()) -> undefined | non_neg_integer().
ttl(#?MODULE{protocol = Proto,
data = Data}) ->
{Result, _} = Proto:get_property(ttl, Data),
Result;
ttl(#?MODULE{annotations = Anns}) ->
maps:get(ttl, Anns, undefined);
ttl(BasicMsg) ->
mc_compat:ttl(BasicMsg).


-spec timestamp(state()) -> undefined | non_neg_integer().
timestamp(#?MODULE{protocol = Proto,
data = Data}) ->
{Result, _} = Proto:get_property(timestamp, Data),
Result;
timestamp(#?MODULE{annotations = Anns}) ->
maps:get(timestamp, Anns, undefined);
timestamp(BasicMsg) ->
mc_compat:timestamp(BasicMsg).

-spec priority(state()) -> undefined | non_neg_integer().
priority(#?MODULE{protocol = Proto,
data = Data}) ->
{Result, _} = Proto:get_property(priority, Data),
Result;
priority(#?MODULE{annotations = Anns}) ->
maps:get(priority, Anns, undefined);
priority(BasicMsg) ->
mc_compat:priority(BasicMsg).

-spec correlation_id(state()) -> undefined | binary().
correlation_id(#?MODULE{annotations = Anns}) ->
maps:get(correlation_id, Anns, undefined);
correlation_id(BasicMsg) ->
mc_compat:correlation_id(BasicMsg).

-spec message_id(state()) -> undefined | binary().
message_id(#?MODULE{annotations = Anns}) ->
maps:get(message_id, Anns, undefined);
message_id(BasicMsg) ->
mc_compat:message_id(BasicMsg).

-spec set_ttl(undefined | non_neg_integer(), state()) -> state().
set_ttl(Value, #?MODULE{protocol = Proto,
data = Data} = State) ->
State#?MODULE{data = Proto:set_property(ttl, Value, Data)};
set_ttl(Value, #?MODULE{annotations = Anns} = State) ->
State#?MODULE{annotations = maps:put(ttl, Value, Anns)};
set_ttl(Value, BasicMsg) ->
mc_compat:set_ttl(Value, BasicMsg).

Expand Down Expand Up @@ -247,17 +222,18 @@ prepare(State) ->
SourceQueue :: binary(),
state()) -> state().
record_death(Reason, SourceQueue,
#?MODULE{protocol = Mod,
data = Data,
#?MODULE{protocol = _Mod,
data = _Data,
annotations = Anns,
deaths = Ds0} = State) ->
deaths = Ds0} = State)
when is_atom(Reason) andalso is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
Exchange = maps:get(exchange, Anns),
RoutingKeys = maps:get(routing_keys, Anns),
Timestamp = os:system_time(millisecond),
Ttl = maps:get(ttl, Anns, undefined),
case Ds0 of
undefined ->
{Ttl, _} = Mod:get_property(ttl, Data),
Ds = #deaths{last = Key,
first = Key,
records = #{Key => #death{count = 1,
Expand All @@ -269,7 +245,8 @@ record_death(Reason, SourceQueue,
State#?MODULE{deaths = Ds};
#deaths{records = Rs} ->
Death = #death{count = C} = maps:get(Key, Rs,
#death{exchange = Exchange,
#death{ttl = Ttl,
exchange = Exchange,
routing_keys = RoutingKeys,
timestamp = Timestamp}),
Ds = Ds0#deaths{last = Key,
Expand Down Expand Up @@ -297,10 +274,6 @@ death_queue_names(#?MODULE{deaths = Deaths}) ->
death_queue_names(BasicMsg) ->
mc_compat:death_queue_names(BasicMsg).

% -spec deaths(state()) -> undefined | #deaths{}.
% deaths(#?MODULE{deaths = Deaths}) ->
% Deaths.

-spec last_death(state()) ->
undefined | {death_key(), #death{}}.
last_death(#?MODULE{deaths = undefined}) ->
Expand Down
3 changes: 0 additions & 3 deletions deps/rabbit/src/mc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,3 @@

%% good enough for most use cases
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).

%%% TODO: work out dead letter logic incl cycle detection
%%% and re-implement for mc
36 changes: 32 additions & 4 deletions deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
%%% properties
is_persistent/1,
ttl/1,
correlation_id/1,
message_id/1,
timestamp/1,
priority/1,
set_ttl/2,
Expand Down Expand Up @@ -82,16 +84,22 @@ set_annotation(<<"x-", _/binary>> = Key, Value,


is_persistent(#basic_message{content = Content}) ->
element(1, rabbit_mc_amqp_legacy:get_property(durable, Content)).
element(1, get_property(durable, Content)).

ttl(#basic_message{content = Content}) ->
element(1, rabbit_mc_amqp_legacy:get_property(ttl, Content)).
element(1, get_property(ttl, Content)).

timestamp(#basic_message{content = Content}) ->
element(1, rabbit_mc_amqp_legacy:get_property(timestamp, Content)).
element(1, get_property(timestamp, Content)).

priority(#basic_message{content = Content}) ->
element(1, rabbit_mc_amqp_legacy:get_property(priority, Content)).
element(1, get_property(priority, Content)).

correlation_id(#basic_message{content = Content}) ->
element(1, get_property(correlation_id, Content)).

message_id(#basic_message{content = Content}) ->
element(1, get_property(message_id, Content)).

set_ttl(Value, #basic_message{content = Content0} = Msg) ->
Content = rabbit_mc_amqp_legacy:set_property(ttl, Value, Content0),
Expand Down Expand Up @@ -336,6 +344,26 @@ last_death(#basic_message{content = Content}) ->
undefined
end.

get_property(durable,
#content{properties = #'P_basic'{delivery_mode = Mode}}) ->
Mode == 2;
get_property(ttl, #content{properties = Props}) ->
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
MsgTTL;
get_property(priority, #content{properties = #'P_basic'{priority = P}}) ->
P;
get_property(timestamp, #content{properties = Props}) ->
#'P_basic'{timestamp = Timestamp} = Props,
case Timestamp of
undefined ->
undefined;
_ ->
%% timestamp should be in ms
Timestamp * 1000
end;
get_property(_P, _C) ->
undefined.


% detect_cycles(rejected, _Msg, Queues) ->
% {Queues, []};
Expand Down
3 changes: 1 addition & 2 deletions deps/rabbit/src/rabbit_exchange_type_headers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ description() ->

serialise_events() -> false.

route(#exchange{name = Name},
Msg0) ->
route(#exchange{name = Name}, Msg0) ->
%% TODO converting to amqp legacy means this will be slow for all protocols
%% except amqp legacy, ok for now to get it working but will need addressing
Msg = mc:convert(rabbit_mc_amqp_legacy, Msg0),
Expand Down
Loading

0 comments on commit 8446105

Please sign in to comment.