Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Fixed and improved timeout implementation for transports (heartbeat_interval and connection_close) #49

Open
wants to merge 4 commits into from

4 participants

@jannschu
  • Reduced number of timer resets
  • Client heartbeats are now considered
  • Correct connection_close time is used

Currently only for xhr-multipart. I will add the others (wait before merge) and maybe also tests.
Please discuss.

@ferd

Is there any specific reason for suddenly adding this constraint there? Just making the invariants more obvious?

Yes there is a reason. After closing a tab with an open connection for some reason the last message was re-sent. This constraint avoids starting the heartbeat timeout (which resets the close_connection timeout).

@ferd

Any specific reason for removing that one?

Yes, of course.

In {initialize, ...}-cast an timer for a heartbeat is created. I for myself think it isn't that useful to send a heartbeat immediately after a connection is opened. So the first heartbeat is sent after a heartbeat_interval.

@ferd

you only need the catch because the timer reference is not reference. If you keep the default value I had of make_ref(), then you can just transparently call for a cancel_timer without needing any error checking, which makes it somewhat nicer to read IMO.

Thats a matter of taste I think. With the proposed solution you have to create a meaningless reference.

@ferd

I'm not sure I get why we have this clause here and why it leads to a shutdown. I might be thick because it's 7 in the morning, but care to explain?

If the client did not respond to the last heartbeat the connection should be considered to be closed. Or did I understand this one wrong?

Collaborator

I'm checking in the node.js implementation and unless I don't get the code, there seems to be no check regarding whether the heartbeats are in right order or not. They seem to be used only to reset timers, and not as a consistency check.

Agreed. There seems to be no numbering at all, think a client id is used what here is index.

But: The current implementation is then buggy as well. First the index should stay the same. Secondly if the client does not response to a heartbeat the server just continues to send 'heartbeat requests' (see timer reset in the clause). So to fix it, the heartbeat clause must check when the last msg/heartbeat from the client received.

@omarkj
Collaborator

Just went through this and am okay with it - haven't tried it but it looks okay. @yrashk, what do you think? I think we can pull this in.

@jannschu

Maybe wait until I changed htmlfile as well. But can't test it here.

@jannschu

So, htmlfile changed. But it is untested.

@yrashk
Owner

@omarkj have you tried to test this pull request yet?

@omarkj
Collaborator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 18, 2011
  1. fix/improve xhr-multipart timeout implementation

    Jannik Schürg authored
  2. fix/improve polling timeout implementation

    Jannik Schürg authored
  3. fix/improve htmlfile timeout implementation

    Jannik Schürg authored
Commits on Jun 19, 2011
  1. remove debug output & fix compiler warning

    Jannik Schürg authored
