Skip to content

Commit

Permalink
Merge 530cfe3 into 837d290
Browse files Browse the repository at this point in the history
  • Loading branch information
weiss committed Jul 6, 2022
2 parents 837d290 + 530cfe3 commit b0746a6
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 47 deletions.
7 changes: 7 additions & 0 deletions src/xmpp_socket.erl
Expand Up @@ -28,6 +28,7 @@
compress/1,
compress/2,
reset_stream/1,
send_elements/2,
send_element/2,
send_header/2,
send_trailer/1,
Expand Down Expand Up @@ -196,6 +197,12 @@ reset_stream(#socket_state{xml_stream = XMLStream,
SocketData#socket_state{socket = Socket1}
end.

-spec send_elements(socket_state(), [fxml:xmlel()]) -> ok | {error, inet:posix()}.
send_elements(#socket_state{xml_stream = undefined}, _Els) ->
erlang:error(not_implemented);
send_elements(SocketData, Els) ->
send(SocketData, list_to_binary([fxml:element_to_binary(El) || El <- Els])).

-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
send_element(#socket_state{xml_stream = undefined} = SocketData, El) ->
send_xml(SocketData, {xmlstreamelement, El});
Expand Down
152 changes: 127 additions & 25 deletions src/xmpp_stream_in.erl
Expand Up @@ -25,8 +25,8 @@
%% API
-export([start/3, start_link/3, call/3, cast/2, reply/2, stop/1, stop_async/1,
accept/1, send/2, close/1, close/2, send_error/3, establish/1,
get_transport/1, change_shaper/2, set_timeout/2, format_error/1,
send_ws_ping/1]).
get_transport/1, change_shaper/2, configure_queue/3, set_timeout/2,
format_error/1, send_ws_ping/1]).

%% gen_server callbacks
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
Expand Down Expand Up @@ -58,6 +58,9 @@
stream_encrypted => boolean(),
stream_version => {non_neg_integer(), non_neg_integer()},
stream_authenticated => boolean(),
stream_queue := [xmpp_element() | xmlel()],
stream_queue_max := non_neg_integer(),
stream_queue_timeout => {non_neg_integer(), integer()},
ip => {inet:ip_address(), inet:port_number()},
codec_options => [xmpp:decode_option()],
xmlns => binary(),
Expand Down Expand Up @@ -226,7 +229,21 @@ close(Pid, Reason) ->
establish(State) ->
process_stream_established(State).

-spec set_timeout(state(), non_neg_integer() | infinity) -> state().
-spec configure_queue(state(), non_neg_integer(), non_neg_integer()) -> state().
configure_queue(#{owner := Owner} = State, MaxSize, MaxDelay)
when Owner == self() ->
flush_queue(State), % Support reconfiguration.
if MaxSize == 0; MaxDelay == 0 ->
State#{stream_queue_max => 0};
true ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
State#{stream_queue_max => MaxSize,
stream_queue_timeout => {MaxDelay, CurrentTime}}
end;
configure_queue(_, _, _) ->
erlang:error(badarg).

-spec set_timeout(state(), timeout()) -> state().
set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
case Timeout of
infinity -> State#{stream_timeout => infinity};
Expand Down Expand Up @@ -280,7 +297,9 @@ init([Mod, {SockMod, Socket}, Opts]) ->
socket_mod => SockMod,
socket_opts => Opts,
stream_timeout => {Timeout, Time},
stream_state => accepting},
stream_state => accepting,
stream_queue => [],
stream_queue_max => 0},
{ok, State, Timeout}.

