Permalink
Browse files

Fixing timers in websockets

The current way of doing timers in websockets is fickle and bound to
error when it is reset on all messages we get. Just by sending a process
random messages, the timers get reset all the time.

This fix adds a different way of handling timers by using the erlang
module's start_timer/3 and cancel_timer/1 to get a safer
timeout/heartbeat behaviour.
  • Loading branch information...
1 parent 441eb19 commit cf5531942b15c495f7593358470cebf6f0907f71 @ferd ferd committed Jun 15, 2011
Showing with 24 additions and 15 deletions.
  1. +24 −15 src/socketio_transport_websocket.erl
@@ -70,7 +70,7 @@ init([Sup, SessionId, ServerModule, ConnectionReference]) ->
session_id = SessionId,
server_module = ServerModule,
connection_reference = ConnectionReference,
- heartbeat_interval = HeartbeatInterval,
+ heartbeat_interval = {make_ref(), HeartbeatInterval},
event_manager = EventMgr,
sup = Sup
}}.
@@ -92,14 +92,18 @@ init([Sup, SessionId, ServerModule, ConnectionReference]) ->
%% Websockets
handle_call({websocket, Data, _Ws}, _From, #state{ heartbeat_interval = Interval, event_manager = EventManager } = State) ->
- Self = self(),
- spawn_link(fun() ->
- F = fun (#heartbeat{}) -> ignore; %% FIXME: we should actually reply
- (M) -> gen_event:notify(EventManager, {message, Self, M})
- end,
- [F(Msg) || Msg <- socketio_data:decode(#msg{content=Data})]
- end),
- {reply, ok, State, Interval};
+ F = fun(#heartbeat{}, _Acc) ->
+ {timer, reset_interval(Interval)};
+ (M, Acc) ->
+ gen_event:notify(EventManager, {message, self(), M}),
+ Acc
+ end,
+ case lists:foldl(F, undefined, socketio_data:decode(#msg{content=Data})) of
+ {timer, NewInterval} ->
+ {reply, ok, State#state{ heartbeat_interval = NewInterval }};
+ undefined ->
+ {reply, ok, State}
+ end;
handle_call({websocket, _}, _From, State) ->
{reply, ok, State};
@@ -133,17 +137,18 @@ handle_call(stop, _From, State) ->
%%--------------------------------------------------------------------
%% Send
handle_cast({send, Message}, #state{ server_module = ServerModule,
- connection_reference = ConnectionReference, heartbeat_interval = Interval } = State) ->
+ connection_reference = ConnectionReference,
+ heartbeat_interval = Interval } = State) ->
handle_send(ConnectionReference, Message, ServerModule),
- {noreply, State, Interval};
+ {noreply, State#state{ heartbeat_interval = reset_interval(Interval) }};
handle_cast(heartbeat, #state{
server_module = ServerModule,
connection_reference = ConnectionReference, heartbeats = Beats,
heartbeat_interval = Interval } = State) ->
Beats1 = Beats + 1,
handle_send(ConnectionReference, #heartbeat{ index = Beats1 }, ServerModule),
- {noreply, State#state { heartbeats = Beats1 }, Interval}.
+ {noreply, State#state { heartbeats = Beats1, heartbeat_interval = reset_interval(Interval) }}.
%%--------------------------------------------------------------------
%% @private
@@ -155,9 +160,8 @@ handle_cast(heartbeat, #state{
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_info(timeout, State) ->
- gen_server:cast(self(), heartbeat),
- {noreply, State};
+handle_info({timeout, _Ref, heartbeat}, State) ->
+ handle_cast(heartbeat, State);
handle_info(_Info, State) ->
{noreply, State}.
@@ -192,3 +196,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
handle_send({websocket, Ws}, Message, ServerModule) ->
apply(ServerModule, websocket_send, [Ws, socketio_data:encode(Message)]).
+
+reset_interval({TimerRef, Time}) ->
+ erlang:cancel_timer(TimerRef),
+ NewRef = erlang:start_timer(Time, self(), heartbeat),
+ {NewRef, Time}.

0 comments on commit cf55319

Please sign in to comment.