Skip to content

Commit

Permalink
Deprecate sync xmpp_stream_in/_out:stop()
Browse files Browse the repository at this point in the history
Sync stop manually calls terminate callback, which skip parts of cleanup
that p1_server performs before it calls it, this results in messages
that p1_server queued in collect_messages never being seen by c2s process.

Async stop don't do that so let's try to remove all calls to sync version
with async ones, to help with handling this, i added stop_async, and marked
stop as deprecated.
  • Loading branch information
prefiks committed Apr 1, 2020
1 parent effd5f2 commit c23e66e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 22 deletions.
41 changes: 29 additions & 12 deletions src/xmpp_stream_in.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
-protocol({xep, 114, '1.6'}).

%% API
-export([start/3, start_link/3, call/3, cast/2, reply/2, stop/1,
-export([start/3, start_link/3, call/3, cast/2, reply/2, stop/1, stop_async/1,
accept/1, send/2, close/1, close/2, send_error/3, establish/1,
get_transport/1, change_shaper/2, set_timeout/2, format_error/1,
send_ws_ping/1]).
Expand All @@ -32,6 +32,8 @@
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
terminate/2, code_change/3]).

-deprecated([{stop, 1}]).

%%-define(DBGFSM, true).
-ifdef(DBGFSM).
-define(FSMOPTS, [{debug, [trace]}]).
Expand Down Expand Up @@ -163,7 +165,7 @@ reply(Ref, Reply) ->
-spec stop(pid()) -> ok;
(state()) -> no_return().
stop(Pid) when is_pid(Pid) ->
cast(Pid, stop);
stop_async(Pid);
stop(#{owner := Owner} = State) when Owner == self() ->
terminate(normal, State),
try erlang:nif_error(normal)
Expand All @@ -172,6 +174,12 @@ stop(#{owner := Owner} = State) when Owner == self() ->
stop(_) ->
erlang:error(badarg).

-spec stop_async(pid()) -> ok.
stop_async(Pid) when is_pid(Pid) ->
cast(Pid, stop);
stop_async(_) ->
erlang:error(badarg).

-spec accept(pid()) -> ok.
accept(Pid) ->
cast(Pid, accept).
Expand Down Expand Up @@ -288,13 +296,17 @@ handle_cast(accept, #{socket := Socket,
State3 = State2#{socket => XMPPSocket,
socket_monitor => SocketMonitor,
ip => IP},
State4 = init_state(State3, Opts),
case is_disconnected(State4) of
true -> noreply(State4);
false -> handle_info({tcp, Socket, <<>>}, State4)
case init_state(State3, Opts) of
{stop, State4} ->
{stop, normal, State4};
State4 ->
case is_disconnected(State4) of
true -> noreply(State4);
false -> handle_info({tcp, Socket, <<>>}, State4)
end
end;
{error, _} ->
stop(State)
{stop, normal, State}
end;
handle_cast({send, Pkt}, State) ->
noreply(send_pkt(State, Pkt));
Expand Down Expand Up @@ -322,7 +334,7 @@ handle_call(Call, From, State) ->

-spec handle_info(term(), state()) -> next_state().
handle_info(_, #{stream_state := accepting} = State) ->
stop(State);
{stop, normal, State};
handle_info({'$gen_event', {xmlstreamstart, Name, Attrs}},
#{stream_state := wait_for_stream,
xmlns := XMLNS, lang := MyLang} = State) ->
Expand Down Expand Up @@ -412,7 +424,7 @@ handle_info(timeout, #{lang := Lang} = State) ->
Txt = <<"Idle connection">>,
send_pkt(State, xmpp:serr_connection_timeout(Txt, Lang));
_:{?MODULE, undef} ->
stop(State)
{stop, normal, State}
end);
handle_info({'DOWN', MRef, _Type, _Object, _Info},
#{socket_monitor := MRef} = State) ->
Expand Down Expand Up @@ -496,10 +508,13 @@ init_state(#{socket := Socket, mod := Mod} = State, Opts) ->
{error, Reason} ->
process_stream_end(Reason, State1);
ignore ->
stop(State)
{stop, State}
end.

-spec noreply(state()) -> noreply().
-spec noreply(state()) -> noreply();
({stop, state()}) -> {stop, normal, state()}.
noreply({stop, State}) ->
{stop, normal, State};
noreply(#{stream_timeout := infinity} = State) ->
{noreply, State, infinity};
noreply(#{stream_timeout := {MSecs, StartTime}} = State) ->
Expand Down Expand Up @@ -545,7 +560,9 @@ process_stream_end(Reason, State) ->
State1 = State#{stream_timeout => infinity,
stream_state => disconnected},
try callback(handle_stream_end, Reason, State1)
catch _:{?MODULE, undef} -> stop(State1)
catch _:{?MODULE, undef} ->
stop_async(self()),
State1
end.

-spec process_stream(stream_start(), state()) -> state().
Expand Down
31 changes: 21 additions & 10 deletions src/xmpp_stream_out.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

%% API
-export([start/3, start_link/3, call/3, cast/2, reply/2, connect/1,
stop/1, send/2, close/1, close/2, bind/2, establish/1, format_error/1,
stop/1, stop_async/1, send/2, close/1, close/2, bind/2, establish/1, format_error/1,
set_timeout/2, get_transport/1, change_shaper/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-deprecated([{stop, 1}]).

%%-define(DBGFSM, true).
-ifdef(DBGFSM).
-define(FSMOPTS, [{debug, [trace]}]).
Expand Down Expand Up @@ -196,7 +198,7 @@ connect(Ref) ->
-spec stop(pid()) -> ok;
(state()) -> no_return().
stop(Pid) when is_pid(Pid) ->
cast(Pid, stop);
stop_async(Pid);
stop(#{owner := Owner} = State) when Owner == self() ->
terminate(normal, State),
try erlang:nif_error(normal)
Expand All @@ -205,6 +207,12 @@ stop(#{owner := Owner} = State) when Owner == self() ->
stop(_) ->
erlang:error(badarg).

-spec stop_async(pid()) -> ok.
stop_async(Pid) when is_pid(Pid) ->
cast(Pid, stop);
stop_async(_) ->
erlang:error(badarg).

-spec send(pid(), xmpp_element()) -> ok;
(state(), xmpp_element()) -> state().
send(Pid, Pkt) when is_pid(Pid) ->
Expand Down Expand Up @@ -445,13 +453,14 @@ handle_info({'$gen_event', closed}, State) ->
noreply(process_stream_end({socket, closed}, State));
handle_info(timeout, #{lang := Lang} = State) ->
Disconnected = is_disconnected(State),
noreply(try callback(handle_timeout, State)
catch _:{?MODULE, undef} when not Disconnected ->
Txt = <<"Idle connection">>,
send_pkt(State, xmpp:serr_connection_timeout(Txt, Lang));
_:{?MODULE, undef} ->
stop(State)
end);
try noreply(callback(handle_timeout, State))
catch
_:{?MODULE, undef} when not Disconnected ->
Txt = <<"Idle connection">>,
noreply(send_pkt(State, xmpp:serr_connection_timeout(Txt, Lang)));
_:{?MODULE, undef} ->
{stop, normal, State}
end;
handle_info({'DOWN', MRef, _Type, _Object, _Info},
#{socket_monitor := MRef} = State) ->
noreply(process_stream_end({socket, closed}, State));
Expand Down Expand Up @@ -522,7 +531,9 @@ process_stream_end(_, #{stream_state := disconnected} = State) ->
process_stream_end(Reason, State) ->
State1 = send_trailer(State),
try callback(handle_stream_end, Reason, State1)
catch _:{?MODULE, undef} -> stop(State1)
catch _:{?MODULE, undef} ->
stop_async(self()),
State1
end.

-spec process_stream(stream_start(), state()) -> state().
Expand Down

0 comments on commit c23e66e

Please sign in to comment.