Skip to content

Commit

Permalink
doc: add units to Scheduler functions (#10637)
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Sherratt <stephen@sherra.tt>
  • Loading branch information
gridbugs committed Jun 25, 2024
1 parent 01fe1d7 commit 6b651ca
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 48 deletions.
2 changes: 1 addition & 1 deletion bin/exec.ml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ module Watch = struct
cause pid reuse *)
Unix.kill pid_int signal;
let do_wait () =
Scheduler.wait_for_process ~timeout:1. pid
Scheduler.wait_for_process ~timeout_seconds:1. pid
|> Fiber.map ~f:(fun (_ : Proc.Process_info.t) -> ())
in
let on_error (e : Exn_with_backtrace.t) =
Expand Down
2 changes: 1 addition & 1 deletion bin/monitor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ let monitor ~quit_on_disconnect () =
Console.Status_line.set
(Console.Status_line.Live
(fun () -> Pp.verbatim ("Waiting for RPC server" ^ String.make (i mod 4) '.')));
let+ () = Scheduler.sleep 0.3 in
let+ () = Scheduler.sleep ~seconds:0.3 in
Some (i + 1))
;;

Expand Down
2 changes: 1 addition & 1 deletion bin/rpc/build.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ let retry_loop once =
match res with
| Some result -> Fiber.return result
| None ->
let* () = Scheduler.sleep 0.2 in
let* () = Scheduler.sleep ~seconds:0.2 in
loop ()
in
loop ()
Expand Down
55 changes: 31 additions & 24 deletions src/dune_engine/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -696,21 +696,21 @@ end
module Alarm_clock : sig
type t

val create : Event.Queue.t -> frequency:float -> t
val create : Event.Queue.t -> period_seconds:float -> t

type alarm

val await : alarm -> [ `Finished | `Cancelled ] Fiber.t
val cancel : t -> alarm -> unit
val sleep : t -> float -> alarm
val sleep : t -> seconds:float -> alarm
val close : t -> unit
end = struct
type alarm = [ `Finished | `Cancelled ] Fiber.Ivar.t

type t =
{ events : Event.Queue.t
; mutex : Mutex.t
; frequency : float
; period_seconds : float
; mutable alarms : (float * [ `Finished | `Cancelled ] Fiber.Ivar.t) list
; mutable active : bool
}
Expand Down Expand Up @@ -747,7 +747,7 @@ end = struct
(match Nonempty_list.of_list expired with
| None -> ()
| Some expired -> Event.Queue.send_timers_completed t.events expired);
Thread.delay t.frequency;
Thread.delay t.period_seconds;
Mutex.lock t.mutex;
loop ()
in
Expand All @@ -757,20 +757,22 @@ end = struct
Mutex.unlock t.mutex
;;

let create events ~frequency =
let t = { events; active = true; alarms = []; frequency; mutex = Mutex.create () } in
let create events ~period_seconds =
let t =
{ events; active = true; alarms = []; period_seconds; mutex = Mutex.create () }
in
Thread.spawn (polling_loop t);
t
;;

let sleep t duration =
let sleep t ~seconds =
Mutex.lock t.mutex;
let ivar = Fiber.Ivar.create () in
if not t.active
then (
Mutex.unlock t.mutex;
Code_error.raise "cannot schedule timers after close" []);
t.alarms <- (duration +. Unix.gettimeofday (), ivar) :: t.alarms;
t.alarms <- (seconds +. Unix.gettimeofday (), ivar) :: t.alarms;
Mutex.unlock t.mutex;
ivar
;;
Expand Down Expand Up @@ -949,7 +951,7 @@ let prepare (config : Config.t) ~(handler : Handler.t) ~events ~file_watcher =
; file_watcher
; fs_syncs = Dune_file_watcher.Sync_id.Table.create 64
; build_inputs_changed = Trigger.create ()
; alarm_clock = lazy (Alarm_clock.create events ~frequency:0.1)
; alarm_clock = lazy (Alarm_clock.create events ~period_seconds:0.1)
; cancel
; thread_pool = Thread_pool.create ~spawn_thread ~min_workers:4 ~max_workers:50
}
Expand Down Expand Up @@ -1226,7 +1228,7 @@ module Run = struct

