Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

working comet server example

  • Loading branch information...
commit df404a509fb82a96f575cb5ce0da31e8e3b78efc 1 parent 858e288
Jake Donham authored
Showing with 121 additions and 121 deletions.
  1. +2 −8 examples/clicks-comet/clicks.ml
  2. +119 −113 examples/clicks-comet/server.ml
View
10 examples/clicks-comet/clicks.ml
@@ -1,12 +1,5 @@
let (>>=) = Lwt.(>>=)
-class type console =
-object
- method log : string -> unit
-end
-
-let console : console = Ocamljs.var "console"
-
;;
Dom.window#_set_onload begin fun () ->
@@ -20,7 +13,8 @@ Dom.window#_set_onload begin fun () ->
let module Server = Proto_js_clnt.Lwt(struct let with_client f = f client end) in
- ignore(Server.clicks () >>= fun n -> set_clicks n; Lwt.return ());
+ let on_connect _ = ignore(Server.clicks () >>= fun n -> set_clicks n; Lwt.return ()) in
+ Orpc_js_client.connect client on_connect;
let click = (Dom.document#getElementById "click" : Dom.button) in
click#_set_onclick (fun _ ->
View
232 examples/clicks-comet/server.ml
@@ -1,33 +1,42 @@
open Cohttp
open Cohttpserver
-type conn = {
+type req = {
conn_id : Http_daemon.conn_id;
- reqs : (unit Lwt.u * Lwt_io.output_channel Lwt.t) Queue.t;
+ finished : unit Lwt.u;
+ out : Lwt_io.output_channel Lwt.t;
}
type session = {
session_id : string;
mutable txn_id : int;
pending_calls : (int, Orpc_js_server.obj Lwt.u) Hashtbl.t;
- mutable conn : conn option;
+ mutable queued_msgs : Orpc_js_server.msg list;
+ mutable req : req option;
}
-let send sess msg =
+(* XXX expire sessions *)
+let sessions_by_session_id = Hashtbl.create 17
+let sessions_by_conn_id = Hashtbl.create 17
+
+let reply sess out =
(* XXX don't send session_id if req already had the right one *)
- let msg = { Orpc_js_server.m_session_id = Some sess.session_id; msg = msg; } in
- let body = Orpc_js_server.string_of_msg msg in
- match sess.conn with
- | None ->
- Http_common.debug_print ("sending to " ^ sess.session_id ^ ", no conn");
- () (* XXX queue send until reconnect or timeout *)
- | Some conn ->
- Http_common.debug_print ("sending to " ^ sess.session_id ^ ", conn_id " ^ Http_daemon.string_of_conn_id conn.conn_id);
- try
- let (u, out) = Queue.take conn.reqs in
- ignore (lwt () = Http_daemon.respond ~body out in Lwt.wakeup u (); Lwt.return ())
- with Queue.Empty ->
- Http_common.debug_print ("no more reqs sending to " ^ sess.session_id ^ ", conn_id " ^ Http_daemon.string_of_conn_id conn.conn_id)
+ let msgs = { Orpc_js_server.m_session_id = Some sess.session_id; msgs = Array.of_list (List.rev sess.queued_msgs); } in
+ sess.queued_msgs <- [];
+ let body = Orpc_js_server.string_of_msgs msgs in
+ Http_daemon.respond ~body out
+
+let send sess msg =
+ sess.queued_msgs <- msg :: sess.queued_msgs;
+ match sess.req with
+ | None -> ()
+ | Some req ->
+ sess.req <- None;
+ Hashtbl.remove sessions_by_conn_id req.conn_id;
+ ignore
+ (lwt () = reply sess req.out in
+ Lwt.wakeup req.finished ();
+ Lwt.return ())
module Comet_js_comet_srv (S : sig val session : session end) =
struct
@@ -42,10 +51,6 @@ struct
with e -> Lwt.fail e
end
-(* XXX expire sessions *)
-let sessions_by_session_id = Hashtbl.create 17
-let sessions_by_conn_id = Hashtbl.create 17
-
module Server =
struct
let n = ref 0
@@ -61,124 +66,125 @@ struct
sessions_by_session_id
end
-let new_session conn_id =
+let new_session () =
let sess = {
- txn_id = 0;
session_id = string_of_int (Random.int max_int); (* XXX *)
+ txn_id = 0;
pending_calls = Hashtbl.create 17;
- conn = Some { conn_id = conn_id; reqs = Queue.create (); }
+ queued_msgs = [];
+ req = None;
} in
Hashtbl.replace sessions_by_session_id sess.session_id sess;
- Hashtbl.replace sessions_by_conn_id conn_id sess;
sess
-let clicks conn_id req out =
+let clicks_get conn_id req out =
+ let session_id =
+ try Some (Http_request.param req "session_id")
+ with Http_types.Param_not_found _ -> None in
+
+ let sess =
+ match session_id with
+ | None -> None
+ | Some session_id ->
+ try Some (Hashtbl.find sessions_by_session_id session_id)
+ with Not_found -> None in
+
+ match sess with
+ | None ->
+ let sess = new_session () in
+ reply sess out (* return session_id *)
+ | Some sess ->
+ begin match sess.req with
+ | None -> ()
+ | Some req ->
+ ignore
+ (lwt () = Http_daemon.respond_error ~status:`Bad_request ~body:"additional connection" out in
+ Lwt.wakeup req.finished ();
+ Lwt.return ());
+ sess.req <- None;
+ Hashtbl.remove sessions_by_conn_id req.conn_id
+ end;
+ if sess.queued_msgs <> []
+ then reply sess out
+ else
+ let t, u = Lwt.wait () in
+ sess.req <- Some { finished = u; out = out; conn_id = conn_id };
+ Hashtbl.replace sessions_by_conn_id conn_id sess;
+ t
+
+let clicks_post _ req out =
let body = Http_request.body req in
lwt body_string = Http_message.string_of_body body in
- let msg = Orpc_js_server.msg_of_string body_string in
+ let msgs = Orpc_js_server.msgs_of_string body_string in
let sess =
- match msg.Orpc_js_server.m_session_id with
- | None ->
- begin
- try
- (* additional request before reply with session_id is received? *)
- let sess = Hashtbl.find sessions_by_conn_id conn_id in
- Http_common.debug_print ("no session_id but found conn_id " ^ Http_daemon.string_of_conn_id conn_id);
- sess
- with Not_found ->
- (* new connection, new session *)
- Http_common.debug_print ("no session_id, new session for conn_id " ^ Http_daemon.string_of_conn_id conn_id);
- new_session conn_id
- end
+ match msgs.Orpc_js_server.m_session_id with
+ | None -> None
| Some session_id ->
- begin
- try
- let sess = Hashtbl.find sessions_by_session_id session_id in
- match sess.conn with
- | None ->
- Http_common.debug_print ("session_id " ^ session_id ^ ", reconnect for conn_id " ^ Http_daemon.string_of_conn_id conn_id);
- (* conn was closed, this is reconnect *)
- sess.conn <- Some { conn_id = conn_id; reqs = Queue.create (); };
- Hashtbl.replace sessions_by_conn_id conn_id sess;
- sess
- | Some conn when conn.conn_id <> conn_id ->
- Http_common.debug_print ("session_id " ^ session_id ^ ", extra connect with conn_id " ^ Http_daemon.string_of_conn_id conn_id);
- (* new connection for session which is already connected. drop the old one *)
- sess.conn <- Some { conn_id = conn_id; reqs = Queue.create (); };
- Hashtbl.remove sessions_by_conn_id conn.conn_id;
- Hashtbl.replace sessions_by_conn_id conn_id sess;
- sess
- | _ ->
- Http_common.debug_print ("session_id " ^ session_id ^ ", conn_id " ^ Http_daemon.string_of_conn_id conn_id);
- (* request on existing connection *)
- sess
- with Not_found ->
- Http_common.debug_print ("dead session_id " ^ session_id ^ ", new session for conn_id " ^ Http_daemon.string_of_conn_id conn_id);
- (* connection for dead session. *)
- new_session conn_id
- end in
-
- let t, u = Lwt.wait () in
- begin match sess.conn with
- | None -> assert false
- | Some conn -> Queue.add (u, out) conn.reqs
- end;
-
- begin match msg.Orpc_js_server.msg with
- | Orpc_js_server.Noop ->
- Http_common.debug_print "got Noop";
- ()
- | Orpc_js_server.Call (txn_id, proc, arg) ->
- Http_common.debug_print "got Call";
- begin
- try
- let proc =
- match proc with
- | "click" -> (fun x0 -> Proto_js_aux.of_click'res (Server.click (Proto_js_aux.to_click'arg x0)))
- | "clicks" -> (fun x0 -> Proto_js_aux.of_clicks'res (Server.clicks (Proto_js_aux.to_clicks'arg x0)))
- | _ -> raise (Invalid_argument "bad proc") in
- try send sess (Orpc_js_server.Res (txn_id, proc arg))
- with e -> send sess (Orpc_js_server.Fail (txn_id, Printexc.to_string e))
- with e -> send sess (Orpc_js_server.Fail (txn_id, Printexc.to_string e))
- end
- | Orpc_js_server.Res (txn_id, res) ->
- Http_common.debug_print "got Res";
- begin
- try
- Lwt.wakeup (Hashtbl.find sess.pending_calls txn_id) res;
- Hashtbl.remove sess.pending_calls txn_id
- with Not_found -> ()
- end
- | Orpc_js_server.Fail (txn_id, s) ->
- Http_common.debug_print "got Fail";
- begin
- try
- Lwt.wakeup_exn (Hashtbl.find sess.pending_calls txn_id) (Failure s);
- Hashtbl.remove sess.pending_calls txn_id
- with Not_found -> ()
- end
- end;
- t
+ try Some (Hashtbl.find sessions_by_session_id session_id)
+ with Not_found -> None in
+
+ match sess with
+ | None -> Http_daemon.respond_error ~status:`Bad_request ~body:"session_id required" out
+ | Some sess ->
+ Array.iter
+ (function
+ | Orpc_js_server.Call (txn_id, proc, arg) ->
+ let reply =
+ try
+ let proc =
+ match proc with
+ | "click" -> (fun x0 -> Proto_js_aux.of_click'res (Server.click (Proto_js_aux.to_click'arg x0)))
+ | "clicks" -> (fun x0 -> Proto_js_aux.of_clicks'res (Server.clicks (Proto_js_aux.to_clicks'arg x0)))
+ | _ -> raise (Invalid_argument "bad proc") in
+ Orpc_js_server.Res (txn_id, proc arg)
+ with e -> Orpc_js_server.Fail (txn_id, Printexc.to_string e) in
+ send sess reply
+ | Orpc_js_server.Res (txn_id, res) ->
+ begin
+ try
+ Lwt.wakeup (Hashtbl.find sess.pending_calls txn_id) res;
+ Hashtbl.remove sess.pending_calls txn_id
+ with Not_found -> ()
+ end
+ | Orpc_js_server.Fail (txn_id, s) ->
+ begin
+ try
+ Lwt.wakeup_exn (Hashtbl.find sess.pending_calls txn_id) (Failure s);
+ Hashtbl.remove sess.pending_calls txn_id
+ with Not_found -> ()
+ end)
+ msgs.Orpc_js_server.msgs;
+
+ (* XXX to potentially save a poll request, defer replies until here *)
+ reply sess out
let callback conn_id req out =
match Http_request.path req with
| "/" -> Http_daemon.respond_file ~fname:"index.html" ~mime_type:"text/html" out
| "/_build/clicks.js" -> Http_daemon.respond_file ~fname:"_build/clicks.js" ~mime_type:"application/javascript" out
- | "/clicks" -> clicks conn_id req out
- | url -> Http_daemon.respond ~body:("not found: " ^ url) out
+ | "/clicks" ->
+ begin match Http_request.meth req with
+ | `GET -> clicks_get conn_id req out
+ | `POST -> clicks_post conn_id req out
+ | _ -> Http_daemon.respond_error ~status:`Method_not_allowed ~body:"method not allowed" out
+ end
+ | url -> Http_daemon.respond_error ~status:`Not_found ~body:("not found: " ^ url) out
let conn_closed conn_id =
try
let sess = Hashtbl.find sessions_by_conn_id conn_id in
- sess.conn <- None;
- Hashtbl.remove sessions_by_conn_id conn_id
+ match sess.req with
+ | Some req when req.conn_id = conn_id ->
+ sess.req <- None;
+ Hashtbl.remove sessions_by_conn_id conn_id
+ | _ -> ()
with Not_found -> ()
let exn_handler exn out = Lwt.return ()
let main () =
- Http_common.debug := true;
+ (* Http_common.debug := true; *)
let spec = {
Http_daemon.address = "192.168.206.129";
auth = `None;
Please sign in to comment.
Something went wrong with that request. Please try again.