Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Containers #5077

Merged
merged 70 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
f5c3e02
Refactor away from using the delivery{} record
kjnilsson Jun 20, 2022
137b56a
Dedup death queue names
ansd Jun 28, 2023
30d131a
Fix function clause crashes
ansd Jun 29, 2023
b480858
Fix is_utf8_no_null crash
ansd Jun 29, 2023
172464c
Implement mqtt mc behaviour
ansd Jun 29, 2023
c3636e1
Shorten mc file names
ansd Jul 4, 2023
0c212c4
mc: make deaths an annotation + fixes
kjnilsson Jul 4, 2023
eccd146
Fix mc_mqtt protocol_state callback
ansd Jul 4, 2023
b8c2b4f
Fix test will_delay_node_restart
ansd Jul 4, 2023
e4f92c2
Bazel run gazelle
ansd Jul 5, 2023
f900484
mix format rabbitmqctl.ex
ansd Jul 5, 2023
7a00fb2
Ensure ttl annotation is refelected in amqp legacy protocol state
kjnilsson Jul 5, 2023
706f0ff
Fix id access in message store
kjnilsson Jul 5, 2023
0857046
Fix rabbit_message_interceptor_SUITE
ansd Jul 5, 2023
b9d0c23
dializer fixes
kjnilsson Jul 5, 2023
99e83bc
Fix rabbit:rabbit_message_interceptor_SUITE-mixed
ansd Jul 6, 2023
d8720d6
Fix MQTT shared_SUITE-mixed
ansd Jul 6, 2023
093be04
Field content of 'v1_0.data' can be binary
ansd Jul 6, 2023
51a7828
Remove route/2 and implement route/3 for all exchange types.
kjnilsson Jul 10, 2023
ec71074
Translate directly from MQTT to AMQP 0.9.1
ansd Jul 10, 2023
d4b212c
handle undecoded properties in mc_compat
kjnilsson Jul 11, 2023
e886c83
Replace callback init_amqp with convert_from
ansd Jul 11, 2023
c670dd4
Fix return value of lists:keyfind/3
ansd Jul 11, 2023
d1870c6
Translate directly from AMQP 0.9.1 to MQTT
ansd Jul 11, 2023
a186cca
Fix MQTT payload size
ansd Jul 11, 2023
3e24d05
more decoding
kjnilsson Jul 14, 2023
262ec75
tracking mixed versions compat
kjnilsson Jul 14, 2023
01c8f3a
mc: flip default of `durable` annotation to save some data.
kjnilsson Jul 17, 2023
8c9f6b7
mc conversion tests and tidy up
kjnilsson Jul 18, 2023
4e82505
mc make x_header unstrict again
kjnilsson Jul 18, 2023
8cd52da
amqpl: death record fixes
kjnilsson Jul 19, 2023
1adee6b
bazel
kjnilsson Jul 19, 2023
1ea3ecd
amqp -> amqpl conversion test
kjnilsson Jul 20, 2023
1804952
Fix crash in mc_amqp:size/1
ansd Aug 1, 2023
e2d22f2
Fix crash in lists:flatten/1
ansd Aug 1, 2023
565b154
Fix crash in rabbit_writer
ansd Aug 2, 2023
295eec4
Add accidentally deleted line back
kjnilsson Aug 14, 2023
85e53e3
mc: optimise mc_amqp internal format
kjnilsson Aug 15, 2023
43ae01d
mc: optimis mc_amqp map_add by using upsert
kjnilsson Aug 15, 2023
961406a
mc: refactoring and bug fixes
kjnilsson Aug 16, 2023
550668b
mc_SUITE routingheader assertions
kjnilsson Aug 16, 2023
60081fe
mc remove serialize/1 callback as only used by amqp
kjnilsson Aug 16, 2023
51c61d6
mc_amqp: avoid returning a nested list from protocol_state
kjnilsson Aug 16, 2023
54c6934
test and bug fix
kjnilsson Aug 16, 2023
6a6685b
move infer_type to mc_util
kjnilsson Aug 16, 2023
ce6ec12
mc fixes and additiona assertions
kjnilsson Aug 16, 2023
18ff358
Support headers exchange routing for MQTT messages
ansd Aug 17, 2023
3610c4e
Fix crash when sending from stream to amqpl
ansd Aug 17, 2023
19814d5
Support consistent hash exchange routing for MQTT 5.0
ansd Aug 17, 2023
0a3740b
Convert MQTT 5.0 User Property
ansd Aug 18, 2023
536cc78
Make use of Annotations in mc_mqtt:protocol_state/2
ansd Aug 21, 2023
657e6d4
Enforce AMQP 0.9.1 field name length limit
ansd Aug 21, 2023
1094565
Fix type specs
ansd Aug 22, 2023
42f12d3
Add mc_mqtt unit test suite
ansd Aug 21, 2023
e9deb38
Translate indicator that payload is UTF-8 encoded
ansd Aug 22, 2023
50e42f9
Translate single amqp-value section from AMQP 1.0 to MQTT
ansd Aug 22, 2023
6eb035d
Fix payload conversion
ansd Aug 23, 2023
1cc35cf
Translate Response Topic between MQTT and AMQP
ansd Aug 23, 2023
95ca0bb
Apply PR feedback
ansd Aug 24, 2023
21869d4
tidy up
kjnilsson Aug 24, 2023
238f3d7
Add MQTT message_containers test
ansd Aug 24, 2023
312ea3d
consistent hash exchange: avoid amqp legacy conversion
kjnilsson Aug 24, 2023
d1735c7
Avoid converting to amqp legacy when using exchange federation
kjnilsson Aug 24, 2023
ddb82d8
Fix test flake
ansd Aug 24, 2023
7254f8a
test and dialyzer fixes
kjnilsson Aug 24, 2023
82adbe9
dialyzer fix
kjnilsson Aug 25, 2023
6cd97e8
Add MQTT protocol interoperability tests
ansd Aug 25, 2023
6f97260
Regenerate portions of deps/rabbit/app.bzl with gazelle
pjk25 Aug 31, 2023
8d0009f
mc: refactoring
kjnilsson Aug 31, 2023
29cfc42
mc_amqpl: handle delivery annotations
kjnilsson Aug 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/amqp10_common/test/binary_generator_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
-module(binary_generator_SUITE).

