Skip to content

Commit

Permalink
Fix all dialyzer warnings in peer discovery plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
binarin authored and mergify[bot] committed Jan 23, 2023
1 parent 9db9db4 commit 183a260
Show file tree
Hide file tree
Showing 18 changed files with 65 additions and 74 deletions.
4 changes: 2 additions & 2 deletions MODULE.bazel
Expand Up @@ -227,8 +227,8 @@ erlang_package.git_package(

erlang_package.hex_package(
name = "thoas",
sha256 = "442296847aca11db8d25180693d7ca3073d6d7179f66952f07b16415306513b6",
version = "0.4.0",
sha256 = "4918d50026c073c4ab1388437132c77a6f6f7c8ac43c60c13758cc0adce2134e",
version = "0.4.1",
)

erlang_package.git_package(
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit_common/src/rabbit_json.erl
Expand Up @@ -55,12 +55,12 @@ encode(Term, Opts) ->
end,
thoas:encode(fixup_terms(Term, F), Opts).

-spec try_encode(thoas:json_term()) -> {ok, iodata()} |
-spec try_encode(thoas:input_term()) -> {ok, iodata()} |
{error, Reason :: term()}.
try_encode(Term) ->
try_encode(Term, ?DEFAULT_ENCODE_OPTIONS).

-spec try_encode(thoas:json_term(), thoas:decode_options()) ->
-spec try_encode(thoas:input_term(), thoas:encode_options()) ->
{ok, iodata()} | {error, Reason :: term()}.
try_encode(Term, Opts) ->
try
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_peer_discovery_common/BUILD.bazel
Expand Up @@ -44,7 +44,7 @@ plt(
name = "base_plt",
apps = EXTRA_APPS,
plt = "//:base_plt",
deps = DEPS,
deps = DEPS + RUNTIME_DEPS,
)

dialyze(
Expand Down
Expand Up @@ -251,7 +251,7 @@ put(Scheme, Host, Port, Path, Args, Headers, Body) ->
%% @doc Perform a HTTP PUT request
%% @end
%%
-spec put(Scheme, Host, Port, Path, Args, Headers, HttpOpts, Body) -> {ok, string()} | {error, any()} when
-spec put(Scheme, Host, Port, Path, Args, Headers, HttpOpts, Body) -> {ok, term()} | {error, any()} when
Scheme :: atom() | string(),
Host :: string() | binary(),
Port :: integer(),
Expand Down Expand Up @@ -425,7 +425,7 @@ decode_body(?CONTENT_JSON, Body) ->
%% @doc Decode the response body and return a list
%% @end
%%
-spec parse_response({ok, integer(), string()} | {error, any()}) -> {ok, string()} | {error, any()}.
-spec parse_response({ok, integer(), string()} | {error, any()}) -> {ok, term()} | {error, any()}.

parse_response({error, Reason}) ->
?LOG_DEBUG("HTTP error ~tp", [Reason], #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
Expand Down
Expand Up @@ -163,8 +163,8 @@ nic_ipv4(Device) ->
%% IPv4 address if found.
%% @end
%%--------------------------------------------------------------------
-spec nic_ipv4(Device :: string(), Interfaces :: list())
-> {ok, string()} | {error, not_found}.
-spec nic_ipv4(Device :: string(), Interfaces :: [{string(), [ifopt()]}])
-> {'ok', string()} | {'error', 'not_found'}.
nic_ipv4(_, []) -> {error, not_found};
nic_ipv4(Device, [{Interface, Opts}|_]) when Interface =:= Device ->
{ok, nic_ipv4_address(Opts)};
Expand All @@ -178,7 +178,7 @@ nic_ipv4(Device, [_|T]) ->
%% for the interface.
%% @end
%%--------------------------------------------------------------------
-spec nic_ipv4_address([ifopt()]) -> {ok, string()} | {error, not_found}.
-spec nic_ipv4_address([ifopt()]) -> string() | {'error', 'not_found'}.
nic_ipv4_address([]) -> {error, not_found};
nic_ipv4_address([{addr, {A,B,C,D}}|_]) ->
inet_parse:ntoa({A,B,C,D});
Expand Down Expand Up @@ -373,9 +373,9 @@ stringify_error({error, Term}) ->
{error, lists:flatten(io_lib:format("~tp", [Term]))}.

-spec maybe_backend_configured(BackendConfigKey :: atom(),
ClusterFormationUndefinedFun :: fun(() -> {ok, term()} | ok),
BackendUndefinedFun :: fun(() -> {ok, term()} | ok),
ConfiguredFun :: fun((list()) -> {ok, term()})) -> {ok, term()}.
ClusterFormationUndefinedFun :: fun(() -> term()),
BackendUndefinedFun :: fun(() -> term()),
ConfiguredFun :: fun((list()) -> term())) -> term().
maybe_backend_configured(BackendConfigKey,
ClusterFormationUndefinedFun,
BackendUndefinedFun,
Expand Down
1 change: 0 additions & 1 deletion deps/rabbitmq_peer_discovery_consul/BUILD.bazel
Expand Up @@ -43,7 +43,6 @@ plt(
dialyze(
dialyzer_opts = RABBITMQ_DIALYZER_OPTS,
plt = ":base_plt",
warnings_as_errors = False,
)

broker_for_integration_suites()
Expand Down
Expand Up @@ -86,6 +86,7 @@
},
consul_svc_meta => #peer_discovery_config_entry_meta{
type = list,
env_variable = "CONSUL_SVC_META",
default_value = []
},
consul_deregister_after => #peer_discovery_config_entry_meta{
Expand Down
Expand Up @@ -45,7 +45,7 @@ init() ->
ok = application:ensure_started(inets),
%% we cannot start this plugin yet since it depends on the rabbit app,
%% which is in the process of being started by the time this function is called
application:load(rabbitmq_peer_discovery_common),
_ = application:load(rabbitmq_peer_discovery_common),
rabbit_peer_discovery_httpc:maybe_configure_proxy(),
rabbit_peer_discovery_httpc:maybe_configure_inet6().

Expand Down Expand Up @@ -181,7 +181,7 @@ lock(Node) ->
-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.

unlock({SessionId, TRef}) ->
timer:cancel(TRef),
_ = timer:cancel(TRef),
?LOG_DEBUG(
"Stopped session renewal",
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
Expand Down Expand Up @@ -235,7 +235,7 @@ http_options(HttpOpts0, M) ->
HttpOpts1 = [TLSOpts | HttpOpts0],
HttpOpts1.

-spec filter_nodes(ConsulResult :: list(), AllowWarning :: atom()) -> list().
-spec filter_nodes(ConsulResult :: [#{term() => term()}], AllowWarning :: boolean()) -> [#{term() => term()}].
filter_nodes(Nodes, Warn) ->
case Warn of
true ->
Expand All @@ -251,10 +251,10 @@ filter_nodes(Nodes, Warn) ->
false -> Nodes
end.

-spec extract_nodes(ConsulResult :: list()) -> list().
-spec extract_nodes(ConsulResult :: [#{binary() => term()}]) -> list().
extract_nodes(Data) -> extract_nodes(Data, []).

-spec extract_nodes(ConsulResult :: list(), Nodes :: list())
-spec extract_nodes(ConsulResult :: [#{binary() => term()}], Nodes :: list())
-> list().
extract_nodes([], Nodes) -> Nodes;
extract_nodes([H | T], Nodes) ->
Expand Down Expand Up @@ -570,8 +570,6 @@ maybe_re_register({error, Reason}) ->
#{domain => ?RMQLOG_DOMAIN_PEER_DIS});
maybe_re_register({ok, {Members, _NodeType}}) ->
maybe_re_register(Members);
maybe_re_register({ok, Members}) ->
maybe_re_register(Members);
maybe_re_register(Members) ->
case lists:member(node(), Members) of
true ->
Expand All @@ -589,13 +587,14 @@ maybe_re_register(Members) ->
wait_for_list_nodes() ->
wait_for_list_nodes(60).

-spec wait_for_list_nodes(non_neg_integer()) -> {'ok', term()} | {'error', term()}.
wait_for_list_nodes(0) ->
list_nodes();
wait_for_list_nodes(N) ->
case {list_nodes(), N} of
{Reply, 0} ->
Reply;
{{ok, _} = Reply, _} ->
case list_nodes() of
{ok, _} = Reply ->
Reply;
{{error, _}, _} ->
_ ->
timer:sleep(1000),
wait_for_list_nodes(N - 1)
end.
Expand All @@ -606,7 +605,7 @@ wait_for_list_nodes(N) ->
%% Create a session to be acquired for a common key
%% @end
%%--------------------------------------------------------------------
-spec create_session(string(), pos_integer()) -> {ok, string()} | {error, Reason::string()}.
-spec create_session(atom(), pos_integer()) -> {ok, string()} | {error, Reason::string()}.
create_session(Name, TTL) ->
case consul_session_create([], maybe_add_acl([]),
[{'Name', Name},
Expand All @@ -623,10 +622,10 @@ create_session(Name, TTL) ->
%% Create session
%% @end
%%--------------------------------------------------------------------
-spec consul_session_create(Query, Headers, Body) -> {ok, string()} | {error, any()} when
-spec consul_session_create(Query, Headers, Body) -> {ok, term()} | {error, any()} when
Query :: list(),
Headers :: [{string(), string()}],
Body :: term().
Body :: thoas:input_term().
consul_session_create(Query, Headers, Body) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
case serialize_json_body(Body) of
Expand All @@ -652,7 +651,7 @@ consul_session_create(Query, Headers, Body) ->
%% the JSON serialization library.
%% @end
%%--------------------------------------------------------------------
-spec serialize_json_body(term()) -> {ok, Payload :: binary()} | {error, atom()}.
-spec serialize_json_body(thoas:input_term()) -> {ok, Payload :: binary()} | {error, atom()}.
serialize_json_body([]) -> {ok, []};
serialize_json_body(Payload) ->
case rabbit_json:try_encode(Payload) of
Expand All @@ -666,7 +665,7 @@ serialize_json_body(Payload) ->
%% Extract session ID from Consul response
%% @end
%%--------------------------------------------------------------------
-spec get_session_id(term()) -> string().
-spec get_session_id(#{binary() => term()}) -> string().
get_session_id(#{<<"ID">> := ID}) -> binary:bin_to_list(ID).

%%--------------------------------------------------------------------
Expand All @@ -675,7 +674,7 @@ get_session_id(#{<<"ID">> := ID}) -> binary:bin_to_list(ID).
%% Start periodically renewing an existing session ttl
%% @end
%%--------------------------------------------------------------------
-spec start_session_ttl_updater(string()) -> ok.
-spec start_session_ttl_updater(string()) -> timer:tref().
start_session_ttl_updater(SessionId) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
Interval = get_config_key(consul_svc_ttl, M),
Expand All @@ -693,7 +692,7 @@ start_session_ttl_updater(SessionId) ->
%% @end
-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, string()} | {error, string()}.
lock(TRef, _, Now, EndTime) when EndTime < Now ->
timer:cancel(TRef),
_ = timer:cancel(TRef),
{error, "Acquiring lock taking too long, bailing out"};
lock(TRef, SessionId, _, EndTime) ->
case acquire_lock(SessionId) of
Expand All @@ -707,15 +706,15 @@ lock(TRef, SessionId, _, EndTime) ->
ok ->
lock(TRef, SessionId, erlang:system_time(seconds), EndTime);
{error, Reason} ->
timer:cancel(TRef),
_ = timer:cancel(TRef),
{error, lists:flatten(io_lib:format("Error waiting for lock release, reason: ~ts",[Reason]))}
end;
{error, Reason} ->
timer:cancel(TRef),
_ = timer:cancel(TRef),
{error, lists:flatten(io_lib:format("Error obtaining lock status, reason: ~ts", [Reason]))}
end;
{error, Reason} ->
timer:cancel(TRef),
_ = timer:cancel(TRef),
{error, lists:flatten(io_lib:format("Error while acquiring lock, reason: ~ts", [Reason]))}
end.

Expand Down Expand Up @@ -747,7 +746,7 @@ release_lock(SessionId) ->
%%--------------------------------------------------------------------
-spec consul_kv_write(Path, Query, Headers, Body) -> {ok, any()} | {error, string()} when
Path :: string(),
Query :: [{string(), string()}],
Query :: [{string() | atom(), string()}],
Headers :: [{string(), string()}],
Body :: term().
consul_kv_write(Path, Query, Headers, Body) ->
Expand Down Expand Up @@ -839,7 +838,7 @@ base_path() ->
wait_for_lock_release(false, _, _) -> ok;
wait_for_lock_release(_, Index, Wait) ->
case consul_kv_read(startup_lock_path(),
[{index, Index}, {wait, service_ttl(Wait)}],
[{"index", Index}, {"wait", service_ttl(Wait)}],
maybe_add_acl([])) of
{ok, _} -> ok;
{error, _} = Err -> Err
Expand Down
Expand Up @@ -55,7 +55,7 @@ terminate(_Arg, #state{timer_ref = undefined}) ->
ok;

terminate(_Arg, #state{timer_ref = TRef}) ->
timer:cancel(TRef),
_ = timer:cancel(TRef),
ok.

code_change(_OldVsn, State, _Extra) ->
Expand Down
Expand Up @@ -879,7 +879,7 @@ wait_for_lock_release_with_session_without_token_test(_Config) ->
?assertEqual("localhost", Host),
?assertEqual(8500, Port),
?assertEqual("v1/kv/rabbitmq/default/startup_lock", Path),
?assertEqual([{index, 42}, {wait, "300s"}], Args),
?assertEqual([{"index", 42}, {"wait", "300s"}], Args),
?assertEqual([], Headers),
?assertEqual([], HttpOpts),
{ok, []}
Expand All @@ -894,7 +894,7 @@ wait_for_lock_release_with_session_with_token_test(_Config) ->
?assertEqual("localhost", Host),
?assertEqual(8500, Port),
?assertEqual("v1/kv/rabbitmq/default/startup_lock", Path),
?assertEqual([{index, 42}, {wait, "300s"}], Args),
?assertEqual([{"index", 42}, {"wait", "300s"}], Args),
?assertEqual([{"X-Consul-Token", "token-value"}], Headers),
?assertEqual([], HttpOpts),
{ok, []}
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_peer_discovery_etcd/BUILD.bazel
Expand Up @@ -39,13 +39,13 @@ xref()

plt(
name = "base_plt",
plt = "//:base_plt",
deps = DEPS,
)

dialyze(
dialyzer_opts = RABBITMQ_DIALYZER_OPTS,
plt = ":base_plt",
warnings_as_errors = False,
)

broker_for_integration_suites()
Expand Down
Expand Up @@ -26,15 +26,15 @@
init() ->
%% We cannot start this plugin yet since it depends on the rabbit app,
%% which is in the process of being started by the time this function is called
application:load(rabbitmq_peer_discovery_common),
application:load(rabbitmq_peer_discovery_etcd),
_ = application:load(rabbitmq_peer_discovery_common),
_ = application:load(rabbitmq_peer_discovery_etcd),

%% Here we start the client very early on, before plugins have initialized.
%% We need to do it conditionally, however.
NoOp = fun() -> ok end,
Run = fun(_) ->
rabbit_log:debug("Peer discovery etcd: initialising..."),
application:ensure_all_started(eetcd),
_ = application:ensure_all_started(eetcd),
Formation = application:get_env(rabbit, cluster_formation, []),
Opts = maps:from_list(proplists:get_value(peer_discovery_etcd, Formation, [])),
{ok, Pid} = rabbitmq_peer_discovery_etcd_v3_client:start_link(Opts),
Expand Down
Expand Up @@ -44,7 +44,7 @@ unregister() ->
post_registration() ->
?DELEGATE:post_registration().

-spec lock(Node :: atom()) -> not_supported.
-spec lock(Node :: atom()) -> {'ok', term()} | {'error', string()}.
lock(Node) ->
?DELEGATE:lock(Node).

Expand Down
Expand Up @@ -231,23 +231,17 @@ connected({call, From}, list_keys, Data = #statem_data{connection_name = Conn})
{ok, #{kvs := Result}} = eetcd_kv:get(C2),
rabbit_log:debug("etcd peer discovery returned keys: ~tp", [Result]),
Values = [maps:get(value, M) || M <- Result],
case Values of
Xs when is_list(Xs) ->
rabbit_log:debug("etcd peer discovery: listing node keys returned ~b results", [length(Xs)]),
ParsedNodes = lists:map(fun extract_node/1, Xs),
{Successes, Failures} = lists:partition(fun filter_node/1, ParsedNodes),
JoinedString = lists:join(",", [rabbit_data_coercion:to_list(Node) || Node <- lists:usort(Successes)]),
rabbit_log:error("etcd peer discovery: successfully extracted nodes: ~ts", [JoinedString]),
lists:foreach(fun(Val) ->
rabbit_log:error("etcd peer discovery: failed to extract node name from etcd value ~tp", [Val])
end, Failures),
gen_statem:reply(From, lists:usort(Successes)),
keep_state_and_data;
Other ->
rabbit_log:debug("etcd peer discovery: listing node keys returned ~tp", [Other]),
gen_statem:reply(From, []),
keep_state_and_data
end.
rabbit_log:debug("etcd peer discovery: listing node keys returned ~b results", [length(Values)]),
ParsedNodes = lists:map(fun extract_node/1, Values),
{Successes, Failures} = lists:partition(fun filter_node/1, ParsedNodes),
JoinedString = lists:join(",", [rabbit_data_coercion:to_list(Node) || Node <- lists:usort(Successes)]),
rabbit_log:error("etcd peer discovery: successfully extracted nodes: ~ts", [JoinedString]),
lists:foreach(fun(Val) ->
rabbit_log:error("etcd peer discovery: failed to extract node name from etcd value ~tp", [Val])
end, Failures),
gen_statem:reply(From, lists:usort(Successes)),
keep_state_and_data.


disconnected(enter, _PrevState, _Data) ->
rabbit_log:info("etcd peer discovery: successfully disconnected from etcd"),
Expand Down Expand Up @@ -307,9 +301,9 @@ registration_value(#statem_data{node_key_lease_id = LeaseID, node_key_ttl_in_sec
-spec extract_node(binary()) -> atom() | {error, any()}.

extract_node(Payload) ->
case rabbit_json:decode(Payload) of
case rabbit_json:try_decode(Payload) of
{error, Error} -> {error, Error};
Map ->
{ok, Map} ->
case maps:get(<<"node">>, Map, undefined) of
undefined -> undefined;
Node -> rabbit_data_coercion:to_atom(Node)
Expand Down Expand Up @@ -376,7 +370,7 @@ obfuscate(Password) ->

deobfuscate(undefined) -> undefined;
deobfuscate(Password) ->
credentials_obfuscation:decrypt(to_binary(Password)).
credentials_obfuscation:decrypt({encrypted, to_binary(Password)}).

disconnect(ConnName, #statem_data{connection_monitor = Ref}) ->
maybe_demonitor(Ref),
Expand Down
1 change: 0 additions & 1 deletion deps/rabbitmq_peer_discovery_k8s/BUILD.bazel
Expand Up @@ -43,7 +43,6 @@ plt(
dialyze(
dialyzer_opts = RABBITMQ_DIALYZER_OPTS,
plt = ":base_plt",
warnings_as_errors = False,
)

broker_for_integration_suites()
Expand Down

0 comments on commit 183a260

Please sign in to comment.