Permalink
Browse files

removed support for Async interfaces (Lwt is preferred)

  • Loading branch information...
1 parent 701c325 commit a6ddca0532ca6662530ba88a72b4f925704b8ec0 Jake Donham committed Jul 20, 2010
@@ -32,35 +32,6 @@ struct
let module M = Go(Protocol_trace.Sync(T)(Client)) in M.go ();
end
-module Async =
-struct
- module Go (C : Protocol.Async) =
- struct
- let go () =
- (* all requests issued first, then all requests returned (when you call through Client) *)
- C.add1 6 ignore;
- C.addN ~n:7 6 ignore;
- C.add1_list [5;6;7] ignore;
- C.add1_lst (lst_of_list [7;8;9]) ignore;
- C.add1_pair (17, 22) ignore;
- C.maybe_raise true ignore;
- end
-
- module T = Orpc_pp.Trace_of_formatter(struct let formatter = Format.err_formatter end)
-
- let main () =
- let esys = Unixqueue.create_unix_event_system() in
- let c = Protocol_clnt.create_client ~esys (Rpc_client.Inet ("localhost", 9007)) Rpc.Tcp in
-
- let module Client = Protocol_clnt.Async(struct let with_client f = f c end) in
-
- let module M = Go(Protocol_trace.Async(T)(Server_impl.Async)) in M.go ();
- let module M = Go(Protocol_trace.Async(T)(Client)) in M.go ();
-
- Unixqueue.run esys;
- Rpc_client.shut_down c
-end
-
module Lwt =
struct
module Go (C : Protocol.Lwt) =
@@ -99,5 +70,4 @@ end
;;
Sync.main ()
-(* Async.main () *)
(* Lwt.main () *)
@@ -15,7 +15,6 @@ sig
end
module type Sync = Abstract with type 'a _r = 'a
-module type Async = Abstract with type 'a _r = ((unit -> 'a) -> unit) -> unit
module type Lwt = Abstract with type 'a _r = 'a Lwt.t
(*
@@ -30,17 +29,6 @@ sig
val maybe_raise : bool -> unit
end
-module type Async =
-sig
- val add1 : int -> ((unit -> int) -> unit) -> unit
- val add1_list : int list -> ((unit -> int list) -> unit) -> unit
- val add1_pair : (int * int) -> ((unit -> (int * int)) -> unit) -> unit
- val add1_r : Types.r -> ((unit -> Types.r) -> unit) -> unit
- val add1_lst : int Types.lst -> ((unit -> int Types.lst) -> unit) -> unit
- val addN : ?n:int -> int -> ((unit -> int) -> unit) -> unit
- val maybe_raise : bool -> ((unit -> unit) -> unit) -> unit
-end
-
module type Lwt =
sig
val add1 : int -> int Lwt.t
@@ -12,7 +12,6 @@ let start() =
~name:"add1"
~setup:(fun srv () ->
let module M = Protocol_srv.Sync(Server_impl.Sync) in
- (* let module M = Protocol_srv.Async(Server_impl.Async) in *)
(* let module M = Protocol_srv.Lwt(Server_impl.Lwt) in *)
M.bind srv)
();
@@ -29,34 +29,6 @@ struct
then raise (Protocol.Bar 17)
end
-module Async =
-struct
- type 'a _r = ((unit -> 'a) -> unit) -> unit
-
- let add1 i r = r (fun () -> (i + 1))
-
- let add1_list l r = r (fun () -> (List.map (fun i -> i + 1) l))
-
- let add1_pair (a, b) r = r (fun () -> (a + 1, b + 1))
-
- let add1_r { Types.fst = f; snd = s; trd = t } r =
- r (fun () -> {
- Types.fst = f + 1;
- snd = (match s with None -> None | Some s -> Some (s + 1));
- trd = Array.map (fun e -> e + 1) t;
- })
-
- let add1_lst l r = r (fun () -> (lst_map (fun i -> i + 1) l))
-
- let addN ?(n=1) i r = r (fun () -> (i + n))
-
- let maybe_raise flag r =
- r (fun () ->
- if flag
- then raise (Protocol.Bar 17)
- else ())
-end
-
module Lwt =
struct
type 'a _r = 'a Lwt.t
@@ -1,2 +1,3 @@
-<*.ml*> : pkg_orpc-onc, pkg_netplex
-<*.byte> : pkg_orpc-onc, pkg_netplex
+<mm_controller.ml> : syntax_camlp4o, pkg_lwt.syntax
+<*.ml*> : pkg_orpc-onc, pkg_netplex, pkg_lwt
+<*.byte> : pkg_orpc-onc, pkg_netplex, pkg_lwt
@@ -30,8 +30,8 @@ let main() =
(`Socket(Rpc.Tcp,
Rpc_client.Inet(!host,!port),
Rpc_client.default_socket_config)) in
- Mm_proto_multiplier_clnt.test_multiply
- multiplier
+ let module M = Mm_proto_multiplier_clnt.Sync (struct let with_client f = f multiplier end) in
+ M.test_multiply
!lrows !rcols !rrows
@@ -91,7 +91,7 @@ end
module Multiplier (W : sig val workers : (string * int) list end) =
struct
- let ping () emit = emit (fun () -> ())
+ let ping () = Lwt.return ()
let fill m rows cols =
for j = 0 to rows-1 do
@@ -101,7 +101,7 @@ struct
done
- let test_multiply lrows rcols rrows emit =
+ let test_multiply lrows rcols rrows =
(* This is an asynchronous RPC implmentation. This means we don't have to
reply the result immediately. Instead we get an [emit] function, and
we can call this function at some time in the future to pass the result
@@ -117,45 +117,35 @@ struct
current_job_queue := Some(0, lcols, 0, rrows);
(* Now start the computations by telling all workers to go: *)
- let n = ref 0 in
let esys = (Netplex_cenv.self_cont()) # event_system in
let worker_clients = ref [] in
- List.iter
- (fun (host,port) ->
- let worker =
- Mm_proto_worker_clnt.create_client2
- ~program_number:(Rtypes.uint4_of_int 3)
- ~esys
- (`Socket(Rpc.Tcp,
- Rpc_client.Inet(host,port),
- Rpc_client.default_socket_config)) in
- worker_clients := worker :: !worker_clients;
- Mm_proto_worker_clnt.run'async
- worker
- ()
- (fun get_result ->
- (* This function is called back when the worker passes a result
- back for "run"
- *)
- decr n;
- ( try let () = get_result() in () (* check for exceptions *)
- with error ->
- Netplex_cenv.logf `Err "Error from worker: %s"
- (Printexc.to_string error)
- );
- if !n=0 then (
- (* All workers done! *)
- assert(!current_job_queue = None);
- (* Delete the result: *)
- current_matrices := None;
- current_result := None;
- emit (fun () -> ());
- List.iter Rpc_client.shut_down !worker_clients
- )
- );
- incr n
- )
- W.workers
+ let worker_calls =
+ List.map
+ (fun (host,port) ->
+ let worker =
+ Mm_proto_worker_clnt.create_client2
+ ~program_number:(Rtypes.uint4_of_int 3)
+ ~esys
+ (`Socket(Rpc.Tcp,
+ Rpc_client.Inet(host,port),
+ Rpc_client.default_socket_config)) in
+ worker_clients := worker :: !worker_clients;
+ let module M = Mm_proto_worker_clnt.Lwt (struct let with_client f = f worker end) in
+ try_lwt M.run ()
+ with error ->
+ Netplex_cenv.logf `Err "Error from worker: %s"
+ (Printexc.to_string error);
+ Lwt.return ())
+ W.workers in
+ lwt () = Lwt.join worker_calls in
+ (* All workers done! *)
+ assert(!current_job_queue = None);
+ (* Delete the result: *)
+ current_matrices := None;
+ current_result := None;
+ (* XXX original emits reply before shutting down clients---how to achieve in Lwt? *)
+ List.iter Rpc_client.shut_down !worker_clients;
+ Lwt.return ()
end
@@ -183,7 +173,7 @@ let configure cf addr =
let setup srv workers =
let module W = struct let workers = workers end in
- let module M = Mm_proto_multiplier_srv.Async(Multiplier(W)) in
+ let module M = Mm_proto_multiplier_srv.Lwt(Multiplier(W)) in
M.bind
~program_number:(Rtypes.uint4_of_int 1)
srv;
@@ -9,11 +9,11 @@ sig
end
-module type Async =
+module type Lwt =
sig
- val ping : unit -> ((unit -> unit) -> unit) -> unit
+ val ping : unit -> unit Lwt.t
- val test_multiply : int -> int -> int -> ((unit -> unit) -> unit) -> unit
+ val test_multiply : int -> int -> int -> unit Lwt.t
(* Creates a test matrix with random values and multiplies them.
Args are: (l_rows, r_cols, l_cols = r_rows)
*)
@@ -8,3 +8,14 @@ sig
finished with everything.
*)
end
+
+module type Lwt =
+sig
+ val ping : unit -> unit Lwt.t
+
+ val run : unit -> unit Lwt.t
+ (* The controller calls this proc to initiate the action in the worker.
+ When it returns, it is assumed that the worker is completely
+ finished with everything.
+ *)
+end
View
@@ -32,35 +32,6 @@ struct
let module M = Go(Protocol_trace.Sync(T)(Client)) in M.go ();
end
-module Async =
-struct
- module Go (C : Protocol.Async) =
- struct
- let go () =
- (* all requests issued first, then all requests returned (when you call through Client) *)
- C.add1 6 ignore;
- C.addN ~n:7 6 ignore;
- C.add1_list [5;6;7] ignore;
- C.add1_lst (lst_of_list [7;8;9]) ignore;
- C.add1_pair (17, 22) ignore;
- C.maybe_raise true ignore;
- end
-
- module T = Orpc_pp.Trace_of_formatter(struct let formatter = Format.err_formatter end)
-
- let main () =
- let esys = Unixqueue.create_unix_event_system() in
- let c = Protocol_clnt.create_client ~esys (Rpc_client.Inet ("localhost", 9007)) Rpc.Tcp in
-
- let module Client = Protocol_clnt.Async(struct let with_client f = f c end) in
-
- let module M = Go(Protocol_trace.Async(T)(Server_impl.Async)) in M.go ();
- let module M = Go(Protocol_trace.Async(T)(Client)) in M.go ();
-
- Unixqueue.run esys;
- Rpc_client.shut_down c
-end
-
module Lwt =
struct
module Go (C : Protocol.Lwt) =
@@ -99,5 +70,4 @@ end
;;
Sync.main ()
-(* Async.main () *)
(* Lwt.main () *)
@@ -18,7 +18,6 @@ sig
end
module type Sync = Abstract with type 'a _r = 'a
-module type Async = Abstract with type 'a _r = ((unit -> 'a) -> unit) -> unit
module type Lwt = Abstract with type 'a _r = 'a Lwt.t
(*
@@ -33,17 +32,6 @@ sig
val maybe_raise : bool -> unit
end
-module type Async =
-sig
- val add1 : int -> ((unit -> int) -> unit) -> unit
- val add1_list : int list -> ((unit -> int list) -> unit) -> unit
- val add1_pair : (int * int) -> ((unit -> (int * int)) -> unit) -> unit
- val add1_r : r -> ((unit -> r) -> unit) -> unit
- val add1_lst : int lst -> ((unit -> int lst) -> unit) -> unit
- val addN : ?n:int -> int -> ((unit -> int) -> unit) -> unit
- val maybe_raise : bool -> ((unit -> unit) -> unit) -> unit
-end
-
module type Lwt =
sig
val add1 : int -> int Lwt.t
@@ -12,7 +12,6 @@ let start() =
~name:"add1"
~setup:(fun srv () ->
let module M = Protocol_srv.Sync(Server_impl.Sync) in
- (* let module M = Protocol_srv.Async(Server_impl.Async) in *)
(* let module M = Protocol_srv.Lwt(Server_impl.Lwt) in *)
M.bind srv)
();
@@ -29,34 +29,6 @@ struct
then raise (Protocol.Bar 17)
end
-module Async =
-struct
- type 'a _r = ((unit -> 'a) -> unit) -> unit
-
- let add1 i r = r (fun () -> (i + 1))
-
- let add1_list l r = r (fun () -> (List.map (fun i -> i + 1) l))
-
- let add1_pair (a, b) r = r (fun () -> (a + 1, b + 1))
-
- let add1_r { Protocol.fst = f; snd = s; trd = t } r =
- r (fun () -> {
- Protocol.fst = f + 1;
- snd = (match s with None -> None | Some s -> Some (s + 1));
- trd = Array.map (fun e -> e + 1) t;
- })
-
- let add1_lst l r = r (fun () -> (lst_map (fun i -> i + 1) l))
-
- let addN ?(n=1) i r = r (fun () -> (i + n))
-
- let maybe_raise flag r =
- r (fun () ->
- if flag
- then raise (Protocol.Bar 17)
- else ())
-end
-
module Lwt =
struct
type 'a _r = 'a Lwt.t
Oops, something went wrong.

0 comments on commit a6ddca0

Please sign in to comment.