Permalink
Browse files

First go at multiplexing, the erlang fashion

  • Loading branch information...
1 parent 977e1a4 commit 379ceb955db1c76462163969ddbf37c996ad8b90 @majek majek committed Mar 1, 2012
@@ -0,0 +1,86 @@
+#!/usr/bin/env escript
+%%! -smp disable +A1 +K true -pa ebin deps/cowboy/ebin -input
+-module(cowboy_multiplex).
+-mode(compile).
+
+-export([main/1]).
+
+%% Cowboy callbacks
+-export([init/3, handle/2, terminate/2]).
+
+main(_) ->
+ Port = 8081,
+ application:start(sockjs),
+ application:start(cowboy),
+
+ MultiplexState = sockjs_multiplex:init_state(
+ [{"ann", fun service_ann/3, []},
+ {"bob", fun service_bob/3, []},
+ {"carl", fun service_carl/3, []}]),
+
+ SockjsState = sockjs_handler:init_state(
+ <<"/multiplex">>, sockjs_multiplex, MultiplexState, []),
+
+ VhostRoutes = [{[<<"multiplex">>, '...'], sockjs_cowboy_handler, SockjsState},
+ {'_', ?MODULE, []}],
+ Routes = [{'_', VhostRoutes}], % any vhost
+
+ io:format(" [*] Running at http://localhost:~p~n", [Port]),
+ cowboy:start_listener(http, 100,
+ cowboy_tcp_transport, [{port, Port}],
+ cowboy_http_protocol, [{dispatch, Routes}]),
+ receive
+ _ -> ok
+ end.
+
+%% --------------------------------------------------------------------------
+
+init({_Any, http}, Req, []) ->
+ {ok, Req, []}.
+
+handle(Req, State) ->
+ {Path, Req1} = cowboy_http_req:path(Req),
+ {ok, Req2} = case Path of
+ [<<"multiplex.js">>] ->
+ {ok, Data} = file:read_file("./examples/multiplex/multiplex.js"),
+ cowboy_http_req:reply(200, [{<<"Content-Type">>, "application/javascript"}],
+ Data, Req1);
+ [] ->
+ {ok, Data} = file:read_file("./examples/multiplex/index.html"),
+ cowboy_http_req:reply(200, [{<<"Content-Type">>, "text/html"}],
+ Data, Req1);
+ _ ->
+ cowboy_http_req:reply(404, [],
+ <<"404 - Nothing here\n">>, Req1)
+ end,
+ {ok, Req2, State}.
+
+terminate(_Req, _State) ->
+ ok.
+
+%% --------------------------------------------------------------------------
+
+service_ann(Conn, init, State) ->
+ Conn:send("Ann says hi!"),
+ {ok, State};
+service_ann(Conn, {recv, Data}, State) ->
+ Conn:send(["Ann nods: ", Data]),
+ {ok, State};
+service_ann(_Conn, closed, State) ->
+ {ok, State}.
+
+service_bob(Conn, init, State) ->
+ Conn:send("Bob doesn't agree."),
+ {ok, State};
+service_bob(Conn, {recv, Data}, State) ->
+ Conn:send(["Bob says no to: ", Data]),
+ {ok, State};
+service_bob(_Conn, closed, State) ->
+ {ok, State}.
+
+service_carl(Conn, init, State) ->
+ Conn:send("Carl says goodbye!"),
+ Conn:close(),
+ {ok, State};
+service_carl(_Conn, _, State) ->
+ {ok, State}.
@@ -0,0 +1,96 @@
+<!doctype html>
+<html><head>
+ <script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
+ <script src="http://cdn.sockjs.org/sockjs-0.2.min.js"></script>
+ <script src="multiplex.js"></script>
+ <style>
+ .box {
+ width: 300px;
+ float: left;
+ margin: 0 20px 0 20px;
+ }
+ .box div, .box input {
+ border: 1px solid;
+ -moz-border-radius: 4px;
+ border-radius: 4px;
+ width: 100%;
+ padding: 0px;
+ margin: 5px;
+ }
+ .box div {
+ border-color: grey;
+ height: 300px;
+ overflow: auto;
+ }
+ .box input {
+ height: 30px;
+ }
+ h1 {
+ margin-left: 75px;
+ }
+ body {
+ background-color: #F0F0F0;
+ font-family: "Arial";
+ }
+ </style>
+<head><body lang="en">
+ <h1>SockJS Multiplex example</h1>
+
+ <div id="first" class="box">
+ <div></div>
+ <form><input autocomplete="off" value="Type here..."></input></form>
+ </div>
+
+ <div id="second" class="box">
+ <div></div>
+ <form><input autocomplete="off"></input></form>
+ </div>
+
+ <div id="third" class="box">
+ <div></div>
+ <form><input autocomplete="off"></input></form>
+ </div>
+
+ <script>
+ // Pipe - convenience wrapper to present data received from an
+ // object supporting WebSocket API in an html element. And the other
+ // direction: data typed into an input box shall be sent back.
+ var pipe = function(ws, el_name) {
+ var div = $(el_name + ' div');
+ var inp = $(el_name + ' input');
+ var form = $(el_name + ' form');
+
+ var print = function(m, p) {
+ p = (p === undefined) ? '' : JSON.stringify(p);
+ div.append($("<code>").text(m + ' ' + p));
+ div.append($("<br>"));
+ div.scrollTop(div.scrollTop() + 10000);
+ };
+
+ ws.onopen = function() {print('[*] open', ws.protocol);};
+ ws.onmessage = function(e) {print('[.] message', e.data);};
+ ws.onclose = function() {print('[*] close');};
+
+ form.submit(function() {
+ print('[ ] sending', inp.val());
+ ws.send(inp.val());
+ inp.val('');
+ return false;
+ });
+ };
+
+ var sockjs_url = '/multiplex';
+ var sockjs = new SockJS(sockjs_url);
+
+ var multiplexer = new MultiplexedWebSocket(sockjs);
+ var ann = multiplexer.channel('ann');
+ var bob = multiplexer.channel('bob');
+ var carl = multiplexer.channel('carl');
+
+ pipe(ann, '#first');
+ pipe(bob, '#second');
+ pipe(carl, '#third');
+
+ $('#first input').focus();
+ </script>
+</body></html>
@@ -0,0 +1,80 @@
+// ****
+
+var DumbEventTarget = function() {
+ this._listeners = {};
+};
+DumbEventTarget.prototype._ensure = function(type) {
+ if(!(type in this._listeners)) this._listeners[type] = [];
+};
+DumbEventTarget.prototype.addEventListener = function(type, listener) {
+ this._ensure(type);
+ this._listeners[type].push(listener);
+};
+DumbEventTarget.prototype.emit = function(type) {
+ this._ensure(type);
+ var args = Array.prototype.slice.call(arguments, 1);
+ if(this['on' + type]) this['on' + type].apply(this, args);
+ for(var i=0; i < this._listeners[type].length; i++) {
+ this._listeners[type][i].apply(this, args);
+ }
+};
+
+
+// ****
+
+var MultiplexedWebSocket = function(ws) {
+ var that = this;
+ this.ws = ws;
+ this.channels = {};
+ this.ws.addEventListener('message', function(e) {
+ var t = e.data.split(',', 3);
+ var type = t[0], name = t[1], payload = t[2];
+ if(!(name in that.channels)) {
+ return;
+ }
+ var sub = that.channels[name];
+
+ switch(type) {
+ case 'uns':
+ delete that.channels[name];
+ sub.emit('close', {});
+ break;
+ case 'msg':
+ sub.emit('message', {data: payload});
+ break
+ }
+ });
+};
+MultiplexedWebSocket.prototype.channel = function(raw_name) {
+ return this.channels[escape(raw_name)] =
+ new Channel(this.ws, escape(raw_name), this.channels);
+};
+
+
+var Channel = function(ws, name, channels) {
+ DumbEventTarget.call(this);
+ var that = this;
+ this.ws = ws;
+ this.name = name;
+ this.channels = channels;
+ var onopen = function() {
+ that.ws.send('sub,' + that.name);
+ that.emit('open');
+ };
+ if(ws.readyState > 0) {
+ setTimeout(onopen, 0);
+ } else {
+ this.ws.addEventListener('open', onopen);
+ }
+};
+Channel.prototype = new DumbEventTarget()
+
+Channel.prototype.send = function(data) {
+ this.ws.send('msg,' + this.name + ',' + data);
+};
+Channel.prototype.close = function() {
+ var that = this;
+ this.ws.send('uns,' + this.name);
+ delete this.channels[this.name];
+ setTimeout(function(){that.emit('close', {})},0);
+};
@@ -0,0 +1,79 @@
+-module(sockjs_multiplex).
+
+-behaviour(sockjs_service).
+
+-export([init_state/1]).
+-export([sockjs_init/2, sockjs_handle/3, sockjs_terminate/2]).
+
+-record(service, {callback, state, vconn}).
+
+%% --------------------------------------------------------------------------
+
+init_state(Services) ->
+ L = [{Topic, #service{callback = Callback, state = State}} ||
+ {Topic, Callback, State} <- Services],
+ {orddict:from_list(L), orddict:new()}.
+
+
+
+sockjs_init(_Conn, {_Services, _Channels} = S) ->
+ {ok, S}.
+
+sockjs_handle(Conn, Data, {Services, Channels}) ->
+ [Type, Topic, Payload] = split($,, binary_to_list(Data), 3),
+ case orddict:find(Topic, Services) of
+ {ok, Service} ->
+ Channels1 = action(Conn, {Type, Topic, Payload}, Service, Channels),
+ {ok, {Services, Channels1}};
+ _Else ->
+ {ok, {Services, Channels}}
+ end.
+
+sockjs_terminate(_Conn, {Services, Channels}) ->
+ _ = [ {emit(closed, Channel)} ||
+ {_Topic, Channel} <- orddict:to_list(Channels) ],
+ {ok, {Services, orddict:new()}}.
+
+
+action(Conn, {Type, Topic, Payload}, Service, Channels) ->
+ case {Type, orddict:is_key(Topic, Channels)} of
+ {"sub", false} ->
+ Channel = Service#service{
+ vconn = sockjs_multiplex_channel:new(
+ Conn, Topic)
+ },
+ orddict:store(Topic, emit(init, Channel), Channels);
+ {"uns", true} ->
+ Channel = orddict:fetch(Topic, Channels),
+ emit(closed, Channel),
+ orddict:erase(Topic, Channels);
+ {"msg", true} ->
+ Channel = orddict:fetch(Topic, Channels),
+ orddict:store(Topic, emit({recv, Payload}, Channel), Channels);
+ _Else ->
+ %% Ignore
+ Channels
+ end.
+
+
+emit(What, Channel = #service{callback = Callback,
+ state = State,
+ vconn = VConn}) ->
+ case Callback(VConn, What, State) of
+ {ok, State1} -> Channel#service{state = State1};
+ ok -> Channel
+ end.
+
+
+%% --------------------------------------------------------------------------
+
+split(Char, Str, Limit) ->
+ Acc = split(Char, Str, Limit, []),
+ lists:reverse(Acc).
+split(_Char, _Str, 0, Acc) -> Acc;
+split(Char, Str, Limit, Acc) ->
+ {L, R} = case string:chr(Str, Char) of
+ 0 -> {Str, ""};
+ I -> {string:substr(Str, 1, I-1), string:substr(Str, I+1)}
+ end,
+ split(Char, R, Limit-1, [L | Acc]).
@@ -0,0 +1,16 @@
+-module(sockjs_multiplex_channel, [Conn, Topic]).
+
+-export([send/1, close/0, close/2, info/0]).
+
+send(Data) ->
+ Conn:send(iolist_to_binary(["msg", ",", Topic, ",", Data])).
+
+close() ->
+ close(1000, "Normal closure").
+
+close(_Code, _Reason) ->
+ Conn:send(iolist_to_binary(["uns", ",", Topic])).
+
+info() ->
+ Conn:info() ++ [{topic, Topic}].
+

0 comments on commit 379ceb9

Please sign in to comment.