Skip to content

Commit

Permalink
Reworking the timers in transport_polling
Browse files Browse the repository at this point in the history
The current way of handling timeouts is risky with regards to user calls
able to mess with the timeouts and get things out of sync with the
client side.

This fix replaces the timeouts coming from the gen_server behaviour with
the erlang timer BIFs, likely safer and independent of the messages
received.

The timeouts having to do with a connection being closed are still
handled the old way.
  • Loading branch information
ferd committed Jun 16, 2011
1 parent cf55319 commit c4bb8ea
Showing 1 changed file with 42 additions and 27 deletions.
69 changes: 42 additions & 27 deletions src/socketio_transport_polling.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ init([Sup, SessionId, ServerModule, {TransportType, {Req, Index}}]) ->
connection_reference = {TransportType, none},
req = Req,
index = Index,
polling_duration = PollingDuration,
polling_duration = {make_ref(), PollingDuration},
close_timeout = CloseTimeout,
event_manager = EventMgr,
sup = Sup
Expand All @@ -108,26 +108,32 @@ init([Sup, SessionId, ServerModule, {TransportType, Req}]) ->
%%--------------------------------------------------------------------
%% Incoming data
handle_call({_TransportType, data, Req}, From, #state{ server_module = ServerModule,
event_manager = EventManager, sup = Sup } = State) ->
Data = apply(ServerModule, parse_post, [Req]),
Self = self(),
Response =
case cors_headers(apply(ServerModule, get_headers, [Req]), Sup) of
{false, _Headers} ->
gen_server:reply(From, apply(ServerModule, respond, [Req, 405, "unauthorized"]));
{_, Headers0} ->
Self = self(),
lists:foreach(fun({"data", M}) ->
spawn(fun () ->
F = fun(#heartbeat{}) -> ignore;
(M0) -> gen_event:notify(EventManager, {message, Self, M0})
end,
[F(Msg) || Msg <- socketio_data:decode(#msg{content=M})]
end)
end, Data),
gen_server:reply(From, apply(ServerModule, respond,[Req, 200, [Headers0|[{"Content-Type", "text/plain"}]], "ok"]))
end,
{reply, Response, State};
event_manager = EventManager,
polling_duration = Interval,
sup = Sup } = State) ->
Msgs = [socketio_data:decode(#msg{content=Data}) || {"data", Data} <- ServerModule:parse_post(Req)],
{Response, NewState} =
case cors_headers(ServerModule:get_headers(Req), Sup) of
{false, _Headers} ->
Reply = gen_server:reply(From, ServerModule:respond(Req, 405, "unauthorized")),
{Reply, State};
{_, Headers} ->
F = fun(#heartbeat{}, _Acc) ->
{timer, reset_duration(Interval)};
(M, Acc) ->
gen_event:notify(EventManager, {message, self(), M}),
Acc
end,
TmpState = case lists:foldl(F, undefined, lists:flatten(Msgs)) of
{timer, NewInterval} ->
State#state{ polling_duration = NewInterval};
undefined ->
State
end,
Reply = gen_server:reply(From, ServerModule:respond(Req, 200, [Headers | [{"Content-Type", "text/plain"}]], "ok")),
{Reply, TmpState}
end,
{reply, Response, NewState};

%% Event management
handle_call(event_manager, _From, #state{ event_manager = EventMgr } = State) ->
Expand Down Expand Up @@ -160,10 +166,13 @@ handle_call(stop, _From, State) ->
handle_cast({TransportType, polling_request, {Req, Index}, Server}, State) ->
handle_cast({TransportType, polling_request, Req, Server}, State#state{ index = Index});
handle_cast({TransportType, polling_request, Req, Server}, #state { server_module = ServerModule,
polling_duration = Interval, message_buffer = [] } = State) ->
apply(ServerModule, ensure_longpolling_request, [Req]),
link(apply(ServerModule, socket, [Req])),
{noreply, State#state{ connection_reference = {TransportType, connected}, req = Req, caller = Server }, Interval};
polling_duration = Interval,
message_buffer = [] } = State) ->
ServerModule:ensure_longpolling_request(Req),
link(ServerModule:socket(Req)),
{noreply, State#state{ connection_reference = {TransportType, connected}, req = Req,
caller = Server,
polling_duration = reset_duration(Interval) }};

handle_cast({TransportType, polling_request, Req, Server}, #state { server_module = ServerModule,
message_buffer = Buffer } = State) ->
Expand All @@ -179,7 +188,8 @@ handle_cast({send, Message}, #state{ server_module = ServerModule,
connection_reference = {TransportType, connected }, req = Req, caller = Caller,
index = Index, sup = Sup, polling_duration = Interval} = State) ->
gen_server:reply(Caller, send_message(Message, Req, Index, ServerModule, Sup)),
{noreply, State#state{ connection_reference = {TransportType, none}}, Interval};
{noreply, State#state{ connection_reference = {TransportType, none},
polling_duration = reset_duration(Interval) }};

handle_cast(_, State) ->
{noreply, State}.
Expand All @@ -199,7 +209,7 @@ handle_info({'EXIT',Port,_Reason}, #state{ connection_reference = {TransportType
{noreply, State#state { connection_reference = {TransportType, none}}, CloseTimeout};

%% Connection has timed out
handle_info(timeout, #state{ server_module = ServerModule,
handle_info({timeout, _Ref, polling}, #state{ server_module = ServerModule,
connection_reference = {_TransportType, connected},
caller = Caller, req = Req, index = Index, sup = Sup } = State) ->
gen_server:reply(Caller, send_message("", Req, Index, ServerModule, Sup)),
Expand Down Expand Up @@ -301,3 +311,8 @@ cors_headers(Headers, Sup) ->
{false, Headers}
end
end.

reset_duration({TimerRef, Time}) ->
erlang:cancel_timer(TimerRef),
NewRef = erlang:start_timer(Time, self(), polling),
{NewRef, Time}.

0 comments on commit c4bb8ea

Please sign in to comment.