diff --git a/deps/rabbit/src/rabbit_binding.erl b/deps/rabbit/src/rabbit_binding.erl index c028edc0ec56..085d51fca62a 100644 --- a/deps/rabbit/src/rabbit_binding.erl +++ b/deps/rabbit/src/rabbit_binding.erl @@ -37,11 +37,12 @@ {'resources_missing', [{'not_found', (rabbit_types:binding_source() | rabbit_types:binding_destination())} | - {'absent', amqqueue:amqqueue()}]}). + {'absent', amqqueue:amqqueue(), Reason :: term()}]}). -type bind_ok_or_error() :: 'ok' | bind_errors() | - rabbit_types:error( - {'binding_invalid', string(), [any()]}). + rabbit_types:error({'binding_invalid', string(), [any()]}) | + %% inner_fun() result + rabbit_types:error(rabbit_types:amqp_error()). -type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()). -type inner_fun() :: fun((rabbit_types:exchange(), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index bd99b3bbe28f..942c9ad94d5d 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1934,11 +1934,6 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, rabbit_amqqueue:not_found(Name); {error, {resources_missing, [{absent, Q, Reason} | _]}} -> rabbit_amqqueue:absent(Q, Reason); - {error, binding_not_found} -> - rabbit_misc:protocol_error( - not_found, "no binding ~s between ~s and ~s", - [RoutingKey, rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(DestinationName)]); {error, {binding_invalid, Fmt, Args}} -> rabbit_misc:protocol_error(precondition_failed, Fmt, Args); {error, #amqp_error{} = Error} -> diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index 7a60bed68a66..afc0f748e119 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -405,7 +405,11 @@ info_all(VHostPath, Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)). --spec route(rabbit_types:exchange(), rabbit_types:delivery()) +%% rabbit_types:delivery() is more strict than #delivery{}, some +%% fields can't be undefined. But there are places where +%% rabbit_exchange:route/2 is called with the absolutely bare delivery +%% like #delivery{message = #basic_message{routing_keys = [...]}} +-spec route(rabbit_types:exchange(), #delivery{}) -> [rabbit_amqqueue:name()]. route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index a10a408a1a8e..059d4ab55ace 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -136,7 +136,8 @@ -callback declare(amqqueue:amqqueue(), node()) -> {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | {'absent', amqqueue:amqqueue(), absent_reason()} | - {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}. + {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()} | + {'error', Err :: term() }. -callback delete(amqqueue:amqqueue(), boolean(), @@ -253,7 +254,8 @@ is_compatible(Type, Durable, Exclusive, AutoDelete) -> -spec declare(amqqueue:amqqueue(), node()) -> {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | {'absent', amqqueue:amqqueue(), absent_reason()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()} | + {'error', Err :: term() }. declare(Q0, Node) -> Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)), Mod = amqqueue:get_type(Q), diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex index e6154c0dd5fc..4c7480902021 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex @@ -66,7 +66,7 @@ defmodule RabbitMQ.CLI.Ctl.InfoKeys do with_valid_info_keys(args, valid_keys, [], fun) end - @spec with_valid_info_keys([charlist], [charlist], aliases, fun([atom])) :: any + @spec with_valid_info_keys([charlist], [charlist], aliases, ([atom] -> any)) :: any def with_valid_info_keys(args, valid_keys, aliases, fun) do case validate_info_keys(args, valid_keys, aliases) do {:ok, info_keys} -> fun.(:proplists.get_keys(info_keys)) diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel index 009791232b46..b1b21e8ed6cc 100644 --- a/deps/rabbitmq_stream/BUILD.bazel +++ b/deps/rabbitmq_stream/BUILD.bazel @@ -35,12 +35,13 @@ APP_ENV = """[ BUILD_DEPS = [ "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:rabbitmqctl", - "@ranch//:erlang_app", ] DEPS = [ "//deps/rabbitmq_stream_common:erlang_app", "//deps/rabbit:erlang_app", + "@ranch//:erlang_app", + "@osiris//:erlang_app", ] rabbitmq_app( @@ -64,12 +65,12 @@ plt( "//deps/rabbitmq_cli:rabbitmqctl", BUILD_DEPS + DEPS, ), + apps = ["erts", "kernel", "stdlib", "ssl"], ) dialyze( dialyzer_opts = RABBITMQ_DIALYZER_OPTS + ["-Wno_undefined_callbacks"], plt = ":base_plt", - warnings_as_errors = False, ) broker_for_integration_suites() diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index 16d353ef7eb8..7af2298cf379 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -215,6 +215,8 @@ run([SuperStream], stream_arguments(Opts) -> stream_arguments(#{}, Opts). +%% Something strange, dialyzer infers that map_size/1 returns positive_integer() +-dialyzer({no_match, stream_arguments/2}). stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 -> Acc; stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index d22faea1e341..9693e015a9c1 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -54,8 +54,7 @@ start(_Type, _Args) -> ++ "Enable stream_queue feature flag then disable " "and re-enable the rabbitmq_stream plugin. ", "See https://www.rabbitmq.com/feature-flags.html " - "to learn more", - []), + "to learn more"), {ok, self()} end. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 29478d3bfe90..6a6b09de37e3 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -243,11 +243,11 @@ handle_call({create_super_stream, ok -> {reply, ok, State}; Error -> - [Fun() || Fun <- RollbackOps], + _ = [Fun() || Fun <- RollbackOps], {reply, Error, State} end; {{error, Reason}, RollbackOps} -> - [Fun() || Fun <- RollbackOps], + _ = [Fun() || Fun <- RollbackOps], {reply, {error, Reason}, State} end; {error, Msg} -> @@ -392,12 +392,18 @@ handle_call({topology, VirtualHost, Stream}, _From, State) -> {error, stream_not_found} end; {error, not_found} -> +<<<<<<< HEAD case rabbit_amqqueue:not_found_or_absent_dirty(Name) of not_found -> {error, stream_not_found}; _ -> {error, stream_not_available} end +======= + {error, stream_not_found}; + {error, not_available} -> + {error, stream_not_available} +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) end, {reply, Res, State}; handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, @@ -427,6 +433,63 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> Res = super_stream_partitions(VirtualHost, SuperStream), {reply, Res, State}; +<<<<<<< HEAD +======= +handle_call({partition_index, VirtualHost, SuperStream, Stream}, + _From, State) -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), + rabbit_log:debug("Looking for partition index of stream ~tp in " + "super stream ~tp (virtual host ~tp)", + [Stream, SuperStream, VirtualHost]), + Res = try + _ = rabbit_exchange:lookup_or_die(ExchangeName), + UnorderedBindings = + _ = [Binding + || Binding = #binding{destination = #resource{name = Q} = D} + <- rabbit_binding:list_for_source(ExchangeName), + is_resource_stream_queue(D), Q == Stream], + OrderedBindings = + rabbit_stream_utils:sort_partitions(UnorderedBindings), + rabbit_log:debug("Bindings: ~p", [OrderedBindings]), + case OrderedBindings of + [] -> + {error, stream_not_found}; + Bindings -> + Binding = lists:nth(1, Bindings), + #binding{args = Args} = Binding, + case rabbit_misc:table_lookup(Args, + <<"x-stream-partition-order">>) + of + {_, Order} -> + Index = rabbit_data_coercion:to_integer(Order), + {ok, Index}; + _ -> + Pattern = <<"-">>, + Size = byte_size(Pattern), + case string:find(Stream, Pattern, trailing) of + nomatch -> + {ok, -1}; + <> -> + try + Index = binary_to_integer(Rest), + {ok, Index} + catch + error:_ -> + {ok, -1} + end; + _ -> + {ok, -1} + end + end + end + catch + exit:Error -> + rabbit_log:error("Error while looking up exchange ~p, ~p", + [ExchangeName, Error]), + {error, stream_not_found} + end, + {reply, Res, State}; +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) handle_call(which_children, _From, State) -> {reply, [], State}. @@ -551,7 +614,7 @@ delete_stream(VirtualHost, Reference, Username) -> super_stream_partitions(VirtualHost, SuperStream) -> ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), try - rabbit_exchange:lookup_or_die(ExchangeName), + _ = rabbit_exchange:lookup_or_die(ExchangeName), UnorderedBindings = [Binding || Binding = #binding{destination = D} @@ -747,13 +810,7 @@ add_super_stream_binding(VirtualHost, {error, {resources_missing, [{absent, Q, _Reason} | _]}} -> {error, {stream_not_found, - rabbit_misc:format("stream ~s does not exists (absent)", [Q])}}; - {error, binding_not_found} -> - {error, - {not_found, - rabbit_misc:format("no binding ~s between ~s and ~s", - [RoutingKey, rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)])}}; + rabbit_misc:format("stream ~ts does not exists (absent)", [Q])}}; {error, {binding_invalid, Fmt, Args}} -> {error, {binding_invalid, rabbit_misc:format(Fmt, Args)}}; {error, #amqp_error{} = Error} -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl index d74b21e5ebf3..a5cd996c0e96 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl @@ -30,8 +30,8 @@ -define(CTAG_PREFIX, <<"stream.subid-">>). init() -> - rabbit_core_metrics:create_table({?TABLE_CONSUMER, set}), - rabbit_core_metrics:create_table({?TABLE_PUBLISHER, set}), + _ = rabbit_core_metrics:create_table({?TABLE_CONSUMER, set}), + _ = rabbit_core_metrics:create_table({?TABLE_PUBLISHER, set}), ok. consumer_created(Connection, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl b/deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl index 147cc1bb821e..e5f688b3e8ab 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl @@ -54,7 +54,7 @@ handle_info(start_gc, State) -> {noreply, start_timer(State)}. terminate(_Reason, #state{timer = TRef}) -> - erlang:cancel_timer(TRef), + _ = erlang:cancel_timer(TRef), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 9d76a833dc4c..8359e26ab73a 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -85,7 +85,14 @@ send_file_oct :: atomics:atomics_ref(), % number of bytes sent with send_file (for metrics) transport :: tcp | ssl, +<<<<<<< HEAD proxy_socket :: undefined | ranch_proxy:proxy_socket()}). +======= + proxy_socket :: undefined | ranch_transport:socket(), + correlation_id_sequence :: integer(), + outstanding_requests :: #{integer() => term()}, + deliver_version :: rabbit_stream_core:command_version()}). +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) -record(configuration, {initial_credits :: integer(), credits_required_for_unblocking :: integer(), @@ -191,6 +198,9 @@ start_link(KeepaliveSup, Transport, Ref, Opts) -> proc_lib:spawn_link(?MODULE, init, [[KeepaliveSup, Transport, Ref, Opts]])}. +%% Because of gen_statem:enter_loop/4 usage inside init/1 +-dialyzer({no_behaviours, init/1}). + init([KeepaliveSup, Transport, Ref, @@ -244,7 +254,7 @@ init([KeepaliveSup, data = rabbit_stream_core:init(undefined)}, Transport:setopts(RealSocket, [{active, once}]), - rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}), + _ = rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}), ConnectionNegotiationStepTimeout = application:get_env(rabbitmq_stream, connection_negotiation_step_timeout, @@ -643,8 +653,26 @@ augment_infos_with_user_provided_connection_name(Infos, close(Transport, S, #stream_connection_state{consumers = Consumers}) -> +<<<<<<< HEAD [osiris_log:close(Log) || #consumer{log = Log} <- maps:values(Consumers)], +======= + [begin + _ = maybe_unregister_consumer(VirtualHost, Consumer, + single_active_consumer(Properties)), + case Log of + undefined -> + ok; %% segment may not be defined on subscription (single active consumer) + L -> + osiris_log:close(L) + end + end + || #consumer{log = Log, + configuration = + #consumer_configuration{properties = Properties}} = + Consumer + <- maps:values(Consumers)], +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) Transport:shutdown(S, write), Transport:close(S). @@ -759,15 +787,15 @@ open(info, {OK, S, Data}, end; open(info, {Closed, Socket}, #statem_data{connection = Connection}) when Closed =:= tcp_closed; Closed =:= ssl_closed -> - demonitor_all_streams(Connection), + _ = demonitor_all_streams(Connection), rabbit_log_connection:warning("Socket ~w closed [~w]", [Socket, self()]), stop; open(info, {Error, Socket, Reason}, #statem_data{connection = Connection}) when Error =:= tcp_error; Error =:= ssl_error -> - demonitor_all_streams(Connection), - rabbit_log_connection:error("Socket error ~p [~w] [~w]", + _ = demonitor_all_streams(Connection), + rabbit_log_connection:error("Socket error ~tp [~w] [~w]", [Reason, Socket, self()]), stop; open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason}, @@ -857,7 +885,7 @@ open({call, From}, {shutdown, Explanation}, % likely closing call from the management plugin rabbit_log_connection:info("Forcing stream connection ~p closing: ~p", [self(), Explanation]), - demonitor_all_streams(Connection), + _ = demonitor_all_streams(Connection), {stop_and_reply, normal, {reply, From, ok}}; open(cast, {queue_event, _, {osiris_written, _, undefined, CorrelationList}}, @@ -2365,6 +2393,7 @@ handle_frame_post_auth(Transport, end, Frame = +<<<<<<< HEAD <>, FrameSize = byte_size(Frame), Transport:send(S, <>), +======= + rabbit_stream_core:frame({response, CorrelationId, + {partitions, ResponseCode, Partitions}}), + + Transport:send(S, Frame), + {Connection, State}; +handle_frame_post_auth(Transport, + #stream_connection{transport = ConnTransport, + outstanding_requests = Requests0, + send_file_oct = SendFileOct, + virtual_host = VirtualHost, + deliver_version = DeliverVersion} = + Connection, + #stream_connection_state{consumers = Consumers} = State, + {response, CorrelationId, + {consumer_update, ResponseCode, ResponseOffsetSpec}}) -> + case ResponseCode of + ?RESPONSE_CODE_OK -> + ok; + RC -> + rabbit_log:info("Unexpected consumer update response code: ~p", + [RC]) + end, + case maps:take(CorrelationId, Requests0) of + {{{subscription_id, SubscriptionId}, {extra, Extra}}, Rs} -> + rabbit_log:debug("Received consumer update response for subscription " + "~tp", + [SubscriptionId]), + Consumers1 = + case Consumers of + #{SubscriptionId := + #consumer{configuration = + #consumer_configuration{active = + true}} = + Consumer} -> + %% active, dispatch messages + #consumer{configuration = + #consumer_configuration{properties = + Properties, + member_pid = + LocalMemberPid, + offset = + SubscriptionOffsetSpec, + stream = + Stream}} = + Consumer, + + OffsetSpec = + case ResponseOffsetSpec of + none -> + SubscriptionOffsetSpec; + ROS -> + ROS + end, + + rabbit_log:debug("Initializing reader for active consumer, offset " + "spec is ~p", + [OffsetSpec]), + QueueResource = + #resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + + Segment = + init_reader(ConnTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec), + Consumer1 = Consumer#consumer{log = Segment}, + Consumer2 = + case send_chunks(DeliverVersion, + Transport, + Consumer1, + SendFileOct) + of + {error, closed} -> + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", + []), + throw({stop, normal}); + {error, Reason} -> + rabbit_log_connection:info("Error while sending chunks: ~p", + [Reason]), + %% likely a connection problem + Consumer; + {ok, Csmr} -> + Csmr + end, + #consumer{configuration = + #consumer_configuration{counters = + ConsumerCounters}, + log = Log2} = + Consumer2, + ConsumerOffset = osiris_log:next_offset(Log2), + + rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp " + "message(s) distributed after subscription", + [SubscriptionId, ConsumerOffset, + messages_consumed(ConsumerCounters)]), + + Consumers#{SubscriptionId => Consumer2}; + #{SubscriptionId := + #consumer{configuration = + #consumer_configuration{active = false, + stream = Stream, + properties = + Properties}}} -> + rabbit_log:debug("Not an active consumer"), + + case Extra of + [{stepping_down, true}] -> + ConsumerName = consumer_name(Properties), + _ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, + Stream, + ConsumerName), + ok; + _ -> + ok + end, + + Consumers; + _ -> + rabbit_log:debug("No consumer found for subscription ~p", + [SubscriptionId]), + Consumers + end, + + {Connection#stream_connection{outstanding_requests = Rs}, + State#stream_connection_state{consumers = Consumers1}}; + {V, _Rs} -> + rabbit_log:warning("Unexpected outstanding requests for correlation " + "ID ~p: ~p", + [CorrelationId, V]), + {Connection, State}; + error -> + rabbit_log:warning("Could not find outstanding consumer update request " + "with correlation ID ~p. No actions taken for " + "the subscription.", + [CorrelationId]), + {Connection, State} + end; +handle_frame_post_auth(Transport, + #stream_connection{socket = S} = Connection0, + State, + {request, CorrelationId, + {exchange_command_versions, CommandVersions}}) -> + Frame = + rabbit_stream_core:frame({response, CorrelationId, + {exchange_command_versions, ?RESPONSE_CODE_OK, + rabbit_stream_utils:command_versions()}}), + send(Transport, S, Frame), + + %% adapt connection handlers to client capabilities + Connection1 = + process_client_command_versions(Connection0, CommandVersions), + {Connection1, State}; +handle_frame_post_auth(Transport, + #stream_connection{socket = S, + virtual_host = VirtualHost, + user = User} = + Connection, + State, + {request, CorrelationId, {stream_stats, Stream}}) -> + QueueResource = + #resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + Response = + case rabbit_stream_utils:check_read_permitted(QueueResource, User, + #{}) + of + ok -> + case rabbit_stream_manager:lookup_member(VirtualHost, Stream) of + {error, not_available} -> + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {stream_stats, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE, + #{}}; + {error, not_found} -> + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_DOES_NOT_EXIST, + 1), + {stream_stats, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, + #{}}; + {ok, MemberPid} -> + StreamStats = + maps:fold(fun(K, V, Acc) -> + Acc#{atom_to_binary(K) => V} + end, + #{}, osiris:get_stats(MemberPid)), + {stream_stats, ?RESPONSE_CODE_OK, StreamStats} + end; + error -> + rabbit_global_counters:increase_protocol_counter(stream, + ?ACCESS_REFUSED, + 1), + {stream_stats, ?RESPONSE_CODE_ACCESS_REFUSED, #{}} + end, + Frame = rabbit_stream_core:frame({response, CorrelationId, Response}), + send(Transport, S, Frame), +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) {Connection, State}; handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, @@ -2406,6 +2639,213 @@ handle_frame_post_auth(Transport, ?UNKNOWN_FRAME, 1), {Connection#stream_connection{connection_step = close_sent}, State}. +<<<<<<< HEAD +======= +process_client_command_versions(C, []) -> + C; +process_client_command_versions(C, [H | T]) -> + process_client_command_versions(process_client_command_api(C, H), T). + +process_client_command_api(C, {deliver, ?VERSION_1, ?VERSION_2}) -> + C#stream_connection{deliver_version = ?VERSION_2}; +process_client_command_api(C, _) -> + C. + +init_reader(ConnectionTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec) -> + CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []}, + Options = + #{transport => ConnectionTransport, + chunk_selector => get_chunk_selector(Properties)}, + {ok, Segment} = + osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options), + rabbit_log:debug("Next offset for subscription ~p is ~p", + [SubscriptionId, osiris_log:next_offset(Segment)]), + Segment. + +single_active_consumer(#{<<"single-active-consumer">> := + <<"true">>}) -> + true; +single_active_consumer(_Properties) -> + false. + +consumer_name(#{<<"name">> := Name}) -> + Name; +consumer_name(_Properties) -> + undefined. + +maybe_dispatch_on_subscription(Transport, + State, + ConsumerState, + #stream_connection{deliver_version = + DeliverVersion} = + Connection, + Consumers, + Stream, + SubscriptionId, + SubscriptionProperties, + SendFileOct, + false = _Sac) -> + rabbit_log:debug("Distributing existing messages to subscription " + "~tp", + [SubscriptionId]), + case send_chunks(DeliverVersion, + Transport, + ConsumerState, + SendFileOct) + of + {error, closed} -> + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", + []), + throw({stop, normal}); + {ok, #consumer{log = Log1, credit = Credit1} = ConsumerState1} -> + Consumers1 = Consumers#{SubscriptionId => ConsumerState1}, + + #consumer{configuration = + #consumer_configuration{counters = + ConsumerCounters1}} = + ConsumerState1, + + ConsumerOffset = osiris_log:next_offset(Log1), + ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1), + + rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp " + "message(s) distributed after subscription", + [SubscriptionId, ConsumerOffset, + messages_consumed(ConsumerCounters1)]), + + rabbit_stream_metrics:consumer_created(self(), + stream_r(Stream, Connection), + SubscriptionId, + Credit1, + messages_consumed(ConsumerCounters1), + ConsumerOffset, + ConsumerOffsetLag, + true, + SubscriptionProperties), + State#stream_connection_state{consumers = Consumers1} + end; +maybe_dispatch_on_subscription(_Transport, + State, + ConsumerState, + Connection, + Consumers, + Stream, + SubscriptionId, + SubscriptionProperties, + _SendFileOct, + true = _Sac) -> + rabbit_log:debug("No initial dispatch for subscription ~tp for " + "now, waiting for consumer update response from " + "client (single active consumer)", + [SubscriptionId]), + #consumer{credit = Credit, + configuration = + #consumer_configuration{offset = Offset, active = Active}} = + ConsumerState, + + rabbit_stream_metrics:consumer_created(self(), + stream_r(Stream, Connection), + SubscriptionId, + Credit, + 0, %% messages consumed + Offset, + 0, %% offset lag + Active, + SubscriptionProperties), + Consumers1 = Consumers#{SubscriptionId => ConsumerState}, + State#stream_connection_state{consumers = Consumers1}. + +maybe_register_consumer(_, _, _, _, _, _, false = _Sac) -> + true; +maybe_register_consumer(VirtualHost, + Stream, + ConsumerName, + ConnectionName, + SubscriptionId, + Properties, + true) -> + PartitionIndex = partition_index(VirtualHost, Stream, Properties), + {ok, Active} = + rabbit_stream_sac_coordinator:register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + self(), + ConnectionName, + SubscriptionId), + Active. + +%% NOTE: Never called with SAC = false, but adding an explicit type +%% still doesn't convince dialyzer. Keeping this clause commented out +%% instead of disabling some dialyzer checks for this function: +%% +%% maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) -> +%% Connection; +maybe_send_consumer_update(Transport, + #stream_connection{socket = S, + correlation_id_sequence = + CorrIdSeq, + outstanding_requests = + OutstandingRequests0} = + Connection, + SubscriptionId, + Active, + true = _Sac, + Extra) -> + rabbit_log:debug("SAC subscription ~p, active = ~p", + [SubscriptionId, Active]), + Frame = + rabbit_stream_core:frame({request, CorrIdSeq, + {consumer_update, SubscriptionId, Active}}), + + OutstandingRequests1 = + maps:put(CorrIdSeq, + {{subscription_id, SubscriptionId}, {extra, Extra}}, + OutstandingRequests0), + send(Transport, S, Frame), + Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1, + outstanding_requests = OutstandingRequests1}. + +maybe_unregister_consumer(_, _, false = _Sac) -> + ok; +maybe_unregister_consumer(VirtualHost, + #consumer{configuration = + #consumer_configuration{stream = Stream, + properties = + Properties, + subscription_id + = + SubscriptionId}}, + true = _Sac) -> + ConsumerName = consumer_name(Properties), + rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost, + Stream, + ConsumerName, + self(), + SubscriptionId). + +partition_index(VirtualHost, Stream, Properties) -> + case Properties of + #{<<"super-stream">> := SuperStream} -> + case rabbit_stream_manager:partition_index(VirtualHost, SuperStream, + Stream) + of + {ok, Index} -> + Index; + _ -> + -1 + end; + _ -> + -1 + end. + +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) notify_connection_closed(#statem_data{connection = #stream_connection{name = Name, publishers = @@ -2475,9 +2915,21 @@ clean_state_after_stream_deletion_or_failure(Stream, case stream_has_subscriptions(Stream, C0) of true -> #{Stream := SubscriptionIds} = StreamSubscriptions, +<<<<<<< HEAD [rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, C0), SubId) +======= + _ = [begin + rabbit_stream_metrics:consumer_cancelled(self(), + stream_r(Stream, + C0), + SubId), + #{SubId := Consumer} = Consumers, + maybe_unregister_consumer(VirtualHost, Consumer, + single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)) + end +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) || SubId <- SubscriptionIds], {true, C0#stream_connection{stream_subscriptions = @@ -2586,6 +3038,11 @@ remove_subscription(SubscriptionId, rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, Connection2), SubscriptionId), +<<<<<<< HEAD +======= + _ = maybe_unregister_consumer(VirtualHost, Consumer, + single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)), +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) {Connection2, State#stream_connection_state{consumers = Consumers1}}. maybe_clean_connection_from_stream(Stream, @@ -2696,6 +3153,34 @@ send_file_callback(Transport, atomics:add(Counter, 1, Size), increase_messages_consumed(Counters, NumEntries), set_consumer_offset(Counters, FirstOffsetInChunk) +<<<<<<< HEAD +======= + end; +send_file_callback(?VERSION_2, + Transport, + Log, + #consumer{configuration = + #consumer_configuration{socket = S, + subscription_id = + SubscriptionId, + counters = Counters}}, + Counter) -> + fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, + Size) -> + FrameSize = 2 + 2 + 1 + 8 + Size, + CommittedChunkId = osiris_log:committed_offset(Log), + FrameBeginning = + <>, + Transport:send(S, FrameBeginning), + atomics:add(Counter, 1, Size), + increase_messages_consumed(Counters, NumEntries), + set_consumer_offset(Counters, FirstOffsetInChunk) +>>>>>>> f4e4db1d12 (Fix all dialyzer warnings in rabbitmq_stream) end. send_chunks(Transport, diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index d16b887df194..19df7f02deff 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -30,7 +30,7 @@ -define(STRING(Str), (byte_size(Str)):16, Str / binary). -define(DATASTR(Str), (byte_size(Str)):32, Str / binary). --export_type([state/0]). +-export_type([state/0, command_version/0]). -type correlation_id() :: non_neg_integer(). %% publishing sequence number diff --git a/deps/rabbitmq_stream_management/BUILD.bazel b/deps/rabbitmq_stream_management/BUILD.bazel index ab602c9b9c0f..0713b7996578 100644 --- a/deps/rabbitmq_stream_management/BUILD.bazel +++ b/deps/rabbitmq_stream_management/BUILD.bazel @@ -46,7 +46,6 @@ plt( dialyze( dialyzer_opts = RABBITMQ_DIALYZER_OPTS, plt = ":base_plt", - warnings_as_errors = False, ) rabbitmq_home( diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_connections_mgmt.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_connections_mgmt.erl index 90672eeaae9f..477d889ba4cb 100644 --- a/deps/rabbitmq_stream_management/src/rabbit_stream_connections_mgmt.erl +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_connections_mgmt.erl @@ -37,8 +37,7 @@ web_ui() -> "and re-enable the rabbitmq_stream_management " "plugin. ", "See https://www.rabbitmq.com/feature-flags.html " - "to learn more", - []), + "to learn more"), [] end.