Skip to content

Commit

Permalink
Fix conflicts and failing tests
Browse files Browse the repository at this point in the history
Extend message_containers_deaths_v2_SUITE to send 3 messages whose death
histories will be stored in 3 different ways:
1. with feature flag message_containers disabled
2. with feature flag message_containers enabled, but message_containers_deaths_v2 disabled
3. with feature flag message_containers_deaths_v2 enabled
  • Loading branch information
ansd committed May 14, 2024
1 parent 441fe9f commit 3981b2a
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 4,383 deletions.
9 changes: 9 additions & 0 deletions deps/amqp10_common/src/amqp10_binary_generator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ constructor(timestamp) -> <<16#83>>;
constructor(uuid) -> <<16#98>>;
constructor(null) -> <<16#40>>;
constructor(boolean) -> <<16#56>>;
constructor(map) -> 16#d1; % use large map type for all array elements
constructor(array) -> <<16#f0>>; % use large array type for all nested arrays
constructor(utf8) -> <<16#b1>>;
constructor({described, Descriptor, Primitive}) ->
Expand All @@ -203,6 +204,14 @@ generate(ulong, {ulong, V}) -> <<V:64/unsigned>>;
generate(long, {long, V}) -> <<V:64/signed>>;
generate({described, D, P}, {described, D, V}) ->
generate(P, V);
generate(map, {map, KvList}) ->
Count = length(KvList) * 2,
Compound = lists:map(fun({Key, Val}) ->
[(generate(Key)),
(generate(Val))]
end, KvList),
S = iolist_size(Compound),
[<<(S + 4):32, Count:32>>, Compound];
generate(array, {array, Type, List}) ->
Count = length(List),
Body = iolist_to_binary([constructor(Type),
Expand Down
1 change: 1 addition & 0 deletions deps/amqp10_common/src/amqp10_binary_parser.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ parse_constructor(16#80) -> ulong;
parse_constructor(16#81) -> long;
parse_constructor(16#40) -> null;
parse_constructor(16#56) -> boolean;
parse_constructor(16#d1) -> map;
parse_constructor(16#f0) -> array;
parse_constructor(0) -> described;
parse_constructor(X) ->
Expand Down
15 changes: 13 additions & 2 deletions deps/amqp10_common/test/binary_generator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,22 @@ array(_Config) ->
?assertEqual({array, boolean, [true, false]},
roundtrip_return({array, boolean, [{boolean, true}, {boolean, false}]})),

% array of arrays
% TODO: does the inner type need to be consistent across the array?
%% array of arrays
roundtrip({array, array, []}),
roundtrip({array, array, [{array, symbol, [{symbol, <<"ANONYMOUS">>}]}]}),

%% array of maps
roundtrip({array, map, []}),
roundtrip({array, map, [{map, [{{symbol, <<"k1">>}, {utf8, <<"v1">>}}]},
{map, []},
{map, [{{described,
{utf8, <<"URL">>},
{utf8, <<"http://example.org/hello-world">>}},
{byte, -1}},
{{int, 0}, {ulong, 0}}
]}
]}),

Desc = {utf8, <<"URL">>},
roundtrip({array, {described, Desc, utf8},
[{described, Desc, {utf8, <<"http://example.org/hello">>}}]}),
Expand Down
66 changes: 0 additions & 66 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2153,71 +2153,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
<<<<<<< HEAD
=======

erlang_bytecode(
name = "test_event_recorder_beam",
testonly = True,
srcs = ["test/event_recorder.erl"],
outs = ["test/event_recorder.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
)
erlang_bytecode(
name = "amqp_auth_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_auth_SUITE.erl"],
outs = ["test/amqp_auth_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "amqp_client_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_client_SUITE.erl"],
outs = ["test/amqp_client_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "amqp_credit_api_v2_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_credit_api_v2_SUITE.erl"],
outs = ["test/amqp_credit_api_v2_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "amqp_proxy_protocol_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_proxy_protocol_SUITE.erl"],
outs = ["test/amqp_proxy_protocol_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "amqp_system_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_system_SUITE.erl"],
outs = ["test/amqp_system_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
)
erlang_bytecode(
name = "amqp_address_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_address_SUITE.erl"],
outs = ["test/amqp_address_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"],
)
erlang_bytecode(
name = "message_containers_deaths_v2_SUITE_beam_files",
testonly = True,
Expand All @@ -2227,4 +2162,3 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
>>>>>>> 6b300a2f34 (Fix dead lettering)
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ is_cycle_v2(TargetQueue, Deaths) ->
%% queue name and there are no newer deaths with the 'rejected' reason then
%% consider this a cycle."
%% However, the correct death order cannot be reliably determined in v1.
%% deaths_v2 fixes this bug.
%% v2 fixes this bug.
is_cycle_v1(_Queue, []) ->
false;
is_cycle_v1(_Queue, [{_Q, rejected} | _]) ->
Expand Down
99 changes: 6 additions & 93 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ protocol_state(Msg0 = #msg{message_annotations = MA0}, Anns) ->
maps_upsert(K, mc_util:infer_type(V), L);
(<<"timestamp_in_ms">>, V, L) ->
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
(deaths, Deaths, L)
when is_list(Deaths) ->
Maps = encode_deaths(Deaths),
maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L);
(_, _, Acc) ->
Acc
end, MA0, Anns),
Expand Down Expand Up @@ -241,32 +245,6 @@ msg_to_sections(#msg{header = H,
[H | S4]
end.

<<<<<<< HEAD
=======
-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) ->
amqp_annotations().
protocol_state_message_annotations(MA, Anns) ->
maps:fold(
fun(?ANN_EXCHANGE, Exchange, L) ->
maps_upsert(<<"x-exchange">>, {utf8, Exchange}, L);
(?ANN_ROUTING_KEYS, RKeys, L) ->
RKey = hd(RKeys),
maps_upsert(<<"x-routing-key">>, {utf8, RKey}, L);
(<<"x-", _/binary>> = K, V, L)
when V =/= undefined ->
%% any x-* annotations get added as message annotations
maps_upsert(K, mc_util:infer_type(V), L);
(<<"timestamp_in_ms">>, V, L) ->
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
(deaths, Deaths, L)
when is_list(Deaths) ->
Maps = encode_deaths(Deaths),
maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L);
(_, _, Acc) ->
Acc
end, MA, Anns).

>>>>>>> 6b300a2f34 (Fix dead lettering)
maps_upsert(Key, TaggedVal, KVList) ->
TaggedKey = {symbol, Key},
Elem = {TaggedKey, TaggedVal},
Expand Down Expand Up @@ -378,43 +356,6 @@ decode([#'v1_0.amqp_value'{} = B | Rem], #msg{} = Msg) ->
%% an amqp value can only be a singleton
decode(Rem, Msg#msg{data = B}).

<<<<<<< HEAD
key_find(K, [{{_, K}, {_, V}} | _]) ->
V;
key_find(K, [_ | Rem]) ->
key_find(K, Rem);
key_find(_K, []) ->
undefined.

recover_deaths([], Acc) ->
Acc;
recover_deaths([{map, Kvs} | Rem], Acc) ->
Queue = key_find(<<"queue">>, Kvs),
Reason = binary_to_existing_atom(key_find(<<"reason">>, Kvs)),
DA0 = case key_find(<<"original-expiration">>, Kvs) of
undefined ->
#{};
Exp ->
#{ttl => binary_to_integer(Exp)}
end,
RKeys = [RK || {_, RK} <- key_find(<<"routing-keys">>, Kvs)],
Ts = key_find(<<"time">>, Kvs),
DA = DA0#{first_time => Ts,
last_time => Ts},
recover_deaths(Rem,
Acc#{{Queue, Reason} =>
#death{anns = DA,
exchange = key_find(<<"exchange">>, Kvs),
count = key_find(<<"count">>, Kvs),
routing_keys = RKeys}}).
=======
-spec first_acquirer(mc:annotations()) -> boolean().
first_acquirer(Anns) ->
Redelivered = case Anns of
#{redelivered := R} -> R;
_ -> false
end,
not Redelivered.

encode_deaths(Deaths) ->
lists:map(
Expand Down Expand Up @@ -442,50 +383,22 @@ encode_deaths(Deaths) ->
end,
{map, Map}
end, Deaths).
>>>>>>> 6b300a2f34 (Fix dead lettering)

essential_properties(#msg{message_annotations = MA} = Msg) ->
Durable = get_property(durable, Msg),
Priority = get_property(priority, Msg),
Timestamp = get_property(timestamp, Msg),
Ttl = get_property(ttl, Msg),
<<<<<<< HEAD

Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
{list, DeathMaps} ->
%% TODO: make more correct?
Def = {utf8, <<>>},
{utf8, FstQ} = message_annotation(<<"x-first-death-queue">>, Msg, Def),
{utf8, FstR} = message_annotation(<<"x-first-death-reason">>, Msg, Def),
{utf8, LastQ} = message_annotation(<<"x-last-death-queue">>, Msg, Def),
{utf8, LastR} = message_annotation(<<"x-last-death-reason">>, Msg, Def),
#deaths{first = {FstQ, binary_to_existing_atom(FstR)},
last = {LastQ, binary_to_existing_atom(LastR)},
records = recover_deaths(DeathMaps, #{})};
_ ->
undefined
end,

Anns = maps_put_falsy(
?ANN_DURABLE, Durable,
=======
Anns0 = #{?ANN_DURABLE => Durable},
Anns = maps_put_truthy(
?ANN_PRIORITY, Priority,
>>>>>>> 6b300a2f34 (Fix dead lettering)
maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
<<<<<<< HEAD
?ANN_TIMESTAMP, Timestamp,
maps_put_truthy(
ttl, Ttl,
maps_put_truthy(
deaths, Deaths,
#{}))))),
=======
ttl, Ttl,
Anns0))),
>>>>>>> 6b300a2f34 (Fix dead lettering)
#{})))),
case MA of
[] ->
Anns;
Expand Down
16 changes: 0 additions & 16 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,6 @@
stability => stable,
depends_on => [quorum_queue]
}}).
<<<<<<< HEAD
=======

-rabbit_feature_flag(
{credit_api_v2,
#{desc => "Credit API v2 between queue clients and queue processes",
stability => stable
}}).

-rabbit_feature_flag(
{message_containers_store_amqp_v1,
#{desc => "Support storing messages in message containers AMQP 1.0 disk format v1",
stability => stable,
depends_on => [message_containers]
}}).

-rabbit_feature_flag(
{message_containers_deaths_v2,
Expand All @@ -187,4 +172,3 @@
stability => stable,
depends_on => [message_containers]
}}).
>>>>>>> 6b300a2f34 (Fix dead lettering)
Loading

0 comments on commit 3981b2a

Please sign in to comment.