Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions deps/rabbit/src/rabbit_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
{'resources_missing',
[{'not_found', (rabbit_types:binding_source() |
rabbit_types:binding_destination())} |
{'absent', amqqueue:amqqueue()}]}).
{'absent', amqqueue:amqqueue(), Reason :: term()}]}).

-type bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error(
{'binding_invalid', string(), [any()]}).
rabbit_types:error({'binding_invalid', string(), [any()]}) |
%% inner_fun() result
rabbit_types:error(rabbit_types:amqp_error()).
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
-type inner_fun() ::
fun((rabbit_types:exchange(),
Expand Down
5 changes: 0 additions & 5 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1934,11 +1934,6 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
rabbit_amqqueue:not_found(Name);
{error, {resources_missing, [{absent, Q, Reason} | _]}} ->
rabbit_amqqueue:absent(Q, Reason);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(DestinationName)]);
{error, {binding_invalid, Fmt, Args}} ->
rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,11 @@ info_all(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)).

-spec route(rabbit_types:exchange(), rabbit_types:delivery())
%% rabbit_types:delivery() is more strict than #delivery{}, some
%% fields can't be undefined. But there are places where
%% rabbit_exchange:route/2 is called with the absolutely bare delivery
%% like #delivery{message = #basic_message{routing_keys = [...]}}
-spec route(rabbit_types:exchange(), #delivery{})
-> [rabbit_amqqueue:name()].

route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@
-callback declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), absent_reason()} |
{'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
{'protocol_error', Type :: atom(), Reason :: string(), Args :: term()} |
{'error', Err :: term() }.

-callback delete(amqqueue:amqqueue(),
boolean(),
Expand Down Expand Up @@ -263,7 +264,8 @@ is_compatible(Type, Durable, Exclusive, AutoDelete) ->
-spec declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), absent_reason()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()} |
{'error', Err :: term() }.
declare(Q0, Node) ->
Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)),
Mod = amqqueue:get_type(Q),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/info_keys.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ defmodule RabbitMQ.CLI.Ctl.InfoKeys do
with_valid_info_keys(args, valid_keys, [], fun)
end