-spec handle_cast(term(), state()) -> next_state().
Expand Down Expand Up @@ -424,6 +443,8 @@ handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}},
noreply(try callback(handle_cdata, Data, State)
catch _:{?MODULE, undef} -> State
end);
handle_info(timeout, #{stream_queue := [_|_]} = State) ->
noreply(flush_queue(State));
handle_info(timeout, #{lang := Lang} = State) ->
Disconnected = is_disconnected(State),
noreply(try callback(handle_timeout, State)
Expand Down Expand Up @@ -522,15 +543,32 @@ init_state(#{socket := Socket, mod := Mod} = State, Opts) ->
end.

-spec noreply(state()) -> noreply();
({stop, state()}) -> {stop, normal, state()}.
({stop, state()}) -> {stop, normal, state()};
({stop, normal, state()}) -> {stop, normal, state()}.
noreply({stop, State}) ->
{stop, normal, State};
noreply(#{stream_timeout := infinity} = State) ->
{noreply, State, infinity};
noreply(#{stream_timeout := {MSecs, StartTime}} = State) ->
noreply({stop, normal, State}) ->
{stop, normal, State};
noreply(State) ->
{noreply, State, get_timeout(State)}.

-spec get_timeout(state()) -> timeout().
get_timeout(State) ->
min(get_stream_timeout(State), get_queue_timeout(State)).

-spec get_stream_timeout(state()) -> timeout().
get_stream_timeout(#{stream_timeout := infinity}) ->
infinity;
get_stream_timeout(#{stream_timeout := {MSecs, StartTime}}) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
Timeout = max(0, MSecs - CurrentTime + StartTime),
{noreply, State, Timeout}.
max(0, MSecs - CurrentTime + StartTime).

-spec get_queue_timeout(state()) -> timeout().
get_queue_timeout(#{stream_queue := []}) ->
infinity;
get_queue_timeout(#{stream_queue_timeout := {MSecs, StartTime}}) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
max(0, MSecs - CurrentTime + StartTime).

-spec is_disconnected(state()) -> boolean().
is_disconnected(#{stream_state := StreamState}) ->
Expand Down Expand Up @@ -1193,21 +1231,29 @@ send_header(State, _) ->

-spec send_pkt(state(), xmpp_element() | xmlel()) -> state().
send_pkt(State, Pkt) ->
Result = socket_send(State, Pkt),
State1 = try callback(handle_send, Pkt, Result, State)
catch _:{?MODULE, undef} -> State
end,
case Result of
_ when is_record(Pkt, stream_error) ->
process_stream_end({stream, {out, Pkt}}, State1);
ok ->
State1;
{error, _Why} ->
% Queue process_stream_end instead of calling it directly,
% so we have opportunity to process incoming queued messages before
% terminating session.
self() ! {'$gen_event', closed},
State1
case check_queue(State, Pkt) of
flush ->
flush_queue(State, Pkt);
queue ->
queue_pkt(State, Pkt);
noqueue ->
State1 = flush_queue(State),
Result = socket_send(State1, Pkt),
State2 = try callback(handle_send, Pkt, Result, State1)
catch _:{?MODULE, undef} -> State1
end,
case Result of
_ when is_record(Pkt, stream_error) ->
process_stream_end({stream, {out, Pkt}}, State2);
ok ->
State2;
{error, _Why} ->
% Queue process_stream_end instead of calling it directly,
% so we have the opportunity to process incoming queued
% messages before terminating the session.
self() ! {'$gen_event', closed},
State2
end
end.

-spec send_error(state(), xmpp_element() | xmlel(), stanza_error()) -> state().
Expand Down Expand Up @@ -1258,6 +1304,62 @@ close_socket(#{socket := Socket} = State) ->
close_socket(State) ->
State.

-spec check_queue(state(), xmpp_element() | xmlel()) -> flush | queue | noqueue.
check_queue(#{stream_queue_max := 0}, _Pkt) ->
noqueue;
check_queue(#{stream_state := StreamState}, _Pkt)
when StreamState /= established->
noqueue;
check_queue(_State, Pkt)
when not ?is_stanza(Pkt),
not is_record(Pkt, sm_a),
not is_record(Pkt, sm_r) ->
noqueue;
check_queue(#{stream_queue := Q, stream_queue_max := MaxQueue}, _Pkt)
when length(Q) >= MaxQueue ->
flush;
check_queue(_State, _Pkt) ->
queue.

-spec queue_pkt(state(), xmpp_element() | xmlel()) -> state().
queue_pkt(#{stream_queue := [],
stream_queue_timeout := {MSecs, _PrevTime}} = State, Pkt) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
State#{stream_queue := [Pkt],
stream_queue_timeout := {MSecs, CurrentTime}};
queue_pkt(#{stream_queue := Q} = State, Pkt) ->
State#{stream_queue := [Pkt|Q]}.

-spec flush_queue(state(), xmpp_element() | xmlel()) -> state().
flush_queue(State, Pkt) ->
flush_queue(queue_pkt(State, Pkt)).

-spec flush_queue(state()) -> state().
flush_queue(#{stream_queue := []} = State) ->
State;
flush_queue(#{stream_queue := Q0,
socket := Sock,
xmlns := NS} = State0) ->
Q = lists:reverse(Q0),
Els = [xmpp:encode(Pkt, NS) || Pkt <- Q],
Result = xmpp_socket:send_elements(Sock, Els),
State1 = State0#{stream_queue := []},
State2 = try lists:foldl(
fun(Pkt, State) ->
callback(handle_send, Pkt, Result, State)
end, State1, Q)
catch _:{?MODULE, undef} -> State1
end,
case Result of
ok ->
State2;
{error, _Why} ->
self() ! {'$gen_event', closed},
State2
end;
flush_queue(#{stream_queue := _Q} = State) -> % Socket has been released.
State#{stream_queue := []}.

-spec select_lang(binary(), binary()) -> binary().
select_lang(Lang, <<"">>) -> Lang;
select_lang(_, Lang) -> Lang.
Expand Down

0 comments on commit b0746a6

Please sign in to comment.