Permalink
Browse files

reworked orpc-js protocol to support comet.

sync client calls no longer supported.
  • Loading branch information...
1 parent ffec233 commit 694f51d2037ad94cecb62cb5b0e649af5bce96ae Jake Donham committed Jul 24, 2010
@@ -32,11 +32,13 @@ let gen_mli name (typedefs, excs, funcs, kinds) =
let modules =
List.map
- (fun kind ->
- let mt = G.string_of_kind kind in
- <:sig_item<
- module $uid:mt$(C : sig val with_client : (Orpc_js_client.t -> 'a) -> 'a end) : $uid:name$.$uid:mt$
- >>)
+ (function
+ | Ik_abstract -> assert false
+ | Sync -> <:sig_item< >>
+ | Lwt ->
+ <:sig_item<
+ module Lwt(C : sig val with_client : (Orpc_js_client.t -> 'a) -> 'a end) : $uid:name$.Lwt
+ >>)
kinds in
<:sig_item< $list:modules$ >>
@@ -51,38 +53,32 @@ let gen_ml name (typedefs, excs, funcs, kinds) =
List.map
(fun kind ->
let func (_, id, args, res) =
- let body =
- match kind with
- | Ik_abstract -> assert false
-
- | Sync ->
- let body2 = <:expr< Orpc_js_client.sync_call c $`str:id$ (Obj.repr x0) >> in
- if has_excs
- then <:expr< C.with_client (fun c -> unpack_orpc_result (fun () -> $body2$)) >>
- else <:expr< C.with_client (fun c -> Obj.obj $body2$) >>
-
- | Lwt ->
+ match kind with
+ | Ik_abstract -> assert false
+ | Sync -> <:str_item< >>
+ | Lwt ->
+ let body =
<:expr<
let t, u = Lwt.wait () in
C.with_client (fun c ->
- Orpc_js_client.add_call c $`str:id$ (Obj.repr x0)
+ Orpc_js_client.call c $`str:id$ (Obj.repr x0)
(fun g ->
$if has_excs
then <:expr< try Lwt.wakeup u (unpack_orpc_result g) with e -> Lwt.wakeup_exn u e >>
else <:expr< Lwt.wakeup u (Obj.obj (g ())) >>$));
t
>> in
- <:str_item<
- let $lid:id$ =
- $G.args_funs args
- (match args with
- | [] -> assert false
- | [a] -> body
- | _ ->
- let (_, es) = G.vars args in
- <:expr< let x0 = ($exCom_of_list es$) in $body$ >>)$
- >> in
+ <:str_item<
+ let $lid:id$ =
+ $G.args_funs args
+ (match args with
+ | [] -> assert false
+ | [a] -> body
+ | _ ->
+ let (_, es) = G.vars args in
+ <:expr< let x0 = ($exCom_of_list es$) in $body$ >>)$
+ >> in
<:str_item<
module $uid:G.string_of_kind kind$(C : sig val with_client : (Orpc_js_client.t -> 'a) -> 'a end) =
@@ -91,7 +87,6 @@ let gen_ml name (typedefs, excs, funcs, kinds) =
$list:List.map func funcs$
end
>>)
-
kinds in
(* exceptions are pointer-compared, so we need to map back to the right ones *)
@@ -59,30 +59,102 @@ end
external new_XMLHttpRequest : unit -> xMLHttpRequest = "$new" "XMLHttpRequest"
-type t = string
+type t = {
+ url : string;
+ mutable txn_id : int;
+ mutable session_id : string;
+ pending_calls : (int, (unit -> Obj.t) -> unit) Hashtbl.t;
+ mutable reqs_in_flight : int;
+ mutable procs : (string, Obj.t -> ((unit -> Obj.t) -> unit) -> unit) Hashtbl.t option;
+}
-let create url = url
+let create url = {
+ url = url;
+ txn_id = 0;
+ session_id = "";
+ pending_calls = Hashtbl.create 17;
+ reqs_in_flight = 0;
+ procs = None;
+}
-let sync_call url proc arg =
- let xhr = new_XMLHttpRequest () in
- xhr#open__ "POST" url false;
- xhr#setRequestHeader "Content-Type" "text/plain";
- xhr#send (serialize (Obj.repr (proc, arg)));
- if xhr#_get_status = 200
- then unserialize xhr#_get_responseText
- else raise (Failure xhr#_get_statusText)
+type msg_t =
+ | Noop
+ | Call of int * string * Obj.t
+ | Res of int * Obj.t
+ | Fail of int * string
+
+type msg = {
+ m_session_id : string option;
+ msg : msg_t;
+}
-let add_call url proc arg pass_reply =
+let rec send t msg =
+ let msg = { m_session_id = Some t.session_id; msg = msg } in
let xhr = new_XMLHttpRequest () in
- xhr#_set_onreadystatechange (fun () ->
+ xhr#_set_onreadystatechange begin fun () ->
match xhr#_get_readyState with
| 4 ->
- let r =
- if xhr#_get_status = 200
- then let o = unserialize xhr#_get_responseText in (fun () -> o)
- else let s = xhr#_get_statusText in (fun () -> raise (Failure s)) in
- pass_reply r
- | _ -> ());
- xhr#open__ "POST" url true;
+ t.reqs_in_flight <- t.reqs_in_flight - 1;
+ if xhr#_get_status = 200
+ then recv t (Obj.obj (unserialize xhr#_get_responseText))
+ else begin
+ (* if we can't read the msg we don't know the txn_id, so fail all *)
+ let r = let s = xhr#_get_statusText in (fun () -> raise (Failure s)) in
+ Hashtbl.iter (fun _ f -> f r) t.pending_calls;
+ Hashtbl.clear t.pending_calls
+ end;
+ if t.procs <> None && t.reqs_in_flight = 0 then poll t
+ | _ -> ()
+ end;
+ xhr#open__ "POST" t.url true;
xhr#setRequestHeader "Content-Type" "text/plain; charset=utf-8";
- xhr#send (serialize (Obj.repr (proc, arg)))
+ xhr#send (serialize (Obj.repr msg));
+ t.reqs_in_flight <- t.reqs_in_flight + 1
+
+and recv t msg =
+ begin match msg.m_session_id with
+ | None -> ()
+ | Some s -> t.session_id <- s
+ end;
+ match msg.msg with
+ | Noop -> ()
+ | Call (txn_id, proc, arg) ->
+ begin
+ match t.procs with
+ | None -> ()
+ | Some procs ->
+ try
+ Hashtbl.find procs proc arg begin fun r ->
+ try send t (Res (txn_id, r ()))
+ with e -> send t (Fail (txn_id, Printexc.to_string e))
+ end
+ with Not_found -> ()
+ end
+ | Res (txn_id, o) ->
+ begin
+ try
+ Hashtbl.find t.pending_calls txn_id (fun () -> o);
+ Hashtbl.remove t.pending_calls txn_id
+ with Not_found -> ()
+ end
+ | Fail (txn_id, s) ->
+ begin
+ try
+ Hashtbl.find t.pending_calls txn_id (fun () -> raise (Failure s));
+ Hashtbl.remove t.pending_calls txn_id
+ with Not_found -> ()
+ end
+
+and poll t = send t Noop
+
+let call t proc arg pass_reply =
+ let txn_id = t.txn_id in
+ t.txn_id <- t.txn_id + 1;
+ Hashtbl.replace t.pending_calls txn_id pass_reply;
+ send t (Call (txn_id, proc, arg))
+
+let bind t procs =
+ let h = Hashtbl.create (List.length procs * 2) in
+ List.iter (fun (k, v) -> Hashtbl.replace h k v) procs;
+ t.procs <- Some h;
+ poll t
@@ -23,5 +23,6 @@ type t
val create : string -> t
-val sync_call : t -> string -> Obj.t -> Obj.t
-val add_call : t -> string -> Obj.t -> ((unit -> Obj.t) -> unit) -> unit
+val call : t -> string -> Obj.t -> ((unit -> Obj.t) -> unit) -> unit
+
+val bind : t -> (string * (Obj.t -> ((unit -> Obj.t) -> unit) -> unit)) list -> unit
@@ -198,6 +198,41 @@ let unserialize s =
| Teoi -> o
| _ -> invalid "serialized heap object"
+type msg_t =
+ | Noop
+ | Call of int * string * obj
+ | Res of int * obj
+ | Fail of int * string
+
+type msg = {
+ m_session_id : string option;
+ msg : msg_t;
+}
+
+let msg_of_string s =
+ match unserialize s with
+ | Oblock (0, [| m_session_id; msg |]) ->
+ let m_session_id = to_option to_string m_session_id in
+ let msg =
+ match msg with
+ | Onumber 0. -> Noop
+ | Oblock (0, [| txn_id; proc; arg |]) -> Call (to_int txn_id, to_string proc, arg)
+ | Oblock (1, [| txn_id; res |]) -> Res (to_int txn_id, res)
+ | Oblock (2, [| txn_id; msg |]) -> Fail (to_int txn_id, to_string msg)
+ | _ -> invalid "msg_t" in
+ { m_session_id = m_session_id; msg = msg }
+ | _ -> invalid "msg"
+
+let string_of_msg { m_session_id = m_session_id; msg = msg } =
+ let m_session_id = of_option of_string m_session_id in
+ let msg =
+ match msg with
+ | Noop -> Onumber 0.
+ | Call (txn_id, proc, arg) -> Oblock (0, [| of_int txn_id; of_string proc; arg |])
+ | Res (txn_id, res) -> Oblock (1, [| of_int txn_id; res |])
+ | Fail (txn_id, msg) -> Oblock (2, [| of_int txn_id; of_string msg |]) in
+ serialize (Oblock (0, [| m_session_id; msg |]))
+
module type Monad =
sig
type 'a t
@@ -218,13 +253,15 @@ module Handler (M : Monad) =
struct
let handler procs body =
try
- let (proc_name, arg) =
- match unserialize body with
- | Oblock (0, [| Ostring proc_name; arg |]) -> proc_name, arg
- | _ -> raise (Invalid_argument "bad request") in
- let proc =
- try List.assoc proc_name procs
- with Not_found -> raise (Invalid_argument ("bad request " ^ proc_name)) in
- M.bind (proc arg) (fun s -> M.return (serialize s))
+ let msg = msg_of_string body in
+ let reply =
+ match msg.msg with
+ | Call (txn_id, proc, arg) ->
+ let proc =
+ try List.assoc proc procs
+ with Not_found -> raise (Invalid_argument ("bad proc " ^ proc)) in
+ M.bind (proc arg) (fun res -> M.return (Res (txn_id, res)))
+ | Noop | Res _ | Fail _ -> raise (Invalid_argument "unsupported message") in
+ M.bind reply (fun reply -> M.return (string_of_msg { m_session_id = msg.m_session_id; msg = reply }))
with e -> M.fail e
end
@@ -55,6 +55,20 @@ val of_ref : ('a -> obj) -> 'a ref -> obj
val set_debug : (string -> unit) -> unit
+type msg_t =
+ | Noop
+ | Call of int * string * obj
+ | Res of int * obj
+ | Fail of int * string
+
+type msg = {
+ m_session_id : string option;
+ msg : msg_t;
+}
+
+val msg_of_string : string -> msg
+val string_of_msg : msg -> string
+
module type Monad =
sig
type 'a t

0 comments on commit 694f51d

Please sign in to comment.