@spec with_valid_info_keys([charlist], [charlist], aliases, fun([atom])) :: any
@spec with_valid_info_keys([charlist], [charlist], aliases, ([atom] -> any)) :: any
def with_valid_info_keys(args, valid_keys, aliases, fun) do
case validate_info_keys(args, valid_keys, aliases) do
{:ok, info_keys} -> fun.(:proplists.get_keys(info_keys))
Expand Down
5 changes: 3 additions & 2 deletions deps/rabbitmq_stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ APP_ENV = """[
BUILD_DEPS = [
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_cli:rabbitmqctl",
"@ranch//:erlang_app",
]

DEPS = [
"//deps/rabbitmq_stream_common:erlang_app",
"//deps/rabbit:erlang_app",
"@ranch//:erlang_app",
"@osiris//:erlang_app",
]

rabbitmq_app(
Expand All @@ -64,12 +65,12 @@ plt(
"//deps/rabbitmq_cli:rabbitmqctl",
BUILD_DEPS + DEPS,
),
apps = ["erts", "kernel", "stdlib", "ssl"],
)

dialyze(
dialyzer_opts = RABBITMQ_DIALYZER_OPTS + ["-Wno_undefined_callbacks"],
plt = ":base_plt",
warnings_as_errors = False,
)

broker_for_integration_suites()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ run([SuperStream],
stream_arguments(Opts) ->
stream_arguments(#{}, Opts).

%% Something strange, dialyzer infers that map_size/1 returns positive_integer()
-dialyzer({no_match, stream_arguments/2}).
stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 ->
Acc;
stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) ->
Expand Down
3 changes: 1 addition & 2 deletions deps/rabbitmq_stream/src/rabbit_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ start(_Type, _Args) ->
++ "Enable stream_queue feature flag then disable "
"and re-enable the rabbitmq_stream plugin. ",
"See https://www.rabbitmq.com/feature-flags.html "
"to learn more",
[]),
"to learn more"),
{ok, self()}
end.

Expand Down
22 changes: 7 additions & 15 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ handle_call({create_super_stream,
ok ->
{reply, ok, State};
Error ->
[Fun() || Fun <- RollbackOps],
_ = [Fun() || Fun <- RollbackOps],
{reply, Error, State}
end;
{{error, Reason}, RollbackOps} ->
[Fun() || Fun <- RollbackOps],
_ = [Fun() || Fun <- RollbackOps],
{reply, {error, Reason}, State}
end;
{error, Msg} ->
Expand Down Expand Up @@ -410,9 +410,7 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
{error, not_found} ->
{error, stream_not_found};
{error, not_available} ->
{error, stream_not_available};
R ->
R
{error, stream_not_available}
end,
{reply, Res, State};
handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
Expand Down Expand Up @@ -449,9 +447,9 @@ handle_call({partition_index, VirtualHost, SuperStream, Stream},
"super stream ~tp (virtual host ~tp)",
[Stream, SuperStream, VirtualHost]),
Res = try
rabbit_exchange:lookup_or_die(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
UnorderedBindings =
[Binding
_ = [Binding
|| Binding = #binding{destination = #resource{name = Q} = D}
<- rabbit_binding:list_for_source(ExchangeName),
is_resource_stream_queue(D), Q == Stream],
Expand Down Expand Up @@ -620,7 +618,7 @@ delete_stream(VirtualHost, Reference, Username) ->
super_stream_partitions(VirtualHost, SuperStream) ->
ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
try
rabbit_exchange:lookup_or_die(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
UnorderedBindings =
[Binding
|| Binding = #binding{destination = D}
Expand Down Expand Up @@ -817,13 +815,7 @@ add_super_stream_binding(VirtualHost,
{error, {resources_missing, [{absent, Q, _Reason} | _]}} ->
{error,
{stream_not_found,
rabbit_misc:format("stream ~s does not exists (absent)", [Q])}};
{error, binding_not_found} ->
{error,
{not_found,
rabbit_misc:format("no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(QueueName)])}};
rabbit_misc:format("stream ~ts does not exists (absent)", [Q])}};
{error, {binding_invalid, Fmt, Args}} ->
{error, {binding_invalid, rabbit_misc:format(Fmt, Args)}};
{error, #amqp_error{} = Error} ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_stream/src/rabbit_stream_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
-define(CTAG_PREFIX, <<"stream.subid-">>).

init() ->
rabbit_core_metrics:create_table({?TABLE_CONSUMER, set}),
rabbit_core_metrics:create_table({?TABLE_PUBLISHER, set}),
_ = rabbit_core_metrics:create_table({?TABLE_CONSUMER, set}),
_ = rabbit_core_metrics:create_table({?TABLE_PUBLISHER, set}),
ok.

consumer_created(Connection,
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_stream/src/rabbit_stream_metrics_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ handle_info(start_gc, State) ->
{noreply, start_timer(State)}.

terminate(_Reason, #state{timer = TRef}) ->
erlang:cancel_timer(TRef),
_ = erlang:cancel_timer(TRef),
ok.

code_change(_OldVsn, State, _Extra) ->
Expand Down
44 changes: 24 additions & 20 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
send_file_oct ::
atomics:atomics_ref(), % number of bytes sent with send_file (for metrics)
transport :: tcp | ssl,
proxy_socket :: undefined | ranch_proxy:proxy_socket(),
proxy_socket :: undefined | ranch_transport:socket(),
correlation_id_sequence :: integer(),
outstanding_requests :: #{integer() => term()},
deliver_version :: rabbit_stream_core:command_version()}).
Expand Down Expand Up @@ -196,6 +196,9 @@ start_link(KeepaliveSup, Transport, Ref, Opts) ->
proc_lib:spawn_link(?MODULE, init,
[[KeepaliveSup, Transport, Ref, Opts]])}.

%% Because of gen_statem:enter_loop/4 usage inside init/1
-dialyzer({no_behaviours, init/1}).

init([KeepaliveSup,
Transport,
Ref,
Expand Down Expand Up @@ -253,7 +256,7 @@ init([KeepaliveSup,
data =
rabbit_stream_core:init(undefined)},
Transport:setopts(RealSocket, [{active, once}]),
rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}),
_ = rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}),
ConnectionNegotiationStepTimeout =
application:get_env(rabbitmq_stream,
connection_negotiation_step_timeout,
Expand Down Expand Up @@ -654,7 +657,7 @@ close(Transport,
#stream_connection{socket = S, virtual_host = VirtualHost},
#stream_connection_state{consumers = Consumers}) ->
[begin
maybe_unregister_consumer(VirtualHost, Consumer,
_ = maybe_unregister_consumer(VirtualHost, Consumer,
single_active_consumer(Properties)),
case Log of
undefined ->
Expand Down Expand Up @@ -847,15 +850,15 @@ open(info,
connection_state = ConnState1}};
open(info, {Closed, Socket}, #statem_data{connection = Connection})
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
demonitor_all_streams(Connection),
_ = demonitor_all_streams(Connection),
rabbit_log_connection:warning("Socket ~w closed [~w]",
[Socket, self()]),
stop;
open(info, {Error, Socket, Reason},
#statem_data{connection = Connection})
when Error =:= tcp_error; Error =:= ssl_error ->
demonitor_all_streams(Connection),
rabbit_log_connection:error("Socket error ~p [~w] [~w]",
_ = demonitor_all_streams(Connection),
rabbit_log_connection:error("Socket error ~tp [~w] [~w]",
[Reason, Socket, self()]),
stop;
open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
Expand Down Expand Up @@ -945,7 +948,7 @@ open({call, From}, {shutdown, Explanation},
% likely closing call from the management plugin
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
[self(), Explanation]),
demonitor_all_streams(Connection),
_ = demonitor_all_streams(Connection),
{stop_and_reply, normal, {reply, From, ok}};
open(cast,
{queue_event, _, {osiris_written, _, undefined, CorrelationList}},
Expand Down Expand Up @@ -2584,9 +2587,10 @@ handle_frame_post_auth(Transport,
case Extra of
[{stepping_down, true}] ->
ConsumerName = consumer_name(Properties),
rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
Stream,
ConsumerName);
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
Stream,
ConsumerName),
ok;
_ ->
ok
end,
Expand Down Expand Up @@ -2846,8 +2850,12 @@ maybe_register_consumer(VirtualHost,
SubscriptionId),
Active.

maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) ->
Connection;
%% NOTE: Never called with SAC = false, but adding an explicit type
%% still doesn't convince dialyzer. Keeping this clause commented out
%% instead of disabling some dialyzer checks for this function:
%%
%% maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) ->
%% Connection;
maybe_send_consumer_update(Transport,
#stream_connection{socket = S,
correlation_id_sequence =
Expand Down Expand Up @@ -2977,7 +2985,7 @@ clean_state_after_stream_deletion_or_failure(Stream,
case stream_has_subscriptions(Stream, C0) of
true ->
#{Stream := SubscriptionIds} = StreamSubscriptions,
[begin
_ = [begin
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
Expand Down Expand Up @@ -3095,8 +3103,8 @@ remove_subscription(SubscriptionId,
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream, Connection2),
SubscriptionId),
maybe_unregister_consumer(VirtualHost, Consumer,
single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)),
_ = maybe_unregister_consumer(VirtualHost, Consumer,
single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)),
{Connection2, State#stream_connection_state{consumers = Consumers1}}.

maybe_clean_connection_from_stream(Stream,
Expand Down Expand Up @@ -3222,11 +3230,7 @@ send_file_callback(?VERSION_2,
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
Size) ->
FrameSize = 2 + 2 + 1 + 8 + Size,
CommittedChunkId =
case osiris_log:committed_offset(Log) of
undefined -> 0;
R -> R
end,
CommittedChunkId = osiris_log:committed_offset(Log),
FrameBeginning =
<<FrameSize:32,
?REQUEST:1,
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_stream_common/src/rabbit_stream_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
-define(STRING(Str), (byte_size(Str)):16, Str / binary).
-define(DATASTR(Str), (byte_size(Str)):32, Str / binary).

-export_type([state/0]).
-export_type([state/0, command_version/0]).

-type command_version() :: 0..65535.
-type correlation_id() :: non_neg_integer().
Expand Down
1 change: 0 additions & 1 deletion deps/rabbitmq_stream_management/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ plt(
dialyze(
dialyzer_opts = RABBITMQ_DIALYZER_OPTS,
plt = ":base_plt",
warnings_as_errors = False,
)

rabbitmq_home(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ web_ui() ->
"and re-enable the rabbitmq_stream_management "
"plugin. ",
"See https://www.rabbitmq.com/feature-flags.html "
"to learn more",
[]),
"to learn more"),
[]
end.

Expand Down