Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

working comet client

  • Loading branch information...
commit 858e288f246523cb4fe7aa8bf7972e5011ffd337 1 parent 050c2c8
Jake Donham authored
200 src/orpc-js-client/orpc_js_client.ml
@@ -19,13 +19,6 @@
19 19 * MA 02111-1307, USA
20 20 *)
21 21
22   -class type console =
23   -object
24   - method log : string -> unit
25   -end
26   -
27   -let console : console = Ocamljs.var "console"
28   -
29 22 let serialize o =
30 23 let a = Javascript.new_Array () in
31 24 let push o = ignore (a#push o) in
@@ -47,104 +40,141 @@ let serialize o =
47 40
48 41 let unserialize = Javascript.eval
49 42
  43 +type msg =
  44 + | Call of int * string * Obj.t
  45 + | Res of int * Obj.t
  46 + | Fail of int * string
  47 +
  48 +type msgs = {
  49 + m_session_id : string option;
  50 + msgs : msg array;
  51 +}
  52 +
50 53 type t = {
51 54 url : string;
52 55 mutable txn_id : int;
53 56 mutable session_id : string option;
54   - pending_calls : (int, (unit -> Obj.t) -> unit) Hashtbl.t;
55   - mutable reqs_in_flight : int;
56 57 mutable procs : (string * (Obj.t -> ((unit -> Obj.t) -> unit) -> unit)) list option;
  58 + pending_calls : (int, (unit -> Obj.t) -> unit) Hashtbl.t;
  59 + mutable queued_msgs : msg list;
  60 + mutable req_in_flight : bool;
57 61 }
58 62
59 63 let create url = {
60 64 url = url;
61 65 txn_id = 0;
62 66 session_id = None;
63   - pending_calls = Hashtbl.create 17;
64   - reqs_in_flight = 0;
65 67 procs = None;
  68 + pending_calls = Hashtbl.create 17;
  69 + queued_msgs = [];
  70 + req_in_flight = false;
66 71 }
67 72
68   -type msg_t =
69   - | Noop
70   - | Call of int * string * Obj.t
71   - | Res of int * Obj.t
72   - | Fail of int * string
73   -
74   -type msg = {
75   - m_session_id : string option;
76   - msg : msg_t;
77   -}
  73 +let rec req t =
  74 + if not t.req_in_flight && t.queued_msgs <> []
  75 + then
  76 + let msgs = { m_session_id = t.session_id; msgs = Array.of_list (List.rev t.queued_msgs); } in
  77 + t.queued_msgs <- [];
  78 + let xhr = Dom.new_XMLHttpRequest () in
  79 + xhr#_set_onreadystatechange begin fun () ->
  80 + match xhr#_get_readyState with
  81 + | 4 ->
  82 + xhr#_set_onreadystatechange ignore;
  83 + t.req_in_flight <- false;
  84 + recv t xhr;
  85 + req t
  86 + | _ -> ()
  87 + end;
  88 + xhr#open_ "POST" t.url true;
  89 + xhr#setRequestHeader "Content-Type" "text/plain; charset=utf-8";
  90 + xhr#send (serialize (Obj.repr msgs));
  91 + t.req_in_flight <- true
78 92
79   -let rec send t msg =
80   - let msg = { m_session_id = t.session_id; msg = msg } in
  93 +and poll ?on_connect t =
81 94 let xhr = Dom.new_XMLHttpRequest () in
82 95 xhr#_set_onreadystatechange begin fun () ->
83 96 match xhr#_get_readyState with
84 97 | 4 ->
85   - t.reqs_in_flight <- t.reqs_in_flight - 1;
86   - if xhr#_get_status = 200
87   - then recv t (Obj.obj (unserialize xhr#_get_responseText))
88   - else begin
89   - (* if we can't read the msg we don't know the txn_id, so fail all *)
90   - let r = let s = string_of_int xhr#_get_status ^ xhr#_get_statusText in (fun () -> raise (Failure s)) in
91   - Hashtbl.iter (fun _ f -> try f r with e -> console#log (Obj.magic e)) t.pending_calls;
92   - Hashtbl.clear t.pending_calls
93   - end;
94   - if t.procs <> None && t.reqs_in_flight = 0 then poll t
  98 + xhr#_set_onreadystatechange ignore;
  99 + recv ?on_connect t xhr;
  100 + poll t
95 101 | _ -> ()
96 102 end;
97   - xhr#open_ "POST" t.url true;
98   - xhr#setRequestHeader "Content-Type" "text/plain; charset=utf-8";
99   - xhr#send (serialize (Obj.repr msg));
100   - t.reqs_in_flight <- t.reqs_in_flight + 1
  103 + let url = t.url ^ "?nonce=" ^ string_of_float (Javascript.new_Date ())#getTime in
  104 + let url =
  105 + match t.session_id with
  106 + | None -> url
  107 + | Some session_id -> url ^ "&session_id=" ^ session_id in
  108 + xhr#open_ "GET" url true;
  109 + xhr#send (Ocamljs.null ());
101 110
102   -and recv t msg =
103   - begin match msg.m_session_id with
104   - | None -> console#log "got no session id"
105   - | Some id -> console#log ("got session id " ^ id); t.session_id <- msg.m_session_id
106   - end;
107   - match msg.msg with
108   - | Noop -> console#log "got Noop"
109   - | Call (txn_id, proc, arg) ->
110   - console#log (Printf.sprintf "got Call (%d, %s, _)" txn_id proc);
111   - begin
112   - let proc =
113   - match t.procs with
114   - | None -> None
115   - | Some procs -> try Some (List.assoc proc procs) with Not_found -> None in
116   - match proc with
117   - | None -> send t (Fail (txn_id, Printexc.to_string (Invalid_argument "bad proc")))
118   - | Some proc ->
119   - proc arg begin fun r ->
120   - let reply =
121   - try Res (txn_id, r ())
122   - with e -> Fail (txn_id, Printexc.to_string e) in
123   - send t reply
124   - end
125   - end
126   - | Res (txn_id, o) ->
127   - console#log (Printf.sprintf "got Res (%d, _)" txn_id);
128   - begin
129   - let call = try Some (Hashtbl.find t.pending_calls txn_id) with Not_found -> None in
130   - match call with
131   - | None -> ()
132   - | Some call ->
133   - Hashtbl.remove t.pending_calls txn_id;
134   - call (fun () -> o)
135   - end
136   - | Fail (txn_id, s) ->
137   - console#log (Printf.sprintf "got Fail (%d, _)" txn_id);
138   - begin
139   - let call = try Some (Hashtbl.find t.pending_calls txn_id) with Not_found -> None in
140   - match call with
141   - | None -> ()
142   - | Some call ->
143   - Hashtbl.remove t.pending_calls txn_id;
144   - call (fun () -> raise (Failure s))
145   - end
  111 +and send t msg =
  112 + t.queued_msgs <- msg :: t.queued_msgs;
  113 + req t
146 114
147   -and poll t = ignore (Dom.window#setTimeout (fun () -> send t Noop) 0.)
  115 +and recv ?on_connect t xhr =
  116 + if xhr#_get_status <> 200
  117 + then begin
  118 + (* don't know the txn_ids, so fail all *)
  119 + let r = let s = string_of_int xhr#_get_status ^ xhr#_get_statusText in (fun () -> raise (Failure s)) in
  120 + Hashtbl.iter (fun _ f -> try f r with e -> ()) t.pending_calls;
  121 + Hashtbl.clear t.pending_calls;
  122 + match on_connect with
  123 + | None -> ()
  124 + | Some f -> f r
  125 + end
  126 + else
  127 + let msgs = Obj.obj (unserialize xhr#_get_responseText) in
  128 +
  129 + begin match msgs.m_session_id with
  130 + | None -> ()
  131 + | Some _ as id ->
  132 + match t.session_id with
  133 + | Some _ -> t.session_id <- id
  134 + | None ->
  135 + t.session_id <- id;
  136 + match on_connect with
  137 + | None -> ()
  138 + | Some f -> f (fun () -> ())
  139 + end;
  140 +
  141 + Array.iter
  142 + (function
  143 + | Call (txn_id, proc, arg) ->
  144 + begin
  145 + let proc =
  146 + match t.procs with
  147 + | None -> None
  148 + | Some procs -> try Some (List.assoc proc procs) with Not_found -> None in
  149 + match proc with
  150 + | None -> send t (Fail (txn_id, Printexc.to_string (Invalid_argument "bad proc")))
  151 + | Some proc ->
  152 + proc arg begin fun r ->
  153 + let reply =
  154 + try Res (txn_id, r ())
  155 + with e -> Fail (txn_id, Printexc.to_string e) in
  156 + send t reply
  157 + end
  158 + end
  159 + | Res (txn_id, o) ->
  160 + begin
  161 + let call = try Some (Hashtbl.find t.pending_calls txn_id) with Not_found -> None in
  162 + match call with
  163 + | None -> ()
  164 + | Some call ->
  165 + Hashtbl.remove t.pending_calls txn_id;
  166 + call (fun () -> o)
  167 + end
  168 + | Fail (txn_id, s) ->
  169 + begin
  170 + let call = try Some (Hashtbl.find t.pending_calls txn_id) with Not_found -> None in
  171 + match call with
  172 + | None -> ()
  173 + | Some call ->
  174 + Hashtbl.remove t.pending_calls txn_id;
  175 + call (fun () -> raise (Failure s))
  176 + end)
  177 + msgs.msgs
148 178
149 179 let call t proc arg pass_reply =
150 180 let txn_id = t.txn_id in
@@ -153,5 +183,7 @@ let call t proc arg pass_reply =
153 183 send t (Call (txn_id, proc, arg))
154 184
155 185 let bind t procs =
156   - t.procs <- Some procs;
157   - poll t
  186 + t.procs <- Some procs
  187 +
  188 +let connect t on_connect =
  189 + poll ~on_connect t
2  src/orpc-js-client/orpc_js_client.mli
@@ -26,3 +26,5 @@ val create : string -> t
26 26 val call : t -> string -> Obj.t -> ((unit -> Obj.t) -> unit) -> unit
27 27
28 28 val bind : t -> (string * (Obj.t -> ((unit -> Obj.t) -> unit) -> unit)) list -> unit
  29 +
  30 +val connect : t -> ((unit -> unit) -> unit) -> unit
66 src/orpc-js-server/orpc_js_server.ml
@@ -187,7 +187,7 @@ let unserialize s =
187 187 | Tblock_end ->
188 188 begin
189 189 match block with
190   - | Onumber tag :: ((_::_) as fields) -> Oblock (int_of_float tag, Array.of_list (List.rev fields))
  190 + | Onumber tag :: fields -> Oblock (int_of_float tag, Array.of_list (List.rev fields))
191 191 | _ -> invalid "block"
192 192 end
193 193 | _ -> Ulexing.rollback lb; loop2 (loop () :: block) in
@@ -198,40 +198,40 @@ let unserialize s =
198 198 | Teoi -> o
199 199 | _ -> invalid "serialized heap object"
200 200
201   -type msg_t =
202   - | Noop
  201 +type msg =
203 202 | Call of int * string * obj
204 203 | Res of int * obj
205 204 | Fail of int * string
206 205
207   -type msg = {
  206 +type msgs = {
208 207 m_session_id : string option;
209   - msg : msg_t;
  208 + msgs : msg array;
210 209 }
211 210
212   -let msg_of_string s =
213   - match unserialize s with
214   - | Oblock (0, [| m_session_id; msg |]) ->
215   - let m_session_id = to_option to_string m_session_id in
216   - let msg =
217   - match msg with
218   - | Onumber 0. -> Noop
219   - | Oblock (0, [| txn_id; proc; arg |]) -> Call (to_int txn_id, to_string proc, arg)
220   - | Oblock (1, [| txn_id; res |]) -> Res (to_int txn_id, res)
221   - | Oblock (2, [| txn_id; msg |]) -> Fail (to_int txn_id, to_string msg)
222   - | _ -> invalid "msg_t" in
223   - { m_session_id = m_session_id; msg = msg }
224   - | _ -> invalid "msg"
225   -
226   -let string_of_msg { m_session_id = m_session_id; msg = msg } =
227   - let m_session_id = of_option of_string m_session_id in
228   - let msg =
229   - match msg with
230   - | Noop -> Onumber 0.
231   - | Call (txn_id, proc, arg) -> Oblock (0, [| of_int txn_id; of_string proc; arg |])
232   - | Res (txn_id, res) -> Oblock (1, [| of_int txn_id; res |])
233   - | Fail (txn_id, msg) -> Oblock (2, [| of_int txn_id; of_string msg |]) in
234   - serialize (Oblock (0, [| m_session_id; msg |]))
  211 +let to_msg = function
  212 + | Oblock (0, [| txn_id; proc; arg |]) -> Call (to_int txn_id, to_string proc, arg)
  213 + | Oblock (1, [| txn_id; res |]) -> Res (to_int txn_id, res)
  214 + | Oblock (2, [| txn_id; msg |]) -> Fail (to_int txn_id, to_string msg)
  215 + | _ -> invalid "msg_t"
  216 +
  217 +let of_msg = function
  218 + | Call (txn_id, proc, arg) -> Oblock (0, [| of_int txn_id; of_string proc; arg |])
  219 + | Res (txn_id, res) -> Oblock (1, [| of_int txn_id; res |])
  220 + | Fail (txn_id, msg) -> Oblock (2, [| of_int txn_id; of_string msg |])
  221 +
  222 +let to_msgs = function
  223 + | Oblock (0, [| m_session_id; msgs |]) ->
  224 + {
  225 + m_session_id = to_option to_string m_session_id;
  226 + msgs = to_array to_msg msgs
  227 + }
  228 + | _ -> invalid "msg"
  229 +
  230 +let of_msgs { m_session_id = m_session_id; msgs = msgs } =
  231 + Oblock (0, [| of_option of_string m_session_id; of_array of_msg msgs |])
  232 +
  233 +let msgs_of_string s = to_msgs (unserialize s)
  234 +let string_of_msgs msgs = serialize (of_msgs msgs)
235 235
236 236 module type Monad =
237 237 sig
@@ -253,15 +253,15 @@ module Handler (M : Monad) =
253 253 struct
254 254 let handler procs body =
255 255 try
256   - let msg = msg_of_string body in
  256 + let msgs = msgs_of_string body in
257 257 let reply =
258   - match msg.msg with
259   - | Call (txn_id, proc, arg) ->
  258 + match msgs.msgs with
  259 + | [| Call (txn_id, proc, arg) |] ->
260 260 let proc =
261 261 try List.assoc proc procs
262 262 with Not_found -> raise (Invalid_argument ("bad proc " ^ proc)) in
263 263 M.bind (proc arg) (fun res -> M.return (Res (txn_id, res)))
264   - | Noop | Res _ | Fail _ -> raise (Invalid_argument "unsupported message") in
265   - M.bind reply (fun reply -> M.return (string_of_msg { m_session_id = msg.m_session_id; msg = reply }))
  264 + | _ -> raise (Invalid_argument "unsupported msgs") in
  265 + M.bind reply (fun reply -> M.return (string_of_msgs { m_session_id = msgs.m_session_id; msgs = [| reply |] }))
266 266 with e -> M.fail e
267 267 end
11 src/orpc-js-server/orpc_js_server.mli
@@ -55,19 +55,18 @@ val of_ref : ('a -> obj) -> 'a ref -> obj
55 55
56 56 val set_debug : (string -> unit) -> unit
57 57
58   -type msg_t =
59   - | Noop
  58 +type msg =
60 59 | Call of int * string * obj
61 60 | Res of int * obj
62 61 | Fail of int * string
63 62
64   -type msg = {
  63 +type msgs = {
65 64 m_session_id : string option;
66   - msg : msg_t;
  65 + msgs : msg array;
67 66 }
68 67
69   -val msg_of_string : string -> msg
70   -val string_of_msg : msg -> string
  68 +val msgs_of_string : string -> msgs
  69 +val string_of_msgs : msgs -> string
71 70
72 71 module type Monad =
73 72 sig

0 comments on commit 858e288

Please sign in to comment.
Something went wrong with that request. Please try again.