Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Eio.Condition lock-free #397

Merged
merged 1 commit into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ test_luv:
rm -rf _build
EIO_BACKEND=luv dune runtest

dscheck:
dune exec -- ./lib_eio/tests/dscheck/test_cells.exe

docker:
docker build -t eio .
3 changes: 2 additions & 1 deletion dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
(astring (and (>= 0.8.5) :with-test))
(crowbar (and (>= 0.2) :with-test))
(mtime (>= 2.0.0))
(alcotest (and (>= 1.4.0) :with-test))))
(alcotest (and (>= 1.4.0) :with-test))
(dscheck (and (>= 0.1.0) :with-test))))
(package
(name eio_linux)
(synopsis "Eio implementation for Linux using io-uring")
Expand Down
1 change: 1 addition & 0 deletions eio.opam
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ depends: [
"crowbar" {>= "0.2" & with-test}
"mtime" {>= "2.0.0"}
"alcotest" {>= "1.4.0" & with-test}
"dscheck" {>= "0.1.0" & with-test}
"odoc" {with-doc}
]
conflicts: [
Expand Down
52 changes: 29 additions & 23 deletions lib_eio/condition.ml
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
type t = {
waiters: unit Waiters.t;
mutex: Mutex.t;
id: Ctf.id
}
type t = Broadcast.t

let create () = {
waiters = Waiters.create ();
id = Ctf.mint_id ();
mutex = Mutex.create ();
}
let create () = Broadcast.create ()

let await t mutex =
Mutex.lock t.mutex;
Eio_mutex.unlock mutex;
match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with
| () -> Eio_mutex.lock mutex
| exception ex -> Eio_mutex.lock mutex; raise ex
let await_generic ?mutex t =
match
Suspend.enter_unchecked (fun ctx enqueue ->
match Fiber_context.get_error ctx with
| Some ex ->
Option.iter Eio_mutex.unlock mutex;
enqueue (Error ex)
| None ->
match Broadcast.suspend t (fun () -> enqueue (Ok ())) with
| None ->
Option.iter Eio_mutex.unlock mutex
| Some request ->
Option.iter Eio_mutex.unlock mutex;
Fiber_context.set_cancel_fn ctx (fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
(* else already succeeded *)
)
)
with
| () -> Option.iter Eio_mutex.lock mutex
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Option.iter Eio_mutex.lock mutex;
Printexc.raise_with_backtrace ex bt

let await_no_mutex t =
Mutex.lock t.mutex;
Waiters.await ~mutex:(Some t.mutex) t.waiters t.id
let await t mutex = await_generic ~mutex t
let await_no_mutex t = await_generic t

let broadcast t =
Mutex.lock t.mutex;
Waiters.wake_all t.waiters ();
Mutex.unlock t.mutex
let broadcast = Broadcast.resume_all
108 changes: 108 additions & 0 deletions lib_eio/core/broadcast.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
(* See the Cells module for an overview of this system.

Each new waiter atomically increments the "suspend" pointer and writes
a callback there. The waking fiber removes all the callbacks and calls them.
In this version, "resume" never gets ahead of "suspend" (broadcasting just
brings it up-to-date with the "suspend" pointer).

When the resume fiber runs, some of the cells reserved for callbacks might
not yet have been filled. In this case, the resuming fiber just marks them
as needing to be resumed. When the suspending fiber continues, it will
notice this and continue immediately. *)

module Cell = struct
(* For any given cell, there are two actors running in parallel: the
suspender and the resumer.

The resumer only performs a single operation (resume).

The consumer waits to be resumed and then, optionally, cancels.

This means we only have three cases to think about:

1. Consumer adds request (Empty -> Request).
1a. Provider fulfills it (Request -> Resumed).
1b. Consumer cancels it (Request -> Cancelled).
2. Provider gets to cell first (Empty -> Resumed).
When the consumer tries to wait, it resumes immediately.

The Resumed state should never been seen. It exists only to allow the
request to be GC'd promptly. We could replace it with Empty, but having
separate states is clearer for debugging. *)

type _ t =
| Request of (unit -> unit)
| Cancelled
| Resumed
| Empty

let init = Empty

let segment_order = 2

let dump f = function
| Request _ -> Fmt.string f "Request"
| Empty -> Fmt.string f "Empty"
| Resumed -> Fmt.string f "Resumed"
| Cancelled -> Fmt.string f "Cancelled"
end

module Cells = Cells.Make(Cell)

type cell = unit Cell.t
type t = unit Cells.t

type request = unit Cells.segment * cell Atomic.t

let rec resume cell =
match (Atomic.get cell : cell) with
| Request r as cur ->
(* The common case: we have a waiter for the value *)
if Atomic.compare_and_set cell cur Resumed then r ();
(* else it was cancelled at the same time; ignore *)
| Empty ->
(* The consumer has reserved this cell but not yet stored the request.
We place Resumed there and it will handle it soon. *)
if Atomic.compare_and_set cell Empty Resumed then
() (* The consumer will deal with it *)
else
resume cell (* The Request was added concurrently; use it *)
| Cancelled -> ()
| Resumed ->
(* This state is unreachable because we (the provider) haven't set this yet *)
assert false

let cancel (segment, cell) =
match (Atomic.get cell : cell) with
| Request _ as old ->
if Atomic.compare_and_set cell old Cancelled then (
Cells.cancel_cell segment;
true
) else false (* We got resumed first *)
| Resumed -> false (* We got resumed first *)
| Cancelled -> invalid_arg "Already cancelled!"
| Empty ->
(* To call [cancel] the user needs a [request] value,
which they only get once we've reached the [Request] state.
[Empty] is unreachable from [Request]. *)
assert false

let suspend t k =
let (_, cell) as request = Cells.next_suspend t in
if Atomic.compare_and_set cell Empty (Request k) then Some request
else match Atomic.get cell with
| Resumed ->
(* Resumed before we could add the waiter *)
k ();
None
| Cancelled | Request _ | Empty ->
(* These are unreachable from the previously-observed non-Empty state
without us taking some action first *)
assert false

let resume_all t =
Cells.resume_all t resume

let create = Cells.make

let dump f t = Cells.dump f t
37 changes: 37 additions & 0 deletions lib_eio/core/broadcast.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
(** A lock-free queue of waiters that should all be resumed at once.

This uses {!Cells} internally. *)

type t

type request
(** A handle to a pending request that can be used to cancel it. *)

val create : unit -> t
(** [create ()] is a fresh broadcast queue. *)

val suspend : t -> (unit -> unit) -> request option
(** [suspend t fn] arranges for [fn ()] to be called on {!resume_all}.

[fn ()] may be called from the caller's context, or by [resume_all],
so it needs to be able to cope with running in any context where that
can run. For example, [fn] must be safe to call from a signal handler
if [resume_all] can be called from one. [fn] must not raise.

The returned request can be used to cancel. It can be [None] in the
(unlikely) event that [t] got resumed before the function returned. *)

val resume_all : t -> unit
(** [resume_all t] calls all non-cancelled callbacks attached to [t],
in the order in which they were suspended.

This function is lock-free and can be used safely even from a signal handler or GC finalizer. *)

val cancel : request -> bool
(** [cancel request] attempts to remove a pending request.

It returns [true] if the request was cancelled, or [false] if it got
resumed before that could happen. *)

val dump : Format.formatter -> t -> unit
(** Display the internal state of a queue, for debugging. *)