Skip to content

Commit

Permalink
fixes for Lwt_stream.t change in cohttpserver
Browse files Browse the repository at this point in the history
  • Loading branch information
duckpilot committed Sep 1, 2010
1 parent b234970 commit 7916716
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 34 deletions.
16 changes: 8 additions & 8 deletions examples/clicks-cohttp/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ end

module M = Proto_js_srv.Lwt(Server)

let clicks req out =
let clicks req =
let body = Http_request.body req in
lwt body_string = Http_message.string_of_body body in
lwt res = M.handler body_string in
Http_daemon.respond ~body:res out
Http_daemon.respond ~body:res ()

let callback _ req out =
let callback _ req =
match Http_request.path req with
| "/" -> Http_daemon.respond_file ~fname:"index.html" ~mime_type:"text/html" out
| "/_build/clicks.js" -> Http_daemon.respond_file ~fname:"_build/clicks.js" ~mime_type:"application/javascript" out
| "/clicks" -> clicks req out
| url -> Http_daemon.respond_error ~status:`Not_found ~body:("not found: " ^ url) out
| "/" -> Http_daemon.respond_file ~fname:"index.html" ~mime_type:"text/html" ()
| "/_build/clicks.js" -> Http_daemon.respond_file ~fname:"_build/clicks.js" ~mime_type:"application/javascript" ()
| "/clicks" -> clicks req
| url -> Http_daemon.respond_error ~status:`Not_found ~body:("not found: " ^ url) ()

let exn_handler exn out = Lwt.return ()
let exn_handler exn = Lwt.return ()

let spec = {
Http_daemon.address = "0.0.0.0";
Expand Down
12 changes: 6 additions & 6 deletions examples/clicks-comet/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ end

let _ = let module M = Proto_js_srv.Lwt(Server) in Orpc_js_comet_server.bind server M.funcs

let callback conn_id req out =
let callback conn_id req =
match Http_request.path req with
| "/" -> Http_daemon.respond_file ~fname:"index.html" ~mime_type:"text/html" out
| "/_build/clicks.js" -> Http_daemon.respond_file ~fname:"_build/clicks.js" ~mime_type:"application/javascript" out
| "/clicks" -> Orpc_js_comet_server.callback server conn_id req out
| url -> Http_daemon.respond_error ~status:`Not_found ~body:("not found: " ^ url) out
| "/" -> Http_daemon.respond_file ~fname:"index.html" ~mime_type:"text/html" ()
| "/_build/clicks.js" -> Http_daemon.respond_file ~fname:"_build/clicks.js" ~mime_type:"application/javascript" ()
| "/clicks" -> Orpc_js_comet_server.callback server conn_id req
| url -> Http_daemon.respond_error ~status:`Not_found ~body:("not found: " ^ url) ()

let conn_closed conn_id = Orpc_js_comet_server.conn_closed server conn_id

let exn_handler exn out = Lwt.return ()
let exn_handler exn = Lwt.return ()

let spec = {
Http_daemon.address = "0.0.0.0";
Expand Down
37 changes: 18 additions & 19 deletions src/orpc-js-comet-server/orpc_js_comet_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ open Cohttpserver

type req = {
conn_id : Http_daemon.conn_id;
finished : unit Lwt.u;
out : Lwt_io.output_channel Lwt.t;
reply : string Lwt_stream.t Lwt.u;
}

type session = {
Expand All @@ -52,12 +51,12 @@ let create () = {

let bind s procs = s.procs <- Some procs

let reply sess out =
let reply sess =
(* XXX don't send session_id if req already had the right one *)
let msgs = { Orpc_js_server.m_session_id = Some sess.session_id; msgs = Array.of_list (List.rev sess.queued_msgs); sync = false; } in
sess.queued_msgs <- [];
let body = Orpc_js_server.string_of_msgs msgs in
Http_daemon.respond ~body out
Http_daemon.respond ~body ()

let send s sess msg =
sess.queued_msgs <- msg :: sess.queued_msgs;
Expand All @@ -67,8 +66,8 @@ let send s sess msg =
sess.req <- None;
Hashtbl.remove s.sessions_by_conn_id req.conn_id;
ignore
(lwt () = reply sess req.out in
Lwt.wakeup req.finished ();
(lwt reply = reply sess in
Lwt.wakeup req.reply reply;
Lwt.return ())

let call s sess proc arg =
Expand All @@ -79,7 +78,7 @@ let call s sess proc arg =
send s sess (Orpc_js_server.Call (txn_id, proc, arg));
t

let get s conn_id req out =
let get s conn_id req =
let session_id =
try Some (Http_request.param req "session_id")
with Http_types.Param_not_found _ -> None in
Expand All @@ -101,27 +100,27 @@ let get s conn_id req out =
req = None;
} in
Hashtbl.replace s.sessions_by_session_id sess.session_id sess;
reply sess out (* return session_id *)
reply sess (* return session_id *)
| Some sess ->
begin match sess.req with
| None -> ()
| Some req ->
ignore
(lwt () = Http_daemon.respond_error ~status:`Bad_request ~body:"additional connection" out in
Lwt.wakeup req.finished ();
(lwt reply = Http_daemon.respond_error ~status:`Bad_request ~body:"additional connection" () in
Lwt.wakeup req.reply reply;
Lwt.return ());
sess.req <- None;
Hashtbl.remove s.sessions_by_conn_id req.conn_id
end;
if sess.queued_msgs <> []
then reply sess out
then reply sess
else
let t, u = Lwt.wait () in
sess.req <- Some { finished = u; out = out; conn_id = conn_id };
sess.req <- Some { reply = u; conn_id = conn_id };
Hashtbl.replace s.sessions_by_conn_id conn_id sess;
t

let post s req out =
let post s req =
let body = Http_request.body req in
lwt body_string = Http_message.string_of_body body in
let msgs = Orpc_js_server.msgs_of_string body_string in
Expand All @@ -134,7 +133,7 @@ let post s req out =
with Not_found -> None in

match sess with
| None -> Http_daemon.respond_error ~status:`Bad_request ~body:"session_id required" out
| None -> Http_daemon.respond_error ~status:`Bad_request ~body:"session_id required" ()
| Some sess ->
Array.iter
(function
Expand Down Expand Up @@ -173,13 +172,13 @@ let post s req out =

(* XXX to potentially save a poll request, defer replies until here *)
(* XXX and/or param to wait a bit before reply? *)
reply sess out
reply sess

let callback s conn_id req out =
let callback s conn_id req =
match Http_request.meth req with
| `GET -> get s conn_id req out
| `POST -> post s req out
| _ -> Http_daemon.respond_error ~status:`Method_not_allowed ~body:"method not allowed" out
| `GET -> get s conn_id req
| `POST -> post s req
| _ -> Http_daemon.respond_error ~status:`Method_not_allowed ~body:"method not allowed" ()

let conn_closed s conn_id =
try
Expand Down
2 changes: 1 addition & 1 deletion src/orpc-js-comet-server/orpc_js_comet_server.mli
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ val find_session_by_id : t -> string -> session
val session_id : session -> string
val iter_sessions : t -> (string -> session -> unit) -> unit

val callback : t -> (Cohttpserver.Http_daemon.conn_id -> Cohttp.Http_request.request -> Lwt_io.output_channel Lwt.t -> unit Lwt.t)
val callback : t -> (Cohttpserver.Http_daemon.conn_id -> Cohttp.Http_request.request -> string Lwt_stream.t Lwt.t)
val conn_closed : t -> (Cohttpserver.Http_daemon.conn_id -> unit)

0 comments on commit 7916716

Please sign in to comment.