Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Eio.Condition #277

Merged
merged 2 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions lib_eio/condition.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
type t = {
waiters: unit Waiters.t;
mutex: Mutex.t;
id: Ctf.id
}

let create () = {
waiters = Waiters.create ();
id = Ctf.mint_id ();
mutex = Mutex.create ();
}

let await t mutex =
Mutex.lock t.mutex;
Eio_mutex.unlock mutex;
match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with
| () -> Eio_mutex.lock mutex
| exception ex -> Eio_mutex.lock mutex; raise ex

let await_no_mutex t =
Mutex.lock t.mutex;
Waiters.await ~mutex:(Some t.mutex) t.waiters t.id

let broadcast t =
Waiters.wake_all t.waiters ()
66 changes: 66 additions & 0 deletions lib_eio/condition.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
(** Waiters call {!await} in a loop as long as some condition is false.
Fibers that modify inputs to the condition must call [broadcast] soon
afterwards so that waiters can re-check the condition.

Example:

{[
let x = ref 0
let cond = Eio.Condition.create ()
let mutex = Eio.Mutex.create ()

let set_x value =
Eio.Mutex.use_rw ~protect:false mutex (fun () -> x := value);
Eio.Condition.broadcast cond

let await_x p =
Eio.Mutex.use_ro mutex (fun () ->
while not (p !x) do (* [x] cannot change, as mutex is locked. *)
Eio.Condition.await ~mutex cond (* Mutex is unlocked while suspended. *)
done
)
]}

It is used like this:

{[
Fiber.both
(fun () ->
traceln "x = %d" !x;
await_x ((=) 42);
traceln "x = %d" !x
)
(fun () ->
set_x 5;
Fiber.yield ();
set_x 7;
set_x 42;
)
]}
*)

type t

val create : unit -> t
(** [create ()] creates a new condition variable. *)

val await : t -> Eio_mutex.t -> unit
(** [await t mutex] suspends the current fiber until it is notified by [t].

You should lock [mutex] before testing whether the condition is true,
and leave it locked while calling this function.
It will be unlocked while the fiber is waiting and locked again before
returning (it is also locked again if the wait is cancelled). *)

val await_no_mutex : t -> unit
(** [await_no_mutex t] suspends the current fiber until it is notified by [t].

This is only safe to use in the case where [t] is only used within a single domain,
and the test for the condition was done without switching fibers.
i.e. you know the condition is still false, and no notification of a change can be sent
until [await_no_mutex] has finished suspending the fiber. *)

val broadcast : t -> unit
(** [broadcast t] wakes up any waiting fibers (by appending them to the run-queue to resume later).

If no fibers are waiting, nothing happens. *)
1 change: 1 addition & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ end

module Semaphore = Semaphore
module Mutex = Eio_mutex
module Condition = Condition
module Stream = Stream
module Exn = Exn
module Generic = Generic
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ module Semaphore = Semaphore
(** Mutual exclusion. *)
module Mutex = Eio_mutex

(** Waiting for a condition to become true. *)
module Condition = Condition

(** A stream/queue. *)
module Stream = Stream

Expand Down
4 changes: 3 additions & 1 deletion lib_eio/eio_mutex.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

Note that mutexes are often unnecessary for code running in a single domain, as
the scheduler will only switch to another fiber if you perform an operation that
can block. *)
can block.

@canonical Eio.Mutex *)

type t
(** The type for a concurrency-friendly mutex. *)
Expand Down
178 changes: 178 additions & 0 deletions tests/condition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Setting up the environment

```ocaml
# #require "eio.mock";;
```

```ocaml
open Eio.Std

module C = Eio.Condition
```

# Test cases

Simple case:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let condition = C.create () in
Fiber.both
(fun () ->
traceln "1: wait for condition";
C.await_no_mutex condition;
traceln "1: finished")
(fun () ->
traceln "2: broadcast condition";
C.broadcast condition;
traceln "2: finished");;
+1: wait for condition
+2: broadcast condition
+2: finished
+1: finished
- : unit = ()
```

Broadcast when no one is waiting doesn't block:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let condition = C.create () in
traceln "broadcast condition";
C.broadcast condition;
traceln "finished";;
+broadcast condition
+finished
- : unit = ()
```

Broadcast wakes all waiters at once:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let condition = C.create () in
Fiber.all [
(fun () ->
traceln "1: wait for condition";
C.await_no_mutex condition;
traceln "1: finished");
(fun () ->
traceln "2: wait for condition";
C.await_no_mutex condition;
traceln "2: finished");
(fun () ->
traceln "3: broadcast condition";
C.broadcast condition;
traceln "3: finished")
];;
+1: wait for condition
+2: wait for condition
+3: broadcast condition
+3: finished
+1: finished
+2: finished
- : unit = ()
```

## Typical single-domain use

```ocaml
let x = ref 0
let cond = Eio.Condition.create ()

let set value =
x := value;
Eio.Condition.broadcast cond

let await p =
(* Warning: only safe within a single-domain, and if [p] doesn't switch fibers! *)
while not (p !x) do
Eio.Condition.await_no_mutex cond
done
```

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Fiber.both
(fun () ->
traceln "x = %d" !x;
await ((=) 42);
traceln "x = %d" !x
)
(fun () ->
set 5;
Fiber.yield ();
set 7;
set 42;
);;
+x = 0
+x = 42
- : unit = ()
```

## Use with mutex

```ocaml
let x = ref 0
let cond = Eio.Condition.create ()
let mutex = Eio.Mutex.create ()

let set value =
Eio.Mutex.use_rw ~protect:false mutex (fun () -> x := value);
Eio.Condition.broadcast cond

let await p =
Eio.Mutex.use_ro mutex (fun () ->
while not (p !x) do
Eio.Condition.await cond mutex
done
)
```

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Fiber.both
(fun () ->
traceln "x = %d" !x;
await ((=) 42);
traceln "x = %d" !x
)
(fun () ->
set 5;
Fiber.yield ();
set 7;
set 42;
);;
+x = 0
+x = 42
- : unit = ()
```

Cancellation while waiting:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Fiber.first
(fun () ->
await ((=) 0);
assert false;
)
(fun () -> ());
Fiber.both
(fun () ->
traceln "x = %d" !x;
await ((=) 0);
traceln "x = %d" !x
)
(fun () ->
set 5;
Fiber.yield ();
set 0;
);;
+x = 42
+x = 0
- : unit = ()
```