This page is out of date. Refresh to see the latest.
View
134 src/socketio_transport_htmlfile.erl
@@ -18,8 +18,10 @@
server_module,
connection_reference,
heartbeats = 0,
+ client_heartbeat = undefined,
heartbeat_interval,
close_timeout,
+ timer_ref = undefined,
event_manager,
sup
}).
@@ -57,27 +59,30 @@ init([Sup, SessionId, ServerModule, {'htmlfile', {Req, Caller}}]) ->
apply(ServerModule, ensure_longpolling_request, [Req]),
process_flag(trap_exit, true),
HeartbeatInterval =
- case application:get_env(heartbeat_interval) of
- {ok, Time} ->
- Time;
- _ ->
- error_logger:warning_report(
- "Could not load default heartbeat_interval value from "
- "the application file. Setting the default value to 10000."
- ),
- 10000
- end,
+ case application:get_env(heartbeat_interval) of
+ {ok, Time} ->
+ Time;
+ _ ->
+ error_logger:warning_report(
+ "Could not load default heartbeat_interval value from "
+ "the application file. Setting the default value to 10000."
+ ),
+ 10000
+ end,
CloseTimeout =
- case application:get_env(close_timeout) of
- {ok, Time0} ->
- Time0;
- _ ->
- 8000
- end,
+ case application:get_env(close_timeout) of
+ {ok, Time0} ->
+ Time0;
+ _ ->
+ error_logger:warning_report(
+ "Could not load default close_timeout value from "
+ "the application file. Setting the default value to 8000 ms."
+ ),
+ 8000
+ end,
{ok, EventMgr} = gen_event:start_link(),
gen_server:cast(self(), {initialize, Req}),
socketio_client:send(self(), #msg{ content = SessionId }),
- gen_server:cast(self(), heartbeat),
{ok, #state{
session_id = SessionId,
server_module = ServerModule,
@@ -85,7 +90,7 @@ init([Sup, SessionId, ServerModule, {'htmlfile', {Req, Caller}}]) ->
req = Req,
caller = Caller,
close_timeout = CloseTimeout,
- heartbeat_interval = {make_ref(), HeartbeatInterval},
+ heartbeat_interval = HeartbeatInterval,
event_manager = EventMgr,
sup = Sup
}}.
@@ -106,24 +111,29 @@ init([Sup, SessionId, ServerModule, {'htmlfile', {Req, Caller}}]) ->
%%--------------------------------------------------------------------
%% Incoming data
handle_call({'htmlfile', data, Req}, _From, #state{ heartbeat_interval = Interval,
+ timer_ref = OldTimerRef,
+ connection_reference = {'htmlfile', connected},
server_module = ServerModule,
event_manager = EventManager } = State) ->
Msgs = [socketio_data:decode(#msg{content=Data}) || {"data", Data} <- ServerModule:parse_post(Req)],
- F = fun(#heartbeat{}, _Acc) ->
- {timer, reset_heartbeat(Interval)};
+ F = fun(#heartbeat{index = HeartbeatNumber}, _Acc) ->
+ {timer, reset_timer(OldTimerRef, Interval, heartbeat), HeartbeatNumber};
(M, Acc) ->
gen_event:notify(EventManager, {message, self(), M}),
Acc
end,
NewState = case lists:foldl(F, undefined, lists:flatten(Msgs)) of
- {timer, NewInterval} ->
- State#state{ heartbeat_interval = NewInterval};
+ {timer, NewTimerRef, HeartbeatNumber} ->
+ State#state{ timer_ref = NewTimerRef, client_heartbeat = HeartbeatNumber };
undefined ->
State
end,
ServerModule:respond(Req, 200, [{"Content-Type", "text/plain"}], "ok"),
{reply, ok, NewState};
+handle_call({'htmlfile', data, _Req}, _From, #state{ connection_reference = {'htmlfile', none} } = State) ->
+ {reply, ok, State};
+
%% Event management
handle_call(event_manager, _From, #state{ event_manager = EventMgr } = State) ->
{reply, EventMgr, State};
@@ -151,7 +161,9 @@ handle_call(stop, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({initialize, Req}, #state{ heartbeat_interval = Interval, server_module = ServerModule } = State) ->
+handle_cast({initialize, Req}, #state{ heartbeat_interval = Interval,
+ timer_ref = OldTimerRef,
+ server_module = ServerModule } = State) ->
ServerModule:headers(Req, [{"Content-Type", "text/html"},
{"Connection", "Keep-Alive"},
{"Transfer-Encoding", "chunked"}]),
@@ -159,22 +171,16 @@ handle_cast({initialize, Req}, #state{ heartbeat_interval = Interval, server_mod
link(ServerModule:socket(Req)),
ServerModule:chunk(Req, H),
{noreply, State#state{ connection_reference = {htmlfile, connected},
- heartbeat_interval = reset_heartbeat(Interval) }};
-
-handle_cast(heartbeat, #state{ heartbeats = Beats,
- heartbeat_interval = Interval } = State) ->
- Beats1 = Beats + 1,
- socketio_client:send(self(), #heartbeat{ index = Beats1 }),
- {noreply, State#state{ heartbeats = Beats1,
- heartbeat_interval = reset_heartbeat(Interval) }};
+ timer_ref = reset_timer(OldTimerRef, Interval, heartbeat) }};
%% Send
handle_cast({send, Message}, #state{ req = Req,
server_module = ServerModule,
- connection_reference = {'htmlfile', connected },
+ connection_reference = {'htmlfile', connected},
+ timer_ref = OldTimerRef,
heartbeat_interval = Interval } = State) ->
send_message(Message, ServerModule, Req),
- {noreply, State#state{ heartbeat_interval = reset_heartbeat(Interval) }};
+ {noreply, State#state{ timer_ref = reset_timer(OldTimerRef, Interval, heartbeat) }};
handle_cast(_, #state{} = State) ->
{noreply, State}.
@@ -191,35 +197,34 @@ handle_cast(_, #state{} = State) ->
%% @end
%%--------------------------------------------------------------------
%% CLient disconnected. We fire a timer (ServerTimeout)!
-handle_info({'EXIT',_Port,_Reason}, #state{ close_timeout = ServerTimeout} = State) when is_port(_Port) ->
- {noreply, State#state { connection_reference = {'htmlfile', none}}, ServerTimeout};
+handle_info({'EXIT',_Port,_Reason}, #state{ close_timeout = ServerTimeout,
+ timer_ref = OldTimerRef } = State) when is_port(_Port) ->
+ NewTimer = reset_timer(OldTimerRef, ServerTimeout, connection_timeout),
+ {noreply, State#state { connection_reference = {'htmlfile', none}, timer_ref = NewTimer }};
-%% This branch handles two purposes: 1. handling the close_timeout,
-%% 2. handling the heartbeat timeout that might comes first. The thing is,
-%% when the client connection dies, we need to wait for the close_timeout
-%% to be fired. That one can be cancelled at any time by God knows what for now,
-%% and that might be desirable. However, it can also be cancelled because we
-%% happen to receive the heartbeat timeout. Given the right setting, this will
-%% happen every time a disconnection happens at the point where the delay left to
-%% the current heartbeat is longer than the delay left to the total value of
-%% close_timout. Funny, eh?
-%% For this reason, the heartbeat timeout when we have no htmlfile
-%% connection reference has to be seen as the same as the close_timeout
-%% timer firing off. This is the safest way to guarantee everything will run
-%% okay.
-handle_info(timeout, #state{ server_module = ServerModule,
- connection_reference = {'htmlfile', none}, caller = Caller, req = Req } = State) ->
- gen_server:reply(Caller, ServerModule:respond(Req, 200)),
+handle_info({timeout, _Ref, connection_reference}, #state{ timer_ref = _Ref,
+ connection_reference = {'htmlfile', none} } = State) ->
+ shutdown(State),
{stop, shutdown, State};
-%% See previous clauses' comments
-handle_info({timeout, _Ref, heartbeat}, #state{ connection_reference = {'htmlfile', none} } = State) ->
- handle_info(timeout, State);
-
%% Regular heartbeat
-handle_info({timeout, _Ref, heartbeat}, State) ->
- gen_server:cast(self(), heartbeat),
- {noreply, State};
+handle_info({timeout, _Ref, heartbeat}, #state{ heartbeats = Beats,
+ connection_reference = {'htmlfile', connected},
+ timer_ref = _Ref,
+ client_heartbeat = ClientHeartbeat } = State) ->
+ Client = case {ClientHeartbeat, Beats} of
+ {undefined, 0} -> 0;
+ _Any -> ClientHeartbeat
+ end,
+ case Client of
+ Beats ->
+ Beats1 = Beats + 1,
+ socketio_client:send(self(), #heartbeat{ index = Beats1 }),
+ {noreply, State#state { heartbeats = Beats1 }};
+ _Other ->
+ shutdown(State),
+ {stop, shutdown, State}
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -263,7 +268,12 @@ send_message(Message, ServerModule, Req) ->
M = "<script>parent.s._(" ++ Message0 ++ ", document);</script>",
apply(ServerModule, chunk, [Req, M]).
-reset_heartbeat({TimerRef, Time}) ->
- erlang:cancel_timer(TimerRef),
- NewRef = erlang:start_timer(Time, self(), heartbeat),
- {NewRef, Time}.
+reset_timer(TimerRef, Time, Message) ->
+ catch(erlang:cancel_timer(TimerRef)),
+ erlang:start_timer(Time, self(), Message).
+
+shutdown(#state{ server_module = ServerModule,
+ req = Req,
+ caller = Caller }) ->
+ gen_server:reply(Caller, ServerModule:respond(Req, 200, "")),
+ ok.
View
114 src/socketio_transport_polling.erl
@@ -21,6 +21,7 @@
index,
polling_duration,
close_timeout,
+ timer_ref = undefined,
event_manager,
sup
}).
@@ -57,23 +58,27 @@ start_link(Sup, SessionId, ServerModule, ConnectionReference) ->
init([Sup, SessionId, ServerModule, {TransportType, {Req, Index}}]) ->
process_flag(trap_exit, true),
PollingDuration =
- case application:get_env(polling_duration) of
- {ok, Time} ->
- Time;
- _ ->
- error_logger:warning_report(
- "Could not load default polling_duration value from "
- "the application file. Setting the default value to 20000 ms."
- ),
- 20000
- end,
+ case application:get_env(polling_duration) of
+ {ok, Time} ->
+ Time;
+ _ ->
+ error_logger:warning_report(
+ "Could not load default polling_duration value from "
+ "the application file. Setting the default value to 20000 ms."
+ ),
+ 20000
+ end,
CloseTimeout =
- case application:get_env(close_timeout) of
- {ok, Time0} ->
- Time0;
- _ ->
- 8000
- end,
+ case application:get_env(close_timeout) of
+ {ok, Time0} ->
+ Time0;
+ _ ->
+ error_logger:warning_report(
+ "Could not load default close_timeout value from "
+ "the application file. Setting the default value to 8000 ms."
+ ),
+ 8000
+ end,
{ok, EventMgr} = gen_event:start_link(),
send_message(#msg{content = SessionId}, Req, Index, ServerModule, Sup),
{ok, #state {
@@ -82,7 +87,7 @@ init([Sup, SessionId, ServerModule, {TransportType, {Req, Index}}]) ->
connection_reference = {TransportType, none},
req = Req,
index = Index,
- polling_duration = {make_ref(), PollingDuration},
+ polling_duration = PollingDuration,
close_timeout = CloseTimeout,
event_manager = EventMgr,
sup = Sup
@@ -109,30 +114,31 @@ init([Sup, SessionId, ServerModule, {TransportType, Req}]) ->
%% Incoming data
handle_call({_TransportType, data, Req}, From, #state{ server_module = ServerModule,
event_manager = EventManager,
+ timer_ref = OldTimerRef,
polling_duration = Interval,
sup = Sup } = State) ->
Msgs = [socketio_data:decode(#msg{content=Data}) || {"data", Data} <- ServerModule:parse_post(Req)],
{Response, NewState} =
- case cors_headers(ServerModule:get_headers(Req), Sup) of
- {false, _Headers} ->
- Reply = gen_server:reply(From, ServerModule:respond(Req, 405, "unauthorized")),
- {Reply, State};
- {_, Headers} ->
- F = fun(#heartbeat{}, _Acc) ->
- {timer, reset_duration(Interval)};
- (M, Acc) ->
- gen_event:notify(EventManager, {message, self(), M}),
- Acc
- end,
- TmpState = case lists:foldl(F, undefined, lists:flatten(Msgs)) of
- {timer, NewInterval} ->
- State#state{ polling_duration = NewInterval};
- undefined ->
- State
- end,
- Reply = gen_server:reply(From, ServerModule:respond(Req, 200, [Headers | [{"Content-Type", "text/plain"}]], "ok")),
- {Reply, TmpState}
- end,
+ case cors_headers(ServerModule:get_headers(Req), Sup) of
+ {false, _Headers} ->
+ Reply = gen_server:reply(From, ServerModule:respond(Req, 405, "unauthorized")),
+ {Reply, State};
+ {_, Headers} ->
+ F = fun(#heartbeat{}, _Acc) ->
+ {timer, reset_timer(OldTimerRef, Interval, polling)};
+ (M, Acc) ->
+ gen_event:notify(EventManager, {message, self(), M}),
+ Acc
+ end,
+ TmpState = case lists:foldl(F, undefined, lists:flatten(Msgs)) of
+ {timer, NewTimerRef} ->
+ State#state{ timer_ref = NewTimerRef };
+ undefined ->
+ State
+ end,
+ Reply = gen_server:reply(From, ServerModule:respond(Req, 200, [Headers | [{"Content-Type", "text/plain"}]], "ok")),
+ {Reply, TmpState}
+ end,
{reply, Response, NewState};
%% Event management
@@ -164,15 +170,16 @@ handle_call(stop, _From, State) ->
%%--------------------------------------------------------------------
%% Polling
handle_cast({TransportType, polling_request, {Req, Index}, Server}, State) ->
- handle_cast({TransportType, polling_request, Req, Server}, State#state{ index = Index});
+ handle_cast({TransportType, polling_request, Req, Server}, State#state{ index = Index });
handle_cast({TransportType, polling_request, Req, Server}, #state { server_module = ServerModule,
polling_duration = Interval,
+ timer_ref = OldTimerRef,
message_buffer = [] } = State) ->
ServerModule:ensure_longpolling_request(Req),
link(ServerModule:socket(Req)),
{noreply, State#state{ connection_reference = {TransportType, connected}, req = Req,
caller = Server,
- polling_duration = reset_duration(Interval) }};
+ timer_ref = reset_timer(OldTimerRef, Interval, polling) }};
handle_cast({TransportType, polling_request, Req, Server}, #state { server_module = ServerModule,
message_buffer = Buffer } = State) ->
@@ -185,11 +192,12 @@ handle_cast({send, Message}, #state{ connection_reference = {_TransportType, non
{noreply, State#state{ message_buffer = lists:append(Buffer, [Message])}};
handle_cast({send, Message}, #state{ server_module = ServerModule,
+ timer_ref = OldTimerRef,
connection_reference = {TransportType, connected }, req = Req, caller = Caller,
index = Index, sup = Sup, polling_duration = Interval} = State) ->
gen_server:reply(Caller, send_message(Message, Req, Index, ServerModule, Sup)),
{noreply, State#state{ connection_reference = {TransportType, none},
- polling_duration = reset_duration(Interval) }};
+ timer_ref = reset_timer(OldTimerRef, Interval, polling) }};
handle_cast(_, State) ->
{noreply, State}.
@@ -206,28 +214,29 @@ handle_cast(_, State) ->
%% @end
%%--------------------------------------------------------------------
%% A client has disconnected. We fire a timer (CloseTimeout)!
-handle_info({'EXIT',Port,_Reason}, #state{ connection_reference = {TransportType, _ }, close_timeout = CloseTimeout} = State) when is_port(Port) ->
- {noreply, State#state { connection_reference = {TransportType, none}}, CloseTimeout};
+handle_info({'EXIT',Port,_Reason}, #state{ connection_reference = {TransportType, _ },
+ close_timeout = CloseTimeout,
+ timer_ref = OldTimerRef } = State) when is_port(Port) ->
+ NewTimerRef = reset_timer(OldTimerRef, CloseTimeout, connection_timeout),
+ {noreply, State#state { connection_reference = {TransportType, none}, timer_ref = NewTimerRef }};
%% Connections has timed out, but is technically still active. This is like a
%% heartbeat, but for polling connections.
handle_info({timeout, _Ref, polling}, #state{ server_module = ServerModule,
connection_reference = {_TransportType, connected},
+ timer_ref = _Ref,
caller = Caller, req = Req, index = Index, sup = Sup } = State) ->
gen_server:reply(Caller, send_message("", Req, Index, ServerModule, Sup)),
{noreply, State};
%% Client has timed out, no active connection found. (connection_reference = none)
-handle_info(timeout, #state{ server_module = ServerModule, caller = Caller, req = Req } = State) ->
+handle_info({timeout, _Ref, connection_timeout}, #state{ server_module = ServerModule,
+ timer_ref = _Ref,
+ caller = Caller,
+ req = Req } = State) ->
gen_server:reply(Caller, apply(ServerModule, respond, [Req, 200,""])),
{stop, shutdown, State};
-%% client has timed out, no active connection found, but the normal close_timeout
-%% is being interrupted by the polling timeout timer interacting in here.
-%% We defer to the preceding clause.
-handle_info({timeout, _Ref, polling}, #state{ } = State) ->
- handle_info(timeout, State);
-
handle_info(_Info, State) ->
{noreply, State}.
@@ -320,7 +329,6 @@ cors_headers(Headers, Sup) ->
end
end.
-reset_duration({TimerRef, Time}) ->
- erlang:cancel_timer(TimerRef),
- NewRef = erlang:start_timer(Time, self(), polling),
- {NewRef, Time}.
+reset_timer(TimerRef, Time, Message) ->
+ catch(erlang:cancel_timer(TimerRef)),
+ erlang:start_timer(Time, self(), Message).
View
157 src/socketio_transport_xhr_multipart.erl
@@ -18,8 +18,10 @@
server_module,
connection_reference,
heartbeats = 0,
+ client_heartbeat = undefined,
heartbeat_interval,
close_timeout,
+ timer_ref = undefined,
event_manager,
sup
}).
@@ -57,31 +59,30 @@ init([Sup, SessionId, ServerModule, {'xhr-multipart', {Req, Caller}}]) ->
apply(ServerModule, ensure_longpolling_request, [Req]),
process_flag(trap_exit, true),
HeartbeatInterval =
- case application:get_env(heartbeat_interval) of
- {ok, Time} ->
- Time;
- _ ->
- error_logger:warning_report(
- "Could not load default heartbeat_interval value from "
- "the application file. Setting the default value to 10000."
- ),
- 10000
- end,
+ case application:get_env(heartbeat_interval) of
+ {ok, Time} ->
+ Time;
+ _ ->
+ error_logger:warning_report(
+ "Could not load default heartbeat_interval value from "
+ "the application file. Setting the default value to 10000."
+ ),
+ 10000
+ end,
CloseTimeout =
- case application:get_env(close_timeout) of
- {ok, Time0} ->
- Time0;
- _ ->
- error_logger:warning_report(
- "Could not load default close_timeout value from "
- "the application file. Setting the default value to 8000 ms."
- ),
- 8000
- end,
+ case application:get_env(close_timeout) of
+ {ok, Time0} ->
+ Time0;
+ _ ->
+ error_logger:warning_report(
+ "Could not load default close_timeout value from "
+ "the application file. Setting the default value to 8000 ms."
+ ),
+ 8000
+ end,
{ok, EventMgr} = gen_event:start_link(),
gen_server:cast(self(), {initialize, Req}),
socketio_client:send(self(), #msg{ content = SessionId }),
- gen_server:cast(self(), heartbeat),
{ok, #state{
session_id = SessionId,
server_module = ServerModule,
@@ -89,7 +90,7 @@ init([Sup, SessionId, ServerModule, {'xhr-multipart', {Req, Caller}}]) ->
req = Req,
caller = Caller,
close_timeout = CloseTimeout,
- heartbeat_interval = {make_ref(), HeartbeatInterval},
+ heartbeat_interval = HeartbeatInterval,
event_manager = EventMgr,
sup = Sup
}}.
@@ -111,23 +112,28 @@ init([Sup, SessionId, ServerModule, {'xhr-multipart', {Req, Caller}}]) ->
%% Incoming data
handle_call({'xhr-multipart', data, Req}, _From, #state{ server_module = ServerModule,
heartbeat_interval = Interval,
+ connection_reference = {'xhr-multipart', connected},
+ timer_ref = OldTimerRef,
event_manager = EventManager } = State) ->
Msgs = [socketio_data:decode(#msg{content=Data}) || {"data", Data} <- ServerModule:parse_post(Req)],
- F = fun(#heartbeat{}, _Acc) ->
- {timer, reset_heartbeat(Interval)};
+ F = fun(#heartbeat{index = HeartbeatNumber}, _Acc) ->
+ {timer, reset_timer(OldTimerRef, Interval, heartbeat), HeartbeatNumber};
(M, Acc) ->
gen_event:notify(EventManager, {message, self(), M}),
Acc
end,
NewState = case lists:foldl(F, undefined, lists:flatten(Msgs)) of
- {timer, NewInterval} ->
- State#state{ heartbeat_interval = NewInterval};
+ {timer, NewTimerRef, HeartbeatNumber} ->
+ State#state{ timer_ref = NewTimerRef, client_heartbeat = HeartbeatNumber };
undefined ->
State
end,
ServerModule:respond(Req, 200, [{"Content-Type", "text/plain"}], "ok"),
{reply, ok, NewState};
+handle_call({'xhr-multipart', data, _Req}, _From, #state{ connection_reference = {'xhr-multipart', none} } = State) ->
+ {reply, ok, State};
+
%% Event management
handle_call(event_manager, _From, #state{ event_manager = EventMgr } = State) ->
{reply, EventMgr, State};
@@ -137,7 +143,7 @@ handle_call(session_id, _From, #state{ session_id = SessionId } = State) ->
{reply, SessionId, State};
%% Initial request
-handle_call(req, _From, #state{ req = Req} = State) ->
+handle_call(req, _From, #state{ req = Req } = State) ->
{reply, Req, State};
%% Flow control
@@ -155,7 +161,9 @@ handle_call(stop, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({initialize, Req}, #state{ server_module = ServerModule, heartbeat_interval = Interval } = State) ->
+handle_cast({initialize, Req}, #state{ server_module = ServerModule,
+ timer_ref = OldTimerRef,
+ heartbeat_interval = Interval } = State) ->
Headers = ServerModule:get_headers(Req),
Headers1 =
case proplists:get_value('Origin', Headers) of
@@ -175,22 +183,16 @@ handle_cast({initialize, Req}, #state{ server_module = ServerModule, heartbeat_i
{"Connection", "Keep-Alive"} | Headers1]),
ServerModule:stream(Req, "--socketio\n"),
{noreply, State#state{ connection_reference = {'xhr-multipart', connected},
- heartbeat_interval = reset_heartbeat(Interval) }};
-
-handle_cast(heartbeat, #state{ heartbeats = Beats,
- heartbeat_interval = Interval } = State) ->
- Beats1 = Beats + 1,
- socketio_client:send(self(), #heartbeat{ index = Beats1 }),
- {noreply, State#state { heartbeats = Beats1,
- heartbeat_interval = reset_heartbeat(Interval) }};
+ timer_ref = reset_timer(OldTimerRef, Interval, heartbeat) }};
%% Send
handle_cast({send, Message}, #state{ req = Req,
server_module = ServerModule,
- connection_reference = {'xhr-multipart', connected },
+ connection_reference = {'xhr-multipart', connected},
+ timer_ref = OldTimerRef,
heartbeat_interval = Interval } = State) ->
send_message(Message, Req, ServerModule),
- {noreply, State#state{ heartbeat_interval = reset_heartbeat(Interval) }};
+ {noreply, State#state{ timer_ref = reset_timer(OldTimerRef, Interval, heartbeat) }};
handle_cast(_, #state{} = State) ->
{noreply, State}.
@@ -206,36 +208,46 @@ handle_cast(_, #state{} = State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-%% A client has disconnected. We fire a timer (ServerTimeout)!
-handle_info({'EXIT',_Port,_Reason}, #state{ close_timeout = ServerTimeout} = State) when is_port(_Port) ->
- {noreply, State#state { connection_reference = {'xhr-multipart', none}}, ServerTimeout};
-
-%% This branch handles two purposes: 1. handling the close_timeout,
-%% 2. handling the heartbeat timeout that might comes first. The thing is,
-%% when the client connection dies, we need to wait for the close_timeout
-%% to be fired. That one can be cancelled at any time by God knows what for now,
-%% and that might be desirable. However, it can also be cancelled because we
-%% happen to receive the heartbeat timeout. Given the right setting, this will
-%% happen every time a disconnection happens at the point where the delay left to
-%% the current heartbeat is longer than the delay left to the total value of
-%% close_timout. Funny, eh?
-%% For this reason, the heartbeat timeout when we have no xhr-multipart
-%% connection reference has to be seen as the same as the close_timeout
-%% timer firing off. This is the safest way to guarantee everything will run
-%% okay.
-handle_info(timeout, #state{ server_module = ServerModule,
- connection_reference = {'xhr-multipart', none}, req = Req, caller = Caller } = State) ->
- gen_server:reply(Caller, ServerModule:respond(Req, 200, "")),
- {stop, shutdown, State};
+%% A client has disconnected. We fire a timer (ConnectionTimeout)!
+handle_info({'EXIT', _Port, _Reason}, #state{ close_timeout = ConnectionTimeout,
+ timer_ref = OldTimerRef } = State) when is_port(_Port) ->
+ NewTimer = reset_timer(OldTimerRef, ConnectionTimeout, connection_timeout),
+ {noreply, State#state { connection_reference = {'xhr-multipart', none}, timer_ref = NewTimer}};
-%% See the previous clauses' comments, please.
-handle_info({timeout, _Ref, heartbeat}, #state{ connection_reference = {'xhr-multipart', none} } = State) ->
- handle_info(timeout, State);
-%% Good old regular heartbeat. I admire your simplicity.
-handle_info({timeout, _Ref, heartbeat}, State) ->
- gen_server:cast(self(), heartbeat),
- {noreply, State};
+%% General information about timer handling:
+%% There are two timer types in this transport.
+%% The heartbeat and connection_timeout. We can't use gen_server timeouts,
+%% because an unrelated incoming message would stop the timeout and a mixed
+%% solution (timer for heartbeat, gen_server timeout for disconnect) would
+%% result in the heartbeat timer killing the gen_server timeout.
+%%
+%% The connection_reference must be checked for all, because it could happen
+%% that the timer fires at exactly the time we tried to reset it (during
+%% handle_info({'EXIT', ...) for example).
+
+handle_info({timeout, _OldTimerRef, connection_timeout}, #state{ timer_ref = _OldTimerRef,
+ connection_reference = {'xhr-multipart', none} } = State) ->
+ shutdown(State),
+ {stop, shutdown, State};
+
+handle_info({timeout, _OldTimerRef, heartbeat}, #state{ heartbeats = Beats,
+ connection_reference = {'xhr-multipart', connected},
+ client_heartbeat = ClientHeartbeat,
+ timer_ref = _OldTimerRef } = State) ->
+ Client = case {ClientHeartbeat, Beats} of
+ {undefined, 0} -> 0;
+ _Any -> ClientHeartbeat
+ end,
+ case Client of
+ Beats ->
+ Beats1 = Beats + 1,
+ socketio_client:send(self(), #heartbeat{ index = Beats1 }),
+ {noreply, State#state { heartbeats = Beats1 }};
+ _Other ->
+ shutdown(State),
+ {stop, shutdown, State}
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -282,7 +294,12 @@ send_message(Message, Req, ServerModule) ->
listener(#state{ sup = Sup }) ->
socketio_listener:server(Sup).
-reset_heartbeat({TimerRef, Time}) ->
- erlang:cancel_timer(TimerRef),
- NewRef = erlang:start_timer(Time, self(), heartbeat),
- {NewRef, Time}.
+reset_timer(TimerRef, Time, Message) ->
+ catch(erlang:cancel_timer(TimerRef)),
+ erlang:start_timer(Time, self(), Message).
+
+shutdown(#state{ server_module = ServerModule,
+ req = Req,
+ caller = Caller }) ->
+ gen_server:reply(Caller, ServerModule:respond(Req, 200, "")),
+ ok.
Something went wrong with that request. Please try again.