-compile(nowarn_export_all).
-compile(export_all).

-export([
]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

%%%===================================================================
Expand Down
8 changes: 4 additions & 4 deletions deps/amqp10_common/test/binary_parser_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ end_per_testcase(_TestCase, _Config) ->
roundtrip(_Config) ->
Terms = [null,
{described,
{utf8, <<"URL">>},
{symbol, <<"URL">>},
{utf8, <<"http://example.org/hello-world">>}},
{described,
{utf8, <<"URL">>},
{utf8, <<"https://rabbitmq.com">>}},
{symbol, <<"URL">>},
{binary, <<"https://rabbitmq.com">>}},
{array, ubyte, [{ubyte, 1}, {ubyte, 255}]},
{boolean, false},
{list, [{utf8, <<"hi">>},
{described,
{utf8, <<"URL">>},
{symbol, <<"URL">>},
{utf8, <<"http://example.org/hello-world">>}}
]},
{list, [{int, 123},
Expand Down
14 changes: 14 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,11 @@ rabbitmq_integration_suite(
size = "medium",
)

rabbitmq_integration_suite(
name = "message_containers_SUITE",
size = "medium",
)

rabbitmq_integration_suite(
name = "metrics_SUITE",
size = "medium",
Expand Down Expand Up @@ -778,6 +783,15 @@ rabbitmq_suite(
],
)

rabbitmq_suite(
name = "mc_SUITE",
size = "small",
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/rabbit_common:erlang_app",
],
)

rabbitmq_suite(
name = "rabbit_stream_coordinator_SUITE",
deps = [
Expand Down
35 changes: 35 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def all_beam_files(name = "all_beam_files"):
name = "behaviours",
srcs = [
"src/gm.erl",
"src/mc.erl",
"src/rabbit_backing_queue.erl",
"src/rabbit_credential_validator.erl",
"src/rabbit_exchange_type.erl",
Expand All @@ -34,6 +35,10 @@ def all_beam_files(name = "all_beam_files"):
"src/gatherer.erl",
"src/internal_user.erl",
"src/lqueue.erl",
"src/mc_amqp.erl",
"src/mc_amqpl.erl",
"src/mc_compat.erl",
"src/mc_util.erl",
"src/mirrored_supervisor.erl",
"src/mirrored_supervisor_sups.erl",
"src/pg_local.erl",
Expand Down Expand Up @@ -254,6 +259,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
testonly = True,
srcs = [
"src/gm.erl",
"src/mc.erl",
"src/rabbit_backing_queue.erl",
"src/rabbit_credential_validator.erl",
"src/rabbit_exchange_type.erl",
Expand All @@ -279,6 +285,10 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/gatherer.erl",
"src/internal_user.erl",
"src/lqueue.erl",
"src/mc_amqp.erl",
"src/mc_amqpl.erl",
"src/mc_compat.erl",
"src/mc_util.erl",
"src/mirrored_supervisor.erl",
"src/mirrored_supervisor_sups.erl",
"src/pg_local.erl",
Expand Down Expand Up @@ -503,6 +513,7 @@ def all_srcs(name = "all_srcs"):
"include/amqqueue.hrl",
"include/amqqueue_v2.hrl",
"include/gm_specs.hrl",
"include/mc.hrl",
"include/rabbit_global_counters.hrl",
"include/vhost.hrl",
"include/vhost_v2.hrl",
Expand Down Expand Up @@ -535,6 +546,11 @@ def all_srcs(name = "all_srcs"):
"src/gm.erl",
"src/internal_user.erl",
"src/lqueue.erl",
"src/mc.erl",
"src/mc_amqp.erl",
"src/mc_amqpl.erl",
"src/mc_compat.erl",
"src/mc_util.erl",
"src/mirrored_supervisor.erl",
"src/mirrored_supervisor_sups.erl",
"src/pg_local.erl",
Expand Down Expand Up @@ -1985,3 +2001,22 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "message_containers_SUITE_beam_files",
testonly = True,
srcs = ["test/message_containers_SUITE.erl"],
outs = ["test/message_containers_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "mc_SUITE_beam_files",
testonly = True,
srcs = ["test/mc_SUITE.erl"],
outs = ["test/mc_SUITE.beam"],
hdrs = ["include/mc.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app"],
)
23 changes: 23 additions & 0 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-type death_key() :: {Queue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{first_time := non_neg_integer(), %% the timestamp of the first
last_time := non_neg_integer(), %% the timestamp of the last
ttl => non_neg_integer()}.
-record(death, {
exchange :: rabbit_misc:resource_name(),
routing_keys = [] :: [rabbit_types:routing_key()],
count = 0 :: non_neg_integer(),
anns :: death_anns()
}).

-record(deaths, {first :: death_key(),
last :: death_key(),
records = #{} :: #{death_key() := #death{}}}).


%% good enough for most use cases
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).

%% "Field names MUST start with a letter, '$' or '#' and may continue with letters, '$' or '#', digits, or
%% underlines, to a maximum length of 128 characters." [AMQP 0.9.1 4.2.5.5 Field Tables]
%% Given that the valid chars are ASCII chars, 1 char is encoded as 1 byte.
-define(AMQP_LEGACY_FIELD_NAME_MAX_LEN, 128).
Loading
Loading