Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed and improved timeout implementation for transports (heartbeat_interval and connection_close) #49

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
134 changes: 72 additions & 62 deletions src/socketio_transport_htmlfile.erl
Expand Up @@ -18,8 +18,10 @@
server_module,
connection_reference,
heartbeats = 0,
client_heartbeat = undefined,
heartbeat_interval,
close_timeout,
timer_ref = undefined,
event_manager,
sup
}).
Expand Down Expand Up @@ -57,35 +59,38 @@ init([Sup, SessionId, ServerModule, {'htmlfile', {Req, Caller}}]) ->
apply(ServerModule, ensure_longpolling_request, [Req]),
process_flag(trap_exit, true),
HeartbeatInterval =
case application:get_env(heartbeat_interval) of
{ok, Time} ->
Time;
_ ->
error_logger:warning_report(
"Could not load default heartbeat_interval value from "
"the application file. Setting the default value to 10000."
),
10000
end,
case application:get_env(heartbeat_interval) of
{ok, Time} ->
Time;
_ ->
error_logger:warning_report(
"Could not load default heartbeat_interval value from "
"the application file. Setting the default value to 10000."
),
10000
end,
CloseTimeout =
case application:get_env(close_timeout) of
{ok, Time0} ->
Time0;
_ ->
8000
end,
case application:get_env(close_timeout) of
{ok, Time0} ->
Time0;
_ ->
error_logger:warning_report(
"Could not load default close_timeout value from "
"the application file. Setting the default value to 8000 ms."
),
8000
end,
{ok, EventMgr} = gen_event:start_link(),
gen_server:cast(self(), {initialize, Req}),
socketio_client:send(self(), #msg{ content = SessionId }),
gen_server:cast(self(), heartbeat),
{ok, #state{
session_id = SessionId,
server_module = ServerModule,
connection_reference = {'htmlfile', none},
req = Req,
caller = Caller,
close_timeout = CloseTimeout,
heartbeat_interval = {make_ref(), HeartbeatInterval},
heartbeat_interval = HeartbeatInterval,
event_manager = EventMgr,
sup = Sup
}}.
Expand All @@ -106,24 +111,29 @@ init([Sup, SessionId, ServerModule, {'htmlfile', {Req, Caller}}]) ->
%%--------------------------------------------------------------------
%% Incoming data
handle_call({'htmlfile', data, Req}, _From, #state{ heartbeat_interval = Interval,
timer_ref = OldTimerRef,
connection_reference = {'htmlfile', connected},
server_module = ServerModule,
event_manager = EventManager } = State) ->
Msgs = [socketio_data:decode(#msg{content=Data}) || {"data", Data} <- ServerModule:parse_post(Req)],
F = fun(#heartbeat{}, _Acc) ->
{timer, reset_heartbeat(Interval)};
F = fun(#heartbeat{index = HeartbeatNumber}, _Acc) ->
{timer, reset_timer(OldTimerRef, Interval, heartbeat), HeartbeatNumber};
(M, Acc) ->
gen_event:notify(EventManager, {message, self(), M}),
Acc
end,
NewState = case lists:foldl(F, undefined, lists:flatten(Msgs)) of
{timer, NewInterval} ->
State#state{ heartbeat_interval = NewInterval};
{timer, NewTimerRef, HeartbeatNumber} ->
State#state{ timer_ref = NewTimerRef, client_heartbeat = HeartbeatNumber };
undefined ->
State
end,
ServerModule:respond(Req, 200, [{"Content-Type", "text/plain"}], "ok"),
{reply, ok, NewState};

handle_call({'htmlfile', data, _Req}, _From, #state{ connection_reference = {'htmlfile', none} } = State) ->
{reply, ok, State};

%% Event management
handle_call(event_manager, _From, #state{ event_manager = EventMgr } = State) ->
{reply, EventMgr, State};
Expand Down Expand Up @@ -151,30 +161,26 @@ handle_call(stop, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({initialize, Req}, #state{ heartbeat_interval = Interval, server_module = ServerModule } = State) ->
handle_cast({initialize, Req}, #state{ heartbeat_interval = Interval,
timer_ref = OldTimerRef,
server_module = ServerModule } = State) ->
ServerModule:headers(Req, [{"Content-Type", "text/html"},
{"Connection", "Keep-Alive"},
{"Transfer-Encoding", "chunked"}]),
H = "<html><body>" ++ lists:duplicate(254, $\s),
link(ServerModule:socket(Req)),
ServerModule:chunk(Req, H),
{noreply, State#state{ connection_reference = {htmlfile, connected},
heartbeat_interval = reset_heartbeat(Interval) }};

handle_cast(heartbeat, #state{ heartbeats = Beats,
heartbeat_interval = Interval } = State) ->
Beats1 = Beats + 1,
socketio_client:send(self(), #heartbeat{ index = Beats1 }),
{noreply, State#state{ heartbeats = Beats1,
heartbeat_interval = reset_heartbeat(Interval) }};
timer_ref = reset_timer(OldTimerRef, Interval, heartbeat) }};

%% Send
handle_cast({send, Message}, #state{ req = Req,
server_module = ServerModule,
connection_reference = {'htmlfile', connected },
connection_reference = {'htmlfile', connected},
timer_ref = OldTimerRef,
heartbeat_interval = Interval } = State) ->
send_message(Message, ServerModule, Req),
{noreply, State#state{ heartbeat_interval = reset_heartbeat(Interval) }};
{noreply, State#state{ timer_ref = reset_timer(OldTimerRef, Interval, heartbeat) }};

handle_cast(_, #state{} = State) ->
{noreply, State}.
Expand All @@ -191,35 +197,34 @@ handle_cast(_, #state{} = State) ->
%% @end
%%--------------------------------------------------------------------
%% CLient disconnected. We fire a timer (ServerTimeout)!
handle_info({'EXIT',_Port,_Reason}, #state{ close_timeout = ServerTimeout} = State) when is_port(_Port) ->
{noreply, State#state { connection_reference = {'htmlfile', none}}, ServerTimeout};
handle_info({'EXIT',_Port,_Reason}, #state{ close_timeout = ServerTimeout,
timer_ref = OldTimerRef } = State) when is_port(_Port) ->
NewTimer = reset_timer(OldTimerRef, ServerTimeout, connection_timeout),
{noreply, State#state { connection_reference = {'htmlfile', none}, timer_ref = NewTimer }};

%% This branch handles two purposes: 1. handling the close_timeout,
%% 2. handling the heartbeat timeout that might comes first. The thing is,
%% when the client connection dies, we need to wait for the close_timeout
%% to be fired. That one can be cancelled at any time by God knows what for now,
%% and that might be desirable. However, it can also be cancelled because we
%% happen to receive the heartbeat timeout. Given the right setting, this will
%% happen every time a disconnection happens at the point where the delay left to
%% the current heartbeat is longer than the delay left to the total value of
%% close_timout. Funny, eh?
%% For this reason, the heartbeat timeout when we have no htmlfile
%% connection reference has to be seen as the same as the close_timeout
%% timer firing off. This is the safest way to guarantee everything will run
%% okay.
handle_info(timeout, #state{ server_module = ServerModule,
connection_reference = {'htmlfile', none}, caller = Caller, req = Req } = State) ->
gen_server:reply(Caller, ServerModule:respond(Req, 200)),
handle_info({timeout, _Ref, connection_reference}, #state{ timer_ref = _Ref,
connection_reference = {'htmlfile', none} } = State) ->
shutdown(State),
{stop, shutdown, State};

%% See previous clauses' comments
handle_info({timeout, _Ref, heartbeat}, #state{ connection_reference = {'htmlfile', none} } = State) ->
handle_info(timeout, State);

%% Regular heartbeat
handle_info({timeout, _Ref, heartbeat}, State) ->
gen_server:cast(self(), heartbeat),
{noreply, State};
handle_info({timeout, _Ref, heartbeat}, #state{ heartbeats = Beats,
connection_reference = {'htmlfile', connected},
timer_ref = _Ref,
client_heartbeat = ClientHeartbeat } = State) ->
Client = case {ClientHeartbeat, Beats} of
{undefined, 0} -> 0;
_Any -> ClientHeartbeat
end,
case Client of
Beats ->
Beats1 = Beats + 1,
socketio_client:send(self(), #heartbeat{ index = Beats1 }),
{noreply, State#state { heartbeats = Beats1 }};
_Other ->
shutdown(State),
{stop, shutdown, State}
end;

handle_info(_Info, State) ->
{noreply, State}.
Expand Down Expand Up @@ -263,7 +268,12 @@ send_message(Message, ServerModule, Req) ->
M = "<script>parent.s._(" ++ Message0 ++ ", document);</script>",
apply(ServerModule, chunk, [Req, M]).

reset_heartbeat({TimerRef, Time}) ->
erlang:cancel_timer(TimerRef),
NewRef = erlang:start_timer(Time, self(), heartbeat),
{NewRef, Time}.
reset_timer(TimerRef, Time, Message) ->
catch(erlang:cancel_timer(TimerRef)),
erlang:start_timer(Time, self(), Message).

shutdown(#state{ server_module = ServerModule,
req = Req,
caller = Caller }) ->
gen_server:reply(Caller, ServerModule:respond(Req, 200, "")),
ok.