diff --git a/CHANGES.md b/CHANGES.md index ddcabce..5251173 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,8 @@ +## 113.24.00 + +Keep up to date with interface changes in `Async_kernel`, `Async_extra` and +`Async_unix`. + ## 113.00.00 - Added `Async.Std.Printf` module so that one doesn't unintentionally use diff --git a/META.ab b/META.ab new file mode 100644 index 0000000..ed4e6e7 --- /dev/null +++ b/META.ab @@ -0,0 +1,7 @@ +version = "$(pkg_version)" +description = "Monadic concurrency library" +requires = "async_extra async_kernel async_unix bin_prot core fieldslib ppx_assert.runtime-lib ppx_bench.runtime-lib ppx_expect.collector ppx_inline_test.runtime-lib sexplib typerep variantslib" +archive(byte ) = "async.cma" +archive(native ) = "async.cmxa" +archive(native, plugin) = "async.cmxs" +exists_if = "async.cma" diff --git a/Makefile b/Makefile index 04cbd69..ca0d902 100644 --- a/Makefile +++ b/Makefile @@ -1,24 +1,20 @@ # Generic Makefile for oasis project -# Set to setup.exe for the release -SETUP := setup-dev.exe +SETUP := setup.exe +NAME := async +PREFIX = $(shell grep ^prefix= setup.data | cut -d\" -f 2) # Default rule default: build -# Setup for the development version -setup-dev.exe: _oasis setup.ml - grep -v '^#' setup.ml > setup_dev.ml - ocamlfind ocamlopt -o $@ -linkpkg -package ocamlbuild,oasis.dynrun setup_dev.ml || ocamlfind ocamlc -o $@ -linkpkg -package ocamlbuild,oasis.dynrun setup_dev.ml || true - rm -f setup_dev.* - -# Setup for the release -setup.exe: setup.ml - ocamlopt.opt -o $@ $< || ocamlopt -o $@ $< || ocamlc -o $@ $< - rm -f setup.cmx setup.cmi setup.o setup.obj setup.cmo +setup.exe: _oasis setup.ml + ocamlfind ocamlopt -o $@ -linkpkg -package ocamlbuild,oasis.dynrun setup.ml || \ + ocamlfind ocamlc -o $@ -linkpkg -package ocamlbuild,oasis.dynrun setup.ml || true + for f in setup.*; do [ $$f = $@ -o $$f = setup.ml ] || rm -f $$f; done build: $(SETUP) setup.data ./$(SETUP) -build $(BUILDFLAGS) + $(MAKE) $(NAME).install doc: $(SETUP) setup.data build ./$(SETUP) -doc $(DOCFLAGS) @@ -28,15 +24,34 @@ test: $(SETUP) setup.data build all: $(SETUP) ./$(SETUP) -all $(ALLFLAGS) - -install: $(SETUP) setup.data - ./$(SETUP) -install $(INSTALLFLAGS) - -uninstall: $(SETUP) setup.data - ./$(SETUP) -uninstall $(UNINSTALLFLAGS) - -reinstall: $(SETUP) setup.data - ./$(SETUP) -reinstall $(REINSTALLFLAGS) + $(MAKE) $(NAME).install + +$(NAME).install: js-utils/gen_install.ml setup.log setup.data + ocaml -I js-utils js-utils/gen_install.ml + +install: $(NAME).install + opam-installer -i --prefix $(PREFIX) $(NAME).install + +uninstall: $(NAME).install + opam-installer -u --prefix $(PREFIX) $(NAME).install + +reinstall: $(NAME).install + opam-installer -u --prefix $(PREFIX) $(NAME).install &> /dev/null || true + opam-installer -i --prefix $(PREFIX) $(NAME).install + +bin.tar.gz: $(NAME).install + rm -rf _install + mkdir _install + opam-installer -i --prefix _install $(NAME).install + tar czf bin.tar.gz -C _install . + rm -rf _install + +bin.lzo: $(NAME).install + rm -rf _install + mkdir _install + opam-installer -i --prefix _install $(NAME).install + cd _install && lzop -1 -P -o ../bin.lzo `find . -type f` + rm -rf _install clean: $(SETUP) ./$(SETUP) -clean $(CLEANFLAGS) diff --git a/_oasis b/_oasis index f1e80f8..51cf346 100644 --- a/_oasis +++ b/_oasis @@ -1,116 +1,44 @@ -OASISFormat: 0.3 -OCamlVersion: >= 4.00.0 -FindlibVersion: >= 1.3.2 -Name: async -Version: 113.00.00 -Synopsis: Jane Street Capital's asynchronous execution library -Authors: Jane Street Group, LLC -Copyrights: (C) 2008-2013 Jane Street Group LLC -Maintainers: Jane Street Group, LLC -License: Apache-2.0 -LicenseFile: LICENSE.txt -Homepage: https://github.com/janestreet/async -Plugins: StdFiles (0.3), DevFiles (0.3), META (0.3) -XStdFilesAUTHORS: false -XStdFilesREADME: false -BuildTools: ocamlbuild, camlp4o +OASISFormat: 0.4 +OCamlVersion: >= 4.02.3 +FindlibVersion: >= 1.3.2 +Name: async +Version: 113.24.00 +Synopsis: Monadic concurrency library +Authors: Jane Street Group, LLC +Copyrights: (C) 2008-2016 Jane Street Group LLC +Maintainers: Jane Street Group, LLC +License: Apache-2.0 +LicenseFile: LICENSE.txt +Homepage: https://github.com/janestreet/async +Plugins: StdFiles (0.3), DevFiles (0.3), META (0.3) +XStdFilesAUTHORS: false +XStdFilesREADME: false +BuildTools: ocamlbuild +AlphaFeatures: ocamlbuild_more_args +XOCamlbuildPluginTags: package(ppx_driver.ocamlbuild) +FilesAB: META.ab Description: - Part of Jane Street’s Core library. - . + Part of Jane Street’s Core library The Core suite of libraries is an industrial strength alternative to OCaml's standard library that was developed by Jane Street, the largest industrial user of OCaml. -# +-------------------------------------------------------------------+ -# | Library | -# +-------------------------------------------------------------------+ - Library async - Path: src - FindlibName: async - Pack: true - Modules: Clock_unit_tests, - Scheduler_unit_tests, - Std - BuildDepends: async_kernel, - async_unix, - async_extra, - pa_ounit, - pa_ounit.syntax, - pa_test, - pa_test.syntax, - threads - XMETARequires: async_kernel, - async_unix, - async_extra, - pa_ounit, - threads - -# +-------------------------------------------------------------------+ -# | Tests | -# +-------------------------------------------------------------------+ - -Executable test_runner - Path: test - MainIs: test_runner.ml - Build$: flag(tests) - CompiledObject: best - Install: false - BuildDepends: async - -Test test_runner - Run$: flag(tests) - Command: $test_runner - WorkingDirectory: lib_test - -# +-------------------------------------------------------------------+ -# | Benchmarks | -# +-------------------------------------------------------------------+ - -Executable handlers - Path: bench - MainIs: handlers.ml - Build$: flag(tests) - CompiledObject: best - Install: false - BuildDepends: async - -Executable loop - Path: bench - MainIs: loop.ml - Build$: flag(tests) - CompiledObject: best - Install: false - BuildDepends: async - -Executable nanos_per_job - Path: bench - MainIs: nanos_per_job.ml - Build$: flag(tests) - CompiledObject: best - Install: false - BuildDepends: async - -Executable queens - Path: bench - MainIs: queens.ml - Build$: flag(tests) - CompiledObject: best - Install: false - BuildDepends: async - -Executable squash - Path: bench - MainIs: squash.ml - Build$: flag(tests) - CompiledObject: best - Install: false - BuildDepends: async - -Executable tco - Path: bench - MainIs: tco.ml - Build$: flag(tests) - CompiledObject: best - Install: false - BuildDepends: async + Path: src + Pack: true + Modules: Clock_unit_tests, + Scheduler_unit_tests, + Std + BuildDepends: async_extra, + async_kernel, + async_unix, + bin_prot, + core, + fieldslib, + ppx_assert.runtime-lib, + ppx_bench.runtime-lib, + ppx_expect.collector, + ppx_inline_test.runtime-lib, + sexplib, + typerep, + variantslib diff --git a/_tags b/_tags index 320483d..8a3a925 100644 --- a/_tags +++ b/_tags @@ -1,3 +1,8 @@ +<**/*.ml{,i}>: warn(-40), no_alias_deps +<**/*>: thread +# This prevents the implicit addition of -ppx options by ocamlfind +<**/*.ml{,i}>: predicate(ppx_driver) +: for-pack(Async) +: pp(ppx-jane -dump-ast -inline-test-lib async) # OASIS_START # OASIS_STOP -: syntax_camlp4o diff --git a/bench/nanos_per_job.ml b/bench/nanos_per_job.ml index 3fe77f1..8845a42 100644 --- a/bench/nanos_per_job.ml +++ b/bench/nanos_per_job.ml @@ -13,7 +13,7 @@ let () = num_live_jobs (Float.iround_nearest_exn (Time.Span.to_ns elapsed /. Float.of_int !num_jobs)); Ivar.fill finished ()); - for _i = 1 to num_live_jobs do + for _ = 1 to num_live_jobs do let rec loop () = upon Deferred.unit (fun () -> incr num_jobs; diff --git a/bench/squash.ml b/bench/squash.ml index 2cbca10..3b40a11 100644 --- a/bench/squash.ml +++ b/bench/squash.ml @@ -1,5 +1,5 @@ -open Core.Std let _ = _squelch_unused_module_warning_ -open Async.Std let _ = _squelch_unused_module_warning_ +open! Core.Std +open! Async.Std open Core_bench.Std diff --git a/descr b/descr new file mode 100644 index 0000000..c30c53d --- /dev/null +++ b/descr @@ -0,0 +1,5 @@ +Monadic concurrency library +Part of Jane Street’s Core library +The Core suite of libraries is an industrial strength alternative to +OCaml's standard library that was developed by Jane Street, the +largest industrial user of OCaml. diff --git a/example/bin_prot_test.ml b/example/bin_prot_test.ml index f9b822d..977e1ad 100644 --- a/example/bin_prot_test.ml +++ b/example/bin_prot_test.ml @@ -15,7 +15,7 @@ type test = f : [`Foo | `Bar] array; g : float array; } -with bin_io +[@@deriving bin_io] (* This value is very likely larger than e.g. an order, fill, etc. *) let test = diff --git a/example/how-to-implement-rpc-tutorial/README.txt b/example/how-to-implement-rpc-tutorial/README.txt deleted file mode 100644 index fd62a12..0000000 --- a/example/how-to-implement-rpc-tutorial/README.txt +++ /dev/null @@ -1,4 +0,0 @@ -A small example that was used as part of a class on RPC. The -structure of the class was to explain how one might implement RPC, -rather than just describing the API. The file here contains a small -almost-implementation of an RPC like library. diff --git a/example/how-to-implement-rpc-tutorial/rpc.ml b/example/how-to-implement-rpc-tutorial/rpc.ml deleted file mode 100644 index 1c4d5ac..0000000 --- a/example/how-to-implement-rpc-tutorial/rpc.ml +++ /dev/null @@ -1,186 +0,0 @@ -open Core.Std -open Async.Std - -module type Z = sig - - module Path : Identifiable - - module Filesystem : sig - - val listdir : Path.t -> Path.t list Or_error.t - val read_file : Path.t -> string Or_error.t - val move : Path.t -> Path.t -> unit Or_error.t - val put_file : Path.t -> string -> unit Or_error.t - val file_size : Path.t -> int Or_error.t - val file_exists : Path.t -> bool - end - - module Connection : sig - type t - val create : Unix.Inet_addr.t -> port:int -> t Deferred.t - val close : t -> unit - val send : t -> Sexp.t -> unit Deferred.t - val recv : t -> Sexp.t Deferred.t - end - - module Request : sig - type t = - | Listdir of Path.t - | Read_file of Path.t - | Move of Path.t * Path.t - | Put_file of Path.t * string - | File_size of Path.t - | File_exists of Path.t - with sexp - end - - module Response : sig - type t = - | Ok - | Error - | File_size of int - | Contents of string - | Paths of Path.t list - | File_exists of bool - with sexp - end - -end - -module M(Z:Z) = struct - open Z - - let handle_query conn = - Connection.recv conn - >>= fun sexp -> - let module Fs = Filesystem in - let with_err x wrap : Response.t = - match x with - | Error _ -> Error - | Ok x -> wrap x - in - let response : Response.t = - match Request.t_of_sexp sexp with - | Listdir p -> with_err (Fs.listdir p) (fun x -> Paths x) - | Read_file p -> with_err (Fs.read_file p) (fun x -> Contents x) - | Move (p1,p2) -> with_err (Fs.move p1 p2) (fun () -> Ok) - | Put_file (p,c) -> with_err (Fs.put_file p c) (fun () -> Ok) - | File_size p -> with_err (Fs.file_size p) (fun x -> File_size x) - | File_exists p -> File_exists (Fs.file_exists p) - in - Connection.send conn (Response.sexp_of_t response) - ;; - - - let rpc_listdir conn path = - Connection.send conn (Request.sexp_of_t (Listdir path)) - >>= fun () -> - Connection.recv conn - >>= fun sexp -> - match Response.t_of_sexp sexp with - | Paths x -> return (Ok x) - | Error -> return (Or_error.error_string "Failed!") - | _ -> assert false (* we didn't mean that here *) - - - (* Another approach..... *) - - module Embedding = struct - type 'a t = { inj: 'a -> Sexp.t - ; prj: Sexp.t -> 'a - } - end - - module Rpc = struct - type ('a,'b) t = - { name : string - ; request : 'a Embedding.t - ; response : 'b Embedding.t - } - end - - - let listdir_rpc = - { Rpc. - name = "listdir" - ; request = { inj = <:sexp_of> - ; prj = <:of_sexp> - } - ; response = { inj = <:sexp_of> - ; prj = <:of_sexp> - } - } - - let read_file_rpc = - { Rpc. - name = "read_file" - ; request = { inj = <:sexp_of> - ; prj = <:of_sexp> - } - ; response = { inj = <:sexp_of> - ; prj = <:of_sexp> - } - } - - - type full_query = string * Sexp.t with sexp - - module Handler : sig - type t - val implement : ('a,'b) Rpc.t -> ('a -> 'b) -> t - val handle : t list -> Sexp.t -> Sexp.t - end = struct - - type t = { name: string - ; handle : Sexp.t -> Sexp.t } - - let implement (rpc:(_,_) Rpc.t) f = - let handle sexp = - sexp - |> rpc.request.prj - |> f - |> rpc.response.inj - in - { name = rpc.Rpc.name - ; handle - } - - let handle (handlers : t list) sexp = - let (name,query_sexp) = full_query_of_sexp sexp in - let handler = - List.find_exn handlers ~f:(fun t -> t.name = name) - in - handler.handle query_sexp - end - - - let server_handle_query conn = - Connection.recv conn - >>= fun sexp -> - let resp = - Handler.handle [ Handler.implement listdir_rpc Filesystem.listdir - ; Handler.implement read_file_rpc Filesystem.read_file - ] - sexp - in - Connection.send conn resp - - let dispatch (rpc:(_,_) Rpc.t) conn arg = - let query_sexp = rpc.request.inj arg in - Connection.send conn (sexp_of_full_query (rpc.name,query_sexp)) - >>= fun () -> - Connection.recv conn - >>= fun sexp -> - return (rpc.response.prj sexp) - - module Client : sig - val listdir : Connection.t -> Path.t -> Path.t list Or_error.t Deferred.t - val read_file : Connection.t -> Path.t -> string Or_error.t Deferred.t - end = struct - let listdir conn x = dispatch listdir_rpc conn x - let read_file conn x = dispatch read_file_rpc conn x - end - -end - - diff --git a/example/jobs_speed_test.ml b/example/jobs_speed_test.ml index 00f439d..46131a9 100644 --- a/example/jobs_speed_test.ml +++ b/example/jobs_speed_test.ml @@ -31,7 +31,7 @@ let () = let start = Time.now () in upon (run_test ()) (fun () -> let stop = Time.now () in - Printf.printf "elapsed time: %s\n" (Time.Span.to_string (Time.diff stop start)); + printf "elapsed time: %s\n" (Time.Span.to_string (Time.diff stop start)); Shutdown.shutdown 0); never_returns (Scheduler.go ()); ;; diff --git a/example/key_value_store.ml b/example/key_value_store.ml index 8a24500..3d60864 100644 --- a/example/key_value_store.ml +++ b/example/key_value_store.ml @@ -8,7 +8,7 @@ module Protocol = struct | Req_find of string | Req_remove of string | Req_invalid - with sexp + [@@deriving sexp] let from_raw msg = match String.split msg ~on:' ' with @@ -23,7 +23,7 @@ module Protocol = struct type t = Rep_fail of string | Rep_succ of string | Rep_help - with sexp + [@@deriving sexp] let to_raw t = match t with @@ -69,7 +69,7 @@ module Key_value_store = struct |(R.Req_help | R.Req_invalid) -> P.Rep_help | R.Req_add (k, v) -> - Hashtbl.replace t.store ~key:k ~data:v; + Hashtbl.set t.store ~key:k ~data:v; P.Rep_succ ("New pair stored: "^k^" -> "^v) | R.Req_find k -> begin match Hashtbl.find t.store k with diff --git a/example/rpc/rpc_client.ml b/example/rpc/rpc_client.ml deleted file mode 100644 index e15b39f..0000000 --- a/example/rpc/rpc_client.ml +++ /dev/null @@ -1,86 +0,0 @@ -open Core.Std -open Async.Std - -type addr = { host:string; port:int } - -let dispatch rpc {host;port} arg = - Rpc.Connection.with_client ~host ~port - (fun conn -> Rpc.Rpc.dispatch_exn rpc conn arg) - >>| Result.ok_exn - -let pipe_dispatch rpc {host;port} arg f = - Rpc.Connection.with_client ~host ~port - (fun conn -> - Rpc.Pipe_rpc.dispatch_exn rpc conn arg - >>= fun (pipe,_) -> - f pipe - ) - >>| Result.ok_exn - -let set_id_counter addr new_id = - dispatch Rpc_intf.set_id_counter addr new_id - -let set_id_counter_v0 addr new_id_pair = - dispatch Rpc_intf.set_id_counter_v0 addr new_id_pair - -let get_unique_id addr = - dispatch Rpc_intf.get_unique_id addr () - >>| fun id -> - printf "UNIQUE ID: %d\n" id -;; - -let counter_values addr = - pipe_dispatch Rpc_intf.counter_values addr () (fun reader -> - Pipe.iter_without_pushback reader ~f:(fun i -> - printf "COUNTER: %d\n%!" i)) - -(* Setting up the command-line interface *) - -let host_and_port () = - Command.Spec.( - step (fun k host port -> k {host; port}) - +> flag "-host" ~doc:" server IP" (optional_with_default "127.0.0.1" string) - +> flag "-port" ~doc:" server port" (optional_with_default 8080 int) - ) - -let get_unique_id_cmd = - Command.async_basic - ~summary:"get unique id from server" - (host_and_port ()) - (fun addr () -> get_unique_id addr) - -let set_id_counter_cmd = - Command.async_basic - ~summary:"forcibly set the unique id counter. DANGEROUS" - Command.Spec.( - host_and_port () - +> anon ("counter" %: int) - ) - (fun addr i () -> set_id_counter addr i) - -(* This one is actually unsupported by the server, so using it will trigger an error. *) -let set_id_counter_cmd_v0 = - Command.async_basic - ~summary:"forcibly set the unique id counter. DANGEROUS" - Command.Spec.( - host_and_port () - +> anon ("counter1" %: int) - +> anon ("counter2" %: int) - ) - (fun addr id1 id2 () -> set_id_counter_v0 addr (id1,id2)) - -let counter_values_cmd = - Command.async_basic - ~summary:"subscribe to changes to counter id" - (host_and_port ()) - (fun addr () -> counter_values addr) - -let () = - Command.run - (Command.group ~summary:"Client for trivial Async-RPC server" - [ "get-unique-id" , get_unique_id_cmd - ; "set-id-counter" , set_id_counter_cmd - ; "set-id-counter-v0", set_id_counter_cmd_v0 - ; "counter-values" , counter_values_cmd - ] - ) diff --git a/example/rpc/rpc_intf.ml b/example/rpc/rpc_intf.ml deleted file mode 100644 index cccd19f..0000000 --- a/example/rpc/rpc_intf.ml +++ /dev/null @@ -1,38 +0,0 @@ -open Core.Std -open Async.Std - -let get_unique_id = Rpc.Rpc.create - ~name:"get-unique-id" - ~version:0 - ~bin_query:Unit.bin_t - ~bin_response:Int.bin_t - -let set_id_counter = Rpc.Rpc.create - ~name:"set-id-counter" - (* Note that the version number is 1, because there is an older v0 query defined below - around. *) - ~version:1 - ~bin_query:Int.bin_t - ~bin_response:Unit.bin_t - - -(* This type is here only for the purpose of getting the ability to bin_prot an int - pair. *) -module Int_pair = struct - type t = int * int with bin_io -end - -let set_id_counter_v0 = Rpc.Rpc.create - ~name:"set-id-counter" - ~version:0 - ~bin_query:Int_pair.bin_t - ~bin_response:Unit.bin_t - - -let counter_values = Rpc.Pipe_rpc.create - ~name:"counter-values" - ~version:0 - ~bin_query:Unit.bin_t - ~bin_response:Int.bin_t - ~bin_error:Unit.bin_t - () diff --git a/example/rpc/rpc_intf.mli b/example/rpc/rpc_intf.mli deleted file mode 100644 index 5111431..0000000 --- a/example/rpc/rpc_intf.mli +++ /dev/null @@ -1,14 +0,0 @@ -open Core.Std -open Async.Std - -(** Query for grabbing a unique ID *) -val get_unique_id : (unit,int) Rpc.Rpc.t - -(** Query for setting the counter used to generate unique IDs *) -val set_id_counter : (int,unit) Rpc.Rpc.t - -(** This is a deprecated query, no longer supported by the server *) -val set_id_counter_v0 : (int * int,unit) Rpc.Rpc.t - -(** For getting a stream updating the counter values *) -val counter_values : (unit,int,unit) Rpc.Pipe_rpc.t diff --git a/example/rpc/rpc_server.ml b/example/rpc/rpc_server.ml deleted file mode 100644 index 9f3c127..0000000 --- a/example/rpc/rpc_server.ml +++ /dev/null @@ -1,66 +0,0 @@ -open Core.Std -open Async.Std - - -(* The list of implementations supported by the server. The server state is simply a - counter used for allocating unique ids. *) -let implementations = - [ Rpc.Rpc.implement Rpc_intf.get_unique_id - (fun ctr () -> - printf ".%!"; - incr ctr; - return !ctr - ) - ; Rpc.Rpc.implement Rpc_intf.set_id_counter - (fun ctr i -> - printf "!%!"; - if i = 0 then failwith "Can't set counter back to zero"; - return (ctr := i) - ) - - ; Rpc.Pipe_rpc.implement Rpc_intf.counter_values - (fun ctr () ~aborted -> - let (r,w) = Pipe.create () in - let last_value = ref !ctr in - let send () = - last_value := !ctr; - Pipe.write w !ctr - in - don't_wait_for (send ()); - Clock.every' ~stop:aborted (sec 0.1) (fun () -> - if !last_value <> !ctr - then send () else return () - ); - return (Ok r) - ) - ] - -let main ~port = - let counter = ref 0 in - let implementations = - Rpc.Implementations.create ~implementations ~on_unknown_rpc:`Close_connection - in - match implementations with - | Error (`Duplicate_implementations _descrs) -> assert false - | Ok implementations -> - let server = - Tcp.Server.create (Tcp.on_port port) ~on_handler_error:`Ignore - (fun _addr reader writer -> - Rpc.Connection.server_with_close reader writer ~implementations - ~connection_state:(fun _ -> counter) - ~on_handshake_error:`Ignore) - in - ignore (server : (_,_) Tcp.Server.t Deferred.t); - Deferred.never () - -let () = - Command.async_basic - ~summary:"A trivial Async-RPC server" - Command.Spec.( - empty - +> flag "-port" ~doc:" Port to listen on" - (optional_with_default 8080 int) - ) - (fun port () -> main ~port) - |> Command.run - diff --git a/example/sexp_with_text.ml b/example/sexp_with_text.ml index c308108..dbfdf8e 100644 --- a/example/sexp_with_text.ml +++ b/example/sexp_with_text.ml @@ -2,11 +2,11 @@ open Core.Std open Async.Std module Data = struct - type t = (int * string) list with sexp + type t = (int * string) list [@@deriving sexp] end type t = Data.t Sexp.With_text.t String.Map.t -with sexp +[@@deriving sexp] let edit_file (type a) (module A : Sexpable with type t = a) filename = let editor = @@ -49,7 +49,7 @@ let rec edit_loop t = let current = match Map.find t key with | Some x -> x - | None -> Sexp.With_text.of_value <:sexp_of<(int*string) list>> [] + | None -> Sexp.With_text.of_value [%sexp_of: (int*string) list] [] in let filename = Filename.temp_file "test" ".scm" in Writer.save filename ~contents:(Sexp.With_text.text current) @@ -64,7 +64,7 @@ let rec edit_loop t = printf "\njust data:\n"; printf "%s\n" (Map.map ~f:Sexp.With_text.value t - |> <:sexp_of> + |> [%sexp_of: Data.t String.Map.t] |> Sexp.to_string_hum); edit_loop t diff --git a/example/thread_pool_not_stuck.ml b/example/thread_pool_not_stuck.ml index 58a49f7..419b8d2 100644 --- a/example/thread_pool_not_stuck.ml +++ b/example/thread_pool_not_stuck.ml @@ -1,4 +1,4 @@ -open Core.Std let _ = _squelch_unused_module_warning_ +open! Core.Std open Async.Std let () = diff --git a/example/thread_pool_stuck.ml b/example/thread_pool_stuck.ml deleted file mode 100644 index 0f30078..0000000 --- a/example/thread_pool_stuck.ml +++ /dev/null @@ -1,30 +0,0 @@ -open Core.Std let _ = _squelch_unused_module_warning_ -open Async.Std - -let () = - let num_sleeping = ref 0 in - let message () = - if false then Core.Std.eprintf "%d sleeping\n%!" !num_sleeping - in - let consume ~num_jobs ~sleep_for = - Deferred.ignore - (Deferred.List.init ~how:`Parallel num_jobs ~f:(fun _ -> - In_thread.run (fun () -> - incr num_sleeping; - message (); - Core.Std.Unix.sleep sleep_for; - decr num_sleeping; - message ()))) - in - (* This causes a single message after about one second, while the initial 50 jobs are - blocked. Then, no more messages for then next ~20s as the 500 jobs complete and new - ones take their place. *) - consume ~num_jobs:500 ~sleep_for:2 - >>> fun () -> - (* This causes one message per second for 60s, until failure. *) - consume ~num_jobs:100 ~sleep_for:100 - >>> fun () -> - shutdown 0 -;; - -let () = never_returns (Scheduler.go ()) diff --git a/js-utils/gen_install.ml b/js-utils/gen_install.ml new file mode 100644 index 0000000..39db642 --- /dev/null +++ b/js-utils/gen_install.ml @@ -0,0 +1,102 @@ +(* Generate .install from setup.log *) + +#use "install_tags.ml" + +module String_map = Map.Make(String) +let string_map_of_list = + List.fold_left + (fun acc (k, v) -> + assert (not (String_map.mem k acc)); + String_map.add k v acc) + String_map.empty + +let lines_of_file fn = + let ic = open_in fn in + let rec loop acc = + match input_line ic with + | exception End_of_file -> + close_in ic; + List.rev acc + | line -> + loop (line :: acc) + in + loop [] + +let read_setup_log () = + lines_of_file "setup.log" + |> List.map (fun line -> Scanf.sscanf line "%S %S" (fun tag arg -> (tag, arg))) + +let read_setup_data () = + lines_of_file "setup.data" + |> List.map (fun line -> Scanf.sscanf line "%[^=]=%S" (fun k v -> (k, v))) + +let remove_cwd = + let prefix = Sys.getcwd () ^ "/" in + let len_prefix = String.length prefix in + fun fn -> + let len = String.length fn in + if len >= len_prefix && String.sub fn 0 len_prefix = prefix then + String.sub fn len_prefix (len - len_prefix) + else + fn + +let gen_section oc name files = + let pr fmt = Printf.fprintf oc (fmt ^^ "\n") in + pr "%s: [" name; + List.iter + (fun (src, dst) -> + let src = remove_cwd src in + let dst = + match dst with + | None -> Filename.basename src + | Some fn -> fn + in + if src = dst then + pr " %S" src + else + pr " %S {%S}" src dst) + files; + pr "]" + +let rec filter_log tags log acc = + match log with + | [] -> acc + | (tag, fname) :: rest -> + match String_map.find tag tags with + | exception Not_found -> filter_log tags rest acc + | dst -> filter_log tags rest ((fname, dst) :: acc) + +let () = + let log = read_setup_log () in + let setup_data = read_setup_data () in + let ext_dll = + match List.assoc "ext_dll" setup_data with + | ext -> ext + | exception Not_found -> ".so" + in + let merge name files map = + match String_map.find name map with + | files' -> String_map.add name (files @ files') map + | exception Not_found -> String_map.add name files map + in + let sections = + List.fold_left + (fun acc (name, tags, extra_files) -> + let tags = string_map_of_list tags in + let files = filter_log tags log [] @ extra_files in + if name = "lib" then + let stubs, others = + List.partition + (fun (fn, _) -> Filename.check_suffix fn ext_dll) + files + in + merge "lib" others (merge "stublibs" stubs acc) + else + merge name files acc) + String_map.empty sections + |> String_map.bindings + |> List.filter (fun (_, l) -> l <> []) + in + let oc = open_out (package_name ^ ".install") in + List.iter (fun (name, files) -> gen_section oc name files) sections; + close_out oc diff --git a/js-utils/install_tags.ml b/js-utils/install_tags.ml new file mode 100644 index 0000000..04fa736 --- /dev/null +++ b/js-utils/install_tags.ml @@ -0,0 +1,9 @@ +let package_name = "async" + +let sections = + [ ("lib", + [ ("built_lib_async", None) + ], + [ ("META", None) + ]) + ] diff --git a/myocamlbuild.ml b/myocamlbuild.ml index 18b7dee..faa940f 100644 --- a/myocamlbuild.ml +++ b/myocamlbuild.ml @@ -1,31 +1,30 @@ (* OASIS_START *) (* OASIS_STOP *) +# 3 "myocamlbuild.ml" -let dispatch = function +(* Temporary hacks *) +let js_hacks = function | After_rules -> - (* ocamlbuild rule for native packs seems to be broken, it tries to touch - lib/async.mli. The workaroung is to remove "touch" at the beginning of commands - executed by ocamlbuild. *) - let module U = Ocamlbuild_pack.My_unix in - let orig_execute_many = U.implem.U.execute_many in - let execute_many ?max_jobs ?ticker ?period ?display commands = - let commands = - List.map - (List.map (fun task () -> - let cmd = task () in - if String.is_prefix "touch " cmd then begin - Ocamlbuild_pack.Log.eprintf - "removing 'touch' inserted by broken ocamlbuild rule for packed modules."; - let idx = String.index cmd ';' in - String.after cmd (idx + 1) - end else - cmd)) - commands - in - orig_execute_many ?max_jobs ?ticker ?period ?display commands - in - U.implem.U.execute_many <- execute_many - | _ -> - () + rule "Generate a cmxs from a cmxa" + ~dep:"%.cmxa" + ~prod:"%.cmxs" + ~insert:`top + (fun env _ -> + Cmd (S [ !Options.ocamlopt + ; A "-shared" + ; A "-linkall" + ; A "-I"; A (Pathname.dirname (env "%")) + ; A (env "%.cmxa") + ; A "-o" + ; A (env "%.cmxs") + ])); -let () = Ocamlbuild_plugin.dispatch (fun hook -> dispatch hook; dispatch_default hook) + (* Pass -predicates to ocamldep *) + pflag ["ocaml"; "ocamldep"] "predicate" (fun s -> S [A "-predicates"; A s]) + | _ -> () + +let () = + Ocamlbuild_plugin.dispatch (fun hook -> + js_hacks hook; + Ppx_driver_ocamlbuild.dispatch hook; + dispatch_default hook) diff --git a/opam b/opam new file mode 100644 index 0000000..0b353e0 --- /dev/null +++ b/opam @@ -0,0 +1,32 @@ +opam-version: "1.2" +maintainer: "opensource@janestreet.com" +authors: ["Jane Street Group, LLC "] +homepage: "https://github.com/janestreet/async" +bug-reports: "https://github.com/janestreet/async/issues" +dev-repo: "https://github.com/janestreet/async.git" +license: "Apache-2.0" +build: [ + ["./configure" "--prefix" prefix] + [make] +] +depends: [ + "ocamlbuild" {build} + "oasis" {build & >= "0.4"} + "ocamlfind" {build & >= "1.3.2"} + "async_extra" + "async_kernel" + "async_unix" + "bin_prot" + "core" + "fieldslib" + "ppx_assert" + "ppx_bench" + "ppx_driver" + "ppx_expect" + "ppx_inline_test" + "ppx_jane" + "sexplib" + "typerep" + "variantslib" +] +available: [ ocaml-version >= "4.02.3" ] diff --git a/setup.ml b/setup.ml index 1321b02..f6b6bc3 100644 --- a/setup.ml +++ b/setup.ml @@ -1,6 +1,6 @@ (* OASIS_START *) -#use "topfind";; -#require "oasis.dynrun";; open OASISDynRun;; +open OASISTypes;; (* OASIS_STOP *) + let () = setup () diff --git a/src/clock_unit_tests.ml b/src/clock_unit_tests.ml index 2fd24f1..05f2544 100644 --- a/src/clock_unit_tests.ml +++ b/src/clock_unit_tests.ml @@ -1,8 +1,8 @@ open Core.Std open Std -TEST_MODULE "Clock.every" = struct - TEST_UNIT = (* [~stop] *) +let%test_module "Clock.every" = (module struct + let%test_unit _ = (* [~stop] *) Thread_safe.block_on_async_exn (fun () -> let r = ref 1_000 in let stop = Ivar.create () in @@ -14,7 +14,7 @@ TEST_MODULE "Clock.every" = struct Ivar.read stop) ;; - TEST_UNIT "[every f ~stop] doesn't hold onto [f] after [stop] becomes determined" = + let%test_unit "[every f ~stop] doesn't hold onto [f] after [stop] becomes determined" = Thread_safe.block_on_async_exn (fun () -> let event = Clock.Event.run_after (sec 10.) (fun () -> failwith "test timed out") () @@ -41,7 +41,7 @@ TEST_MODULE "Clock.every" = struct exception E - TEST_UNIT = (* [~continue_on_error:true] *) + let%test_unit _ = (* [~continue_on_error:true] *) Thread_safe.block_on_async_exn (fun () -> let count = 100 in let r = ref count in @@ -68,7 +68,7 @@ TEST_MODULE "Clock.every" = struct Ivar.read finished) ;; - TEST_UNIT = (* [~continue_on_error:false] *) + let%test_unit _ = (* [~continue_on_error:false] *) Thread_safe.block_on_async_exn (fun () -> let exns = Monitor.catch_stream (fun () -> @@ -87,7 +87,7 @@ TEST_MODULE "Clock.every" = struct Ivar.read finished) ;; - TEST_UNIT = (* if [f] asynchronously raises and also returns, the exception goes to the + let%test_unit _ = (* if [f] asynchronously raises and also returns, the exception goes to the enclosing monitor, and iteration continues. *) List.iter [ false; true ] @@ -118,12 +118,12 @@ TEST_MODULE "Clock.every" = struct >>= fun () -> Ivar.read got_exn)) ;; -end +end) -TEST_MODULE = struct +let%test_module _ = (module struct module Event = Clock.Event - TEST_UNIT = (* abort *) + let%test_unit _ = (* abort *) Thread_safe.block_on_async_exn (fun () -> let event = Event.after (sec 1_000.) in assert (Event.abort event () = `Ok); @@ -134,7 +134,7 @@ TEST_MODULE = struct | `Aborted () -> return ()) ;; - TEST_UNIT = (* happen *) + let%test_unit _ = (* happen *) Thread_safe.block_on_async_exn (fun () -> Deferred.List.iter [ -1.; 0.; 0.01 ] ~f:(fun span -> let span = sec span in @@ -150,19 +150,19 @@ TEST_MODULE = struct assert (Event.abort event () = `Previously_happened ()))) ;; - TEST_UNIT = (* reschedule *) + let%test_unit _ = (* reschedule *) Thread_safe.block_on_async_exn (fun () -> let count = ref 0 in - let time0 = Timing_wheel_ns.now (Async_kernel.Scheduler0.t ()).events in + let time0 = Timing_wheel_ns.now (Async_kernel.Scheduler1.t ()).events in let after span = Time_ns.add time0 span in let module Event = Clock_ns.Event in let sec = Time_ns.Span.of_sec in let event = Event.run_at (after (sec 1.)) incr count in let ensure_scheduled_after span = - <:test_result< int >> !count ~expect:0; + [%test_result: int] !count ~expect:0; match Event.status event with | `Aborted () | `Happened () -> assert false - | `Scheduled_at time -> <:test_result< Time_ns.t >> time ~expect:(after span) + | `Scheduled_at time -> [%test_result: Time_ns.t] time ~expect:(after span) in ensure_scheduled_after (sec 1.); let ensure_reschedule_after_is_ok span = @@ -192,12 +192,12 @@ TEST_MODULE = struct >>| function | `Aborted () -> assert false | `Happened () -> - <:test_result< int >> !count ~expect:1; + [%test_result: int] !count ~expect:1; assert (Event.abort event () = `Previously_happened ()); assert (Event.reschedule_after event (sec 1.) = `Previously_happened ())) ;; - TEST_UNIT = (* [Event.run_after] where [f] raises *) + let%test_unit _ = (* [Event.run_after] where [f] raises *) Thread_safe.block_on_async_exn (fun () -> let event = ref None in try_with (fun () -> @@ -211,7 +211,7 @@ TEST_MODULE = struct | `Aborted _ | `Happened _ -> assert false) ;; - TEST_UNIT = (* [Event.run_after] where [f] calls [abort] *) + let%test_unit _ = (* [Event.run_after] where [f] calls [abort] *) Thread_safe.block_on_async_exn (fun () -> let event_ref = ref None in let event = @@ -224,4 +224,4 @@ TEST_MODULE = struct | `Aborted () -> () | `Happened () -> assert false) ;; -end +end) diff --git a/src/scheduler_unit_tests.ml b/src/scheduler_unit_tests.ml index cd1e5cf..6345e85 100644 --- a/src/scheduler_unit_tests.ml +++ b/src/scheduler_unit_tests.ml @@ -1,9 +1,9 @@ open Core.Std open Std -TEST_MODULE = struct +let%test_module _ = (module struct (* [Scheduler.run_cycles_until_no_jobs_remain] includes things scheduled in the past *) - TEST_UNIT = + let%test_unit _ = Thread_safe.block_on_async_exn (fun () -> let has_determined = ref false in upon (after (sec 0.)) (fun () -> has_determined := true); @@ -14,7 +14,7 @@ TEST_MODULE = struct (* [Scheduler.run_cycles_until_no_jobs_remain] keeps running cycles as long as there are jobs in the past. *) - TEST_UNIT = + let%test_unit _ = Thread_safe.block_on_async_exn (fun () -> let has_determined = ref false in let rec loop i = @@ -30,7 +30,7 @@ TEST_MODULE = struct (* [Scheduler.run_cycles_until_no_jobs_remain] does not include things outside of the event precision. *) - TEST_UNIT = + let%test_unit _ = Thread_safe.block_on_async_exn (fun () -> let has_determined = ref false in upon (after (Time.Span.of_day 1.)) (fun () -> has_determined := true); @@ -38,4 +38,4 @@ TEST_MODULE = struct assert (not !has_determined); Deferred.unit) ;; -end +end) diff --git a/src/std.ml b/src/std.ml index b17d5d3..e219beb 100644 --- a/src/std.ml +++ b/src/std.ml @@ -2,7 +2,7 @@ include Async_kernel.Std include Async_unix.Std include Async_extra.Std -(* Check that no Async library code accidentally created the scheduler. *) -let () = assert (Scheduler.is_ready_to_initialize ()) +let%test "Async library initialization does not initialize the scheduler" = + Scheduler.is_ready_to_initialize () +;; -let _squelch_unused_module_warning_ = () diff --git a/test/async_test_in_child_process.ml b/test/async_test_in_child_process.ml deleted file mode 100644 index 9d5dac9..0000000 --- a/test/async_test_in_child_process.ml +++ /dev/null @@ -1,166 +0,0 @@ -open Core.Std -open Async.Std - -(* Turn off [ASYNC_CONFIG], so it isn't inherited by child processes. *) -let () = Unix.unsetenv "ASYNC_CONFIG" - -module Expect = struct - type t = (string * (Process.Output.t -> unit Or_error.t Deferred.t)) Blang.t - with sexp_of - - type t1 = (string * unit Or_error.t Deferred.t) Blang.t - with sexp_of - - let custom string f : t = Blang.base (string, fun _ -> f ()) - - let eval (t : t) output = - let t1 = Blang.map t ~f:(fun (expect, check) -> (expect, check output)) in - Deferred.map (Deferred.all (List.map (Blang.values t1) ~f:snd)) ~f:(fun _ -> - if Blang.eval t1 (fun (_, d) -> - match Deferred.peek d with - | None -> assert false - | Some r -> Result.is_ok r) - then Ok () - else Or_error.error "unexpected output" t1 <:sexp_of< t1 >>) - ;; - - let bool expected f : t = - Blang.base (expected, fun output -> - return (if f output then Ok () else Or_error.error_string expected)) - - let status expected f : t = - bool ("exit status " ^ expected) - (fun { Process.Output.exit_status; _ } -> f exit_status) - ;; - - let ok = status "zero" Result.is_ok - - let error = status "nonzero" Result.is_error - - let no_output = - bool "no output" - (fun { Process.Output. stdout; stderr; _ } -> stdout = "" && stderr = "") - ;; - - let ( || ) t1 t2 = Blang.or_ [ t1; t2 ] - let ( && ) t1 t2 = Blang.and_ [ t1; t2 ] - let not t = Blang.not_ t -end - -let test_parent_subcommand = "test-parent" -let test_child_subcommand = "test-child" - -module Test = struct - type t = - { id : int; - source_code_position : Source_code_position.t; - expect : Expect.t; - run : unit -> unit Deferred.t; - } - with sexp_of - - let id_counter = ref 0 - - let all = ref [] - - let add source_code_position expect run = - incr id_counter; - all := { id = !id_counter; - source_code_position; - expect; - run; - } :: !all; - ;; - - let get_id id = - match List.find !all ~f:(fun t -> id = t.id) with - | Some t -> t - | None -> failwiths "no test with id" id <:sexp_of< int >> - ;; - - let run_child id = - let t = get_id id in - t.run () - ;; - - let run_parent id = - let t = get_id id in - Process.create - ~prog:Sys.executable_name - ~args:[ test_child_subcommand; - Int.to_string t.id; - ] - () - >>= function - | Error e -> Error.raise e - | Ok process -> - Process.collect_output_and_wait process - >>= fun output -> - Monitor.try_with (fun () -> Expect.eval t.expect output) - >>| function - | Ok (Ok ()) -> () - | Ok (Error error) -> - failwiths "check_in_parent returned error" error <:sexp_of< Error.t >> - | Error exn -> - failwiths "check_in_parent raised" exn <:sexp_of< exn >> - ;; - - let run_all () = - Deferred.List.filter_map !all ~f:(fun t -> - Process.run - ~prog:Sys.executable_name - ~args:[ test_parent_subcommand; - Int.to_string t.id; - ] - () - >>| function - | Error error -> Some (t, error) - | Ok "" -> None - | Ok s -> - Some (t, - Error.create "\ -parent unexpectedly wrote to stdout while exiting with status zero" - s <:sexp_of< string >>)) - >>| fun errors -> - if not (List.is_empty errors) then - failwiths "errors" - (List.map errors ~f:(fun (t, error) -> (t.source_code_position, error))) - (<:sexp_of< (Source_code_position.t * Error.t) list >>); - ;; -end - -let add_test = Test.add - -let shutdown_after d = upon d (fun () -> shutdown 0) - -let main () = - Command.( - Spec.( - run - (group ~summary:"testing stuff" - [ "list", - basic ~summary:"list all the tests" - empty - (fun () -> - eprintf "%s\n" - (Sexp.to_string_hum (<:sexp_of< Test.t list >> !Test.all)); - shutdown 0); - - "test-all", - basic ~summary:"run all the tests" - empty - (fun () -> shutdown_after (Test.run_all ())); - - test_parent_subcommand, - basic ~summary:"run the parent process of a test" - (empty +> anon ("ID" %: int)) - (fun id () -> shutdown_after (Test.run_parent id)); - - test_child_subcommand, - basic ~summary:"run the child process of a test" - (empty +> anon ("ID" %: int)) - (fun id () -> shutdown_after (Test.run_child id)); - - ]))); - Scheduler.go (); -;; diff --git a/test/async_test_in_child_process.mli b/test/async_test_in_child_process.mli deleted file mode 100644 index 4644709..0000000 --- a/test/async_test_in_child_process.mli +++ /dev/null @@ -1,24 +0,0 @@ -open Core.Std -open Async.Std - -module Expect : sig - type t with sexp_of - - val ok : t - val error : t - val no_output : t - - val custom : string -> (unit -> unit Or_error.t Deferred.t) -> t - - val ( && ) : t -> t -> t - val ( || ) : t -> t -> t - val not : t -> t -end - -val add_test - : Source_code_position.t - -> Expect.t - -> (unit -> unit Deferred.t) - -> unit - -val main : unit -> never_returns diff --git a/test/bind_test.ml b/test/bind_test.ml deleted file mode 100644 index 3b7d35d..0000000 --- a/test/bind_test.ml +++ /dev/null @@ -1,329 +0,0 @@ -open Async.Std - -(* Note: The ordering of trace_connect outputs don't appear - to correctly correspond to tests. Sorry! *) - -(* [protect] generates a single spurious empty_star trace. - [all_unit] generates a single spurious full_star trace. *) - -(* Reminder: - - let bind t f = - create (fun i -> t >>> fun a -> connect i (f a)) - ;; -*) - -let trace = ref false -(* let trace = Deferred.debug_trace_connect *) - -(** Guaranteed to be filled after everything else that - will be scheduled this thread has been executed. *) -let tick () = Deferred.create (fun ivar -> - Deferred.unit >>> fun () -> - Deferred.unit >>> fun () -> - Ivar.fill ivar ()) - -let chatty f () = - let g () = Deferred.create (fun ivar -> - trace := true; - f () >>> fun () -> (trace := false; Ivar.fill ivar ()) - ) in - Monitor.protect g ~finally:(fun () -> trace := false; return ()) -;; - -let t_full_star () = - Deferred.unit >>= fun () -> Deferred.unit -;; - -let t_empty_star () = - Deferred.unit >>= fun () -> tick () -;; - -let t_star_empty () = - Deferred.create (fun ivar -> - ignore (Deferred.unit >>= fun () -> - let x = tick () in - x >>> (fun () -> Ivar.fill ivar ()); - x)) -;; - -let t_inline_inline () = - let ivar1 = Ivar.create () in - let ivar2 = Ivar.create () in - let m = Deferred.unit >>= fun () -> begin - let x = tick () in - x >>> (fun () -> Ivar.fill ivar1 ()); - x - end in - m >>> (fun () -> Ivar.fill ivar2 ()); - Deferred.all_unit [Ivar.read ivar1; Ivar.read ivar2] -;; - -let t_inline_many () = - let ivar1 = Ivar.create () in - let ivar2 = Ivar.create () in - let ivar3 = Ivar.create () in - let m = Deferred.unit >>= fun () -> begin - let x = tick () in - x >>> (fun () -> Ivar.fill ivar1 ()); - x - end in - m >>> (fun () -> Ivar.fill ivar2 ()); - m >>> (fun () -> Ivar.fill ivar3 ()); - Deferred.all_unit [Ivar.read ivar1; Ivar.read ivar2; Ivar.read ivar3] -;; - -let t_many_inline () = - let ivar1 = Ivar.create () in - let ivar2 = Ivar.create () in - let ivar3 = Ivar.create () in - let m = Deferred.unit >>= fun () -> begin - let x = tick () in - x >>> (fun () -> Ivar.fill ivar1 ()); - x >>> (fun () -> Ivar.fill ivar2 ()); - x - end in - m >>> (fun () -> Ivar.fill ivar3 ()); - Deferred.all_unit [Ivar.read ivar1; Ivar.read ivar2; Ivar.read ivar3] -;; - -let t_many_many () = - let ivar1 = Ivar.create () in - let ivar2 = Ivar.create () in - let ivar3 = Ivar.create () in - let ivar4 = Ivar.create () in - let m = Deferred.unit >>= fun () -> begin - let x = tick () in - x >>> (fun () -> Ivar.fill ivar1 ()); - x >>> (fun () -> Ivar.fill ivar2 ()); - x - end in - m >>> (fun () -> Ivar.fill ivar3 ()); - m >>> (fun () -> Ivar.fill ivar4 ()); - Deferred.all_unit [Ivar.read ivar1; Ivar.read ivar2; Ivar.read ivar3; Ivar.read ivar4] -;; - -(* What are the semantics of these tests? We would like to verify - that if we attach a removable callback to a deferred, and - then that deferred gets modified by a [connect] (this means - it had to be a bind, and its callback needs to fire after - we attached the removable callback), we would like to verify that the - callback is still removed. The steps are relatively - simple, but arranging all of the callbacks to fire in the - right order is not. - - Deferreds are numbered in the order they should be filled. - Ivars named retX represent Ivars that are filled in order to - ensure all necessary callbacks are called. -*) - -let never_called () = assert false -let nop () = () - -(* KEY: - i t <-- deferreds - m >>= f <-- syntax - - i <-- empty deferred with name i - [ ]--(C) <-- removable callback - \--( ) <-- normal callback - \--{ } <-- inline callback - \-- . <-- indicates that though there is only one - callback, this is a bag with one element - - [X] <-- full deferred - [ ]<--[I] <-- indirection -*) - -(* - -1 is anything (in this case, Empty_many_handlers -with a removable callback and a normal callback) -and 2 is empty (the result of a bind with nothing -attached to it). - - 1 2 - (C)--[ ] [ ] - ( )--/ - -Callback for bind fires: - - [I]-->[ ]--(C) - \---( ) - -Removable callback is removed: - - [I]-->[ ]--( ) - -1 is filled in, normal callback fires: - - [I]-->[X]***X - -*) -let t_choice_star_empty () = - let ret = Ivar.create () in - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - t1 >>> Ivar.fill ret; - ignore (choose [choice t1 never_called; choice tremove nop]); - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - ignore t2; - tremove >>> Ivar.fill i1; - Ivar.read ret -;; - -(* -1 has one handler (the removable callback), 2 has one -inline handler (a normal callback). (Note that 1 cannot be an inline -handler, due to implementation reasons.) - - 1 2 - (C)--[ ] [ ]--{ } - -Callback for bind fires: - - [I]-->[ ]--( ) - \--(C) -*) - -let t_choice_one_inline () = - let ret = Ivar.create () in - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - ignore (choose [choice t1 never_called; choice tremove nop]); - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - t2 >>> Ivar.fill ret; - tremove >>> Ivar.fill i1; - Ivar.read ret - -(* -1 has one inline handler (normal) and 2 has one handler (removable). - - 1 2 - { }--[ ] [ ]--(C) - -Callback for bind fires: - - [I]-->[ ]--(C) - \--( ) -*) -let t_choice_inline_one () = - let ret = Ivar.create () in - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - ignore (choose [choice t2 never_called; choice tremove nop]); - t1 >>> Ivar.fill ret; - tremove >>> Ivar.fill i1; - Ivar.read ret - -(* 1 has one handler (removable) and 2 has one handler (removable). *) -let t_choice_one_one () = - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - tremove >>> Ivar.fill i1; - ignore (choose [choice t1 never_called; choice tremove nop]); - ignore (choose [choice t2 never_called; choice tremove nop]); - tremove - -(* 1 has one handler (removable) and 2 has many handlers (one removable). *) -let t_choice_one_many () = - let ret = Ivar.create () in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - t2 >>> Ivar.fill ret; - tremove >>> Ivar.fill i1; - ignore (choose [choice t1 never_called; choice tremove nop]); - ignore (choose [choice t2 never_called; choice tremove nop]); - Ivar.read ret - -(* 2 has one handler (removable) and 1 has many handlers (one removable). *) -let t_choice_many_one () = - let ret = Ivar.create () in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - t1 >>> Ivar.fill ret; - tremove >>> Ivar.fill i1; - ignore (choose [choice t1 never_called; choice tremove nop]); - ignore (choose [choice t2 never_called; choice tremove nop]); - Ivar.read ret - -(* 1 has one inline handler and 2 has many handlers (one removable). *) -let t_choice_inline_many () = - let ret1 = Ivar.create () in - let ret2 = Ivar.create () in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - t1 >>> Ivar.fill ret1; - t2 >>> Ivar.fill ret2; - tremove >>> Ivar.fill i1; - ignore (choose [choice t2 never_called; choice tremove nop]); - Deferred.all_unit [Ivar.read ret1; Ivar.read ret2] - -(* 2 has one inline handler and 1 has many handlers (one removable). *) -let t_choice_many_inline () = - let ret1 = Ivar.create () in - let ret2 = Ivar.create () in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - t1 >>> Ivar.fill ret1; - t2 >>> Ivar.fill ret2; - tremove >>> Ivar.fill i1; - ignore (choose [choice t1 never_called; choice tremove nop]); - Deferred.all_unit [Ivar.read ret1; Ivar.read ret2] - -(* 1 has many handlers (one removable) and 2 has many handlers (one removable). *) -let t_choice_many_many () = - let ret1 = Ivar.create () in - let ret2 = Ivar.create () in - let remove = Ivar.create () in - let tremove = Ivar.read remove in - let i1 = Ivar.create () in - let t1 = Ivar.read i1 in - let t2 = Deferred.unit >>= fun () -> Ivar.fill remove (); t1 in - t1 >>> Ivar.fill ret1; - t2 >>> Ivar.fill ret2; - tremove >>> Ivar.fill i1; - ignore (choose [choice t1 never_called; choice tremove nop]); - ignore (choose [choice t2 never_called; choice tremove nop]); - Deferred.all_unit [Ivar.read ret1; Ivar.read ret2] - -let tests = [ - ("Bind_test.t_full_star", chatty t_full_star); - ("Bind_test.t_empty_star", chatty t_empty_star); - ("Bind_test.t_star_empty", chatty t_star_empty); - ("Bind_test.t_inline_inline", chatty t_inline_inline); - ("Bind_test.t_inline_many", chatty t_inline_many); - ("Bind_test.t_many_inline", chatty t_many_inline); - ("Bind_test.t_many_many", chatty t_many_many); - ("Bind_test.t_choice_star_empty", chatty t_choice_star_empty); - ("Bind_test.t_choice_one_inline", chatty t_choice_one_inline); - ("Bind_test.t_choice_inline_one", chatty t_choice_inline_one); - ("Bind_test.t_choice_one_one", chatty t_choice_one_one); - ("Bind_test.t_choice_one_many", chatty t_choice_one_many); - ("Bind_test.t_choice_many_one", chatty t_choice_many_one); - ("Bind_test.t_choice_inline_many", chatty t_choice_inline_many); - ("Bind_test.t_choice_many_inline", chatty t_choice_many_inline); - ("Bind_test.t_choice_many_many", chatty t_choice_many_many); -] diff --git a/test/busy_poll_test.ml b/test/busy_poll_test.ml deleted file mode 100644 index 79907e3..0000000 --- a/test/busy_poll_test.ml +++ /dev/null @@ -1,86 +0,0 @@ -open Core.Std -open Async.Std - -let counter () = - let i = ref 0 in - Scheduler.add_busy_poller (fun () -> - incr i; - if !i < 1_000_000 then - `Continue_polling - else begin - `Stop_polling () - end) -;; - -let stop_after () = - let continue = ref true in - upon (after (sec 0.001)) (fun () -> continue := false); - Scheduler.add_busy_poller (fun () -> - if !continue then - `Continue_polling - else - `Stop_polling ()) -;; - -let at_intervals () = - let rec try_more num_remaining_tries = - let stop = after (sec 1.) in - let num_fires = ref 0 in - let busy_poller_finished = - Scheduler.add_busy_poller (fun () -> - if is_some (Deferred.peek stop) then - `Stop_polling () - else - `Continue_polling) - in - Stream.iter (Clock.at_intervals (sec 0.001) ~stop) ~f:(fun () -> incr num_fires); - stop - >>= fun () -> - busy_poller_finished - >>= fun () -> - let num_fires = !num_fires in - if 500 <= num_fires && num_fires <= 15_000 then - Deferred.unit - else if num_remaining_tries = 0 then - failwiths "bad num_fires" num_fires <:sexp_of< int >> - else - try_more (num_remaining_tries - 1) - in - try_more 5 -;; - -let error () = - let monitor = Monitor.create () in - let d1 = Stream.next (Monitor.detach_and_get_error_stream monitor) in - let d2 = - within' ~monitor (fun () -> - Scheduler.add_busy_poller (fun () -> failwith "foo")) - in - d1 - >>| fun _ -> - assert (not (Deferred.is_determined d2)); -;; - -let stop_after_error () = - let monitor = Monitor.create () in - let d = Stream.next (Monitor.detach_and_get_error_stream monitor) in - don't_wait_for - (within' ~monitor (fun () -> - let first_time = ref true in - Scheduler.add_busy_poller (fun () -> - if !first_time - then (first_time := false; failwith "foo") - else failwith "error"))); - d - >>| fun _ -> - () -;; - -let tests = - [ "Busy_poll_test.counter", counter; - "Busy_poll_test.stop_after", stop_after; - "Busy_poll_test.at_intervals", at_intervals; - "Busy_poll_test.error", error; - "Busy_poll_test.srop_after_error", stop_after_error; - ] -;; diff --git a/test/clock_priority_queue_bench.ml b/test/clock_priority_queue_bench.ml deleted file mode 100644 index ee206d3..0000000 --- a/test/clock_priority_queue_bench.ml +++ /dev/null @@ -1,130 +0,0 @@ -open Core.Std -open Async.Std - -let stdout = force Writer.stdout - -let log message a sexp_of_a = - eprintf "%s\n" (Sexp.to_string_hum (Info.sexp_of_t (Info.create message a sexp_of_a))); -;; - -module Report = struct - type t = - { alarms_per_second : int; - load : string; - real_time : Time.Span.t; - num_alarms_error : float; - load_error : float; - } - with sexp_of -end - -let user_plus_sys () = - let { Unix.tms_utime; tms_stime; _ } = Unix.times () in - sec (tms_utime +. tms_stime) -;; - -let main () = - let test ~alarms_per_second ?(force_test_to_run_for = 0) () = - Deferred.create (fun result -> - let second = sec 1. in - let start = Time.add (Time.now ()) second in - let stop = ref false in - let num_alarms = ref 0 in - let num_stopped = ref 0 in - let rec alarm_every_second_until_stop at = - if !stop then begin - incr num_stopped; - if !num_stopped = alarms_per_second then Ivar.fill result (); - end else begin - incr num_alarms; - Clock.run_at at - (fun () -> alarm_every_second_until_stop (Time.add at second)) - (); - end - in - for i = 0 to alarms_per_second - 1 do - alarm_every_second_until_stop - (Time.add start (Time.Span.of_sec (float i /. float alarms_per_second))); - done; - at start - >>> fun () -> - let user_plus_sys_at_start = user_plus_sys () in - num_alarms := 0; - let rec loop at i last_load = - let at = Time.add at second in - Clock.at at - >>> fun () -> - let now = Time.now () in - let real_time = Time.diff now start in - let user_plus_sys = Time.Span.(-) (user_plus_sys ()) user_plus_sys_at_start in - let load = Time.Span.(//) user_plus_sys real_time in - let expected_num_alarms = i * alarms_per_second in - let num_alarms = !num_alarms in - let error f1 f2 = Float.abs (f1 -. f2) /. (f1 +. f2) in - let num_alarms_error = - error (Float.of_int num_alarms) (Float.of_int expected_num_alarms) - in - let load_error = error load last_load in - let print_status () = - printf "%s\n" - (Sexp.to_string_hum - (Report.sexp_of_t - { Report. - alarms_per_second; - load = sprintf "%.0f%%" (load *. 100.); - real_time; - num_alarms_error; - load_error; - })); - in - let fail message = - print_status (); - Writer.flushed stdout - >>> fun () -> - failwith message - in - let expected_real_time = Time.Span.scale second (Float.of_int i) in - if 0.1 < Float.abs (Time.Span.to_sec (Time.Span.(-) real_time expected_real_time)) - then - fail "real time differs to much from expected_real_time" - else if load_error > 0.01 - || num_alarms_error > 0.001 - || i < force_test_to_run_for - then begin - if i < 30 then - loop at (i + 1) load - else - fail "unable to get stable readings" - end else begin - stop := true; - print_status (); - end - in - loop start 1 0.00) - in - let rec loop alarms_per_second = - test ~alarms_per_second () - >>> fun () -> - loop (alarms_per_second * 2) - in - if true then - loop 1024 - else begin - test ~alarms_per_second:65_536 ~force_test_to_run_for:20 () - >>> fun () -> - let module Gc = Core.Std.Gc in - log "gc" (Gc.stat ()) <:sexp_of< Gc.Stat.t >>; - shutdown 0 - end; - never () -;; - -let () = - Command.run - (Command.basic - ~summary:"Benchmark for the clock priority queue of async." - Command.Spec.(empty) - (fun () -> - upon (main ()) (fun () -> shutdown 0); - never_returns (Scheduler.go ()))) -;; diff --git a/test/clock_test.ml b/test/clock_test.ml deleted file mode 100644 index ad64957..0000000 --- a/test/clock_test.ml +++ /dev/null @@ -1,45 +0,0 @@ -open Core.Std -open Async.Std - -let log = Async_kernel.Debug.log - -let closeness ~expected ~actual = Float.abs (expected -. actual) /. expected - -let sub_milli () = - Deferred.create (fun success -> - let interval = sec 0.0002 in - let trial_length = sec 0.1 in - let expected_num_intervals = Time.Span.(//) trial_length interval in - let rec test trials_remaining = - if trials_remaining = 0 then failwith "unable to succeed"; - let stop = ref false in - let num_not_late = ref 0 in - let num_late = ref 0 in - upon (after trial_length) (fun () -> stop := true); - Deferred.create (fun finished -> - let rec loop at = - if !stop then Ivar.fill finished () - else begin - let next = Time.add at interval in - if Time.(<=) (Time.now ()) next - then incr num_not_late - else incr num_late; - Clock.run_at next loop next - end - in - loop (Time.now ())) - >>> fun () -> - let c = - closeness ~expected:expected_num_intervals - ~actual:(Float.of_int !num_not_late) - in - if 0.1 < c - then test (trials_remaining - 1) - else Ivar.fill success () - in - test 100) -;; - -let tests = [ - "Clock_test.sub_milli", sub_milli; -] diff --git a/test/dump_core_on_job_delay_test.ml b/test/dump_core_on_job_delay_test.ml deleted file mode 100644 index 95a95a8..0000000 --- a/test/dump_core_on_job_delay_test.ml +++ /dev/null @@ -1,22 +0,0 @@ -open Core.Std -open Async.Std - -let main () = - Async_unix.Dump_core_on_job_delay.start_watching - ~dump_if_delayed_by:(sec 2.) - ~how_to_dump:Call_abort; - don't_wait_for begin - after (sec 5.) - >>= fun () -> - Writer.save "starting-to-block" ~contents:"" - >>= fun () -> - Time.pause (sec 5.); (* block -- should cause core dump *) - shutdown 0; - Deferred.never (); - end; -;; - -let () = - main (); - never_returns (Scheduler.go ()) -;; diff --git a/test/dump_core_on_job_delay_test_runner.sh b/test/dump_core_on_job_delay_test_runner.sh deleted file mode 100755 index bd511b9..0000000 --- a/test/dump_core_on_job_delay_test_runner.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env bash - -set -e -u -o pipefail - -rm -f core.* starting-to-block -if output=$(./dump_core_on_job_delay_test.exe 2>&1); then - rm -f starting-to-block - echo "$output" - echo >&2 " -Failure: $0 - The program did not appear to be aborted." - exit 1 -else - rm -f core.* - if ! [ -e starting-to-block ]; then - echo "$output" - echo >&2 "\ -Failure: $0 - The program was aborted too soon." - exit 1 - fi - rm -f starting-to-block - echo "Success: $0" -fi - diff --git a/test/dynamic_port_writer_test.ml b/test/dynamic_port_writer_test.ml deleted file mode 100644 index 7ca1005..0000000 --- a/test/dynamic_port_writer_test.ml +++ /dev/null @@ -1,12 +0,0 @@ -open Core.Std -open Async.Std - -let test () = - Process.run ~prog:"../example/dynamic_port_writer.exe" ~args:["parent"] () - >>= fun output -> - let (_ : string) = Or_error.ok_exn output in - Deferred.unit - -let tests = [ - ("Dynamic_port_writer_test", test); -] diff --git a/test/fd_test.ml b/test/fd_test.ml deleted file mode 100644 index f50c09f..0000000 --- a/test/fd_test.ml +++ /dev/null @@ -1,47 +0,0 @@ -open Core.Std let _ = _squelch_unused_module_warning_ -open Async.Std - -let clear_nonblock () = - Unix.pipe (Info.of_string "clear_nonblock") - >>= fun (`Reader rfd, `Writer wfd) -> - let reader = Reader.create rfd in - let writer = Writer.create wfd in - let line = "hello" in - Writer.write_line writer line; - Reader.read_line reader - >>= function - | `Eof -> assert false - | `Ok line' -> - assert (line = line'); - Fd.clear_nonblock rfd; - let all = - In_thread.run (fun () -> - let in_channel = Core.Std.Unix.in_channel_of_descr (Fd.file_descr_exn rfd) in - let s = In_channel.input_all in_channel in - In_channel.close in_channel; - s) - in - after (sec 0.001) (* wait a bit to give [In_channel.input_all] a chance to fail. *) - >>= fun () -> - Writer.write writer line; - Writer.close writer - >>= fun () -> - all - >>= fun line' -> - assert (line = line'); - Fd.close rfd ~should_close_file_descriptor:false; -;; - -let syscall_in_thread_doesn't_raise () = - Fd.syscall_in_thread ~name:"z" (Fd.stdout ()) - (fun _ -> raise (Unix.Unix_error (EINTR, "", ""))) - >>| function - | `Error _ -> () - | `Already_closed | `Ok _ -> assert false -;; - -let tests = - [ "Fd.clear_nonblock", clear_nonblock; - "Fd.syscall_in_thread", syscall_in_thread_doesn't_raise; - ] -;; diff --git a/test/finalizer_test.ml b/test/finalizer_test.ml deleted file mode 100644 index effa7d7..0000000 --- a/test/finalizer_test.ml +++ /dev/null @@ -1,19 +0,0 @@ -open Core.Std -open Async.Std - -let tests = - [ "Finalizer_test.test", - fun () -> begin - let finalizer_ran = ref false in - return 13 - >>= fun x -> - let l = [ x ] in - Gc.add_finalizer_exn l (fun _ -> finalizer_ran := true); - Gc.full_major (); - after (sec 0.1) - >>= fun () -> - assert !finalizer_ran; - Deferred.unit - end - ] -;; diff --git a/test/in_thread_test.ml b/test/in_thread_test.ml deleted file mode 100644 index d1646b0..0000000 --- a/test/in_thread_test.ml +++ /dev/null @@ -1,37 +0,0 @@ -open Core.Std let _ = _squelch_unused_module_warning_ -open Async.Std - -let in_thread_syscall () = - let module M = struct - exception Foo - end in - In_thread.syscall ~name:"test" (fun () -> raise M.Foo) - >>| function - | Error M.Foo -> () - | _ -> assert false -;; - -type when_finished = - [ `Notify_the_scheduler - | `Take_the_async_lock - | `Best - ] -with sexp_of - -let runs = - List.map [ `Notify_the_scheduler - ; `Take_the_async_lock - ; `Best - ] - ~f:(fun when_finished -> - (String.concat [ "In_thread.run__" - ; when_finished |> <:sexp_of< when_finished >> |> Sexp.to_string - ], - (fun () -> In_thread.run ~when_finished ignore))) -;; - -let tests = - [ "In_thread.syscall", in_thread_syscall - ] - @ runs -;; diff --git a/test/log_test.ml b/test/log_test.ml deleted file mode 100644 index 5dc2c1c..0000000 --- a/test/log_test.ml +++ /dev/null @@ -1,240 +0,0 @@ -open Core.Std -open Async.Std - -let clear_file file = - Sys.file_exists file - >>= fun exists -> - begin match exists with - | `No -> Deferred.unit - | `Unknown -> failwithf "unable to determine if %s exists" file () - | `Yes -> Unix.unlink file - end -;; - -let write_and_read (fmt : Log.Output.machine_readable_format) () = - let file = "tmp_async_log_test.txt" in - clear_file file - >>= fun () -> - let log = - Log.create - ~level:`Debug - ~output:[ Log.Output.file (fmt :> Log.Output.format) ~filename:file ] - ~on_error:`Raise - in - let messages = [ - `Debug, "debugging message"; - `Info, "info message"; - `Info, ""; - `Error, "error message"; - `Error, " spaces at both ends "; - ] in - let start_time = Time.now () in - List.iter messages ~f:(fun (level, msg) -> - match level with - | `Debug -> Log.debug log "%s" msg; - | `Info -> Log.info log "%s" msg; - | `Error -> Log.error log "%s" msg); - Log.flushed log - >>= fun () -> - Log.close log; - let stop_time = Time.now () in - Pipe.to_list (Log.Reader.pipe fmt file) - >>= fun read -> - assert (List.length read = List.length messages); - List.iter2_exn messages read - ~f:(fun (e_level, e_msg) msg -> - assert (Some e_level = Log.Message.level msg); - assert (e_msg = Log.Message.message msg); - assert (Time.(start_time <= Log.Message.time msg && Log.Message.time msg <= stop_time))); - Unix.unlink file -;; - -let speed_test fmt max_time () = - let file = "tmp_async_log_speed_test.txt" in - clear_file file - >>= fun () -> - let log = - Log.create - ~level:`Info - ~output:[ Log.Output.file fmt ~filename:file ] - ~on_error:`Raise - in - let msg = - "the quick brown fox jumped over the lazy dog two or three times to make the \ - description longer" - in - let rec loop n = - if n = 0 - then () - else begin - Log.info log "%s" msg; - loop (n - 1) - end - in - let start_times = Unix.times () in - loop 1_000_000; - Log.flushed log - >>= fun () -> - let stop_times = Unix.times () in - let total_time = Unix.( - (stop_times.tms_utime +. stop_times.tms_stime) - -. (start_times.tms_utime +. start_times.tms_stime)) - |! Time.Span.of_sec - in - if Time.Span.(>) total_time max_time then - failwithf "regression: log write and flush took more than %s (%s)" - (Time.Span.to_string max_time) - (Time.Span.to_string total_time) (); - Unix.unlink file -;; - -let rotation_test = - let max_files = 5 in - let expected_msgs_per_file = 1 in - let rotation = - Log.Rotation.create - ~messages:expected_msgs_per_file - ~keep:(`At_least max_files) - ~naming_scheme:`Numbered - () - in - let rec loop n = - if n > 0 then begin - let o = Log.Output.rotating_file `Sexp ~basename:"test" rotation in - (* rotation on start-up is different from rotation during a log life-time. This - case of a log containing multiple copies of the same output is weird but legal, - and it shakes out some race conditions in the rotation code. *) - let log = Log.create ~level:`Info ~output:[o; o; o; o; o] ~on_error:`Raise in - Log.info log "test %d" n; - Clock.after (Time.Span.of_sec 1.) - >>= fun () -> - Log.info log "test %d" n; - let flushed = Log.flushed log in - Log.close log; - flushed - >>= fun () -> - loop (n - 1) - end else Deferred.unit - in - let check_num_msgs_per_file fnames expected = - Deferred.List.map fnames ~f:(fun fname -> - Pipe.to_list (Log.Reader.pipe `Sexp fname) - >>| fun msgs -> - let observed = List.length msgs in - if Int.(=) observed expected_msgs_per_file - then Ok () - else Or_error.error "too many messages in log file" - (observed, "instead of", expected) <:sexp_of< int * string * int >>) - in - let check_num_files logs expected = - let num_logs = List.length logs in - if Int.(=) num_logs expected - then Ok () - else Or_error.error "too many log files after rotation" - (num_logs, "instead of", expected) <:sexp_of< int * string * int >> - in - fun () -> - loop (2 * max_files) - >>= fun () -> - Sys.readdir "." - >>= fun files -> - let logs = Array.fold files ~init:[] ~f:(fun acc fn -> - if String.is_prefix fn ~prefix:"test." && String.is_suffix fn ~suffix:".log" - then fn :: acc - else acc) - in - check_num_msgs_per_file logs expected_msgs_per_file - >>= fun too_many_msgs_errors -> - (Or_error.ok_exn - (Or_error.combine_errors_unit - (check_num_files logs (max_files + 1) :: too_many_msgs_errors))); - (* if the tests failed, leave the output around for debugging *) - Deferred.List.iter logs ~f:(fun fn -> Unix.unlink fn) -;; - -let rotation_types = - let max_files = 4 in - let keep = `At_least max_files in - let prefix = "rotation_type_test" in - let is_log fn = String.is_prefix fn ~prefix && String.is_suffix fn ~suffix:".log" in - let time_stamps () = - Sys.readdir "." - >>= Deferred.Array.filter_map ~f:(fun fn -> - if is_log fn - then - Unix.stat fn - >>| Unix.Stats.mtime - >>| Option.some - else - return None - ) - >>| Time.Set.of_array - in - fun () -> Deferred.List.iter [`Numbered ; `Timestamped] ~f:(fun naming_scheme -> - let rotation = Log.Rotation.create ~keep ~naming_scheme () in - let rec loop n prev_ts = - if n=0 then Deferred.unit - else - let output = [Log.Output.rotating_file `Sexp ~basename:prefix rotation] in - let log = Log.create ~level:`Debug ~output ~on_error:`Raise in - Log.info log "Some output %d" n; - Log.flushed log - >>= fun () -> - time_stamps () - >>= fun cur_ts -> - let removed = Set.diff prev_ts cur_ts in - Set.iter cur_ts ~f:(fun c -> - Set.iter removed ~f:(fun r -> - assert (Time.(<) r c) - ) - ); - Clock.after (Time.Span.of_ms 1.) - >>= fun () -> loop (n-1) cur_ts - in - loop (3 * max_files) Time.Set.empty - >>= fun () -> - Sys.readdir "." - >>= Deferred.Array.iter ~f:(fun fn -> - if is_log fn - then Unix.unlink fn - else Deferred.unit - ) - ) - -let error_logging_test () = - let output_received = Ivar.create () in - let output = - Log.Output.create (fun _ -> - Ivar.fill_if_empty output_received (); - Deferred.unit) - in - let real_output = Log.Global.get_output () in - Log.Global.set_output [ output ]; - let monitor_returned = Ivar.create () in - Monitor.try_with (fun () -> - upon (Ivar.read monitor_returned) (fun () -> assert false); - Deferred.unit) - >>= function - | Error e -> raise e - | Ok () -> - Ivar.fill monitor_returned (); - Clock.with_timeout (sec 5.) (Ivar.read output_received) - >>| fun res -> - Log.Global.set_output real_output; - begin match res with - | `Timeout -> failwith "error_logging_test timeout" - | `Result () -> () - end -;; - -let tests = [ - "Log_test.write_and_read (sexp)", write_and_read `Sexp; - "Log_test.write_and_read (bin-prot)", write_and_read `Bin_prot; - "Log_test.speed_regression (sexp)", speed_test `Sexp (sec 8.); - "Log_test.speed_regression (text)", speed_test `Text (sec 8.); - "Log_test.speed_regression (bin-prot)", speed_test `Bin_prot (sec 5.); - "Log_test.rotation", rotation_test; - "Log_test.rotation (types)", rotation_types; - "Monitor.try_with errors go to the global error log", error_logging_test; -] - diff --git a/test/monitor_try_with_ignore_exn.ml b/test/monitor_try_with_ignore_exn.ml deleted file mode 100644 index 8d42a2c..0000000 --- a/test/monitor_try_with_ignore_exn.ml +++ /dev/null @@ -1,18 +0,0 @@ -open Core.Std -open Async.Std - -let () = - don't_wait_for begin - try_with (fun () -> - upon (Scheduler.yield ()) (fun () -> - upon (Scheduler.yield ()) (fun () -> shutdown 0); - failwith "delayed exception"); - return ()) - >>= function - | Ok () -> return () - | Error _ -> assert false - end -;; - -let () = never_returns (Scheduler.go ()) - diff --git a/test/process_test.ml b/test/process_test.ml deleted file mode 100644 index b5ca514..0000000 --- a/test/process_test.ml +++ /dev/null @@ -1,112 +0,0 @@ -open Core.Std -open Async.Std -open Async_extended.Std - -let test1 () = - Process.backtick_new_exn ~prog:"ls" ~args:[] () - >>| fun _ls_output_string -> - () -;; - -let test2 () = - Process.backtick_status ~prog:"ls" ~args:[] () - >>| fun ({ Process.Output. stdout = _; stderr }, exit_or_signal) -> - assert (Result.is_ok exit_or_signal); - assert (stderr = ""); -;; - -(* -------------------------------------------------------------------------------- - [Async.Std.Process] tests below here. -*) - -module Process = Async.Std.Process - -let test3 () = - Process.create ~prog:"ls" ~args:[] () - >>= function - | Error error -> Error.raise error - | Ok process -> - Process.collect_output_and_wait process - >>| fun { Process.Output. stdout = _; stderr; exit_status } -> - assert (Result.is_ok exit_status); - assert (stderr = ""); -;; - -let test4 () = - Process.run ~prog:"ls" ~args:[] () - >>| function - | Error error -> Error.raise error - | Ok _ -> () -;; - -let test5 () = - let prog = "/bin/zzz" in - Process.run ~prog ~args:[] () - >>| function - | Error _ -> () - | Ok _ -> failwithf "somehow ran %s" prog () -;; - -let test6 () = - let num_rounds = 100 in - let num_processes_in_parallel = 10 in - Deferred.repeat_until_finished 0 (fun i -> - if i = num_rounds - then return (`Finished ()) - else begin - Deferred.all_unit - (List.init num_processes_in_parallel ~f:(fun j -> - let i = Int.to_string i in - let j = Int.to_string j in - Process.run ~prog:"echo" ~args:[ i; j ] () - >>| function - | Error e -> Error.raise e - | Ok s -> - assert (s = String.concat [ i; " "; j; "\n" ]))) - >>| fun () -> - `Repeat (i + 1) - end) -;; - -let test7 () = - Process.create ~prog:"sleep" ~args:["1000"] () - >>= function - | Error e -> Error.raise e - | Ok process -> - let output = Process.collect_output_and_wait process in - let signal = Signal.usr1 in - Signal.send_i signal (`Pid (Process.pid process)); - output - >>| fun { Process.Output. exit_status; _ } -> - match exit_status with - | Error (`Signal signal') -> assert (signal = signal') - | _ -> assert false -;; - -let test8 () = - Process.run ~prog:"false" ~args:[] () - >>| function - | Error _ -> () - | Ok "" -> failwith "false returned zero" - | Ok s -> failwiths "false returned zero and produced output" s <:sexp_of< string >> -;; - -let test9 () = - Process.run ~prog:"false" ~args:[] ~accept_nonzero_exit:[1] () - >>| function - | Error e -> Error.raise e - | Ok stdout -> <:test_eq< string >> stdout "" -;; - -let tests = - [ "Process_test1", test1; - "Process_test2", test2; - "Process_test3", test3; - "Process_test4", test4; - "Process_test5", test5; - "Process_test6", test6; - "Process_test7", test7; - "Process_test8", test8; - "Process_test9", test9; - ] -;; diff --git a/test/qtest.ml b/test/qtest.ml deleted file mode 100644 index 4754583..0000000 --- a/test/qtest.ml +++ /dev/null @@ -1,40 +0,0 @@ -(** Regression test runner. *) - -open Core.Std;; -open Async.Std -open Qtest_lib.Std;; - -let tests = - [] - @ Bind_test.tests - @ Busy_poll_test.tests - @ Clock_test.tests - @ Fd_test.tests - @ Finalizer_test.tests - @ In_thread_test.tests - @ Quickcheck_clock_test.tests - @ Log_test.tests - @ Process_test.tests - @ Reader_test.tests - @ Ready_to_test.tests - @ Rpc_test.tests - @ Socket_test.tests - @ Tcp_file_test.tests - @ Tcp_serve.tests - @ Test_handler.tests - @ Thread_safe_test.tests - @ Wait_test.tests - @ Writer_test.tests - @ Dynamic_port_writer_test.tests -;; - -let () = - (* this test takes roughly 40s alone, so 25min on something that compiles - with -j 12 should be ok *) - after (Time.Span.of_min 25.) - >>> fun () -> - eprintf "Shutting down test after a 25min timeout\n%!"; - Shutdown.shutdown 3 -;; - -let () = Runner.main tests diff --git a/test/quickcheck_clock_test.ml b/test/quickcheck_clock_test.ml deleted file mode 100644 index 12d013f..0000000 --- a/test/quickcheck_clock_test.ml +++ /dev/null @@ -1,29 +0,0 @@ -open Core.Std -open Async.Std -open Quickcheck - -let span_gen = - let open Generator in - float_between - ~nan:Without - ~lower_bound:(Incl 1.) - ~upper_bound:(Incl 10_000.) - >>| Time.Span.of_us - -let after_test () = - async_test (Generator.list span_gen) - ~sexp_of:<:sexp_of< Time.Span.t list >> - ~f:(fun spans -> - Deferred.List.fold spans ~init:(Time.now ()) ~f:(fun start span -> - Clock.after span - >>| fun () -> - let finish = Time.now () in - if Time.(<) finish (Time.add start span) - then failwiths "Clock.after finished too soon" (start, span, finish) - <:sexp_of< Time.t * Time.Span.t * Time.t >>; - finish) - >>| ignore) - -let tests = - [ "Clock.after", after_test - ] diff --git a/test/quickcheck_clock_test.mli b/test/quickcheck_clock_test.mli deleted file mode 100644 index 40ed22c..0000000 --- a/test/quickcheck_clock_test.mli +++ /dev/null @@ -1,4 +0,0 @@ -open Core.Std -open Async.Std - -val tests : (string * (unit -> unit Deferred.t)) list diff --git a/test/reader_test.ml b/test/reader_test.ml deleted file mode 100644 index 83695e7..0000000 --- a/test/reader_test.ml +++ /dev/null @@ -1,401 +0,0 @@ -open Core.Std -open Qtest_lib.Std -open Async.Std - -exception Unexpected_sexps of Sexp.t list with sexp - -let read_fail_and_continue () = - let file = "reader_test.sexp" in - Reader.file_contents file - >>= fun expected_contents -> - Reader.with_file file ~f:(fun reader -> - try_with (fun () -> - Reader.read_until reader (`Pred (fun _ -> assert false)) ~keep_delim:false) - >>= function - | Ok _ -> assert false - | Error _ -> - Reader.contents reader - >>| fun got_contents -> - assert (got_contents = expected_contents); - (* Async_kernel.Debug.log "contents" s <:sexp_of< string >> *) - ) -;; - -let test_sexps reader = - let sexps = Reader.read_sexps reader in - Pipe.to_list sexps - >>| fun sexps -> - let last = List.hd_exn (List.rev sexps) in - let last = Sexp.to_string last in - assert_string_equal "(last sexp)" last -;; - -let read_sexps_file () = - Reader.with_file "reader_test.sexp" ~f:(fun reader -> - test_sexps reader) -;; - -let read_sexps_pipe () = - Reader.with_file "reader_test.sexp" ~f:(fun reader -> - let sexps = Reader.read_sexps reader in - Unix.pipe (Info.of_string "reader_test") - >>= function (`Reader reader_fd, `Writer writer_fd) -> - let writer = Writer.create writer_fd in - Pipe.iter sexps ~f:(fun sexp -> Writer.write_sexp writer sexp; Writer.flushed writer) - >>= fun () -> - Writer.close writer - >>= fun () -> - let reader = Reader.create reader_fd in - test_sexps reader) -;; - -let load_sexps_fail () = - Reader.load_sexps "reader_test.sexp" (fun _ -> assert false) - >>| function - | Error _ -> () - | Ok _ -> assert false -;; - -let lseek () = - Reader.with_file "reader_test.sexp" ~f:(fun reader -> - Reader.read_line reader - >>= function - | `Eof -> assert false - | `Ok line -> - assert_string_equal line "(first sexp)"; - Reader.lseek reader (Int64.of_int 20) ~mode:`Set - >>= fun new_pos -> - assert_int_equal (Int64.to_int_exn new_pos) 20; - Reader.read_line reader - >>= function - | `Eof -> assert false - | `Ok line -> - assert_string_equal line "(a b c)"; - Reader.lseek reader (Int64.of_int 0) ~mode:`Set - >>= fun new_pos -> - assert_int_equal (Int64.to_int_exn new_pos) 0; - Reader.read_line reader - >>= function - | `Eof -> assert false - | `Ok line -> - assert_string_equal line "(first sexp)"; - Reader.lseek reader (Int64.of_int (-12)) ~mode:`End - >>= fun new_pos -> - assert_int_equal (Int64.to_int_exn new_pos) 48; - Reader.read_line reader - >>= function - | `Eof -> assert false - | `Ok line -> - assert_string_equal line "(last sexp)"; - (* Seek past the end of the file *) - Reader.lseek reader (Int64.of_int 2) ~mode:`End - >>= fun new_pos -> - assert_int_equal (Int64.to_int_exn new_pos) 62; - Reader.read_line reader - >>| function - (* We get Eof because we are past the end of the file. *) - | `Eof -> () - | `Ok line -> - failwithf "expected eof, got %s" line ()) - -let reader_of_string ?buf_len str = - Unix.pipe (Info.of_string "reader test") - >>= fun (`Reader reader_fd, `Writer writer_fd) -> - let reader = Reader.create reader_fd ?buf_len in - let writer = Writer.create writer_fd in - Writer.write writer str; - don't_wait_for (Writer.close writer); - return reader -;; - -let read_one_chunk_at_a_time_errors () = - let is_error = Result.is_error in - let is_ok = Result.is_ok in - let check (result_is_correct, handle_chunk) = - reader_of_string (String.create 10) - >>= fun reader -> - try_with (fun () -> - Reader.read_one_chunk_at_a_time reader ~handle_chunk:(fun _ ~pos:_ ~len -> - return (`Consumed (handle_chunk len)))) - >>= fun result -> - Reader.close reader - >>| fun () -> - assert (result_is_correct result) - in - Deferred.List.iter ~f:check - [ - is_error, (fun _ -> (-1, `Need_unknown)); - is_ok, (fun _ -> (0, `Need_unknown)); - is_ok, (fun len -> (len, `Need_unknown)); - is_error, (fun len -> (len + 1, `Need_unknown)); - is_error, (fun _ -> (-1, `Need_unknown)); - - is_error, (fun _ -> (0, `Need (-1)) ); - is_error, (fun _ -> (0, `Need 0) ); - is_error, (fun len -> (0, `Need len) ); - is_ok, (fun len -> (0, `Need (len + 1))); - - is_error, (fun len -> (len, `Need (-1)) ); - is_error, (fun len -> (len, `Need 0) ); - is_ok, (fun len -> (len, `Need 1) ); - ] -;; - -let read_partial_chunks () = - (* Read chunk by chunk without consuming everything available each time. *) - reader_of_string "0123456789" ~buf_len:5 - >>= fun reader -> - let step = ref 0 in - Reader.read_one_chunk_at_a_time reader - ~handle_chunk:(fun buf ~pos ~len -> - incr step; - match !step with - | 1 -> - assert_int_equal 0 pos; - assert_int_equal 5 len; - assert_string_equal "01234" (Bigstring.to_string buf ~pos ~len); - return (`Consumed (4, `Need_unknown)) - | 2 -> - assert_int_equal 0 pos; - assert_int_equal 5 len; - assert_string_equal "45678" (Bigstring.to_string buf ~pos ~len); - return (`Consumed (3, `Need_unknown)) - | 3 -> - assert_int_equal 0 pos; - assert_int_equal 3 len; - assert_string_equal "789" (Bigstring.to_string buf ~pos ~len); - return (`Stop ()) - | n -> - raise (Test.Test_failure (Sexp.List [ - Sexp.Atom "Step greater than 3"; - sexp_of_int n; - ]))) - >>= fun result -> - assert_equal (`Stopped ()) result - ~sexp_of_t:<:sexp_of< unit Reader.read_one_chunk_at_a_time_result >>; - Reader.close reader -;; - -let read_partial_chunks_multiple_times () = - (* Read chunk by chunk without consuming everything available each time. *) - let test consumed = - let input = "0123456789" in - reader_of_string input ~buf_len:5 - >>= fun reader -> - let rec loop acc = - Reader.read_one_chunk_at_a_time reader - ~handle_chunk:(fun buf ~pos ~len -> - let len = min consumed len in - let str = Bigstring.to_string buf ~pos ~len in - return (`Stop_consumed (str, len))) - >>= function - | `Eof -> return acc - | `Eof_with_unconsumed_data data -> return (data :: acc) - | `Stopped str -> loop (str :: acc) - in - loop [] - >>= fun result -> - List.iter result ~f:(fun chunk -> - assert (String.length chunk <= consumed)); - let result = String.concat ~sep:"" (List.rev result) in - assert_string_equal result input; - Reader.close reader - in - Deferred.List.iter [ 1; 2; 3; 5; 100 ] ~f:test -;; - -let read_blocks_ending_with_incomplete_one () = - (* Read blocks of length 4, ending with an incomplete one. *) - reader_of_string "aaaabbbbcc" ~buf_len:3 - >>= fun reader -> - Reader.read_one_chunk_at_a_time reader - ~handle_chunk:(fun buf ~pos ~len -> - ignore (buf, pos); - return (`Consumed (len - len mod 4, `Need 4))) - >>= fun result -> - assert_equal (`Eof_with_unconsumed_data "cc") result - ~sexp_of_t:<:sexp_of< unit Reader.read_one_chunk_at_a_time_result >>; - Reader.close reader -;; - -let read_messages () = - (* Read nessages composed of a size (on 1 byte) followed by a body of this size. *) - let data = ["xyz"; "abcdefgh"] in - reader_of_string - (String.concat - (List.map data - ~f:(fun s -> - String.make 1 (Char.of_int_exn (String.length s)) ^ s))) - ~buf_len:6 - >>= fun reader -> - let state = ref `Size (* [`Size] or [`Body body_size] *) in - let messages = ref [] in - Reader.read_one_chunk_at_a_time reader - ~handle_chunk:(fun buf ~pos ~len -> - let orig_len = len in - let rec loop ~pos ~len = - match !state with - | `Size -> - if len < 1 then - return (`Consumed (orig_len - len, `Need 1)) - else begin - let size = Char.to_int buf.{pos} in - state := `Body size; - loop ~pos:(pos + 1) ~len:(len - 1) - end - | `Body size -> - if len < size then - return (`Consumed (orig_len - len, `Need size)) - else begin - let msg = Bigstring.to_string buf ~pos ~len:size in - messages := msg :: !messages; - state := `Size; - loop ~pos:(pos + size) ~len:(len - size) - end - in - loop ~pos ~len) - >>= fun result -> - assert_equal `Eof result - ~sexp_of_t:<:sexp_of< unit Reader.read_one_chunk_at_a_time_result >>; - assert_equal (List.rev data) !messages - ~sexp_of_t:<:sexp_of< string list >>; - Reader.close reader -;; - -let rec find_zero buf ~pos ~max = - if pos = max then - None - else if buf.{pos} = '\000' then - Some pos - else - find_zero buf ~pos:(pos + 1) ~max - -let read_zero_terminated_strings () = - (* Read a sequence of zero-terminated strings. The size of each string is not known in - advance. *) - let data = ["foo"; "bar"; "a long string"; "ocaml"; "core"; "async"] in - reader_of_string - (String.concat - (List.map data ~f:(fun s -> s ^ "\000"))) - ~buf_len:2 - >>= fun reader -> - let strings = ref [] in - let start = ref 0 in - Reader.read_one_chunk_at_a_time reader - ~handle_chunk:(fun buf ~pos ~len -> - let orig_len = len in - let rec loop ~pos ~len = - match find_zero buf ~pos:(pos + !start) ~max:(pos + len) with - | None -> - start := len; (* restart the search after current data. *) - return (`Consumed (orig_len - len, `Need_unknown)) - | Some pos' -> - let str_len = pos' - pos in - let str = Bigstring.to_string buf ~pos ~len:str_len in - start := 0; - strings := str :: !strings; - loop ~pos:(pos' + 1) ~len:(len - str_len - 1) - in - loop ~pos ~len) - >>= fun result -> - assert_equal `Eof result - ~sexp_of_t:<:sexp_of< unit Reader.read_one_chunk_at_a_time_result >>; - assert_equal (List.rev data) !strings - ~sexp_of_t:<:sexp_of< string list >>; - Reader.close reader -;; - -let read_bin_prot_fail_with_max_length_exceeded_and_continue () = - let file = "reader_test" in - let i = 13 in - Monitor.protect (fun () -> - Writer.with_file file ~f:(fun writer -> - Writer.write_bin_prot writer Int.bin_writer_t i; - Writer.write writer "\n"; - Writer.write_bin_prot writer Int.bin_writer_t i; - Deferred.unit) - >>= fun () -> - Reader.with_file file ~f:(fun reader -> - let read_int ~max_len = - try_with (fun () -> Reader.read_bin_prot reader Int.bin_reader_t ~max_len) - in - read_int ~max_len:0 - >>= function - | Ok _ -> assert false - | Error _ -> - Reader.read_until reader (`Char '\n') ~keep_delim:true - >>= function - | `Eof_without_delim _ | `Eof -> assert false - | `Ok _ -> - read_int ~max_len:8 - >>= function - | Error _ | Ok `Eof -> assert false - | Ok (`Ok i') -> - assert (i = i'); - Deferred.unit)) - ~finally:(fun () -> Unix.unlink file) -;; - -let read_one_chunk_at_a_time_share_buffer () = - reader_of_string (String.create 1) - >>= fun reader -> - let r = ref None in - Reader.read_one_chunk_at_a_time reader - ~handle_chunk:(fun buf ~pos:_ ~len:_ -> - r := Some (Bigstring.sub_shared buf); - return `Continue) - >>= function - | `Stopped _ | `Eof_with_unconsumed_data _ -> assert false - | `Eof -> Reader.close reader -;; - -let test_macros () = - In_thread.run (fun () -> - let tests = Pa_ounit_lib.Runtime.collect (fun () -> - let module Test = Sexplib_ounit_tests.Test_macros.Make (struct - let load_sexp_conv_exn file f = - Thread_safe.block_on_async (fun () -> - Reader.load_sexp_exn ~expand_macros:true file f) - |> function - | Ok x -> x - | Error e -> raise (Monitor.extract_exn e) - let load_sexps_conv file f = - Thread_safe.block_on_async_exn (fun () -> - Async_unix.Reader0.Macro_loader.load_sexps_conv file f) - end) - in - ()) - in - List.iter tests ~f:(fun f -> f ())) -;; - -let drain_test () = - Reader.open_file "reader_test.sexp" - >>= fun reader -> - Reader.drain reader - >>| fun () -> - assert (Reader.is_closed reader); -;; - -let tests = [ - "Reader_test.load_sexps_fail", load_sexps_fail; - "Reader_test.lseek", lseek; - "Reader_test.read_blocks_ending_with_incomplete_one", - read_blocks_ending_with_incomplete_one; - "Reader_test.read_fail_and_continue", read_fail_and_continue; - "Reader_test.read_messages", read_messages; - "Reader_test.read_partial_chunks", read_partial_chunks; - "Reader_test.read_partial_chunks_multiple_times", - read_partial_chunks_multiple_times; - "Reader_test.read_one_chunk_at_a_time_errors", - read_one_chunk_at_a_time_errors; - "Reader_test.read_sexps_file", read_sexps_file; - "Reader_test.read_sexps_pipe", read_sexps_pipe; - "Reader_test.read_zero_terminated_strings", read_zero_terminated_strings; - "Reader_test.read_bin_prot_fail_with_max_length_exceeded_and_continue", - read_bin_prot_fail_with_max_length_exceeded_and_continue; - "Reader_test.read_one_chunk_at_a_time_share_buffer", - read_one_chunk_at_a_time_share_buffer; - "Reader_test.test_macros", test_macros; - "Reader_test.drain_test", drain_test; -] diff --git a/test/reader_test.sexp b/test/reader_test.sexp deleted file mode 100644 index 0ddc870..0000000 --- a/test/reader_test.sexp +++ /dev/null @@ -1,6 +0,0 @@ -(first sexp) -second -(a b c) -(d e f) -penultimate -(last sexp) diff --git a/test/ready_to_test.ml b/test/ready_to_test.ml deleted file mode 100644 index 2ede9e7..0000000 --- a/test/ready_to_test.ml +++ /dev/null @@ -1,28 +0,0 @@ -open Core.Std -open Async.Std - -let already_closed () = - Unix.pipe (Info.of_string "Ready_to_test.already_closed") - >>= fun (`Reader fd, `Writer write_fd) -> - let ready1 = Fd.ready_to fd `Read in - (* At this point, the fd should be Open, Watching for read. *) - let close = Fd.close fd in - (* Now that we've requested a closed, the fd should be Close_requested, - Stop_requested for read. So, another call to [ready_to] should notice the close. *) - let ready2 = Fd.ready_to fd `Read in - close - >>= fun () -> - ready1 - >>= function - | `Bad_fd | `Ready -> assert false - | `Closed -> - ready2 - >>= function - | `Bad_fd | `Ready -> assert false - | `Closed -> Fd.close write_fd -;; - -let tests = - [ "Ready_to_test.already_closed", already_closed; - ] -;; diff --git a/test/reset_in_forked_process_tests.ml b/test/reset_in_forked_process_tests.ml deleted file mode 100644 index fb3117f..0000000 --- a/test/reset_in_forked_process_tests.ml +++ /dev/null @@ -1,19 +0,0 @@ -open Core.Std let _ = _squelch_unused_module_warning_ -open Async.Std -open Async_test_in_child_process - -let () = add_test _here_ Expect.ok (fun () -> - match Core.Std.Unix.fork () with - | `In_the_parent pid -> Unix.waitpid_exn pid - | `In_the_child -> - Scheduler.reset_in_forked_process (); - Deferred.unit) -;; - -let () = add_test _here_ Expect.ok (fun () -> - match Core.Std.Unix.fork () with - | `In_the_parent pid -> Unix.waitpid_exn pid - | `In_the_child -> - Scheduler.reset_in_forked_process (); - after (sec 0.01)) -;; diff --git a/test/rpc_test.ml b/test/rpc_test.ml deleted file mode 100644 index 60e5b5b..0000000 --- a/test/rpc_test.ml +++ /dev/null @@ -1,145 +0,0 @@ -open Core.Std -open Async.Std - -module Debug = Async_kernel.Debug - -let test ~make_transport ~imp1 ~imp2 ~state1 ~state2 ~f () = - Unix.pipe (Info.of_string "rpc_test 1") - >>= fun (`Reader r1, `Writer w2) -> - Unix.pipe (Info.of_string "rpc_test 2") - >>= fun (`Reader r2, `Writer w1) -> - let t1 = make_transport (r1, w1) in - let t2 = make_transport (r2, w2) in - let s imp = - if List.length imp > 0 - then Some ( - Rpc.Implementations.create_exn - ~implementations:imp - ~on_unknown_rpc:`Close_connection) - else None - in - let s1 = s imp1 in - let s2 = s imp2 in - let conn1_ivar = Ivar.create () in - let f2_done = - Async_rpc_kernel.Std.Rpc.Connection.with_close ?implementations:s2 t2 - ~dispatch_queries:(fun conn2 -> - Ivar.read conn1_ivar >>= fun conn1 -> - f conn1 conn2) - ~connection_state:(fun _ -> state2) - ~on_handshake_error:`Raise - in - Async_rpc_kernel.Std.Rpc.Connection.with_close ?implementations:s1 t1 - ~dispatch_queries:(fun conn1 -> - Ivar.fill conn1_ivar conn1; - f2_done) - ~connection_state:(fun _ -> state1) - ~on_handshake_error:`Raise -;; - -let test1 ~make_transport ~imp ~state ~f = - test ~make_transport - ~imp1:imp ~state1:state - ~imp2:[] ~state2:() ~f -;; - -module Pipe_count_error = struct - type t = [`Argument_must_be_positive] with bin_io -end - -let pipe_count_rpc = - Rpc.Pipe_rpc.create - ~name:"pipe_count" - ~version:0 - ~bin_query:Int.bin_t - ~bin_response:Int.bin_t - ~bin_error:Pipe_count_error.bin_t - () -;; - -let pipe_wait_rpc = - Rpc.Pipe_rpc.create - ~name:"pipe_wait" - ~version:0 - ~bin_query:Unit.bin_t - ~bin_response:Unit.bin_t - ~bin_error:Unit.bin_t - () -;; - -let pipe_count_imp = - Rpc.Pipe_rpc.implement pipe_count_rpc (fun () n ~aborted:_ -> - if n < 0 - then return (Error `Argument_must_be_positive) - else - let pipe_r, pipe_w = Pipe.create () in - upon - (Deferred.List.iter (List.init n ~f:Fn.id) ~how:`Sequential ~f:(fun i -> - Pipe.write pipe_w i)) - (fun () -> Pipe.close pipe_w); - return (Ok pipe_r)) -;; - -let pipe_wait_imp ivar = - Rpc.Pipe_rpc.implement pipe_wait_rpc (fun () () ~aborted:_ -> - let pipe_r, pipe_w = Pipe.create () in - (Pipe.write pipe_w () - >>> fun () -> - Ivar.read ivar - >>> fun () -> - Pipe.write pipe_w () - >>> fun () -> - Pipe.close pipe_w); - return (Ok pipe_r)) -;; - -let make_tests ~make_transport ~transport_name = - List.mapi ~f:(fun i f -> sprintf "rpc-%s-%d" transport_name i, f) - [ test1 ~make_transport ~imp:[pipe_count_imp] ~state:() ~f:(fun _ conn -> - let n = 3 in - Rpc.Pipe_rpc.dispatch_exn pipe_count_rpc conn n - >>= fun (pipe_r, _id) -> - Pipe.fold_without_pushback pipe_r ~init:0 ~f:(fun x i -> - assert (x=i); - i+1) - >>= fun x -> - <:test_result< int >> ~expect:n x; - Deferred.unit) - ; test1 ~make_transport ~imp:[pipe_count_imp] ~state:() ~f:(fun _ conn -> - Rpc.Pipe_rpc.dispatch pipe_count_rpc conn (-1) - >>= fun result -> - match result with - | Ok (Ok _) | Error _ -> assert false - | Ok (Error `Argument_must_be_positive) -> Deferred.unit) - ; let ivar = Ivar.create () in - test1 ~make_transport ~imp:[pipe_wait_imp ivar] ~state:() ~f:(fun conn1 conn2 -> - (* Test that the pipe is flushed when the connection is closed. *) - Rpc.Pipe_rpc.dispatch_exn pipe_wait_rpc conn2 () - >>= fun (pipe_r, _id) -> - Pipe.read pipe_r - >>= fun res -> - assert (res = `Ok ()); - don't_wait_for (Rpc.Connection.close conn1); - Ivar.fill ivar (); - Pipe.read pipe_r - >>= fun res -> - assert (res = `Ok ()); - Deferred.unit) - ] -;; - -let tests = - let max_message_size = 1_000_000 in - let make_transport_std (fd_r, fd_w) : Rpc.Transport.t = - { reader = Reader.create fd_r |> Rpc.Transport.Reader.of_reader ~max_message_size - ; writer = Writer.create fd_w |> Rpc.Transport.Writer.of_writer ~max_message_size - } - in - let make_transport_low_latency (fd_r, fd_w) : Rpc.Transport.t = - { reader = Rpc.Low_latency_transport.Reader.create fd_r ~max_message_size - ; writer = Rpc.Low_latency_transport.Writer.create fd_w ~max_message_size - } - in - make_tests ~make_transport:make_transport_std ~transport_name:"std" @ - make_tests ~make_transport:make_transport_low_latency ~transport_name:"low-latency" -;; diff --git a/test/scheduler_go_unhandled_exn.ml b/test/scheduler_go_unhandled_exn.ml deleted file mode 100644 index 8f22a2c..0000000 --- a/test/scheduler_go_unhandled_exn.ml +++ /dev/null @@ -1,17 +0,0 @@ -open Core.Std -open Async.Std - -let () = - (* This raises a toplevel-unhandled exception in Async. But we haven't started the - scheduler yet. *) - Scheduler.within (fun () -> - failwith "this exception should be caught by the try-with below"); - (* We start the scheduler, which should raise the exception. *) - begin - try - never_returns (Scheduler.go ~raise_unhandled_exn:true ()) - with - | _ -> Pervasives.exit 0 - end; - assert false; -;; diff --git a/test/shutdown_on_unhandled_exn.ml b/test/shutdown_on_unhandled_exn.ml deleted file mode 100644 index 49e93e5..0000000 --- a/test/shutdown_on_unhandled_exn.ml +++ /dev/null @@ -1,17 +0,0 @@ -open Core.Std -open Async.Std - -let () = - Shutdown.shutdown_on_unhandled_exn (); - upon (return ()) (fun () -> failwith "this should cause [shutdown] to be called"); - Shutdown.at_shutdown (fun () -> - Debug.amf _here_ "shutdown handler ran"; - return ()); - begin - try - never_returns (Scheduler.go ~raise_unhandled_exn:true ()) - with - | _ -> assert false; - end; - assert false; -;; diff --git a/test/shutdown_tests.ml b/test/shutdown_tests.ml deleted file mode 100644 index 1202589..0000000 --- a/test/shutdown_tests.ml +++ /dev/null @@ -1,62 +0,0 @@ -open Core.Std -open Async.Std -open Async_test_in_child_process - -let () = add_test _here_ Expect.(ok && no_output) (fun () -> return ()) - -let () = - add_test _here_ Expect.(ok && no_output) (fun () -> - let ivar = Ivar.create () in - Shutdown.don't_finish_before (Ivar.read ivar); - shutdown 0; - Ivar.fill ivar (); - Deferred.unit) -;; - -let () = - add_test _here_ Expect.(ok && no_output) (fun () -> - let ivar = Ivar.create () in - Shutdown.don't_finish_before (Ivar.read ivar); - Shutdown.at_shutdown (fun () -> Ivar.fill ivar (); Deferred.unit); - Deferred.unit); -;; - -let () = - add_test _here_ Expect.(ok && no_output) (fun () -> - Shutdown.don't_finish_before (after (sec 0.01)); - Deferred.unit); -;; - -let () = - add_test _here_ Expect.error (fun () -> - Shutdown.don't_finish_before (never ()); - shutdown 0 ~force:(after (sec 0.01)); - never ()); -;; - -let () = - add_test _here_ Expect.(ok && no_output) (fun () -> - for i = 1 to 100 do - Shutdown.don't_finish_before (after (Time.Span.of_ms (Float.of_int i))); - done; - shutdown 0; - never ()); -;; - -let () = - add_test _here_ Expect.error (fun () -> - for i = 1 to 100 do - Shutdown.don't_finish_before (after (Time.Span.of_ms (Float.of_int i))); - done; - shutdown 0 ~force:(after (sec 0.01)); - never ()); -;; - -let () = - add_test _here_ Expect.ok (fun () -> - for _i = 1 to 1_000 do - Shutdown.don't_finish_before Deferred.unit; - done; - shutdown 0; - never ()); -;; diff --git a/test/socket_test.ml b/test/socket_test.ml deleted file mode 100644 index 8ef3d2a..0000000 --- a/test/socket_test.ml +++ /dev/null @@ -1,75 +0,0 @@ -open Core.Std -open Async.Std - -let listen_to_named_pipe file ~add_fd = - let file_gone = - Sys.file_exists file - >>= function - | `Yes -> Unix.unlink file - | `Unknown | `No -> Deferred.unit - in - file_gone - >>= fun () -> - let socket = Socket.create Socket.Type.unix in - add_fd (Socket.fd socket); - Socket.bind socket (Socket.Address.Unix.create file) - >>| fun sock -> - add_fd (Socket.fd sock); - Socket.listen sock ~max_pending_connections:20 - -(* Check to see if the sexp representation of file descriptors contain - unprintable characters. *) -let printable_sexp () = - let file = Filename.dirname (Sys.executable_name) ^/ "socket_test" in - let to_close = ref [] in - let add_fd fd = - to_close := fd::!to_close - in - Monitor.protect (fun () -> - listen_to_named_pipe file ~add_fd - >>= fun local_socket -> - add_fd (Socket.fd local_socket); - let rec loop = function - | 0 -> return () - | remaining -> - don't_wait_for ( - Socket.accept local_socket - >>| function - | `Socket_closed -> assert false - | `Ok (socket, address) -> - add_fd (Socket.fd socket); - let sexp = (<:sexp_of< Socket.Address.t>> (address :> Socket.Address.t)) in - let address_string = Sexp.to_string sexp in - if String.exists address_string ~f:(fun c -> - not (Char.is_print c)) then - Error.raise (Error.create "Sexp contains unprintable characters" sexp Fn.id)); - Tcp.connect_sock (Tcp.to_file file) - >>= fun client -> - add_fd (Socket.fd client); - loop (remaining - 1) - in - loop 2) - ~finally:(fun () -> - after (Time.Span.of_sec 1.) - >>= fun () -> - Deferred.List.iter !to_close ~f:Unix.close) - -let tests = [ - "Socket_test.printable_sexp", printable_sexp; -] - -open Async_test_in_child_process - -let () = - add_test _here_ Expect.ok (fun () -> - Unix.close (Fd.stdin ()) - >>= fun () -> - Tcp.Server.create Tcp.on_port_chosen_by_os - (fun _address _reader _writer -> Deferred.unit) - >>= fun server -> - Tcp.with_connection - (Tcp.to_host_and_port "localhost" (Tcp.Server.listening_on server)) - (fun _socket _reader _writer -> Deferred.unit) - >>= fun () -> - return ()) -;; diff --git a/test/tcp_file_test.ml b/test/tcp_file_test.ml deleted file mode 100644 index 41aaa13..0000000 --- a/test/tcp_file_test.ml +++ /dev/null @@ -1,14 +0,0 @@ -open Core.Std -open Async.Std - -let serve_existing_static_file () = - Tcp_file.Server.serve ~auth:(fun _ -> true) Tcp.on_port_chosen_by_os - >>= fun (server : Tcp.Server.inet) -> - let file = (Filename.dirname Sys.executable_name) ^/ "tcp_file_test.ml" in - Tcp_file.Server.serve_existing_static_file file - >>= fun () -> - Tcp.Server.close server - -let tests = [ - "Tcp_file_test.serve_existing_static_file", serve_existing_static_file; -] diff --git a/test/tcp_serve.ml b/test/tcp_serve.ml deleted file mode 100644 index f794971..0000000 --- a/test/tcp_serve.ml +++ /dev/null @@ -1,100 +0,0 @@ -open Core.Std ;; -open Async.Std ;; - -let server_read_test = Ivar.create () ;; -let server_write_test = Ivar.create () ;; -let server_close_test = Ivar.create () ;; -let client_read_test = Ivar.create () ;; -let client_write_test = Ivar.create () ;; -let client_close_test = Ivar.create () ;; -let client_eof_test = Ivar.create () ;; - -let all_tests_success = Ivar.create () ;; - -let tcp_serve_test () = - begin - Ivar.read server_read_test >>> fun () -> - Ivar.read server_write_test >>> fun () -> - Ivar.read server_close_test >>> fun () -> - Ivar.read client_read_test >>> fun () -> - Ivar.read client_write_test >>> fun () -> - Ivar.read client_eof_test >>> fun () -> - Ivar.read client_close_test >>> fun () -> - - Ivar.fill all_tests_success () - end; - upon (after (sec 35.)) (fun () -> - let check ivar s = if Ivar.is_empty ivar then failwithf "tcp_serve_test: %s" s () in - check client_write_test "client_write_test"; - check server_read_test "server_read_test"; - check server_write_test "server_write_test"; - check client_read_test "client_read_test"; - check client_eof_test "server_eof_test"; - check server_close_test "server_close_test"; - check client_close_test "client_close_test"; - - check all_tests_success "all_tests_success!?!" - ); - Tcp.Server.create Tcp.on_port_chosen_by_os - ~buffer_age_limit:(`At_most (sec 1.)) - ~on_handler_error:(`Call (fun _a e -> - failwithf "Tcp.Server.create: handler error: %s" (Exn.to_string e) () - )) - (fun inet reader writer -> - Writer.close_finished writer >>> (fun () -> Ivar.fill server_close_test ()); - - let echo line = - Writer.write - writer - (sprintf "%s: %s\n" - (Unix.Inet_addr.to_string (Socket.Address.Inet.addr inet)) - line); - in - Reader.read_line reader - >>= function - | `Eof -> failwith "server: read test1: premature EOF" - | `Ok line -> - Ivar.fill server_read_test (); - echo line; - Reader.read_line reader - >>| function - | `Eof -> failwith "server: read test2: premature EOF" - | `Ok line -> - Ivar.fill server_write_test (); - echo line - ) - >>= fun server -> - Tcp.with_connection (Tcp.to_host_and_port "localhost" (Tcp.Server.listening_on server)) - (fun _ reader writer -> - Writer.close_finished writer >>> (fun () -> Ivar.fill client_close_test ()); - - Writer.write writer "foo\n"; - Ivar.fill client_write_test (); - - Reader.read_line reader - >>= function - | `Eof -> failwith "client: read test: premature EOF" - | `Ok s -> - ignore s; - Ivar.fill client_read_test (); - Writer.write writer "bar\n"; - Reader.read_line reader - >>= function - | `Eof -> failwith "client: read test: premature EOF" - | `Ok s -> - ignore s; - Reader.read_line reader - >>| function - | `Ok s -> failwithf "client: expected EOF, instead got: %s" s () - | `Eof -> Ivar.fill client_eof_test () - ) - >>= fun () -> - Ivar.read all_tests_success - >>= fun () -> - Tcp.Server.close server -;; - -let tests = - [ "Tcp_serve_read_write_close", tcp_serve_test; - ] -;; diff --git a/test/test-command.t b/test/test-command.t deleted file mode 100644 index 99cb270..0000000 --- a/test/test-command.t +++ /dev/null @@ -1,4 +0,0 @@ - $ $TESTDIR/test_command.exe ANON -some-flag FLAG - anon: ANON - flag: FLAG - later diff --git a/test/test-monitor-try-with-ignore-exn.t b/test/test-monitor-try-with-ignore-exn.t deleted file mode 100644 index dd1623e..0000000 --- a/test/test-monitor-try-with-ignore-exn.t +++ /dev/null @@ -1,2 +0,0 @@ - $ $TESTDIR/monitor_try_with_ignore_exn.exe 2>&1 \ - > | grep -qF 'Exception raised to Monitor.try_with that already returned' diff --git a/test/test_command.ml b/test/test_command.ml deleted file mode 100644 index 85c7c31..0000000 --- a/test/test_command.ml +++ /dev/null @@ -1,22 +0,0 @@ -open Core.Std -open Async.Std - -let command = - Command.async' ~summary:"test command" - Command.Param.( - anon ("STRING" %: string) - @> flag "some-flag" (required string) ~doc:"" - @> nil) - (fun anon flag () -> - print_string (String.concat [ "\ -anon: ";anon;" -flag: ";flag;" -" - ]); - after (sec 0.01) - >>= fun () -> - printf "later\n"; - return ()) -;; - -let () = Command.run command diff --git a/test/test_handler.ml b/test/test_handler.ml deleted file mode 100644 index f90e84f..0000000 --- a/test/test_handler.ml +++ /dev/null @@ -1,171 +0,0 @@ -open Core.Std -open Async.Std -let return = Deferred.return - -exception Exn - -let t0 () = - Deferred.create (fun ivar -> - Monitor.try_with (fun () -> raise Exn) >>> function - | Ok () -> assert false - | Error error -> - assert (Monitor.extract_exn error = Exn); - Ivar.fill ivar ()) -;; - -let t1 () = - let monitor = Monitor.create ~name:"1st-monitor" () in - let errors = Monitor.detach_and_get_error_stream monitor in - Deferred.create (fun ivar -> - begin - Scheduler.within' ~monitor (fun () -> raise Exn) >>> fun _ -> - assert false - end; - Stream.next errors >>> function - | Stream.Nil -> assert false - | Stream.Cons (error, _) -> - assert (Monitor.extract_exn error = Exn); - Ivar.fill ivar ()) -;; - -(* handlers run in the execution context that was in effect when they - were created rather than the execution context that is in effect when - they are installed. *) -let t2 () = - let m1 = Monitor.create ~name:"1st-monitor" () in - let m2 = Monitor.create ~name:"2nd-monitor" () in - let errors1 = Monitor.detach_and_get_error_stream m1 in - let errors2 = Monitor.detach_and_get_error_stream m2 in - Deferred.create (fun ivar -> - Scheduler.within' ~monitor:m1 (fun () -> - return (Handler.create (fun _ -> raise Exn (* to m1 *))) - ) >>> fun handler -> - Scheduler.within' ~monitor:m2 (fun () -> - let _uninstall = Handler.install handler Deferred.unit in - Deferred.unit - ) >>> fun () -> - (Stream.iter errors2 ~f:(fun _ -> assert false)); - (Stream.iter errors1 ~f:(fun error -> - assert (Monitor.extract_exn error = Exn); - Ivar.fill ivar ()))) -;; - -(* prepending to the handler under [m2] doesn't affect this property *) -let t3 () = - let m1 = Monitor.create ~name:"1st-monitor" () in - let m2 = Monitor.create ~name:"2nd-monitor" () in - let errors1 = Monitor.detach_and_get_error_stream m1 in - let errors2 = Monitor.detach_and_get_error_stream m2 in - Deferred.create (fun ivar -> - Scheduler.within' ~monitor:m1 (fun () -> - return (Handler.create (fun _ -> raise Exn (* to m1 *))) - ) >>> fun handler -> - Scheduler.within' ~monitor:m2 (fun () -> - let handler = Handler.prepend handler ~f:Fn.id in - let _uninstall = Handler.install handler Deferred.unit in - Deferred.unit - ) >>> fun () -> - (Stream.iter errors2 ~f:(fun _ -> assert false)); - (Stream.iter errors1 ~f:(fun error -> - assert (Monitor.extract_exn error = Exn); - Ivar.fill ivar ()))) -;; - -(* A variant of t3 where the failure happens inside the prepended function *) -let t6 () = - let m1 = Monitor.create ~name:"1st-monitor" () in - let m2 = Monitor.create ~name:"2nd-monitor" () in - let errors1 = Monitor.detach_and_get_error_stream m1 in - let errors2 = Monitor.detach_and_get_error_stream m2 in - Deferred.create (fun ivar -> - Scheduler.within' ~monitor:m1 (fun () -> - return (Handler.create (fun x -> x)) - ) >>> fun handler -> - Scheduler.within' ~monitor:m2 (fun () -> - let handler = - Handler.prepend handler ~f:(fun _ -> raise Exn (* to m1 *)) - in - let _uninstall = Handler.install handler Deferred.unit in - Deferred.unit - ) >>> fun () -> - (Stream.iter errors2 ~f:(fun _ -> assert false)); - (Stream.iter errors1 ~f:(fun error -> - assert (Monitor.extract_exn error = Exn); - Ivar.fill ivar ()))) -;; - -(* filtering the handler under [m2] doesn't affect this property *) -let t4 () = - let m1 = Monitor.create ~name:"1st-monitor" () in - let m2 = Monitor.create ~name:"2nd-monitor" () in - let errors1 = Monitor.detach_and_get_error_stream m1 in - let errors2 = Monitor.detach_and_get_error_stream m2 in - Deferred.create (fun ivar -> - Scheduler.within' ~monitor:m1 (fun () -> - return (Handler.create (fun _ -> raise Exn (* to m1 *))) - ) >>> fun handler -> - Scheduler.within' ~monitor:m2 (fun () -> - let handler = Handler.filter handler ~f:(fun _ -> true) in - let _uninstall = Handler.install handler Deferred.unit in - Deferred.unit - ) >>> fun () -> - (Stream.iter errors2 ~f:(fun _ -> assert false)); - (Stream.iter errors1 ~f:(fun error -> - assert (Monitor.extract_exn error = Exn); - Ivar.fill ivar ()))) -;; - -(* A variant of t4 where the failure happens inside the filter predicate *) -let t7 () = - let m1 = Monitor.create ~name:"1st-monitor" () in - let m2 = Monitor.create ~name:"2nd-monitor" () in - let errors1 = Monitor.detach_and_get_error_stream m1 in - let errors2 = Monitor.detach_and_get_error_stream m2 in - Deferred.create (fun ivar -> - Scheduler.within' ~monitor:m1 (fun () -> - return (Handler.create (fun x -> x)) - ) >>> fun handler -> - Scheduler.within' ~monitor:m2 (fun () -> - let handler = - Handler.filter handler ~f:(fun _ -> raise Exn (* to m1 *)) - in - let _uninstall = Handler.install handler Deferred.unit in - Deferred.unit - ) >>> fun () -> - (Stream.iter errors2 ~f:(fun _ -> assert false)); - (Stream.iter errors1 ~f:(fun error -> - assert (Monitor.extract_exn error = Exn); - Ivar.fill ivar ()))) -;; - -(* handlers can be uninstalled *) -let t5 () = - let m1 = Monitor.create ~name:"1st-monitor" () in - let m2 = Monitor.create ~name:"2nd-monitor" () in - let errors1 = Monitor.detach_and_get_error_stream m1 in - let errors2 = Monitor.detach_and_get_error_stream m2 in - Deferred.create (fun ivar -> - Scheduler.within' ~monitor:m1 (fun () -> - return (Handler.create (fun _ -> raise Exn (* to m1 *))) - ) >>> fun handler -> - Scheduler.within' ~monitor:m2 (fun () -> - let uninstall = Handler.install handler Deferred.unit in - uninstall (); - Deferred.unit - ) >>> fun () -> - (Stream.iter errors2 ~f:(fun _ -> assert false)); - (Stream.iter errors1 ~f:(fun _ -> assert false)); - Clock.after Time.Span.millisecond >>> fun () -> - Ivar.fill ivar ()) -;; - -let tests = [ - ("Test_handler.t0", t0); - ("Test_handler.t1", t1); - ("Test_handler.t2", t2); - ("Test_handler.t3", t3); - ("Test_handler.t4", t4); - ("Test_handler.t5", t5); - ("Test_handler.t6", t6); - ("Test_handler.t7", t7); -] diff --git a/test/test_runner.ml b/test/test_runner.ml deleted file mode 100644 index 4c3e2d9..0000000 --- a/test/test_runner.ml +++ /dev/null @@ -1,4 +0,0 @@ -let () = - (* Async_unix.Inline_tests.run (); *) - (* Async_extra.Inline_tests.run (); *) - () diff --git a/test/tests_in_child_processes.ml b/test/tests_in_child_processes.ml deleted file mode 100644 index a4ebfcb..0000000 --- a/test/tests_in_child_processes.ml +++ /dev/null @@ -1,11 +0,0 @@ -open Core.Std -open Async.Std let _ = _squelch_unused_module_warning_ -open Async_test_in_child_process - -(* The following [include]s exist to make those modules depend on this one. *) -include (Reset_in_forked_process_tests : sig end) -include (Socket_test : sig end) -include (Shutdown_tests : sig end) -include (Writer_close_and_shutdown_tests : sig end) - -let () = never_returns (main ()) diff --git a/test/thread_safe_test.ml b/test/thread_safe_test.ml deleted file mode 100644 index b4ac17f..0000000 --- a/test/thread_safe_test.ml +++ /dev/null @@ -1,125 +0,0 @@ -open Core.Std -open Async.Std - -let tests : (string * (unit -> unit Deferred.t)) list ref = ref [] - -let test name f = - tests := (String.concat [ "Thread_safe: "; name ], f) :: !tests -;; - -let () = - test "run_in_async_with_optional_cycle must be in thread" (fun () -> - begin try - ignore (Thread_safe.run_in_async_with_optional_cycle (fun () -> (`Run_a_cycle, ())) - : (unit, exn) Result.t); - assert false; - with _ -> () - end; - Deferred.unit) -;; - -let () = - test "not am_holding_async_lock in a thread" (fun () -> - assert (Thread_safe.am_holding_async_lock ()); - In_thread.run (fun () -> - assert (not (Thread_safe.am_holding_async_lock ())))) -;; - -let () = - test "run_in_async_exn am_holding_async_lock" (fun () -> - assert - (try Thread_safe.run_in_async_exn (fun () -> assert false); false - with _ -> true); - In_thread.run (fun () -> - Thread_safe.run_in_async_exn (fun () -> - assert (Thread_safe.am_holding_async_lock ()))); - ) -;; - -let () = - test "run_in_async_exn" (fun () -> - let message = "foo" in - Monitor.try_with (fun () -> - In_thread.run (fun () -> - Thread_safe.run_in_async_exn (fun () -> failwith message))) - >>| function - | Ok () -> assert false; - | Error exn -> - match Monitor.extract_exn exn with - | Failure message' -> assert (message = message') - | _ -> assert false - ) -;; - -let () = - test "deferred" (fun () -> - In_thread.run (fun () -> - let (d, put) = Thread_safe.deferred () in - put 13; - d) - >>= fun d -> - d - >>| fun i -> - assert (i = 13); - ) -;; - -let () = - test "deferred2" (fun () -> - In_thread.run (fun () -> Thread_safe.deferred ()) - >>= fun (d, put) -> - assert (try put 13; false with _ -> true); - don't_wait_for (In_thread.run (fun () -> put 13)); - d - >>| fun i -> - assert (i = 13); - ) - -let log_string = Async_kernel.Debug.log_string - -let () = - test "Thread_safe_pipe" (fun () -> - let module P = Thread_safe_pipe in - let r, p = P.create () in - assert (try P.write p 13 ~if_closed:Raise; false with _ -> true); - let throttle = Throttle.create ~continue_on_error:false ~max_concurrent_jobs:1 in - let run_in_thread f = - don't_wait_for (Throttle.enqueue throttle (fun () -> In_thread.run f)) - in - let num_elts = 100 in - for i = 0 to num_elts - 1; do - run_in_thread (fun () -> P.write p i ~if_closed:Raise) - done; - run_in_thread (fun () -> P.close p); - Pipe.to_list r - >>| fun list -> - assert (list = List.init num_elts ~f:Fn.id); - ) -;; - -let () = - test "Thread_safe_pipe2" (fun () -> - In_thread.run (fun () -> Thread_safe_pipe.create ()) - >>= fun (pipe_reader, pipe_writer) -> - assert (* [write] raises if we're in Async. *) - (try - Thread_safe_pipe.write pipe_writer 13 ~if_closed:Raise; false - with _ -> - true); - let throttle = Throttle.create ~continue_on_error:false ~max_concurrent_jobs:1 in - let run_in_thread f = - don't_wait_for (Throttle.enqueue throttle (fun () -> In_thread.run f)) - in - let num_elts = 100 in - for i = 0 to num_elts - 1; do - run_in_thread (fun () -> - Thread_safe_pipe.write_without_pushback pipe_writer i ~if_closed:Raise) - done; - run_in_thread (fun () -> Thread_safe_pipe.close pipe_writer); - Pipe.to_list pipe_reader - >>| fun list -> - assert (list = List.init num_elts ~f:Fn.id); - ) -;; - -let tests = !tests diff --git a/test/wait_test.ml b/test/wait_test.ml deleted file mode 100644 index fe20c18..0000000 --- a/test/wait_test.ml +++ /dev/null @@ -1,71 +0,0 @@ -open Core.Std -open Async.Std - -let fast_child () = Unix.fork_exec ~prog:"/bin/true" ~args:[] () -let slow_child () = - let prog = "/bin/sleep" in - Unix.fork_exec ~prog ~args:[ prog; "1" ] () -;; - -let raises_echild f = - try ignore (f ()); false with - | Unix.Unix_error (ECHILD, _, _) -> true -;; - -let test_raises_echild () = - assert (raises_echild (fun () -> Unix.wait_nohang `Any )); - assert (raises_echild (fun () -> Unix.wait_nohang `My_group)); - assert (raises_echild (fun () -> Unix.wait_nohang_untraced `Any )); - assert (raises_echild (fun () -> Unix.wait_nohang_untraced `My_group)); - Deferred.unit - -let wait_on_fast_child wait_on = - fast_child () - >>= fun pid -> - Unix.wait (wait_on pid) - >>| fun (pid', exit_or_signal) -> - assert (Result.is_ok exit_or_signal); - assert (pid = pid') - -let wait_on_slow_child wait_on = - Deferred.List.map (List.init 10 ~f:(fun _i -> slow_child ())) ~f:Fn.id - >>= fun pids -> - Deferred.List.iter ~how:`Parallel pids ~f:(fun pid -> - let wait_on = wait_on pid in - Monitor.try_with (fun () -> - Unix.wait wait_on - >>| fun (_, exit_or_signal) -> - if not (Result.is_ok exit_or_signal) then - failwith (Unix.Exit_or_signal.to_string_hum exit_or_signal)) - >>| function - | Ok () -> () - | Error exn -> - failwiths "wait for slow child" (wait_on, exn) - (<:sexp_of< Unix.wait_on * exn >>)) - -let unsafe_tests = - ("Wait_test.raises_echild", test_raises_echild) - :: (List.concat_map [ - ("any", (fun _ -> `Any)); - ("my_group", (fun _ -> `My_group)); - ("pid", (fun pid -> `Pid pid)); - ] ~f:(fun (name, wait_on) -> [ - "Wait_test.fast_child_" ^ name, (fun () -> wait_on_fast_child wait_on); - "Wait_test.slow_child_" ^ name, (fun () -> wait_on_slow_child wait_on); - ])) -;; - -(* Clear wait-status for any children spawned by previous tests. *) -let rec clear_wait_status () = - let continue = - try - ignore (Unix.wait_nohang `Any); - true - with _ -> false - in - if continue then clear_wait_status () - -let tests = List.map unsafe_tests ~f:(fun (name, test) -> name, (fun () -> - clear_wait_status (); - test ())) -;; diff --git a/test/writer_close_and_shutdown_tests.ml b/test/writer_close_and_shutdown_tests.ml deleted file mode 100644 index 571bff0..0000000 --- a/test/writer_close_and_shutdown_tests.ml +++ /dev/null @@ -1,78 +0,0 @@ -open Core.Std -open Async.Std -open Async_test_in_child_process - -let tmp_file = "/tmp/z.file" - -let () = - for power_of_two = 0 to 20 do - let contents () = - String.init (Float.to_int (Float.ldexp 1. power_of_two)) ~f:(fun i -> - Char.of_int_exn (Int.rem i 256)) - in - let expect = - Expect.(ok && no_output && custom "correct contents" (fun () -> - Reader.file_contents tmp_file - >>= fun contents' -> - Unix.unlink tmp_file - >>| fun () -> - if contents' = contents () then - Ok () - else - Or_error.error_string "contents mismatch")) - in - add_test _here_ expect (fun () -> - Writer.open_file tmp_file - >>= fun writer -> - Writer.write writer (contents ()); - Deferred.unit) - done -;; - -let () = - let string1 = "hello\n" in - let string2 = "goodbye\n" in - let expect = - Expect.(ok && no_output && custom "correct contents" (fun () -> - Reader.file_contents tmp_file - >>= fun contents' -> - Unix.unlink tmp_file - >>| fun () -> - if contents' = string1 ^ string2 then - Ok () - else - Or_error.error_string "contents mismatch")) - in - add_test _here_ expect (fun () -> - Writer.open_file tmp_file - >>= fun writer -> - Shutdown.at_shutdown (fun () -> - Writer.write writer string2; - Writer.flushed writer); - Writer.write writer string1; - Deferred.unit); -;; - -let () = - let string1 = "hello\n" in - let string2 = "goodbye\n" in - let expect = - Expect.(ok && no_output && custom "correct contents" (fun () -> - Reader.file_contents tmp_file - >>= fun contents' -> - Unix.unlink tmp_file - >>| fun () -> - if contents' = string1 ^ string2 then - Ok () - else - Or_error.error_string "contents mismatch")) - in - add_test _here_ expect (fun () -> - Writer.open_file tmp_file - >>= fun writer -> - Shutdown.at_shutdown (fun () -> - Writer.write writer string2; - Writer.close writer); - Writer.write writer string1; - Deferred.unit); -;; diff --git a/test/writer_test.ml b/test/writer_test.ml deleted file mode 100644 index c1304d7..0000000 --- a/test/writer_test.ml +++ /dev/null @@ -1,300 +0,0 @@ -open Core.Std -open Qtest_lib.Std -open Async.Std - -let concat = String.concat - -let tests = ref [] - -let add_test name { Lexing. pos_fname; pos_lnum; _ } f = - let name = - concat [ Filename.basename pos_fname - ; ":"; Int.to_string pos_lnum - ; " "; name - ] - in - tests := (name, f) :: !tests -;; - -let write = add_test "write" _here_ (fun () -> - let file = "tmp_writer_test.txt" in - Writer.open_file file - >>= fun writer -> - Writer.write writer "abc\n"; - Writer.write writer "def\n"; - Writer.write writer "ghi\n"; - Writer.close writer - >>= fun () -> - Reader.with_file file ~f:Reader.contents - >>= fun contents -> - assert_string_equal contents "abc\ndef\nghi\n"; - Unix.unlink file) -;; - -let multiple_writers here n = - add_test (concat [ Int.to_string n; " writers" ]) here (fun () -> - let files = Array.init n ~f:(fun i -> - sprintf "writer_test%d.txt" i) - in - let files = Array.to_list files in - let writers = List.map ~f:Writer.open_file files in - Deferred.all writers - >>= fun writers -> - for i = 1 to 10 - do - List.iter writers ~f:(fun writer -> - Writer.writef writer "line %d\n" i) - done; - Deferred.all_unit (List.map ~f:Writer.close writers) - >>= fun () -> - Deferred.all_unit (List.map ~f:Unix.unlink files)) -;; - -let () = multiple_writers _here_ 10 -let () = multiple_writers _here_ 100 -let () = multiple_writers _here_ 500 - -let append = add_test "append" _here_ (fun () -> - let max = 1000 in - let file = "tmp_writer_test_append.txt" in - Writer.save file ~contents:"" - >>= fun () -> - let append text = - Writer.with_file file ~append:true ~f:(fun writer -> - Writer.write writer text; - Deferred.unit) - in - Deferred.create (fun ivar -> - let rec write i = - append (Int.to_string i ^ "\n") - >>> fun () -> - if i < max - then write (i + 1) - else Ivar.fill ivar () - in - write 1) - >>= fun () -> - Reader.open_file file - >>= fun reader -> - let lines = Reader.lines reader in - let expected = ref 1 in - Pipe.iter' lines ~f:(fun lines -> - Queue.iter lines ~f:(fun line -> - assert_string_equal (Int.to_string !expected) line; - incr expected); - Deferred.unit) - >>= fun () -> - assert_string_equal (Int.to_string !expected) (Int.to_string (max + 1)); - Reader.close reader - >>= fun () -> - Unix.unlink file) -;; - -let write_lots writer = - let ss = String.make 4096 's' in - for _i = 1 to 64 do - Writer.write writer ss - done -;; - -let () = add_test "buffer_age_limit" _here_ (fun () -> - Unix.pipe (Info.of_string "buffer_age_limit") - >>= function (`Reader reader_fd, `Writer writer_fd) -> - let writer_to_close = ref None in - try_with (fun () -> - let writer = - Writer.create writer_fd ~buffer_age_limit:(`At_most (sec 2.)) - in - writer_to_close := Some writer; - write_lots writer; - after (sec 5.) - >>| fun () -> - `Did_not_raise_error) - >>= function - | Ok `Did_not_raise_error -> failwith "buffer-age check did not fire" - | Error _ -> - (* buffer-age check fired correctly *) - Writer.close ~force_close:(return ()) (Option.value_exn !writer_to_close) - >>= fun () -> - Unix.Fd.close reader_fd) -;; - -let () = add_test "reduce_buffer_age_limit" _here_ (fun () -> - Unix.pipe (Info.of_string "buffer_age_limit") - >>= function (`Reader reader_fd, `Writer writer_fd) -> - let writer_to_close = ref None in - try_with (fun () -> - let writer = - Writer.create writer_fd ~buffer_age_limit:(`At_most (sec 180.)) - in - writer_to_close := Some writer; - Writer.set_buffer_age_limit writer (`At_most (sec 1.)); - write_lots writer; - after (sec 2.) - >>| fun () -> - `Did_not_raise_error) - >>= function - | Ok `Did_not_raise_error -> failwith "buffer-age check did not fire" - | Error _ -> - (* buffer-age check fired correctly *) - Writer.close ~force_close:(return ()) (Option.value_exn !writer_to_close) - >>= fun () -> - Unix.Fd.close reader_fd) -;; - -let () = add_test "increase_buffer_age_limit" _here_ (fun () -> - Unix.pipe (Info.of_string "buffer_age_limit") - >>= function (`Reader reader_fd, `Writer writer_fd) -> - let writer = - Writer.create writer_fd ~buffer_age_limit:(`At_most (sec 2.)) - in - Writer.set_buffer_age_limit writer (`At_most (sec 180.)); - write_lots writer; - after (sec 5.) - >>= fun () -> - Writer.close ~force_close:(return ()) writer - >>= fun () -> - Unix.Fd.close reader_fd) -;; - -let () = add_test "flush_on_close" _here_ (fun () -> - let module Debug = Async_kernel.Debug in - let file = "flush_on_close.txt" in - Writer.open_file file - >>= fun writer -> - let pipe_r, pipe_w = Pipe.create () in - let buffer = Buffer.create 1 in - let write s = - Buffer.add_string buffer s; - Pipe.write_without_pushback pipe_w s; - in - don't_wait_for (Writer.transfer writer (Pipe.map pipe_r ~f:Fn.id) - (fun s -> Writer.write writer s)); - write "hello\n"; - Pipe.downstream_flushed pipe_w - >>= function - | `Reader_closed -> assert false - | `Ok -> - write "goodbye\n"; - Writer.close writer - >>= fun () -> - Reader.file_contents file - >>= fun s -> - assert (s = Buffer.contents buffer); - Unix.unlink file) -;; - -let () = add_test "schedule_non_zero_pos" _here_ (fun () -> - let file = "schedule-non-zero-pos.txt" in - Writer.with_file file ~f:(fun writer -> - let buf = Bigstring.create 2 in - Bigstring.set buf 1 '$'; - Writer.schedule_bigstring writer buf ~pos:1 ~len:1; - return ()) - >>= fun () -> - Reader.with_file file ~f:Reader.contents - >>= fun contents -> - assert_string_equal contents "$"; - Unix.unlink file) -;; - -let () = add_test "stdout" _here_ (fun () -> - Core.Std.printf "not from writer 1\n"; - return ()) -;; - -let transfer_test f = - let pipe_reader, pipe_writer = Pipe.create () in - Unix.pipe (Info.create "transfer" () <:sexp_of< unit >>) - >>= fun (`Reader rfd, `Writer wfd) -> - let reader = Reader.create rfd in - let writer = Writer.create wfd ~raise_when_consumer_leaves:false in - let transfer_finished = - Writer.transfer writer pipe_reader (fun s -> Writer.write writer s) - in - f pipe_reader pipe_writer reader writer; - transfer_finished - >>= fun () -> - Reader.close reader - >>= fun () -> - Writer.close writer -;; - -let () = add_test "transfer close consumer" _here_ (fun () -> - transfer_test (fun _pipe_reader pipe_writer reader _writer -> - don't_wait_for - (Reader.close reader - >>| fun () -> - Pipe.write_without_pushback pipe_writer "hello"))) -;; - -let () = add_test "transfer close producer" _here_ (fun () -> - transfer_test (fun _pipe_reader pipe_writer _reader _writer -> - Pipe.close pipe_writer)) -;; - -let dfor from to_ ~f = - let rec loop i = - if i > to_ - then return () - else f i >>= fun () -> loop (i + 1) - in - loop from -;; - -let () = add_test "write_iobuf" _here_ (fun () -> - let file = "write_iobuf_test" in - let iobuf = Iobuf.of_string "hello" in - dfor (-1) (Iobuf.length iobuf + 1) ~f:(fun pos -> - dfor (-1) (Iobuf.length iobuf + 1 - pos) ~f:(fun len -> - let slice_is_ok = pos >= 0 && len >= 0 && pos + len <= Iobuf.length iobuf in - Writer.with_file file ~f:(fun writer -> - begin - try - Writer.write_iobuf writer iobuf ~pos ~len - with _ -> assert (not slice_is_ok); - end; - return ()) - >>= fun () -> - Reader.file_contents file - >>= fun s -> - <:test_result< string >> s - ~expect:(if slice_is_ok - then Iobuf.to_string (Iobuf.sub_shared iobuf ~pos ~len) - else ""); - return ())) - >>= fun () -> - Unix.unlink file) -;; - -let () = add_test "save_sexps" _here_ (fun () -> - let values = ["abc",1 ;"def",2 ;"ghi jkl mno",123124] in - let sexps = List.map values ~f:<:sexp_of> in - let file = "save_sexps" in - Writer.save_sexps file sexps - >>= fun () -> - Reader.load_sexps_exn file <:of_sexp> - >>= fun read_values -> - Unix.unlink file - >>| fun () -> - <:test_eq<(string * int) list>> values read_values) -;; - -let () = add_test "write_sexp" _here_ (fun () -> - (* this tests a special case when writing a single sexp that's not surrounded by - quotes or parens, in those cases write_sexp must add whitespace after the sexp *) - let values = [1;2;-3;4;-1;1;123;-1;-5] in - let sexps = List.map values ~f:Int.sexp_of_t in - let file = "write_sexp" in - Writer.with_file file ~f:(fun w -> - List.iter sexps ~f:(fun s -> Writer.write_sexp w s); - Deferred.unit) - >>= fun () -> - Reader.load_sexps_exn file Int.t_of_sexp - >>= fun read_values -> - Unix.unlink file - >>| fun () -> - <:test_eq> values read_values) -;; - -let tests = List.rev !tests