Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Eio.Semaphore lock-free #398

Merged
merged 2 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ test_luv:
EIO_BACKEND=luv dune runtest

dscheck:
dune exec -- ./lib_eio/tests/dscheck/test_semaphore.exe
dune exec -- ./lib_eio/tests/dscheck/test_cells.exe

docker:
Expand Down
83 changes: 46 additions & 37 deletions bench/bench_semaphore.ml
Original file line number Diff line number Diff line change
@@ -1,51 +1,60 @@
open Eio.Std

let run_sender ~n_iters ~batch_size ~ack sem =
for i = 1 to n_iters do
Eio.Semaphore.release sem;
if i mod batch_size = 0 then
Eio.Semaphore.acquire ack
done
(* Simulate other work in the domain, and also prevent it from going to sleep.
Otherwise, we're just measuring how long it takes the OS to wake a sleeping thread. *)
let rec spin () =
Fiber.yield ();
spin ()

let run_bench ~domain_mgr ~clock ~use_domains ~n_iters ~batch_size =
let sem = Eio.Semaphore.make 0 in
let ack = Eio.Semaphore.make 0 in
let run_bench ~domain_mgr ~clock ~use_domains ~n_iters ~n_resources =
let n_workers = 4 in
let sem = Eio.Semaphore.make n_resources in
let n_pending = Atomic.make n_workers in
let all_started, set_all_started = Promise.create () in
let t0 = ref 0.0 in
let run_worker ~n_iters sem =
Switch.run @@ fun sw ->
Fiber.fork_daemon ~sw spin;
if Atomic.fetch_and_add n_pending (-1) = 1 then (
Promise.resolve set_all_started ();
t0 := Eio.Time.now clock;
) else (
Promise.await all_started
);
for _ = 1 to n_iters do
Eio.Semaphore.acquire sem;
Fiber.yield ();
Eio.Semaphore.release sem
done
in
let run () =
if use_domains then (
Eio.Domain_manager.run domain_mgr @@ fun () ->
run_worker ~n_iters sem
) else (
run_worker ~n_iters sem
)
in
Gc.full_major ();
let _minor0, prom0, _major0 = Gc.counters () in
let t0 = Eio.Time.now clock in
Fiber.both
(fun () ->
if use_domains then (
Eio.Domain_manager.run domain_mgr @@ fun () ->
run_sender ~n_iters ~batch_size ~ack sem
) else (
run_sender ~n_iters ~batch_size ~ack sem
)
)
(fun () ->
for i = 1 to n_iters do
Eio.Semaphore.acquire sem;
if i mod batch_size = 0 then
Eio.Semaphore.release ack
done
);
Fiber.all (List.init n_workers (Fun.const run));
let t1 = Eio.Time.now clock in
let time_total = t1 -. t0 in
let time_total = t1 -. !t0 in
let time_per_iter = time_total /. float n_iters in
let _minor1, prom1, _major1 = Gc.counters () in
let prom = prom1 -. prom0 in
Printf.printf "%11b, %8d, %3d, %8.2f, %13.4f\n%!" use_domains n_iters batch_size (1e9 *. time_per_iter) (prom /. float n_iters)
Printf.printf "%11b, %8d, %11d, %8.2f, %13.4f\n%!" use_domains n_iters n_resources (1e9 *. time_per_iter) (prom /. float n_iters)

let main ~domain_mgr ~clock =
Printf.printf "use_domains, n_iters, batch, ns/iter, promoted/iter\n%!";
[false, 1_000_000, 1;
false, 1_000_000, 10;
false, 1_000_000, 100;
true, 100_000, 1;
true, 100_000, 10;
true, 100_000, 100]
|> List.iter (fun (use_domains, n_iters, batch_size) ->
run_bench ~domain_mgr ~clock ~use_domains ~n_iters ~batch_size
Printf.printf "use_domains, n_iters, resources, ns/iter, promoted/iter\n%!";
[false, 100_000, 2;
false, 100_000, 3;
false, 100_000, 4;
true, 10_000, 2;
true, 10_000, 3;
true, 10_000, 4]
|> List.iter (fun (use_domains, n_iters, n_resources) ->
run_bench ~domain_mgr ~clock ~use_domains ~n_iters ~n_resources
)

let () =
Expand Down
176 changes: 176 additions & 0 deletions lib_eio/sem_state.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
(* A lock-free semaphore, using Cells.

We have a number of resources (1 in the case of a mutex). Each time a user
wants a resource, the user decrements the counter. When finished with the
resource, the user increments the counter again.

If there are more users than resources then the counter will be negative. If
a user decrements the counter to a non-negative value then it gets ownership
of one of the free resources. If it decrements the counter to a negative
value then it must wait (by allocating a cell). When a user with a resource
increments the counter *from* a negative value, that user is responsible for
resuming one waiting cell (transferring ownership of the resource). This
ensures that every waiter will get woken exactly once.

Cancellation

We could consider cancelling a request to be simply replacing the callback
with a dummy one that immediately releases the resource. However, if callers
keep cancelling then the list of cancelled requests would keep growing.

Instead, we'd like cancellation simply to undo the effects of suspending, by
incrementing the counter and marking the cell as Finished (so that the
resumer will ignore it and move on to the next waiter, and the Finished
cell can be freed).

If the cancelling user increments from a negative value then it is responsible
for waking one user, which is fine as it is waking itself. However, it may find
itself incrementing from a non-negative one if it is racing with a resumer
(if the count is non-negative then once all current operations finish there
would be no suspended users, so the process of waking this user must have
already begun).

To handle this, a cancelling user first transitions the cell to In_transition,
then increments the counter, then transitions to the final Finished state,
in the usual case where it incremented from a negative value.

If a resumer runs at the same time then it may also increment the counter
from a non-negative value and try to wake this cell. It transitions the cell
from In_transition to Finished. The cancelling user will notice this when it
fails to CAS to Finished and can handle it.

If the cancelling user sees the Finished state after In_transition then it
knows that the resuming user has transferred to it the responsibility of
waking one user. If the cancelling user is also responsible for waking one
user then it performs an extra resume on behalf of the resuming user.

Finally, if the cancelling user is not responsible for waking anyone (even
itself) then it leaves the cell in In_transition (the CQS paper uses a
separate Refused state, but we don't actually need that). This can only
happen when a resume is happening at the same time. The resumer will
transition to Finished, creating an obligation to resume, but we've just
done that anyway. We know this In_transition state can't last long because
at the moment when the canceller incremented the counter all current
waiters, including itself, were in the process of being resumed. *)

module Cell = struct
type _ t =
| In_transition (* The suspender will try to CAS this soon. *)
| Request of (unit -> unit) (* Waiting for a resource. *)
| Finished (* Ownership of the resource has been transferred,
or the suspender cancelled. *)

let init = In_transition
(* We only resume when we know another thread is suspended or in the process of suspending. *)

let segment_order = 2

let dump f = function
| Request _ -> Fmt.string f "Request"
| Finished -> Fmt.string f "Finished"
| In_transition -> Fmt.string f "In_transition"
end

module Cells = Cells.Make(Cell)

type cell = unit Cell.t

type t = {
state : int Atomic.t; (* Free resources. Negative if there are waiters waiting. *)
cells : unit Cells.t;
}

type request = t * unit Cells.segment * unit Cell.t Atomic.t

(* Wake one waiter (and give it the resource being released). *)
let rec resume t =
let cell = Cells.next_resume t.cells in
match (Atomic.exchange cell Finished : cell) with
| Request r ->
(* The common case: there was a waiter for the value.
We pass ownership of the resource to it. *)
r ()
| Finished ->
(* The waiter has finished cancelling. Ignore it and resume the next one. *)
resume t
| In_transition ->
(* The consumer is in the middle of doing something and will soon try to
CAS to a new state. It will see that we got there first and handle the
resume when it's done. *)
()

(* [true] on success, or [false] if we need to suspend.
You MUST call [suspend] iff this returns [false].
The reason for splitting this is because e.g. [Semaphore] needs to get
the continuation for the fiber between [acquire] and [suspend]. *)
let acquire t =
let s = Atomic.fetch_and_add t.state (-1) in
(* We got a resource if we decremented *to* a non-negative number,
which happens if we decremented *from* a positive one. *)
s > 0

let suspend t k : request option =
let (segment, cell) = Cells.next_suspend t.cells in
if Atomic.compare_and_set cell In_transition (Request k) then Some (t, segment, cell)
else match Atomic.get cell with
| Finished ->
(* We got resumed before we could add the waiter. *)
k ();
None
| Request _ | In_transition ->
(* These are unreachable from the previously-observed non-In_transition state
without us taking some action first *)
assert false

let release t =
let s = Atomic.fetch_and_add t.state (+1) in
if s < 0 then (
(* We incremented from a negative value.
We are therefore responsible for waking one waiter. *)
resume t
)

let cancel (t, segment, cell) =
match (Atomic.get cell : cell) with
| Request _ as old ->
if Atomic.compare_and_set cell old In_transition then (
(* Undo the effect of [acquire] by incrementing the counter.
As always, if we increment from a negative value then we need to resume one waiter. *)
let need_resume = Atomic.fetch_and_add t.state (+1) < 0 in
if need_resume then (
if Atomic.compare_and_set cell In_transition Finished then (
(* The normal case. We resumed ourself by cancelling.
This is the only case we need to tell the segment because in all
other cases the resumer has already reached this segment so
freeing it is pointless. *)
Cells.cancel_cell segment
) else (
(* [release] got called at the same time and it also needed to resume one waiter.
So we call [resume] to handle the extra one, in addition to resuming ourself. *)
resume t
)
) else (
(* This can only happen if [release] ran at the same time and incremented the counter
before we did. Since we were suspended, and later we saw the counter
show that no one was, it must have decided to wake us. Either it has placed Finished
in the cell, or it's about to do so. Either way, we discharge the obligation to
wake someone by resuming ourself with a cancellation.
The resource returns to the free pool. We know the resumer has already finished with it
even if it hasn't updated the cell state yet. *)
);
true
) else false (* We got resumed first *)
| Finished -> false (* We got resumed first *)
| In_transition -> invalid_arg "Already cancelling!"

let dump f t =
Fmt.pf f "Semaphore (state=%d)@,%a"
(Atomic.get t.state)
Cells.dump t.cells

let create n =
if n < 0 then raise (Invalid_argument "n < 0");
{
cells = Cells.make ();
state = Atomic.make n;
}
65 changes: 26 additions & 39 deletions lib_eio/semaphore.ml
Original file line number Diff line number Diff line change
@@ -1,55 +1,42 @@
type state =
| Free of int
| Waiting of unit Waiters.t

type t = {
id : Ctf.id;
mutex : Mutex.t;
mutable state : state;
state : Sem_state.t;
}

let make n =
if n < 0 then raise (Invalid_argument "n < 0");
let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Semaphore;
{
id;
mutex = Mutex.create ();
state = Free n;
state = Sem_state.create n;
}

let release t =
Mutex.lock t.mutex;
Ctf.note_signal t.id;
match t.state with
| Free x when x = max_int -> Mutex.unlock t.mutex; raise (Sys_error "semaphore would overflow max_int!")
| Free x -> t.state <- Free (succ x); Mutex.unlock t.mutex
| Waiting q ->
begin match Waiters.wake_one q () with
| `Ok -> ()
| `Queue_empty -> t.state <- Free 1
end;
Mutex.unlock t.mutex
Sem_state.release t.state

let rec acquire t =
Mutex.lock t.mutex;
match t.state with
| Waiting q ->
Ctf.note_try_read t.id;
Waiters.await ~mutex:(Some t.mutex) q t.id
| Free 0 ->
t.state <- Waiting (Waiters.create ());
Mutex.unlock t.mutex;
acquire t
| Free n ->
Ctf.note_read t.id;
t.state <- Free (pred n);
Mutex.unlock t.mutex
let acquire t =
if not (Sem_state.acquire t.state) then (
(* No free resources.
We must wait until one of the existing users increments the counter and resumes us.
It's OK if they resume before we suspend; we'll just pick up the token they left. *)
Suspend.enter_unchecked (fun ctx enqueue ->
match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with
Copy link
Contributor

@polytypic polytypic Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an opportunity here to avoid the Suspend.enter_unchecked call (which, I assume, captures the continuation) by exposing the Sem_state protocol a bit more. Basically, require the acquirer to separately create the cell. After creating the cell, the acquirer would then read the cell state (as is done in Sem_state.suspend at the moment in case the CAS fails) before capturing the continuation. If the state has already been set to Finished, then there is no need to capture a continuation. Otherwise capture continuation and proceed as before. Basically, this results in a kind of double-checked pattern. This could improve performance in highly contested cases where it is possible that a resumer actually manages to see the InTransition (or Empty) state. The downside, of course, is making the Sem_state API/protocol more complex (adding an extra step).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. This case wouldn't work:

  1. The suspender decrements the count to say they're planning to suspend.
  2. The resumer resumes the cell.
  3. The suspender creates the cell.

We have to be able to initialise cells before the suspender does anything beyond modifying the counter. We don't want it creating cells before changing the counter because that includes the fast path (where we don't need a cell at all).

Copy link
Contributor

@polytypic polytypic Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, this is what I had in mind:

diff --git a/lib_eio/sem_state.ml b/lib_eio/sem_state.ml
index 24a301c..7ffebf6 100644
--- a/lib_eio/sem_state.ml
+++ b/lib_eio/sem_state.ml
@@ -109,6 +109,26 @@ let acquire t =
      which happens if we decremented *from* a positive one. *)
   s > 0
 
+let prepare_suspend t =
+  Cells.next_suspend t.cells
+
+let is_in_transition (_, (cell: cell Atomic.t)) =
+  match Atomic.get cell with
+  | In_transition -> true
+  | Finished | Request _ -> false
+
+let perform_suspend t (segment, (cell: cell Atomic.t)) k : request option =
+  if Atomic.compare_and_set cell In_transition (Request k) then Some (t, segment, cell)
+  else (
+    (* We got resumed before we could add the waiter. *)
+    k ();
+    None
+  )
+
 let suspend t k : request option =
   let (segment, cell) = Cells.next_suspend t.cells in
   if Atomic.compare_and_set cell In_transition (Request k) then Some (t, segment, cell)
diff --git a/lib_eio/semaphore.ml b/lib_eio/semaphore.ml
index 2733be5..a90fa64 100644
--- a/lib_eio/semaphore.ml
+++ b/lib_eio/semaphore.ml
@@ -20,21 +20,24 @@ let acquire t =
     (* No free resources.
        We must wait until one of the existing users increments the counter and resumes us.
        It's OK if they resume before we suspend; we'll just pick up the token they left. *)
-    Suspend.enter_unchecked (fun ctx enqueue ->
-        match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with
-        | None -> ()   (* Already resumed *)
-        | Some request ->
-          Ctf.note_try_read t.id;
-          match Fiber_context.get_error ctx with
-          | Some ex ->
-            if Sem_state.cancel request then enqueue (Error ex);
-            (* else already resumed *)
-          | None ->
-            Fiber_context.set_cancel_fn ctx (fun ex ->
-                if Sem_state.cancel request then enqueue (Error ex)
-                (* else already resumed *)
-              )
-      )
+    let segment_cell = Sem_state.prepare_suspend t.state in
+    (* We may have already been resumed at this point. So, check before capturing continuation. *)
+    if Sem_state.is_in_transition segment_cell then (
+      Suspend.enter_unchecked (fun ctx enqueue ->
+          match Sem_state.perform_suspend t.state segment_cell (fun () -> enqueue (Ok ())) with
+          | None -> ()   (* Already resumed *)
+          | Some request ->
+            Ctf.note_try_read t.id;
+            match Fiber_context.get_error ctx with
+            | Some ex ->
+              if Sem_state.cancel request then enqueue (Error ex);
+              (* else already resumed *)
+            | None ->
+              Fiber_context.set_cancel_fn ctx (fun ex ->
+                  if Sem_state.cancel request then enqueue (Error ex)
+                  (* else already resumed *)
+                )
+        )
+    )
   );
   Ctf.note_read t.id

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Pushing some of the work to outside of the suspend in case we can skip the suspend by the time it's done, at the cost of an extra Atomic.get.

I tried measuring it by incrementing slow and fast counters in is_in_transition and running the benchmark gives:

slow=850475, fast=75, frac=0.01%

So probably not worth it, I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I guess I should have disabled the single-process runs before measuring that... it's slow=51221, fast=98, frac=0.19% now (0.19% of the slow path cases). But I don't think that changes anything.

| None -> () (* Already resumed *)
| Some request ->
Ctf.note_try_read t.id;
match Fiber_context.get_error ctx with
| Some ex ->
if Sem_state.cancel request then enqueue (Error ex);
(* else already resumed *)
| None ->
Fiber_context.set_cancel_fn ctx (fun ex ->
if Sem_state.cancel request then enqueue (Error ex)
(* else already resumed *)
)
)
);
Ctf.note_read t.id

let get_value t =
Mutex.lock t.mutex;
let s = t.state in
Mutex.unlock t.mutex;
match s with
| Free n -> n
| Waiting _ -> 0
max 0 (Atomic.get t.state.state)
10 changes: 8 additions & 2 deletions lib_eio/tests/dscheck/dune
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
; We copy cells.ml here so we can build it using TracedAtomic instead of the default one.
(copy_files# (files ../../core/cells.ml))
(copy_files# (files ../../sem_state.ml))

(executable
(name test_cells)
(executables
(names test_cells test_semaphore)
(libraries dscheck optint fmt))

(rule
(alias dscheck)
(package eio)
(action (run %{exe:test_cells.exe})))

(rule
(alias dscheck)
(package eio)
(action (run %{exe:test_semaphore.exe})))