Skip to content
Browse files

First draft of a websocket-based socket.io server

* Includes changes to parser
* Includes encoder
* Heartbeat is not working right yet
  • Loading branch information...
1 parent 3b5f464 commit 9d40a5614a5fa04d67678d7c67cadd1062b0e337 @yrashk committed with Feb 24, 2011
View
2 .gitignore
@@ -1,5 +1,5 @@
ebin/socketio.app
ebin/*.beam
-deps/misultin
+deps
.eunit
.agner.config
View
4 .gitmodules
@@ -1,3 +1,3 @@
-[submodule "deps/Socket.IO"]
- path = deps/Socket.IO
+[submodule "priv/Socket.IO"]
+ path = priv/Socket.IO
url = https://github.com/LearnBoost/Socket.IO.git
View
22 demo/demo.erl
@@ -1,22 +1,22 @@
#! /usr/bin/env escript
-%%! -pa ../ebin ../deps/misultin/ebin
+%%! -pa ../ebin ../deps/misultin/ebin ../deps/ossp_uuid/ebin ../deps/jsx/ebin
-mode(compile).
+-include_lib("../include/socketio.hrl").
+-export([handle_request/2, handle_message/3]).
main(_) ->
- misultin:start_link([{port, 7878}, {loop, fun(Req) -> handle_http(Req) end}]),
+ application:start(socketio),
+ socketio_http:start(7878, ?MODULE, ?MODULE),
receive _ -> ok end.
-handle_http(Req) ->
- handle_http_1(Req:get(uri), Req).
-
-handle_http_1({abs_path, "/socket.io.js"}, Req) ->
- Req:file(filename:join([filename:dirname(code:which(?MODULE)), "..", "deps", "Socket.IO", "socket.io.js"]));
-handle_http_1({abs_path, "/"}, Req) ->
+handle_request({abs_path, "/"}, Req) ->
Req:file(filename:join([filename:dirname(code:which(?MODULE)), "index.html"]));
-handle_http_1({abs_path, Path}, Req) ->
- io:format("~p~n", [{Path, Req}]),
+handle_request({abs_path, _Path}, Req) ->
Req:respond(200).
-
+handle_message(Server, SessionID, #msg{ content = Content } = Msg) ->
+ io:format("Got a message: ~p from ~p~n",[Msg, SessionID]),
+ gen_server:call(Server, {send, SessionID, #msg{ content = "hello!" }}),
+ gen_server:call(Server, {send, SessionID, #msg{ content = [{<<"echo">>, Content}], json = true }}).
View
21 demo/index.html
@@ -5,17 +5,18 @@
</script>
<script type="text/javascript">
- function init() {
- socket = new io.Socket('localhost');
- socket.connect();
- socket.on('connect', function(){
- console.log("Imma connectad!");
- });
- socket.on('message', function(data){
- console.log("Imma haz data: " + data);
- });
+ function init() {
+ socket = new io.Socket('localhost');
+ socket.on('message', function(data){
+ console.log(data);
+ });
+ socket.on('connect', function(){
+ console.log("Imma connectad!");
+ console.log(socket);
+ socket.send({msg: "Erlang rulez!"});
- socket.send('some data');
+ });
+ socket.connect();
}
</script>
</head>
View
11 include/socketio.hrl
@@ -0,0 +1,11 @@
+-record(msg,
+ {
+ content = [],
+ json = false,
+ length = 0
+ }).
+
+-record(heartbeat,
+ {
+ index
+ }).
View
0 deps/Socket.IO → priv/Socket.IO
File renamed without changes.
View
4 src/socketio.app.src
@@ -3,11 +3,11 @@
{description, ""},
{vsn, "1"},
{registered, []},
- {agner, [{requires, ["misultin"]}]},
+ {agner, [{requires, ["misultin","ossp_uuid","jsx"]}]},
{applications, [
kernel,
stdlib
]},
{mod, { socketio_app, []}},
- {env, []}
+ {env, [ {heartbeat_interval, 10000} ]}
]}.
View
50 src/socketio_data.erl
@@ -1,5 +1,6 @@
-module(socketio_data).
--export([parse/2, string_reader/2]).
+-include_lib("socketio.hrl").
+-export([encode/1, parse/2, string_reader/2]).
-record(parser,
{
@@ -14,17 +15,6 @@
buf = []
}).
--record(msg,
- {
- content = [],
- json = false,
- length = 0
- }).
-
--record(hearbeat,
- {
- index
- }).
%% Not sure if session messages are any different. All i know is they're the first message
%% that arrives from the client.
@@ -33,6 +23,20 @@
%% id
%% }).
+encode(#msg{ content = Content, json = false }) when is_list(Content) ->
+ Length = integer_to_list(length(Content)),
+ "~m~" ++ Length ++ "~m~" ++ Content;
+
+encode(#msg{ content = Content, json = true }) ->
+ JSON = binary_to_list(jsx:term_to_json(Content)),
+ Length = integer_to_list(length(JSON) + 3),
+ "~m~" ++ Length ++ "~m~~j~" ++ JSON;
+
+encode(#heartbeat{ index = Index }) ->
+ String = integer_to_list(Index),
+ Length = integer_to_list(length(String) + 3),
+ "~m~" ++ Length ++ "~m~~h~" ++ String.
+
parse(Reader, Fun) when is_function(Reader), is_function(Fun) ->
parse(#parser{ reader_fun = Reader, f = Fun }).
@@ -62,7 +66,7 @@ parse_1(#parser{ expr = {message, _Length, controller}, buf = [$m|T] } = Parser)
parse_1(Parser#parser{ expr = {message, _Length, controller}, buf = T});
% Message type
-parse_1(#parser{ expr = {message, _, controller}, buf = [$h|T], acc = Acc} = Parser) -> % hearbeat
+parse_1(#parser{ expr = {message, _, controller}, buf = [$h|T], acc = Acc} = Parser) -> % heartbeat
parse_1(Parser#parser{ expr = {message, 0, 1, body}, buf = T, acc = Acc});
parse_1(#parser{ expr = {message, Length, controller}, buf = [$j|T], acc = Acc} = Parser) -> % json
parse_1(Parser#parser{ expr = {message, Length, 2, body}, buf = T, acc = Acc});
@@ -80,9 +84,9 @@ message(#parser { expr = {message, Length, 0}, acc = Acc}) ->
#msg{ content = lists:reverse(Acc), length = Length };
message(#parser{ expr = {message, 0, 1}, acc = Acc}) ->
{Index, _} = string:to_integer(lists:reverse(Acc)),
- #hearbeat{ index = Index };
+ #heartbeat{ index = Index };
message(#parser{ expr = {message, Length, 2}, acc = Acc}) ->
- #msg{ content = lists:reverse(Acc), json = true, length = Length }.
+ #msg{ content = jsx:json_to_term(list_to_binary(lists:reverse(Acc))), json = true, length = Length }.
string_reader(#parser{ reader = undefined } = Parser, String) ->
{String, Parser#parser{ reader = String }};
@@ -108,13 +112,23 @@ complex_msg_test() ->
simple_heartbeat_test() ->
parse(fun (Parser) -> string_reader(Parser, "~m~1~m~h~0") end, fun (X) -> self() ! X end),
receive X ->
- ?assertMatch(#hearbeat{ index = 0 }, X)
+ ?assertMatch(#heartbeat{ index = 0 }, X)
end.
simple_json_test() ->
- parse(fun (Parser) -> string_reader(Parser, "~m~17~m~j~{\"hello\":\"world\"}") end, fun (X) -> self() ! X end),
+ parse(fun (Parser) -> string_reader(Parser, "~m~20~m~~j~{\"hello\":\"world\"}") end, fun (X) -> self() ! X end),
+ receive X ->
+ ?assertMatch(#msg{ content = [{<<"hello">>,<<"world">>}], json = true, length = _ }, X)
+ end.
+
+json_encoding_test() ->
+ JSON = [{<<"hello">>,<<"world">>}],
+ Msg = #msg{ content = JSON, json = true, length = 17 },
+ Data = encode(Msg),
+ parse(fun (Parser) -> string_reader(Parser, Data) end, fun (X) -> self() ! X end),
receive X ->
- ?assertMatch(#msg{ content = "{\"hello\":\"world\"}", json = true, length = 17 }, X)
+ ?assertMatch(#msg{ content = JSON, json = true, length = _ }, X)
end.
+
-endif.
View
0 src/socketio_data_v1.erl → src/socketio_data_v1.erl.skip
File renamed without changes.
View
222 src/socketio_http.erl
@@ -0,0 +1,222 @@
+-module(socketio_http).
+-include_lib("socketio.hrl").
+-behaviour(gen_server).
+
+%% API
+-export([start_link/3, start/3]).
+%% Internal API
+-export([heartbeat/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {
+ default_http_handler,
+ message_handler,
+ sessions
+ }).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+start_link(Port, DefaultHttpHandler, MessageHandler) ->
+ gen_server:start_link(?MODULE, [Port, DefaultHttpHandler, MessageHandler], []).
+
+start(Port, DefaultHttpHandler, MessageHandler) ->
+ supervisor:start_child(socketio_http_sup, [Port, DefaultHttpHandler, MessageHandler]).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+init([Port, DefaultHttpHandler, MessageHandler]) ->
+ Self = self(),
+ misultin:start_link([{port, Port},
+ {loop, fun (Req) -> handle_http(Self, Req) end},
+ {ws_loop, fun (Ws) -> handle_websocket(Self, Ws) end},
+ {ws_autoexit, false}
+ ]),
+ {ok, #state{
+ default_http_handler = DefaultHttpHandler,
+ message_handler = MessageHandler,
+ sessions = ets:new(socketio_sessions,[public])
+ }}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @spec handle_call(Request, From, State) ->
+%% {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_call({request, {abs_path, "/socket.io.js"}, Req}, _From, State) ->
+ Response = Req:file(filename:join([filename:dirname(code:which(?MODULE)), "..", "priv", "Socket.IO", "socket.io.js"])),
+ {reply, Response, State};
+
+%% If we can't route it, let others deal with it
+handle_call({request, _, _} = Req, From, #state{ default_http_handler = HttpHandler } = State) when is_atom(HttpHandler) ->
+ handle_call(Req, From, State#state{ default_http_handler = fun(P1, P2) -> HttpHandler:handle_request(P1, P2) end });
+
+handle_call({request, Path, Req}, _From, #state{ default_http_handler = HttpHandler } = State) when is_function(HttpHandler) ->
+ Response = HttpHandler(Path, Req),
+ {reply, Response, State};
+
+%% Websockets
+handle_call({websocket, _Data, _SessionID, _Ws} = Req, From, #state{ message_handler = Handler} = State) when is_atom(Handler) ->
+ handle_call(Req, From, State#state{ message_handler = fun(P1, P2, P3) -> Handler:handle_message(P1, P2, P3) end });
+
+handle_call({websocket, Data, SessionID, _Ws}, _From, #state{ message_handler = Handler} = State) when is_function(Handler) ->
+ Self = self(),
+ spawn_link(fun () ->
+ socketio_data:parse(fun (Parser) -> socketio_data:string_reader(Parser, Data) end,
+ fun (#heartbeat{}) ->
+ ignore; %% FIXME: we should actually reply
+ (M) -> Handler(Self, SessionID, M) end)
+ end),
+ {reply, ok, State};
+
+handle_call({websocket, _}, _From, State) ->
+ {reply, ok, State};
+
+%% Sessions
+handle_call({session, generate, ConnectionReference}, _From, #state{ sessions = Sessions } = State) ->
+ UUID = binary_to_list(ossp_uuid:make(v4, text)),
+ ets:insert(Sessions, {UUID, ConnectionReference}),
+ case application:get_env(heartbeat_interval) of
+ {ok, Time} ->
+ timer:apply_after(Time, ?MODULE, heartbeat, [ConnectionReference, 0]);
+ _ ->
+ ignore
+ end,
+ {reply, UUID, State};
+
+handle_call({session, expire, SessionID}, _From, #state{ sessions = Sessions } = State) ->
+ ets:delete(Sessions, SessionID),
+ {reply, ok, State};
+
+%% Send
+handle_call({send, SessionID, #msg{} = Message}, _From, #state{ sessions = Sessions } = State) ->
+ case ets:lookup(Sessions, SessionID) of
+ [{SessionID, ConnectionReference}] ->
+ handle_send(ConnectionReference, Message),
+ {reply, ok, State};
+ [] ->
+ {reply, {error, invalid_session}, State}
+ end.
+
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+handle_http(Server, Req) ->
+ gen_server:call(Server, {request, Req:get(uri), Req}).
+
+handle_websocket(Server, Ws) ->
+ SessionID = gen_server:call(Server, {session, generate, {websocket, Ws}}),
+ ok = gen_server:call(Server, {send, SessionID, #msg{ content = SessionID }}),
+ handle_websocket(Server, Ws, SessionID).
+
+handle_websocket(Server, Ws, SessionID) ->
+ receive
+ {browser, Data} ->
+ gen_server:call(Server, {websocket, Data, SessionID, Ws}),
+ handle_websocket(Server, Ws, SessionID);
+ closed ->
+ gen_server:call(Server, {session, expire, SessionID});
+ _Ignore ->
+ handle_websocket(Server, Ws, SessionID)
+ end.
+
+handle_send({websocket, Ws}, Message) ->
+ Ws:send(socketio_data:encode(Message)).
+
+heartbeat(ConnectionReference, Beats) ->
+ Beats1 = Beats + 1,
+ handle_send(ConnectionReference, #heartbeat{ index = Beats1 }),
+ case application:get_env(heartbeat_interval) of
+ {ok, Time} ->
+ timer:apply_after(Time, ?MODULE, heartbeat, [ConnectionReference, Beats1]);
+ _ ->
+ ignore
+ end.
+
View
30 src/socketio_http_sup.erl
@@ -0,0 +1,30 @@
+-module(socketio_http_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+ {ok, { {simple_one_for_one, 5, 10}, [
+ {socketio_http, {socketio_http, start_link, []},
+ permanent, 5000, worker, [socketio_http]}
+ ]} }.
+
View
5 src/socketio_sup.erl
@@ -24,5 +24,8 @@ start_link() ->
%% ===================================================================
init([]) ->
- {ok, { {one_for_one, 5, 10}, []} }.
+ {ok, { {one_for_one, 5, 10}, [
+ {socketio_http_sup, {socketio_http_sup, start_link, []},
+ permanent, infinity, supervisor, [socketio_http_sup]}
+ ]} }.

0 comments on commit 9d40a56

Please sign in to comment.
Something went wrong with that request. Please try again.