From 85f1d7a9d203be4e7a658cb2d91dfa0acccb4e4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 10 Oct 2024 15:12:52 +0200 Subject: [PATCH 1/3] Return error if stream publisher reference is longer than 255 characters Fixes #12499 (cherry picked from commit 4e8fb46bbf4d2cc3569e709ebb1ee8849a8855ce) --- .../src/rabbit_stream_reader.erl | 26 ++++++++++++-- .../test/rabbit_stream_SUITE.erl | 35 ++++++++++++++++++- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index ffada5519745..7dd701464a74 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -81,6 +81,8 @@ -define(UNKNOWN_FIELD, unknown_field). -define(SILENT_CLOSE_DELAY, 3_000). +-import(rabbit_stream_utils, [check_write_permitted/2]). + %% client API -export([start_link/4, info/2, @@ -1655,6 +1657,26 @@ handle_frame_post_auth(Transport, {C1#stream_connection{connection_step = failure}, S1} end, {Connection1, State1}; +handle_frame_post_auth(Transport, + #stream_connection{user = User, + resource_alarm = false} = C, + State, + {request, CorrelationId, + {declare_publisher, _PublisherId, WriterRef, S}}) + when is_binary(WriterRef), byte_size(WriterRef) > 255 -> + {Code, Counter} = case check_write_permitted(stream_r(S, C), User) of + ok -> + {?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED}; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED} + end, + response(Transport, + C, + declare_publisher, + CorrelationId, + Code), + rabbit_global_counters:increase_protocol_counter(stream, Counter, 1), + {C, State}; handle_frame_post_auth(Transport, #stream_connection{user = User, publishers = Publishers0, @@ -1664,7 +1686,7 @@ handle_frame_post_auth(Transport, State, {request, CorrelationId, {declare_publisher, PublisherId, WriterRef, Stream}}) -> - case rabbit_stream_utils:check_write_permitted(stream_r(Stream, + case check_write_permitted(stream_r(Stream, Connection0), User) of @@ -3102,7 +3124,7 @@ evaluate_state_after_secret_update(Transport, {_, Conn1} = ensure_token_expiry_timer(User, Conn0), PublisherStreams = lists:foldl(fun(#publisher{stream = Str}, Acc) -> - case rabbit_stream_utils:check_write_permitted(stream_r(Str, Conn0), User) of + case check_write_permitted(stream_r(Str, Conn0), User) of ok -> Acc; _ -> diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 06792b4e739d..d1fbb8fd88fd 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -64,7 +64,8 @@ groups() -> test_super_stream_duplicate_partitions, authentication_error_should_close_with_delay, unauthorized_vhost_access_should_close_with_delay, - sasl_anonymous + sasl_anonymous, + test_publisher_with_too_long_reference_errors ]}, %% Run `test_global_counters` on its own so the global metrics are %% initialised to 0 for each testcase @@ -945,6 +946,38 @@ unauthorized_vhost_access_should_close_with_delay(Config) -> closed = wait_for_socket_close(T, S, 10), ok. +test_publisher_with_too_long_reference_errors(Config) -> + FunctionName = atom_to_binary(?FUNCTION_NAME, utf8), + T = gen_tcp, + Port = get_port(T, Config), + Opts = get_opts(T), + {ok, S} = T:connect("localhost", Port, Opts), + C = rabbit_stream_core:init(0), + ConnectionName = FunctionName, + test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C), + test_authenticate(T, S, C), + + Stream = FunctionName, + test_create_stream(T, S, Stream, C), + + MaxSize = 255, + ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)), + ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)), + + Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK}, + {2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}], + + [begin + F = request({declare_publisher, PubId, Ref, Stream}), + ok = T:send(S, F), + {Cmd, C} = receive_commands(T, S, C), + ?assertMatch({response, 1, {declare_publisher, ExpectedResponseCode}}, Cmd) + end || {PubId, Ref, ExpectedResponseCode} <- Tests], + + test_delete_stream(T, S, Stream, C), + test_close(T, S, C), + ok. + consumer_offset_info(Config, ConnectionName) -> [[{offset, Offset}, {offset_lag, Lag}]] = rpc(Config, 0, ?MODULE, From 551231503c4197d4115ce4c65e9d8ff655e74cc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 11 Oct 2024 11:29:09 +0200 Subject: [PATCH 2/3] Return error if stream consumer reference is longer than 255 characters (cherry picked from commit 0260862a27866e037213ad3372aa4b30e7cf1992) --- .../src/rabbit_stream_reader.erl | 19 ++++++++-- .../test/rabbit_stream_SUITE.erl | 35 ++++++++++++++++++- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 7dd701464a74..a90815b34106 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -80,8 +80,10 @@ peer_cert_validity]). -define(UNKNOWN_FIELD, unknown_field). -define(SILENT_CLOSE_DELAY, 3_000). +-define(MAX_REFERENCE_SIZE, 255). --import(rabbit_stream_utils, [check_write_permitted/2]). +-import(rabbit_stream_utils, [check_write_permitted/2, + check_read_permitted/3]). %% client API -export([start_link/4, @@ -1663,7 +1665,7 @@ handle_frame_post_auth(Transport, State, {request, CorrelationId, {declare_publisher, _PublisherId, WriterRef, S}}) - when is_binary(WriterRef), byte_size(WriterRef) > 255 -> + when is_binary(WriterRef), byte_size(WriterRef) > ?MAX_REFERENCE_SIZE -> {Code, Counter} = case check_write_permitted(stream_r(S, C), User) of ok -> {?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED}; @@ -1917,6 +1919,19 @@ handle_frame_post_auth(Transport, #stream_connection{} = Connection, State, {subscribe, _, _, _, _, _}} = Request) -> handle_frame_post_auth(Transport, {ok, Connection}, State, Request); +handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, State, + {request, CorrelationId, + {subscribe, _, S, _, _, #{ <<"name">> := N}}}) + when is_binary(N), byte_size(N) > ?MAX_REFERENCE_SIZE -> + {Code, Counter} = case check_read_permitted(stream_r(S, C), User,#{}) of + ok -> + {?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED}; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED} + end, + response(Transport, C, subscribe, CorrelationId, Code), + rabbit_global_counters:increase_protocol_counter(stream, Counter, 1), + {C, State}; handle_frame_post_auth(Transport, {ok, #stream_connection{ name = ConnName, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index d1fbb8fd88fd..91644f1364f6 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -65,7 +65,8 @@ groups() -> authentication_error_should_close_with_delay, unauthorized_vhost_access_should_close_with_delay, sasl_anonymous, - test_publisher_with_too_long_reference_errors + test_publisher_with_too_long_reference_errors, + test_consumer_with_too_long_reference_errors ]}, %% Run `test_global_counters` on its own so the global metrics are %% initialised to 0 for each testcase @@ -978,6 +979,38 @@ test_publisher_with_too_long_reference_errors(Config) -> test_close(T, S, C), ok. +test_consumer_with_too_long_reference_errors(Config) -> + FunctionName = atom_to_binary(?FUNCTION_NAME, utf8), + T = gen_tcp, + Port = get_port(T, Config), + Opts = get_opts(T), + {ok, S} = T:connect("localhost", Port, Opts), + C = rabbit_stream_core:init(0), + ConnectionName = FunctionName, + test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C), + test_authenticate(T, S, C), + + Stream = FunctionName, + test_create_stream(T, S, Stream, C), + + MaxSize = 255, + ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)), + ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)), + + Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK}, + {2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}], + + [begin + F = request({subscribe, SubId, Stream, first, 1, #{<<"name">> => Ref}}), + ok = T:send(S, F), + {Cmd, C} = receive_commands(T, S, C), + ?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd) + end || {SubId, Ref, ExpectedResponseCode} <- Tests], + + test_delete_stream(T, S, Stream, C), + test_close(T, S, C), + ok. + consumer_offset_info(Config, ConnectionName) -> [[{offset, Offset}, {offset_lag, Lag}]] = rpc(Config, 0, ?MODULE, From 1ff8235dfd80687da9b6cd6c474b85efc9459af7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 11 Oct 2024 14:55:44 +0200 Subject: [PATCH 3/3] Return error if store offset reference is longer than 255 characters (cherry picked from commit 622dec011de9d3f5143063efe90fea3c29175683) --- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index a90815b34106..357283cc8066 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -3463,6 +3463,9 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream, {not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2} end. +store_offset(Reference, _, _, C) when is_binary(Reference), byte_size(Reference) > ?MAX_REFERENCE_SIZE -> + rabbit_log:warning("Reference is too long to store offset: ~p", [byte_size(Reference)]), + C; store_offset(Reference, Stream, Offset, Connection0) -> case lookup_leader(Stream, Connection0) of {error, Error} ->