From f4e4db1d122aba7a63ba63dd0ee8f7953caa5e39 Mon Sep 17 00:00:00 2001 From: Alexey Lebedeff Date: Mon, 16 Jan 2023 16:58:12 +0100 Subject: [PATCH 1/3] Fix all dialyzer warnings in rabbitmq_stream There are some elixir-related messages about undefined functions, but they don't produce warnings (yet). (cherry picked from commit 2c4e4fb691050e280821689c08ac3625610f54fd) # Conflicts: # deps/rabbit/src/rabbit_channel.erl # deps/rabbitmq_stream/src/rabbit_stream_manager.erl # deps/rabbitmq_stream/src/rabbit_stream_reader.erl --- deps/rabbit/src/rabbit_binding.erl | 7 +-- deps/rabbit/src/rabbit_channel.erl | 3 ++ deps/rabbit/src/rabbit_exchange.erl | 6 ++- deps/rabbit/src/rabbit_queue_type.erl | 6 ++- .../lib/rabbitmq/cli/ctl/info_keys.ex | 2 +- deps/rabbitmq_stream/BUILD.bazel | 5 ++- ...CLI.Ctl.Commands.AddSuperStreamCommand.erl | 2 + deps/rabbitmq_stream/src/rabbit_stream.erl | 3 +- .../src/rabbit_stream_manager.erl | 18 ++++---- .../src/rabbit_stream_metrics.erl | 4 +- .../src/rabbit_stream_metrics_gc.erl | 2 +- .../src/rabbit_stream_reader.erl | 45 +++++++++++-------- .../src/rabbit_stream_core.erl | 2 +- deps/rabbitmq_stream_management/BUILD.bazel | 1 - .../src/rabbit_stream_connections_mgmt.erl | 3 +- 15 files changed, 65 insertions(+), 44 deletions(-) diff --git a/deps/rabbit/src/rabbit_binding.erl b/deps/rabbit/src/rabbit_binding.erl index 29af64d212ba..f615b9376ab3 100644 --- a/deps/rabbit/src/rabbit_binding.erl +++ b/deps/rabbit/src/rabbit_binding.erl @@ -37,11 +37,12 @@ {'resources_missing', [{'not_found', (rabbit_types:binding_source() | rabbit_types:binding_destination())} | - {'absent', amqqueue:amqqueue()}]}). + {'absent', amqqueue:amqqueue(), Reason :: term()}]}). -type bind_ok_or_error() :: 'ok' | bind_errors() | - rabbit_types:error( - {'binding_invalid', string(), [any()]}). + rabbit_types:error({'binding_invalid', string(), [any()]}) | + %% inner_fun() result + rabbit_types:error(rabbit_types:amqp_error()). -type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()). -type inner_fun() :: fun((rabbit_types:exchange(), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index f49c82f9f3e8..93da6cfd0155 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1934,11 +1934,14 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, rabbit_amqqueue:not_found(Name); {error, {resources_missing, [{absent, Q, Reason} | _]}} -> rabbit_amqqueue:absent(Q, Reason); +<<<<<<< HEAD {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(DestinationName)]); +======= +>>>>>>> 2c4e4fb691 (Fix all dialyzer warnings in rabbitmq_stream) {error, {binding_invalid, Fmt, Args}} -> rabbit_misc:protocol_error(precondition_failed, Fmt, Args); {error, #amqp_error{} = Error} -> diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index e715927dca21..218892d6f352 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -393,7 +393,11 @@ info_all(VHostPath, Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)). --spec route(rabbit_types:exchange(), rabbit_types:delivery()) +%% rabbit_types:delivery() is more strict than #delivery{}, some +%% fields can't be undefined. But there are places where +%% rabbit_exchange:route/2 is called with the absolutely bare delivery +%% like #delivery{message = #basic_message{routing_keys = [...]}} +-spec route(rabbit_types:exchange(), #delivery{}) -> [rabbit_amqqueue:name()]. route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 6c5b210d90d4..bc8718f6973f 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -137,7 +137,8 @@ -callback declare(amqqueue:amqqueue(), node()) -> {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | {'absent', amqqueue:amqqueue(), absent_reason()} | - {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}. + {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()} | + {'error', Err :: term() }. -callback delete(amqqueue:amqqueue(), boolean(), @@ -263,7 +264,8 @@ is_compatible(Type, Durable, Exclusive, AutoDelete) -> -spec declare(amqqueue:amqqueue(), node()) -> {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | {'absent', amqqueue:amqqueue(), absent_reason()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()} | + {'error', Err :: term() }. declare(Q0, Node) -> Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)), Mod = amqqueue:get_type(Q), diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex index e6154c0dd5fc..4c7480902021 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex @@ -66,7 +66,7 @@ defmodule RabbitMQ.CLI.Ctl.InfoKeys do with_valid_info_keys(args, valid_keys, [], fun) end - @spec with_valid_info_keys([charlist], [charlist], aliases, fun([atom])) :: any + @spec with_valid_info_keys([charlist], [charlist], aliases, ([atom] -> any)) :: any def with_valid_info_keys(args, valid_keys, aliases, fun) do case validate_info_keys(args, valid_keys, aliases) do {:ok, info_keys} -> fun.(:proplists.get_keys(info_keys)) diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel index 009791232b46..b1b21e8ed6cc 100644 --- a/deps/rabbitmq_stream/BUILD.bazel +++ b/deps/rabbitmq_stream/BUILD.bazel @@ -35,12 +35,13 @@ APP_ENV = """[ BUILD_DEPS = [ "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:rabbitmqctl", - "@ranch//:erlang_app", ] DEPS = [ "//deps/rabbitmq_stream_common:erlang_app", "//deps/rabbit:erlang_app", + "@ranch//:erlang_app", + "@osiris//:erlang_app", ] rabbitmq_app( @@ -64,12 +65,12 @@ plt( "//deps/rabbitmq_cli:rabbitmqctl", BUILD_DEPS + DEPS, ), + apps = ["erts", "kernel", "stdlib", "ssl"], ) dialyze( dialyzer_opts = RABBITMQ_DIALYZER_OPTS + ["-Wno_undefined_callbacks"], plt = ":base_plt", - warnings_as_errors = False, ) broker_for_integration_suites() diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index fbe9a156d2c4..f512abd140b8 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -215,6 +215,8 @@ run([SuperStream], stream_arguments(Opts) -> stream_arguments(#{}, Opts). +%% Something strange, dialyzer infers that map_size/1 returns positive_integer() +-dialyzer({no_match, stream_arguments/2}). stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 -> Acc; stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index 10cb76fda6af..f54b558b33e7 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -53,8 +53,7 @@ start(_Type, _Args) -> ++ "Enable stream_queue feature flag then disable " "and re-enable the rabbitmq_stream plugin. ", "See https://www.rabbitmq.com/feature-flags.html " - "to learn more", - []), + "to learn more"), {ok, self()} end. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 7b9c8a0a74c9..d121e96fbc61 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -267,11 +267,11 @@ handle_call({create_super_stream, ok -> {reply, ok, State}; Error -> - [Fun() || Fun <- RollbackOps], + _ = [Fun() || Fun <- RollbackOps], {reply, Error, State} end; {{error, Reason}, RollbackOps} -> - [Fun() || Fun <- RollbackOps], + _ = [Fun() || Fun <- RollbackOps], {reply, {error, Reason}, State} end; {error, Msg} -> @@ -410,9 +410,7 @@ handle_call({topology, VirtualHost, Stream}, _From, State) -> {error, not_found} -> {error, stream_not_found}; {error, not_available} -> - {error, stream_not_available}; - R -> - R + {error, stream_not_available} end, {reply, Res, State}; handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, @@ -449,9 +447,9 @@ handle_call({partition_index, VirtualHost, SuperStream, Stream}, "super stream ~tp (virtual host ~tp)", [Stream, SuperStream, VirtualHost]), Res = try - rabbit_exchange:lookup_or_die(ExchangeName), + _ = rabbit_exchange:lookup_or_die(ExchangeName), UnorderedBindings = - [Binding + _ = [Binding || Binding = #binding{destination = #resource{name = Q} = D} <- rabbit_binding:list_for_source(ExchangeName), is_resource_stream_queue(D), Q == Stream], @@ -620,7 +618,7 @@ delete_stream(VirtualHost, Reference, Username) -> super_stream_partitions(VirtualHost, SuperStream) -> ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), try - rabbit_exchange:lookup_or_die(ExchangeName), + _ = rabbit_exchange:lookup_or_die(ExchangeName), UnorderedBindings = [Binding || Binding = #binding{destination = D} @@ -817,6 +815,7 @@ add_super_stream_binding(VirtualHost, {error, {resources_missing, [{absent, Q, _Reason} | _]}} -> {error, {stream_not_found, +<<<<<<< HEAD rabbit_misc:format("stream ~s does not exists (absent)", [Q])}}; {error, binding_not_found} -> {error, @@ -824,6 +823,9 @@ add_super_stream_binding(VirtualHost, rabbit_misc:format("no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)])}}; +======= + rabbit_misc:format("stream ~ts does not exists (absent)", [Q])}}; +>>>>>>> 2c4e4fb691 (Fix all dialyzer warnings in rabbitmq_stream) {error, {binding_invalid, Fmt, Args}} -> {error, {binding_invalid, rabbit_misc:format(Fmt, Args)}}; {error, #amqp_error{} = Error} -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl index d8256132e640..73dd1535d01a 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl @@ -30,8 +30,8 @@ -define(CTAG_PREFIX, <<"stream.subid-">>). init() -> - rabbit_core_metrics:create_table({?TABLE_CONSUMER, set}), - rabbit_core_metrics:create_table({?TABLE_PUBLISHER, set}), + _ = rabbit_core_metrics:create_table({?TABLE_CONSUMER, set}), + _ = rabbit_core_metrics:create_table({?TABLE_PUBLISHER, set}), ok. consumer_created(Connection, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl b/deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl index 147cc1bb821e..e5f688b3e8ab 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl @@ -54,7 +54,7 @@ handle_info(start_gc, State) -> {noreply, start_timer(State)}. terminate(_Reason, #state{timer = TRef}) -> - erlang:cancel_timer(TRef), + _ = erlang:cancel_timer(TRef), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 1b5065bd5f70..0b055cd3713a 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -86,7 +86,7 @@ send_file_oct :: atomics:atomics_ref(), % number of bytes sent with send_file (for metrics) transport :: tcp | ssl, - proxy_socket :: undefined | ranch_proxy:proxy_socket(), + proxy_socket :: undefined | ranch_transport:socket(), correlation_id_sequence :: integer(), outstanding_requests :: #{integer() => term()}, deliver_version :: rabbit_stream_core:command_version()}). @@ -196,6 +196,9 @@ start_link(KeepaliveSup, Transport, Ref, Opts) -> proc_lib:spawn_link(?MODULE, init, [[KeepaliveSup, Transport, Ref, Opts]])}. +%% Because of gen_statem:enter_loop/4 usage inside init/1 +-dialyzer({no_behaviours, init/1}). + init([KeepaliveSup, Transport, Ref, @@ -253,7 +256,7 @@ init([KeepaliveSup, data = rabbit_stream_core:init(undefined)}, Transport:setopts(RealSocket, [{active, once}]), - rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}), + _ = rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}), ConnectionNegotiationStepTimeout = application:get_env(rabbitmq_stream, connection_negotiation_step_timeout, @@ -654,7 +657,7 @@ close(Transport, #stream_connection{socket = S, virtual_host = VirtualHost}, #stream_connection_state{consumers = Consumers}) -> [begin - maybe_unregister_consumer(VirtualHost, Consumer, + _ = maybe_unregister_consumer(VirtualHost, Consumer, single_active_consumer(Properties)), case Log of undefined -> @@ -847,15 +850,20 @@ open(info, connection_state = ConnState1}}; open(info, {Closed, Socket}, #statem_data{connection = Connection}) when Closed =:= tcp_closed; Closed =:= ssl_closed -> - demonitor_all_streams(Connection), + _ = demonitor_all_streams(Connection), rabbit_log_connection:warning("Socket ~w closed [~w]", [Socket, self()]), stop; open(info, {Error, Socket, Reason}, #statem_data{connection = Connection}) when Error =:= tcp_error; Error =:= ssl_error -> +<<<<<<< HEAD demonitor_all_streams(Connection), rabbit_log_connection:error("Socket error ~p [~w] [~w]", +======= + _ = demonitor_all_streams(Connection), + rabbit_log_connection:error("Socket error ~tp [~w] [~w]", +>>>>>>> 2c4e4fb691 (Fix all dialyzer warnings in rabbitmq_stream) [Reason, Socket, self()]), stop; open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason}, @@ -945,7 +953,7 @@ open({call, From}, {shutdown, Explanation}, % likely closing call from the management plugin rabbit_log_connection:info("Forcing stream connection ~p closing: ~p", [self(), Explanation]), - demonitor_all_streams(Connection), + _ = demonitor_all_streams(Connection), {stop_and_reply, normal, {reply, From, ok}}; open(cast, {queue_event, _, {osiris_written, _, undefined, CorrelationList}}, @@ -2584,9 +2592,10 @@ handle_frame_post_auth(Transport, case Extra of [{stepping_down, true}] -> ConsumerName = consumer_name(Properties), - rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, - Stream, - ConsumerName); + _ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, + Stream, + ConsumerName), + ok; _ -> ok end, @@ -2846,8 +2855,12 @@ maybe_register_consumer(VirtualHost, SubscriptionId), Active. -maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) -> - Connection; +%% NOTE: Never called with SAC = false, but adding an explicit type +%% still doesn't convince dialyzer. Keeping this clause commented out +%% instead of disabling some dialyzer checks for this function: +%% +%% maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) -> +%% Connection; maybe_send_consumer_update(Transport, #stream_connection{socket = S, correlation_id_sequence = @@ -2977,7 +2990,7 @@ clean_state_after_stream_deletion_or_failure(Stream, case stream_has_subscriptions(Stream, C0) of true -> #{Stream := SubscriptionIds} = StreamSubscriptions, - [begin + _ = [begin rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, C0), @@ -3095,8 +3108,8 @@ remove_subscription(SubscriptionId, rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, Connection2), SubscriptionId), - maybe_unregister_consumer(VirtualHost, Consumer, - single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)), + _ = maybe_unregister_consumer(VirtualHost, Consumer, + single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)), {Connection2, State#stream_connection_state{consumers = Consumers1}}. maybe_clean_connection_from_stream(Stream, @@ -3222,11 +3235,7 @@ send_file_callback(?VERSION_2, fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, Size) -> FrameSize = 2 + 2 + 1 + 8 + Size, - CommittedChunkId = - case osiris_log:committed_offset(Log) of - undefined -> 0; - R -> R - end, + CommittedChunkId = osiris_log:committed_offset(Log), FrameBeginning = < "and re-enable the rabbitmq_stream_management " "plugin. ", "See https://www.rabbitmq.com/feature-flags.html " - "to learn more", - []), + "to learn more"), [] end. From 62e40494fcdc73d78b624db4ed216169cc002b35 Mon Sep 17 00:00:00 2001 From: Alexey Lebedeff Date: Mon, 16 Jan 2023 17:13:41 +0000 Subject: [PATCH 2/3] Fix conflict --- deps/rabbit/src/rabbit_channel.erl | 8 -------- deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 10 ---------- 2 files changed, 18 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 93da6cfd0155..98161e6cc15f 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1934,14 +1934,6 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, rabbit_amqqueue:not_found(Name); {error, {resources_missing, [{absent, Q, Reason} | _]}} -> rabbit_amqqueue:absent(Q, Reason); -<<<<<<< HEAD - {error, binding_not_found} -> - rabbit_misc:protocol_error( - not_found, "no binding ~s between ~s and ~s", - [RoutingKey, rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(DestinationName)]); -======= ->>>>>>> 2c4e4fb691 (Fix all dialyzer warnings in rabbitmq_stream) {error, {binding_invalid, Fmt, Args}} -> rabbit_misc:protocol_error(precondition_failed, Fmt, Args); {error, #amqp_error{} = Error} -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index d121e96fbc61..b743454dd741 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -815,17 +815,7 @@ add_super_stream_binding(VirtualHost, {error, {resources_missing, [{absent, Q, _Reason} | _]}} -> {error, {stream_not_found, -<<<<<<< HEAD - rabbit_misc:format("stream ~s does not exists (absent)", [Q])}}; - {error, binding_not_found} -> - {error, - {not_found, - rabbit_misc:format("no binding ~s between ~s and ~s", - [RoutingKey, rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)])}}; -======= rabbit_misc:format("stream ~ts does not exists (absent)", [Q])}}; ->>>>>>> 2c4e4fb691 (Fix all dialyzer warnings in rabbitmq_stream) {error, {binding_invalid, Fmt, Args}} -> {error, {binding_invalid, rabbit_misc:format(Fmt, Args)}}; {error, #amqp_error{} = Error} -> From 9d868fb763f6c9197ecd41c2eec537f03f3b6ab0 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 16 Jan 2023 11:15:46 -0600 Subject: [PATCH 3/3] Resolve conflicts --- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 5 ----- 1 file changed, 5 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 0b055cd3713a..148409e4967e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -857,13 +857,8 @@ open(info, {Closed, Socket}, #statem_data{connection = Connection}) open(info, {Error, Socket, Reason}, #statem_data{connection = Connection}) when Error =:= tcp_error; Error =:= ssl_error -> -<<<<<<< HEAD - demonitor_all_streams(Connection), - rabbit_log_connection:error("Socket error ~p [~w] [~w]", -======= _ = demonitor_all_streams(Connection), rabbit_log_connection:error("Socket error ~tp [~w] [~w]", ->>>>>>> 2c4e4fb691 (Fix all dialyzer warnings in rabbitmq_stream) [Reason, Socket, self()]), stop; open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},