Permalink
Browse files

comet server example

  • Loading branch information...
1 parent a921993 commit 759b49d546e6cec460c5c182f4cdc28ab98aa756 Jake Donham committed Jul 25, 2010
@@ -0,0 +1,9 @@
+all: myocamlbuild.ml
+ ocamlbuild clicks.js server.byte
+
+clean:
+ ocamlbuild -clean
+ rm -f myocamlbuild.ml
+
+myocamlbuild.ml:
+ ln -s ../../tools/myocamlbuild.ml
@@ -0,0 +1,7 @@
+You need ocamljs and cohttpserver for this example. Run the server with
+
+ ./server.byte
+
+then point your browser at
+
+ http://localhost:9007/
@@ -0,0 +1,13 @@
+<proto.ml> : pkg_lwt
+<comet.ml> : pkg_lwt
+<proto_js_clnt.ml*> : pkg_lwt,pkg_orpc-js-client
+<comet_js_comet_clnt.ml*> : pkg_lwt,pkg_orpc-js-client
+<clicks.ml> : pkg_lwt,pkg_orpc-js-client,pkg_dom
+<clicks.js> : pkg_lwt,pkg_orpc-js-client,pkg_dom
+
+<proto_js_srv.ml*> : pkg_lwt,pkg_nethttpd,pkg_orpc-js-server
+<proto_js_aux.ml*> : pkg_orpc-js-server
+<comet_js_aux.ml*> : pkg_orpc-js-server
+<server.ml> : syntax_camlp4o,pkg_lwt.syntax,pkg_cohttpserver,pkg_orpc-js-server,pkg_lwt.unix
+<server.byte> : pkg_cohttpserver,pkg_orpc-js-server,pkg_lwt.unix
+
@@ -0,0 +1,29 @@
+let (>>=) = Lwt.(>>=)
+
+class type console =
+object
+ method log : string -> unit
+end
+
+let console : console = Ocamljs.var "console"
+
+;;
+
+Dom.window#_set_onload begin fun () ->
+ let client = Orpc_js_client.create "/clicks" in
+
+ let clicks = (Dom.document#getElementById "clicks" : Dom.span) in
+ let set_clicks n = clicks#_set_innerHTML (string_of_int n) in
+
+ let module M = Comet_js_comet_clnt.Sync(struct let set_clicks = set_clicks end) in
+ M.bind client;
+
+ let module Server = Proto_js_clnt.Lwt(struct let with_client f = f client end) in
+
+ ignore(Server.clicks () >>= fun n -> set_clicks n; Lwt.return ());
+
+ let click = (Dom.document#getElementById "click" : Dom.button) in
+ click#_set_onclick (fun _ ->
+ ignore(Server.click ());
+ false)
+end
@@ -0,0 +1,9 @@
+module type Sync =
+sig
+ val set_clicks : int -> unit
+end
+
+module type Lwt =
+sig
+ val set_clicks : int -> unit Lwt.t
+end
@@ -0,0 +1,10 @@
+<html>
+ <head>
+ <title>Clicks</title>
+ </head>
+ <body>
+ <p>The button has been clicked <span id="clicks"></span> times.</p>
+ <p><button type="button" id="click">Click</button></p>
+ <script src="_build/clicks.js"></script>
+ </body>
+</html>
@@ -0,0 +1,11 @@
+module type Sync =
+sig
+ val clicks : unit -> int
+ val click : unit -> unit
+end
+
+module type Lwt =
+sig
+ val clicks : unit -> int Lwt.t
+ val click : unit -> unit Lwt.t
+end
@@ -0,0 +1,27 @@
+netplex {
+
+ service {
+ name = "clicks";
+ protocol {
+ name = "http/clicks";
+ address { type = "internet"; bind = "192.168.206.129:9007"; };
+ };
+ processor {
+ type = "nethttpd";
+ host {
+ names = "*:0";
+ uri { path = "/clicks"; service { type = "dynamic"; handler = "clicks" }};
+ uri {
+ path = "/";
+ service {
+ type = "file";
+ docroot = "/home/jake/github/orpc/examples/clicks";
+ index_files = "index.html";
+ media_type { suffix = "html"; type = "text/html"; };
+ }
+ };
+ };
+ };
+ workload_manager { type = "constant"; threads = 1; };
+ };
+}
@@ -0,0 +1,197 @@
+open Cohttp
+open Cohttpserver
+
+type conn = {
+ conn_id : Http_daemon.conn_id;
+ reqs : (unit Lwt.u * Lwt_io.output_channel Lwt.t) Queue.t;
+}
+
+type session = {
+ session_id : string;
+ mutable txn_id : int;
+ pending_calls : (int, Orpc_js_server.obj Lwt.u) Hashtbl.t;
+ mutable conn : conn option;
+}
+
+let send sess msg =
+ (* XXX don't send session_id if req already had the right one *)
+ let msg = { Orpc_js_server.m_session_id = Some sess.session_id; msg = msg; } in
+ let body = Orpc_js_server.string_of_msg msg in
+ match sess.conn with
+ | None ->
+ Http_common.debug_print ("sending to " ^ sess.session_id ^ ", no conn");
+ () (* XXX queue send until reconnect or timeout *)
+ | Some conn ->
+ Http_common.debug_print ("sending to " ^ sess.session_id ^ ", conn_id " ^ Http_daemon.string_of_conn_id conn.conn_id);
+ try
+ let (u, out) = Queue.take conn.reqs in
+ ignore (lwt () = Http_daemon.respond ~body out in Lwt.wakeup u (); Lwt.return ())
+ with Queue.Empty ->
+ Http_common.debug_print ("no more reqs sending to " ^ sess.session_id ^ ", conn_id " ^ Http_daemon.string_of_conn_id conn.conn_id)
+
+module Comet_js_comet_srv (S : sig val session : session end) =
+struct
+ let set_clicks n =
+ let txn_id = S.session.txn_id in
+ S.session.txn_id <- S.session.txn_id + 1;
+ let t, u = Lwt.wait () in
+ Hashtbl.add S.session.pending_calls txn_id u;
+ send S.session (Orpc_js_server.Call (txn_id, "set_clicks", Comet_js_aux.of_set_clicks'arg n));
+ lwt o = t in
+ try Lwt.return (Comet_js_aux.to_set_clicks'res o)
+ with e -> Lwt.fail e
+end
+
+(* XXX expire sessions *)
+let sessions_by_session_id = Hashtbl.create 17
+let sessions_by_conn_id = Hashtbl.create 17
+
+module Server =
+struct
+ let n = ref 0
+
+ let clicks () = !n
+
+ let click () =
+ incr n;
+ Hashtbl.iter
+ (fun _ sess ->
+ let module M = Comet_js_comet_srv (struct let session = sess end) in
+ ignore (M.set_clicks !n))
+ sessions_by_session_id
+end
+
+let new_session conn_id =
+ let sess = {
+ txn_id = 0;
+ session_id = string_of_int (Random.int max_int); (* XXX *)
+ pending_calls = Hashtbl.create 17;
+ conn = Some { conn_id = conn_id; reqs = Queue.create (); }
+ } in
+ Hashtbl.replace sessions_by_session_id sess.session_id sess;
+ Hashtbl.replace sessions_by_conn_id conn_id sess;
+ sess
+
+let clicks conn_id req out =
+ let body = Http_request.body req in
+ lwt body_string = Http_message.string_of_body body in
+ let msg = Orpc_js_server.msg_of_string body_string in
+
+ let sess =
+ match msg.Orpc_js_server.m_session_id with
+ | None ->
+ begin
+ try
+ (* additional request before reply with session_id is received? *)
+ let sess = Hashtbl.find sessions_by_conn_id conn_id in
+ Http_common.debug_print ("no session_id but found conn_id " ^ Http_daemon.string_of_conn_id conn_id);
+ sess
+ with Not_found ->
+ (* new connection, new session *)
+ Http_common.debug_print ("no session_id, new session for conn_id " ^ Http_daemon.string_of_conn_id conn_id);
+ new_session conn_id
+ end
+ | Some session_id ->
+ begin
+ try
+ let sess = Hashtbl.find sessions_by_session_id session_id in
+ match sess.conn with
+ | None ->
+ Http_common.debug_print ("session_id " ^ session_id ^ ", reconnect for conn_id " ^ Http_daemon.string_of_conn_id conn_id);
+ (* conn was closed, this is reconnect *)
+ sess.conn <- Some { conn_id = conn_id; reqs = Queue.create (); };
+ Hashtbl.replace sessions_by_conn_id conn_id sess;
+ sess
+ | Some conn when conn.conn_id <> conn_id ->
+ Http_common.debug_print ("session_id " ^ session_id ^ ", extra connect with conn_id " ^ Http_daemon.string_of_conn_id conn_id);
+ (* new connection for session which is already connected. drop the old one *)
+ sess.conn <- Some { conn_id = conn_id; reqs = Queue.create (); };
+ Hashtbl.remove sessions_by_conn_id conn.conn_id;
+ Hashtbl.replace sessions_by_conn_id conn_id sess;
+ sess
+ | _ ->
+ Http_common.debug_print ("session_id " ^ session_id ^ ", conn_id " ^ Http_daemon.string_of_conn_id conn_id);
+ (* request on existing connection *)
+ sess
+ with Not_found ->
+ Http_common.debug_print ("dead session_id " ^ session_id ^ ", new session for conn_id " ^ Http_daemon.string_of_conn_id conn_id);
+ (* connection for dead session. *)
+ new_session conn_id
+ end in
+
+ let t, u = Lwt.wait () in
+ begin match sess.conn with
+ | None -> assert false
+ | Some conn -> Queue.add (u, out) conn.reqs
+ end;
+
+ begin match msg.Orpc_js_server.msg with
+ | Orpc_js_server.Noop ->
+ Http_common.debug_print "got Noop";
+ ()
+ | Orpc_js_server.Call (txn_id, proc, arg) ->
+ Http_common.debug_print "got Call";
+ begin
+ try
+ let proc =
+ match proc with
+ | "click" -> (fun x0 -> Proto_js_aux.of_click'res (Server.click (Proto_js_aux.to_click'arg x0)))
+ | "clicks" -> (fun x0 -> Proto_js_aux.of_clicks'res (Server.clicks (Proto_js_aux.to_clicks'arg x0)))
+ | _ -> raise (Invalid_argument "bad proc") in
+ try send sess (Orpc_js_server.Res (txn_id, proc arg))
+ with e -> send sess (Orpc_js_server.Fail (txn_id, Printexc.to_string e))
+ with e -> send sess (Orpc_js_server.Fail (txn_id, Printexc.to_string e))
+ end
+ | Orpc_js_server.Res (txn_id, res) ->
+ Http_common.debug_print "got Res";
+ begin
+ try
+ Lwt.wakeup (Hashtbl.find sess.pending_calls txn_id) res;
+ Hashtbl.remove sess.pending_calls txn_id
+ with Not_found -> ()
+ end
+ | Orpc_js_server.Fail (txn_id, s) ->
+ Http_common.debug_print "got Fail";
+ begin
+ try
+ Lwt.wakeup_exn (Hashtbl.find sess.pending_calls txn_id) (Failure s);
+ Hashtbl.remove sess.pending_calls txn_id
+ with Not_found -> ()
+ end
+ end;
+ t
+
+let callback conn_id req out =
+ match Http_request.path req with
+ | "/" -> Http_daemon.respond_file ~fname:"index.html" ~mime_type:"text/html" out
+ | "/_build/clicks.js" -> Http_daemon.respond_file ~fname:"_build/clicks.js" ~mime_type:"application/javascript" out
+ | "/clicks" -> clicks conn_id req out
+ | url -> Http_daemon.respond ~body:("not found: " ^ url) out
+
+let conn_closed conn_id =
+ try
+ let sess = Hashtbl.find sessions_by_conn_id conn_id in
+ sess.conn <- None;
+ Hashtbl.remove sessions_by_conn_id conn_id
+ with Not_found -> ()
+
+let exn_handler exn out = Lwt.return ()
+
+let main () =
+ Http_common.debug := true;
+ let spec = {
+ Http_daemon.address = "192.168.206.129";
+ auth = `None;
+ callback = callback;
+ conn_closed = conn_closed;
+ port = 9007;
+ root_dir = None;
+ exn_handler = exn_handler;
+ timeout = None;
+ auto_close = true;
+ } in
+ Lwt_main.run (Http_daemon.main spec)
+
+;;
+
+main ()

0 comments on commit 759b49d

Please sign in to comment.