Skip to content

Commit

Permalink
Adding missing function specs
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunyiLyu committed Jan 25, 2023
1 parent 1f106fc commit 722c3f3
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 10 deletions.
3 changes: 3 additions & 0 deletions deps/rabbitmq_mqtt/src/mqtt_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ apply(_Meta, Unknown, State) ->
logger:error("MQTT Raft state machine v1 received unknown command ~tp", [Unknown]),
{State, {error, {unknown_command, Unknown}}, []}.

-spec state_enter(ra_server:ra_state(), state()) ->
ra_machine:effects().
state_enter(leader, State) ->
%% re-request monitors for all known pids, this would clean up
%% records for all connections are no longer around, e.g. right after node restart
Expand All @@ -188,6 +190,7 @@ overview(#machine_state{client_ids = ClientIds,
%% ==========================

%% Avoids blocking the Raft leader.
-spec notify_connection(pid(), 'duplicate_id' | 'decommission_node') -> pid().
notify_connection(Pid, Reason) ->
spawn(fun() -> gen_server2:cast(Pid, Reason) end).

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ apply(_Meta, Unknown, State) ->
logger:error("MQTT Raft state machine received an unknown command ~tp", [Unknown]),
{State, {error, {unknown_command, Unknown}}, []}.

-spec state_enter(ra_server:ra_state(), state()) ->
ra_machine:effects().
state_enter(leader, State) ->
%% re-request monitors for all known pids, this would clean up
%% records for all connections are no longer around, e.g. right after node restart
Expand All @@ -123,6 +125,7 @@ state_enter(_, _) ->
%% ==========================

%% Avoids blocking the Raft leader.
-spec notify_connection(pid(), duplicate_id | decommission_node) -> pid().
notify_connection(Pid, Reason) ->
spawn(fun() -> gen_server2:cast(Pid, Reason) end).

Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ start(normal, []) ->
stop(_) ->
rabbit_mqtt_sup:stop_listeners().

-spec emit_connection_info_all([node()], [atom()], reference(), pid()) -> term().
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
Expand All @@ -57,6 +58,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
rabbit_control_misc:await_emitters_termination(Pids)
end.

-spec emit_connection_info_local([atom()], reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = local_connection_pids(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).
Expand Down
3 changes: 3 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ register(ServerId, ClientId, Pid) ->
erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}),
{ok, Corr}.

