Skip to content

Commit

Permalink
Merge pull request #277 from talex5/condition
Browse files Browse the repository at this point in the history
Add Eio.Condition
  • Loading branch information
talex5 committed Aug 11, 2022
2 parents f76139c + 49c2269 commit 5e7ad8e
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 1 deletion.
25 changes: 25 additions & 0 deletions lib_eio/condition.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
type t = {
waiters: unit Waiters.t;
mutex: Mutex.t;
id: Ctf.id
}

let create () = {
waiters = Waiters.create ();
id = Ctf.mint_id ();
mutex = Mutex.create ();
}

let await t mutex =
Mutex.lock t.mutex;
Eio_mutex.unlock mutex;
match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with
| () -> Eio_mutex.lock mutex
| exception ex -> Eio_mutex.lock mutex; raise ex

let await_no_mutex t =
Mutex.lock t.mutex;
Waiters.await ~mutex:(Some t.mutex) t.waiters t.id

let broadcast t =
Waiters.wake_all t.waiters ()
66 changes: 66 additions & 0 deletions lib_eio/condition.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
(** Waiters call {!await} in a loop as long as some condition is false.
Fibers that modify inputs to the condition must call [broadcast] soon
afterwards so that waiters can re-check the condition.
Example:
{[
let x = ref 0
let cond = Eio.Condition.create ()
let mutex = Eio.Mutex.create ()
let set_x value =
Eio.Mutex.use_rw ~protect:false mutex (fun () -> x := value);
Eio.Condition.broadcast cond
let await_x p =
Eio.Mutex.use_ro mutex (fun () ->
while not (p !x) do (* [x] cannot change, as mutex is locked. *)
Eio.Condition.await ~mutex cond (* Mutex is unlocked while suspended. *)
done
)
]}
It is used like this:
{[
Fiber.both
(fun () ->
traceln "x = %d" !x;
await_x ((=) 42);
traceln "x = %d" !x
)
(fun () ->
set_x 5;
Fiber.yield ();
set_x 7;
set_x 42;
)
]}
*)

type t

val create : unit -> t
(** [create ()] creates a new condition variable. *)

val await : t -> Eio_mutex.t -> unit
(** [await t mutex] suspends the current fiber until it is notified by [t].
You should lock [mutex] before testing whether the condition is true,
and leave it locked while calling this function.
It will be unlocked while the fiber is waiting and locked again before
returning (it is also locked again if the wait is cancelled). *)

val await_no_mutex : t -> unit
(** [await_no_mutex t] suspends the current fiber until it is notified by [t].
This is only safe to use in the case where [t] is only used within a single domain,
and the test for the condition was done without switching fibers.
i.e. you know the condition is still false, and no notification of a change can be sent
until [await_no_mutex] has finished suspending the fiber. *)

val broadcast : t -> unit
(** [broadcast t] wakes up any waiting fibers (by appending them to the run-queue to resume later).
If no fibers are waiting, nothing happens. *)
1 change: 1 addition & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ end

module Semaphore = Semaphore
module Mutex = Eio_mutex
module Condition = Condition
module Stream = Stream
module Exn = Exn
module Generic = Generic
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ module Semaphore = Semaphore
(** Mutual exclusion. *)
module Mutex = Eio_mutex

(** Waiting for a condition to become true. *)
module Condition = Condition

(** A stream/queue. *)
module Stream = Stream

Expand Down
4 changes: 3 additions & 1 deletion lib_eio/eio_mutex.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
Note that mutexes are often unnecessary for code running in a single domain, as
the scheduler will only switch to another fiber if you perform an operation that
can block. *)
can block.
@canonical Eio.Mutex *)

type t
(** The type for a concurrency-friendly mutex. *)
Expand Down
178 changes: 178 additions & 0 deletions tests/condition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Setting up the environment

```ocaml
# #require "eio.mock";;
```

```ocaml
open Eio.Std
module C = Eio.Condition
```

# Test cases

Simple case:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let condition = C.create () in
Fiber.both
(fun () ->
traceln "1: wait for condition";
C.await_no_mutex condition;
traceln "1: finished")
(fun () ->
traceln "2: broadcast condition";
C.broadcast condition;
traceln "2: finished");;
+1: wait for condition
+2: broadcast condition
+2: finished
+1: finished
- : unit = ()
```

Broadcast when no one is waiting doesn't block:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let condition = C.create () in
traceln "broadcast condition";
C.broadcast condition;
traceln "finished";;
+broadcast condition
+finished
- : unit = ()
```

Broadcast wakes all waiters at once:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
let condition = C.create () in
Fiber.all [
(fun () ->
traceln "1: wait for condition";
C.await_no_mutex condition;
traceln "1: finished");
(fun () ->
traceln "2: wait for condition";
C.await_no_mutex condition;
traceln "2: finished");
(fun () ->
traceln "3: broadcast condition";
C.broadcast condition;
traceln "3: finished")
];;
+1: wait for condition
+2: wait for condition
+3: broadcast condition
+3: finished
+1: finished
+2: finished
- : unit = ()
```

## Typical single-domain use

```ocaml
let x = ref 0
let cond = Eio.Condition.create ()
let set value =
x := value;
Eio.Condition.broadcast cond
let await p =
(* Warning: only safe within a single-domain, and if [p] doesn't switch fibers! *)
while not (p !x) do
Eio.Condition.await_no_mutex cond
done
```

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Fiber.both
(fun () ->
traceln "x = %d" !x;
await ((=) 42);
traceln "x = %d" !x
)
(fun () ->
set 5;
Fiber.yield ();
set 7;
set 42;
);;
+x = 0
+x = 42
- : unit = ()
```

## Use with mutex

```ocaml
let x = ref 0
let cond = Eio.Condition.create ()
let mutex = Eio.Mutex.create ()
let set value =
Eio.Mutex.use_rw ~protect:false mutex (fun () -> x := value);
Eio.Condition.broadcast cond
let await p =
Eio.Mutex.use_ro mutex (fun () ->
while not (p !x) do
Eio.Condition.await cond mutex
done
)
```

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Fiber.both
(fun () ->
traceln "x = %d" !x;
await ((=) 42);
traceln "x = %d" !x
)
(fun () ->
set 5;
Fiber.yield ();
set 7;
set 42;
);;
+x = 0
+x = 42
- : unit = ()
```

Cancellation while waiting:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
Fiber.first
(fun () ->
await ((=) 0);
assert false;
)
(fun () -> ());
Fiber.both
(fun () ->
traceln "x = %d" !x;
await ((=) 0);
traceln "x = %d" !x
)
(fun () ->
set 5;
Fiber.yield ();
set 0;
);;
+x = 42
+x = 0
- : unit = ()
```

0 comments on commit 5e7ad8e

Please sign in to comment.