let go
(config : Config.t)
?timeout
?timeout_seconds
?(file_watcher = No_watcher)
~(on_event : Config.t -> Handler.Event.t -> unit)
run
Expand All @@ -1251,11 +1253,11 @@ module Run = struct
Memo.reset initial_invalidation;
let result =
let run =
match timeout with
match timeout_seconds with
| None -> run
| Some timeout ->
fun () ->
let sleep = Alarm_clock.sleep (Lazy.force t.alarm_clock) timeout in
let sleep = Alarm_clock.sleep (Lazy.force t.alarm_clock) ~seconds:timeout in
Fiber.fork_and_join_unit
(fun () ->
let+ res = Alarm_clock.await sleep in
Expand Down Expand Up @@ -1311,9 +1313,9 @@ let inject_memo_invalidation invalidation =
Fiber.return ()
;;

let wait_for_process_with_timeout t pid waiter ~timeout ~is_process_group_leader =
let wait_for_process_with_timeout t pid waiter ~timeout_seconds ~is_process_group_leader =
Fiber.of_thunk (fun () ->
let sleep = Alarm_clock.sleep (Lazy.force t.alarm_clock) timeout in
let sleep = Alarm_clock.sleep (Lazy.force t.alarm_clock) ~seconds:timeout_seconds in
Fiber.fork_and_join_unit
(fun () ->
let+ res = Alarm_clock.await sleep in
Expand All @@ -1328,31 +1330,36 @@ let wait_for_process_with_timeout t pid waiter ~timeout ~is_process_group_leader
res))
;;

let wait_for_build_process ?timeout ?(is_process_group_leader = false) pid =
let wait_for_build_process ?timeout_seconds ?(is_process_group_leader = false) pid =
let* t = t () in
match timeout with
match timeout_seconds with
| None -> wait_for_build_process t pid
| Some timeout ->
| Some timeout_seconds ->
wait_for_process_with_timeout
t
pid
wait_for_build_process
~timeout
~timeout_seconds
~is_process_group_leader
;;

let wait_for_process ?timeout ?(is_process_group_leader = false) pid =
let wait_for_process ?timeout_seconds ?(is_process_group_leader = false) pid =
let* t = t () in
match timeout with
match timeout_seconds with
| None -> wait_for_process t pid
| Some timeout ->
wait_for_process_with_timeout t pid wait_for_process ~timeout ~is_process_group_leader
| Some timeout_seconds ->
wait_for_process_with_timeout
t
pid
wait_for_process
~timeout_seconds
~is_process_group_leader
;;

let sleep duration =
let sleep ~seconds =
let* t = t () in
let alarm_clock = Lazy.force t.alarm_clock in
let+ res = Alarm_clock.await (Alarm_clock.sleep alarm_clock duration) in
let+ res = Alarm_clock.await (Alarm_clock.sleep alarm_clock ~seconds) in
match res with
| `Finished -> ()
| `Cancelled ->
Expand Down
14 changes: 7 additions & 7 deletions src/dune_engine/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ module Run : sig

val go
: Config.t
-> ?timeout:float
-> ?timeout_seconds:float
-> ?file_watcher:file_watcher
-> on_event:(Config.t -> Event.t -> unit)
-> (unit -> 'a Fiber.t)
Expand All @@ -94,7 +94,7 @@ val with_job_slot : (Fiber.Cancel.t -> Config.t -> 'a Fiber.t) -> 'a Fiber.t
true, kill the entire process group instead of just the process in case of
timeout. *)
val wait_for_process
: ?timeout:float
: ?timeout_seconds:float
-> ?is_process_group_leader:bool
-> Pid.t
-> Proc.Process_info.t Fiber.t
Expand All @@ -104,7 +104,7 @@ type termination_reason =
| Cancel

val wait_for_build_process
: ?timeout:float
: ?timeout_seconds:float
-> ?is_process_group_leader:bool
-> Pid.t
-> (Proc.Process_info.t * termination_reason) Fiber.t
Expand Down Expand Up @@ -143,10 +143,10 @@ val cancel_current_build : unit -> unit Fiber.t

val inject_memo_invalidation : Memo.Invalidation.t -> unit Fiber.t

(** [sleep duration] wait for [duration] to elapse. Sleepers are checked for
wake up at a rate of once per 0.1 seconds. So [duration] should be at least
this long. *)
val sleep : float -> unit Fiber.t
(** [sleep duration] wait for [duration] seconds to elapse. Sleepers
are checked for wake up at a rate of once per 0.1 seconds. So
[duration] should be at least this long. *)
val sleep : seconds:float -> unit Fiber.t

val stats : unit -> Dune_stats.t option Fiber.t

Expand Down
2 changes: 1 addition & 1 deletion src/dune_pkg/rev_store.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ let rec attempt_to_lock flock lock ~max_retries =
| Ok `Failure ->
if max_retries > 0
then
let* () = Scheduler.sleep sleep_duration in
let* () = Scheduler.sleep ~seconds:sleep_duration in
attempt_to_lock flock lock ~max_retries:(max_retries - 1)
else Fiber.return (Ok `Failure)
;;
Expand Down
6 changes: 5 additions & 1 deletion test/expect-tests/dune_patch/dune_patch_tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ let test files (patch, patch_contents) =
; watch_exclusions = []
}
in
Scheduler.Run.go config ~timeout:5.0 ~file_watcher:No_watcher ~on_event:(fun _ _ -> ())
Scheduler.Run.go
config
~timeout_seconds:5.0
~file_watcher:No_watcher
~on_event:(fun _ _ -> ())
@@ fun () ->
let open Fiber.O in
let* () = Fiber.return @@ create_files ((patch, patch_contents) :: files) in
Expand Down
6 changes: 3 additions & 3 deletions test/expect-tests/dune_rpc_e2e/dune_rpc_e2e.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ let init_chan ~root_dir =
let* res = once () in
match res with
| Some res -> Fiber.return res
| None -> Scheduler.sleep 0.2 >>= loop
| None -> Scheduler.sleep ~seconds:0.2 >>= loop
in
loop ()
;;
Expand Down Expand Up @@ -109,7 +109,7 @@ let run ?env ~prog ~argv () =
Unix.close stdout_w;
Unix.close stderr_w;
( pid
, (let+ proc = Scheduler.wait_for_process ~timeout:3.0 pid in
, (let+ proc = Scheduler.wait_for_process ~timeout_seconds:3.0 pid in
if proc.status <> Unix.WEXITED 0
then (
let name =
Expand Down Expand Up @@ -195,5 +195,5 @@ let run run =
~finally:(fun () -> Sys.chdir cwd)
~f:(fun () ->
Sys.chdir (Path.to_string dir);
Scheduler.Run.go config run ~timeout:5.0 ~on_event:(fun _ _ -> ()))
Scheduler.Run.go config run ~timeout_seconds:5.0 ~on_event:(fun _ _ -> ()))
;;
8 changes: 4 additions & 4 deletions test/expect-tests/dune_rpc_e2e/dune_rpc_registry_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ module Scheduler = Dune_engine.Scheduler
module Poll_active = Dune_rpc_impl.Poll_active
open Dune_rpc_e2e

let try_ ~times ~delay ~f =
let try_ ~times ~delay_seconds ~f =
let rec loop = function
| 0 -> Fiber.return None
| n ->
let* res = f () in
(match res with
| Some s -> Fiber.return (Some s)
| None ->
let* () = Scheduler.sleep delay in
let* () = Scheduler.sleep ~seconds:delay_seconds in
loop (n - 1))
in
loop times
Expand Down Expand Up @@ -42,7 +42,7 @@ let run =
~finally:(fun () -> Sys.chdir cwd)
~f:(fun () ->
Sys.chdir (Path.to_string dir);
Scheduler.Run.go config run ~timeout:5.0 ~on_event:(fun _ _ -> ()))
Scheduler.Run.go config run ~timeout_seconds:5.0 ~on_event:(fun _ _ -> ()))
;;

let%expect_test "turn on dune watch and wait until the connection is listed" =
Expand All @@ -65,7 +65,7 @@ let%expect_test "turn on dune watch and wait until the connection is listed" =
in
with_dune_watch ~env (fun pid ->
let+ res =
try_ ~times:5 ~delay:0.2 ~f:(fun () ->
try_ ~times:5 ~delay_seconds:0.2 ~f:(fun () ->
let+ refresh = Poll_active.poll poll in
match refresh with
| Error _ -> None
Expand Down
9 changes: 7 additions & 2 deletions test/expect-tests/scheduler_tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ let default =
}
;;

let go ?(timeout = 0.3) ?(config = default) f =
let go ?(timeout_seconds = 0.3) ?(config = default) f =
try
Scheduler.Run.go ~timeout config ~file_watcher:No_watcher ~on_event:(fun _ _ -> ()) f
Scheduler.Run.go
~timeout_seconds
config
~file_watcher:No_watcher
~on_event:(fun _ _ -> ())
f
with
| Scheduler.Run.Shutdown.E Requested -> ()
;;
Expand Down
6 changes: 3 additions & 3 deletions test/expect-tests/timer_tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ let%expect_test "create and wait for timer" =
let now () = Unix.gettimeofday () in
let start = now () in
let duration = 0.2 in
let+ () = Scheduler.sleep duration in
let+ () = Scheduler.sleep ~seconds:duration in
assert (now () -. start >= duration);
print_endline "timer finished successfully");
[%expect {| timer finished successfully |}]
Expand All @@ -32,7 +32,7 @@ let%expect_test "multiple timers" =
(fun () ->
[ 0.3; 0.2; 0.1 ]
|> Fiber.parallel_iter ~f:(fun duration ->
let+ () = Scheduler.sleep duration in
let+ () = Scheduler.sleep ~seconds:duration in
printfn "finished %0.2f" duration));
[%expect {|
finished 0.10
Expand All @@ -52,7 +52,7 @@ let%expect_test "run process with timeout" =
in
Spawn.spawn ~prog ~argv:[ prog; "100000" ] () |> Pid.of_int
in
let+ _ = Scheduler.wait_for_process ~timeout:0.1 pid in
let+ _ = Scheduler.wait_for_process ~timeout_seconds:0.1 pid in
print_endline "sleep timed out");
[%expect {|
sleep timed out |}]
Expand Down

0 comments on commit 6b651ca

Please sign in to comment.