-spec unregister(binary(), pid()) -> ok.
unregister(ClientId, Pid) ->
{ClusterName, _} = mqtt_node:server_id(),
case ra_leaderboard:lookup_leader(ClusterName) of
Expand All @@ -49,6 +50,7 @@ unregister(ClientId, Pid) ->
list_pids() ->
list(fun(#machine_state{pids = Pids}) -> maps:keys(Pids) end).

-spec list() -> term().
list() ->
list(fun(#machine_state{client_ids = Ids}) -> maps:to_list(Ids) end).

Expand Down Expand Up @@ -76,6 +78,7 @@ list(QF) ->
end
end.

-spec leave(binary()) -> ok | timeout | nodedown.
leave(NodeBin) ->
Node = binary_to_atom(NodeBin, utf8),
ServerId = mqtt_node:server_id(),
Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
callbacks => #{enable => {mqtt_node, delete}}
}}).

-spec track_client_id_in_ra() -> boolean().
track_client_id_in_ra() ->
not rabbit_feature_flags:is_enabled(delete_ra_cluster_mqtt_node).
17 changes: 17 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@

-opaque state() :: #state{}.

-type info_commands() :: host | port | peer_host | peer_port
| connected_at | ssl_login_name | vhost | user_who_performed_action | user
| clean_sess | will_msg | retainer_pid | exchange | prefetch
| messages_unconfirmed | messages_unacknowledged
| node
| client_id | client_properties
| channel_max | frame_max | auth_mechanism.

-spec initial_state(Socket :: any(), ConnectionName :: binary()) ->
state().
initial_state(Socket, ConnectionName) ->
Expand Down Expand Up @@ -1329,9 +1337,13 @@ serialise_and_send_to_client(Packet, #state{cfg = #cfg{proto_ver = ProtoVer,
[Sock, Error, Packet#mqtt_packet.fixed, Packet#mqtt_packet.variable])
end.

-spec serialise(#mqtt_packet{}, state()) ->
iodata().
serialise(Packet, #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
rabbit_mqtt_packet:serialise(Packet, ProtoVer).

-spec terminate(any(), binary(), atom(),rabbit_mqtt_processor:state()) ->
ok.
terminate(SendWill, ConnName, ProtoFamily, State) ->
maybe_send_will(SendWill, ConnName, State),
Infos = [{name, ConnName},
Expand Down Expand Up @@ -1427,11 +1439,15 @@ delete_queue(QName, Username) ->
ok
end).

-spec handle_pre_hibernate() -> ok.
handle_pre_hibernate() ->
erase(permission_cache),
erase(topic_permission_cache),
ok.

-spec handle_ra_event(register_timeout
| {applied, [{reference(), ok}]}
| {not_leader, term(), reference()}, state()) -> state().
handle_ra_event({applied, [{Corr, ok}]},
State = #state{register_state = {pending, Corr}}) ->
%% success case - command was applied transition into registered state
Expand Down Expand Up @@ -1796,6 +1812,7 @@ throttle(Conserve, Connected, #state{queues_soft_limit_exceeded = QSLE,
not sets:is_empty(QSLE) orelse
credit_flow:blocked().

-spec info(info_commands(), state()) -> any.
info(host, #state{cfg = #cfg{host = Val}}) -> Val;
info(port, #state{cfg = #cfg{port = Val}}) -> Val;
info(peer_host, #state{cfg = #cfg{peer_host = Val}}) -> Val;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,37 @@
table
}).

-type store_state() :: #store_state{}.

-spec new(file:name_all(), rabbit_types:vhost()) -> store_state().
new(Dir, VHost) ->
Tid = open_table(Dir, VHost),
#store_state{table = Tid}.

-spec recover(file:name_all(), rabbit_types:vhost()) ->
{error, uninitialized} | {ok, store_state()}.
recover(Dir, VHost) ->
case open_table(Dir, VHost) of
{error, _} -> {error, uninitialized};
{ok, Tid} -> {ok, #store_state{table = Tid}}
end.

-spec insert(binary(), mqtt_msg(), store_state()) -> ok.
insert(Topic, Msg, #store_state{table = T}) ->
ok = dets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}).

-spec lookup(binary(), store_state()) -> retained_message() | not_found.
lookup(Topic, #store_state{table = T}) ->
case dets:lookup(T, Topic) of
[] -> not_found;
[Entry] -> Entry
end.

-spec delete(binary(), store_state()) -> ok.
delete(Topic, #store_state{table = T}) ->
ok = dets:delete(T, Topic).

-spec terminate(store_state()) -> ok.
terminate(#store_state{table = T}) ->
ok = dets:close(T).

Expand Down
8 changes: 8 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
filename
}).

-type store_state() :: #store_state{}.

-spec new(file:name_all(), rabbit_types:vhost()) -> store_state().
new(Dir, VHost) ->
Path = rabbit_mqtt_util:path_for(Dir, VHost),
TableName = rabbit_mqtt_util:vhost_name_to_table_name(VHost),
_ = file:delete(Path),
Tid = ets:new(TableName, [set, public, {keypos, #retained_message.topic}]),
#store_state{table = Tid, filename = Path}.

-spec recover(file:name_all(), rabbit_types:vhost()) ->
{error, uninitialized} | {ok, store_state()}.
recover(Dir, VHost) ->
Path = rabbit_mqtt_util:path_for(Dir, VHost),
case ets:file2tab(Path) of
Expand All @@ -35,19 +39,23 @@ recover(Dir, VHost) ->
{error, _} -> {error, uninitialized}
end.

-spec insert(binary(), mqtt_msg(), store_state()) -> ok.
insert(Topic, Msg, #store_state{table = T}) ->
true = ets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}),
ok.

-spec lookup(binary(), store_state()) -> retained_message() | not_found.
lookup(Topic, #store_state{table = T}) ->
case ets:lookup(T, Topic) of
[] -> not_found;
[Entry] -> Entry
end.

-spec delete(binary(), store_state()) -> ok.
delete(Topic, #store_state{table = T}) ->
true = ets:delete(T, Topic),
ok.

-spec terminate(store_state()) -> ok.
terminate(#store_state{table = T, filename = Path}) ->
ok = ets:tab2file(T, Path, [{extended_info, [object_count]}]).
1 change: 1 addition & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
start_link(SupName) ->
supervisor:start_link(SupName, ?MODULE, []).

-spec child_for_vhost(binary()) -> pid().
child_for_vhost(VHost) when is_binary(VHost) ->
case rabbit_mqtt_retainer_sup:start_child(VHost) of
{ok, Pid} -> Pid;
Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ init([{Listeners, SslListeners0}]) ->
)
]}}.

-spec stop_listeners() -> ok.
stop_listeners() ->
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL),
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ to_mqtt(T0) ->
T2 = string:replace(T1, ".", "/", all),
erlang:iolist_to_binary(T2).

-spec env(atom()) -> any().
env(Key) ->
case application:get_env(?APP_NAME, Key) of
{ok, Val} -> coerce_env_value(Key, Val);
Expand All @@ -145,6 +146,8 @@ coerce_env_value(exchange, Val) -> rabbit_data_coercion:to_binary(Val);
coerce_env_value(vhost, Val) -> rabbit_data_coercion:to_binary(Val);
coerce_env_value(_, Val) -> Val.

-spec table_lookup([tuple()], binary()) ->
tuple() | undefined.
table_lookup(undefined, _Key) ->
undefined;
table_lookup(Table, Key) ->
Expand All @@ -156,13 +159,14 @@ vhost_name_to_dir_name(VHost, Suffix) ->
<<Num:128>> = erlang:md5(VHost),
"mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num]) ++ Suffix.

-spec path_for(file:name_all(), rabbit_types:vhost()) -> file:filename_all().
path_for(Dir, VHost) ->
filename:join(Dir, vhost_name_to_dir_name(VHost)).

-spec path_for(file:name_all(), rabbit_types:vhost(), string()) -> file:filename_all().
path_for(Dir, VHost, Suffix) ->
filename:join(Dir, vhost_name_to_dir_name(VHost, Suffix)).


-spec vhost_name_to_table_name(rabbit_types:vhost()) ->
atom().
vhost_name_to_table_name(VHost) ->
Expand Down
15 changes: 6 additions & 9 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@
upgrade/5,
takeover/7]).

-type option(T) :: undefined | T.

-record(state, {
socket,
socket :: rabbit_net:socket(),
parse_state = rabbit_mqtt_packet:initial_state() :: rabbit_mqtt_packet:state(),
proc_state :: undefined | rabbit_mqtt_processor:state(),
connection_state = running :: running | blocked,
conserve = false :: boolean(),
stats_timer :: undefined | rabbit_event:state(),
stats_timer :: option(rabbit_event:state()),
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
conn_name :: undefined | binary(),
conn_name :: option(binary()),
received_connect_packet = false :: boolean()
}).
}).

-type state() :: #state{}.

Expand Down Expand Up @@ -66,10 +68,6 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState,
{Handler, {HandlerState#state{socket = Sock}, PeerAddr}}).

%% cowboy_websocket
-spec init(Req, any()) ->
{ok | module(), Req, any()} |
{module(), Req, any(), any()}
when Req::cowboy_req:req().
init(Req, Opts) ->
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
undefined ->
Expand Down Expand Up @@ -225,7 +223,6 @@ websocket_info(Msg, State) ->
?LOG_WARNING("Web MQTT: unexpected message ~tp", [Msg]),
{[], State, hibernate}.

-spec terminate(any(), cowboy_req:req(), any()) -> ok.
terminate(_Reason, _Req, #state{proc_state = undefined}) ->
ok;
terminate(Reason, Request, #state{} = State) ->
Expand Down

0 comments on commit 722c3f3

Please sign in to comment.