Permalink
Browse files

Merge remote-tracking branch 'origin/async-108.07.00'

  • Loading branch information...
2 parents 6a17160 + 30960dd commit 78d000be4bde9ea3346cb6612a73530275a57c87 @avsm avsm committed Oct 27, 2012
Showing with 12 additions and 14 deletions.
  1. +11 −12 async/cohttp_async.ml
  2. +1 −2 async/cohttp_async.mli
View
@@ -30,19 +30,19 @@ let pipe_of_body read_fn ic =
|Transfer.Done ->
return (Pipe.close wr);
|Transfer.Final_chunk c -> begin
- Pipe.with_write wr ~f:(fun wrfn -> wrfn c)
+ Pipe.write_when_ready wr ~f:(fun wrfn -> wrfn c)
>>| function
|`Closed -> ()
|`Ok _ -> Pipe.close wr
end
|Transfer.Chunk c -> begin
- Pipe.with_write wr ~f:(fun wrfn -> wrfn c)
+ Pipe.write_when_ready wr ~f:(fun wrfn -> wrfn c)
>>= function
|`Closed -> return ()
|`Ok _ -> write ()
end
in
- whenever (write ());
+ don't_wait_for (write ());
rd
let close_all ic oc =
@@ -67,7 +67,7 @@ module Client = struct
|false -> (Some (res, None))
|true ->
let body_rd = pipe_of_body (Response.read_body res) ic in
- if close then whenever (
+ if close then don't_wait_for (
Pipe.closed body_rd >>= fun () ->
close_all ic oc
);
@@ -85,7 +85,7 @@ module Client = struct
match Uri_services.tcp_port_of_uri ~default:"http" uri with
|None -> return None
|Some port ->
- Tcp.connect ~host ~port ()
+ Tcp.(connect (to_host_and_port host port) )
>>= fun (ic,oc) -> write_request ?body req oc
>>= fun () -> read_response ~close:true ic oc
end
@@ -103,8 +103,8 @@ module Server = struct
let res = Response.make ~status
~encoding:(Transfer.Fixed (String.length body)) ?headers () in
let body_rd, body_wr = Pipe.create () in
- whenever (
- Pipe.with_write body_wr ~f:(fun wrfn -> wrfn body)
+ don't_wait_for (
+ Pipe.write_when_ready body_wr ~f:(fun wrfn -> wrfn body)
>>| function |`Closed -> () |`Ok _ -> Pipe.close body_wr);
return (res, (Some body_rd))
@@ -129,15 +129,15 @@ module Server = struct
|false -> return None
)
>>= fun req_body ->
- Pipe.with_write wr_req (fun wrfn -> wrfn (req, req_body))
+ Pipe.write_when_ready wr_req (fun wrfn -> wrfn (req, req_body))
>>= function
|`Closed -> close_all ic oc (* TODO test *)
|`Ok _ ->
if Header.get_connection_close (Request.headers req) then
close ()
else
read_t ()
- in whenever (read_t ());
+ in don't_wait_for (read_t ());
(* Map the requsts onto a response stream to serialise out *)
let rec write_resps () =
Pipe.read rd_req >>= function
@@ -155,7 +155,6 @@ module Server = struct
in write_resps ()
let main spec =
- let on_handler_error = `Raise in (* TODO just for debug *)
- Tcp.serve ~port:spec.port ~on_handler_error (callback spec)
-
+ let listen_on = Tcp.on_port spec.port in
+ Tcp.Server.create listen_on (callback spec)
end
View
@@ -64,6 +64,5 @@ module Server : sig
val respond_string : ?headers:Header.t -> status:Code.status_code -> body:string -> unit -> (Response.t * string Pipe.Reader.t option) Deferred.t
- val main : config -> unit Deferred.t
+ val main : config -> (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t
end
-

0 comments on commit 78d000b

Please sign in to comment.