Permalink
Browse files

Fixing more timers

More timers and heartbeats fixed, including htmlfile and xhr multipart.
Nothing in the tests seemed to have an influence on the htmlfile
transport. Further tests ought to be added in time.

Note that as for the other transport, the timeout on a disconnection is
still not a BIF, compared to the heartbeats which now use the Erlang
timer functions.
  • Loading branch information...
1 parent c4bb8ea commit a915b18ab1df097327da80c5ea9c615b2e94279b @ferd ferd committed Jun 16, 2011
Showing with 80 additions and 57 deletions.
  1. +39 −28 src/socketio_transport_htmlfile.erl
  2. +41 −29 src/socketio_transport_xhr_multipart.erl
@@ -63,9 +63,9 @@ init([Sup, SessionId, ServerModule, {'htmlfile', {Req, Caller}}]) ->
_ ->
error_logger:warning_report(
"Could not load default heartbeat_interval value from "
- "the application file. Setting the default value to infinity."
+ "the application file. Setting the default value to 10000."
),
- infinity
+ 10000
end,
CloseTimeout =
case application:get_env(close_timeout) of
@@ -85,7 +85,7 @@ init([Sup, SessionId, ServerModule, {'htmlfile', {Req, Caller}}]) ->
req = Req,
caller = Caller,
close_timeout = CloseTimeout,
- heartbeat_interval = HeartbeatInterval,
+ heartbeat_interval = {make_ref(), HeartbeatInterval},
event_manager = EventMgr,
sup = Sup
}}.
@@ -105,21 +105,24 @@ init([Sup, SessionId, ServerModule, {'htmlfile', {Req, Caller}}]) ->
%% @end
%%--------------------------------------------------------------------
%% Incoming data
-handle_call({'htmlfile', data, Req}, _From, #state{ heartbeat_interval = Interval,
+handle_call({'htmlfile', data, Req}, _From, #state{ heartbeat_interval = Interval,
server_module = ServerModule,
event_manager = EventManager } = State) ->
- Data = apply(ServerModule, parse_post, [Req]),
- Self = self(),
- lists:foreach(fun({"data", M}) ->
- spawn(fun () ->
- F = fun(#heartbeat{}) -> ignore;
- (M0) -> gen_event:notify(EventManager, {message, Self, M0})
- end,
- [F(Msg) || Msg <- socketio_data:decode(#msg{content=M})]
- end)
- end, Data),
- apply(ServerModule, respond, [Req, 200, [{"Content-Type","text/plain"}],"ok"]),
- {reply, ok, State, Interval};
+ Msgs = [socketio_data:decode(#msg{content=Data}) || {"data", Data} <- ServerModule:parse_post(Req)],
+ F = fun(#heartbeat{}, _Acc) ->
+ {timer, reset_heartbeat(Interval)};
+ (M, Acc) ->
+ gen_event:notify(EventManager, {message, self(), M}),
+ Acc
+ end,
+ NewState = case lists:foldl(F, undefined, lists:flatten(Msgs)) of
+ {timer, NewInterval} ->
+ State#state{ heartbeat_interval = NewInterval};
+ undefined ->
+ State
+ end,
+ ServerModule:respond(Req, 200, [{"Content-Type", "text/plain"}], "ok"),
+ {reply, ok, NewState};
%% Event management
handle_call(event_manager, _From, #state{ event_manager = EventMgr } = State) ->
@@ -149,26 +152,29 @@ handle_call(stop, _From, State) ->
%% @end
%%--------------------------------------------------------------------
handle_cast({initialize, Req}, #state{ heartbeat_interval = Interval, server_module = ServerModule } = State) ->
- apply(ServerModule, headers, [Req, [{"Content-Type", "text/html"},
- {"Connection", "Keep-Alive"},
- {"Transfer-Encoding", "chunked"}]]),
- H = "<html><body>" ++ lists:duplicate(254,$\s),
- link(apply(ServerModule, socket, [Req])),
- apply(ServerModule, chunk, [Req, H]),
- {noreply, State#state{ connection_reference = {'htmlfile', connected} }, Interval};
+ ServerModule:headers(Req, [{"Content-Type", "text/html"},
+ {"Connection", "Keep-Alive"},
+ {"Transfer-Encoding", "chunked"}]),
+ H = "<html><body>" ++ lists:duplicate(254, $\s),
+ link(ServerModule:socket(Req)),
+ ServerModule:chunk(Req, H),
+ {noreply, State#state{ connection_reference = {htmlfile, connected},
+ heartbeat_interval = reset_heartbeat(Interval) }};
handle_cast(heartbeat, #state{ heartbeats = Beats,
heartbeat_interval = Interval } = State) ->
Beats1 = Beats + 1,
socketio_client:send(self(), #heartbeat{ index = Beats1 }),
- {noreply, State#state { heartbeats = Beats1 }, Interval};
+ {noreply, State#state{ heartbeats = Beats1,
+ heartbeat_interval = reset_heartbeat(Interval) }};
%% Send
handle_cast({send, Message}, #state{ req = Req,
server_module = ServerModule,
- connection_reference = {'htmlfile', connected }, heartbeat_interval = Interval } = State) ->
+ connection_reference = {'htmlfile', connected },
+ heartbeat_interval = Interval } = State) ->
send_message(Message, ServerModule, Req),
- {noreply, State, Interval};
+ {noreply, State#state{ heartbeat_interval = reset_heartbeat(Interval) }};
handle_cast(_, #state{} = State) ->
{noreply, State}.
@@ -189,10 +195,10 @@ handle_info({'EXIT',_Port,_Reason}, #state{ close_timeout = ServerTimeout} = Sta
handle_info(timeout, #state{ server_module = ServerModule,
connection_reference = {'htmlfile', none}, caller = Caller, req = Req } = State) ->
- gen_server:reply(Caller, apply(ServerModule, respond, [Req, 200])),
+ gen_server:reply(Caller, ServerModule:respond(Req, 200)),
{stop, shutdown, State};
-handle_info(timeout, State) ->
+handle_info({timeout, _Ref, heartbeat}, State) ->
gen_server:cast(self(), heartbeat),
{noreply, State};
@@ -237,3 +243,8 @@ send_message(Message, ServerModule, Req) ->
Message0 = binary_to_list(jsx:term_to_json(list_to_binary(Message), [{strict, false}])),
M = "<script>parent.s._(" ++ Message0 ++ ", document);</script>",
apply(ServerModule, chunk, [Req, M]).
+
+reset_heartbeat({TimerRef, Time}) ->
+ erlang:cancel_timer(TimerRef),
+ NewRef = erlang:start_timer(Time, self(), heartbeat),
+ {NewRef, Time}.
@@ -63,9 +63,9 @@ init([Sup, SessionId, ServerModule, {'xhr-multipart', {Req, Caller}}]) ->
_ ->
error_logger:warning_report(
"Could not load default heartbeat_interval value from "
- "the application file. Setting the default value to infinity."
+ "the application file. Setting the default value to 10000."
),
- infinity
+ 10000
end,
CloseTimeout =
case application:get_env(close_timeout) of
@@ -89,7 +89,7 @@ init([Sup, SessionId, ServerModule, {'xhr-multipart', {Req, Caller}}]) ->
req = Req,
caller = Caller,
close_timeout = CloseTimeout,
- heartbeat_interval = HeartbeatInterval,
+ heartbeat_interval = {make_ref(), HeartbeatInterval},
event_manager = EventMgr,
sup = Sup
}}.
@@ -110,19 +110,23 @@ init([Sup, SessionId, ServerModule, {'xhr-multipart', {Req, Caller}}]) ->
%%--------------------------------------------------------------------
%% Incoming data
handle_call({'xhr-multipart', data, Req}, _From, #state{ server_module = ServerModule,
- heartbeat_interval = Interval, event_manager = EventManager } = State) ->
- Data = apply(ServerModule, parse_post, [Req]),
- Self = self(),
- lists:foreach(fun({"data", M}) ->
- spawn(fun () ->
- F = fun(#heartbeat{}) -> ignore;
- (M0) -> gen_event:notify(EventManager, {message, Self, M0})
- end,
- [F(Msg) || Msg <- socketio_data:decode(#msg{content=M})]
- end)
- end, Data),
- apply(ServerModule, respond, [Req, 200,[{"Content-Type","text/plain"}],"ok"]),
- {reply, ok, State, Interval};
+ heartbeat_interval = Interval,
+ event_manager = EventManager } = State) ->
+ Msgs = [socketio_data:decode(#msg{content=Data}) || {"data", Data} <- ServerModule:parse_post(Req)],
+ F = fun(#heartbeat{}, _Acc) ->
+ {timer, reset_heartbeat(Interval)};
+ (M, Acc) ->
+ gen_event:notify(EventManager, {message, self(), M}),
+ Acc
+ end,
+ NewState = case lists:foldl(F, undefined, lists:flatten(Msgs)) of
+ {timer, NewInterval} ->
+ State#state{ heartbeat_interval = NewInterval};
+ undefined ->
+ State
+ end,
+ ServerModule:respond(Req, 200, [{"Content-Type", "text/plain"}], "ok"),
+ {reply, ok, NewState};
%% Event management
handle_call(event_manager, _From, #state{ event_manager = EventMgr } = State) ->
@@ -152,38 +156,41 @@ handle_call(stop, _From, State) ->
%% @end
%%--------------------------------------------------------------------
handle_cast({initialize, Req}, #state{ server_module = ServerModule, heartbeat_interval = Interval } = State) ->
- Headers = apply(ServerModule, get_headers, [Req]),
- Headers1 =
+ Headers = ServerModule:get_headers(Req),
+ Headers1 =
case proplists:get_value('Origin', Headers) of
undefined ->
Headers;
Origin ->
case socketio_listener:verify_origin(Origin, socketio_listener:origins(listener(State))) of
true ->
[{"Access-Control-Allow-Origin", "*"},
- {"Access-Control-Allow-Credentials", "true"}|Headers];
+ {"Access-Control-Allow-Credentials", "true"} | Headers];
false ->
Headers
end
end,
- link(apply(ServerModule, socket, [Req])),
- apply(ServerModule, headers, [Req, [{"Content-Type", "multipart/x-mixed-replace;boundary=\"socketio\""},
- {"Connection", "Keep-Alive"}|Headers1]]),
- apply(ServerModule, stream, [Req, "--socketio\n"]),
- {noreply, State#state{ connection_reference = {'xhr-multipart', connected} }, Interval};
+ link(ServerModule:socket(Req)),
+ ServerModule:headers(Req, [{"Content-Type", "multipart/x-mixed-replace;boundary=\"socketio\""},
+ {"Connection", "Keep-Alive"} | Headers1]),
+ ServerModule:stream(Req, "--socketio\n"),
+ {noreply, State#state{ connection_reference = {'xhr-multipart', connected},
+ heartbeat_interval = reset_heartbeat(Interval) }};
handle_cast(heartbeat, #state{ heartbeats = Beats,
heartbeat_interval = Interval } = State) ->
Beats1 = Beats + 1,
socketio_client:send(self(), #heartbeat{ index = Beats1 }),
- {noreply, State#state { heartbeats = Beats1 }, Interval};
+ {noreply, State#state { heartbeats = Beats1,
+ heartbeat_interval = reset_heartbeat(Interval) }};
%% Send
handle_cast({send, Message}, #state{ req = Req,
server_module = ServerModule,
- connection_reference = {'xhr-multipart', connected }, heartbeat_interval = Interval } = State) ->
+ connection_reference = {'xhr-multipart', connected },
+ heartbeat_interval = Interval } = State) ->
send_message(Message, Req, ServerModule),
- {noreply, State, Interval};
+ {noreply, State#state{ heartbeat_interval = reset_heartbeat(Interval) }};
handle_cast(_, #state{} = State) ->
{noreply, State}.
@@ -204,10 +211,10 @@ handle_info({'EXIT',_Port,_Reason}, #state{ close_timeout = ServerTimeout} = Sta
handle_info(timeout, #state{ server_module = ServerModule,
connection_reference = {'xhr-multipart', none}, req = Req, caller = Caller } = State) ->
- gen_server:reply(Caller, apply(ServerModule, respond, [Req, 200, ""])),
+ gen_server:reply(Caller, ServerModule:respond(Req, 200, "")),
{stop, shutdown, State};
-handle_info(timeout, State) ->
+handle_info({timeout, _Ref, heartbeat}, State) ->
gen_server:cast(self(), heartbeat),
{noreply, State};
@@ -255,3 +262,8 @@ send_message(Message, Req, ServerModule) ->
listener(#state{ sup = Sup }) ->
socketio_listener:server(Sup).
+
+reset_heartbeat({TimerRef, Time}) ->
+ erlang:cancel_timer(TimerRef),
+ NewRef = erlang:start_timer(Time, self(), heartbeat),
+ {NewRef, Time}.

0 comments on commit a915b18

Please sign in to comment.