Skip to content

Commit

Permalink
Merge pull request #396 from talex5/simplify-cancel
Browse files Browse the repository at this point in the history
Simplify cancellation logic
  • Loading branch information
talex5 committed Dec 21, 2022
2 parents 8288651 + 036d83d commit 589320e
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 243 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ all:
dune build @runtest @all

bench:
dune exec -- ./bench/bench_condition.exe
dune exec -- ./bench/bench_buf_read.exe
dune exec -- ./bench/bench_mutex.exe
dune exec -- ./bench/bench_yield.exe
Expand Down
65 changes: 65 additions & 0 deletions bench/bench_condition.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
open Eio.Std

(* A publisher keeps updating a counter and signalling a condition.
Two consumers read the counter whenever they get a signal.
The producer stops after signalling [target], and the consumers stop after seeing it. *)

let n_iters = 100
let target = 100000

let run_publisher cond v =
for i = 1 to target do
Atomic.set v i;
(* traceln "set %d" i; *)
Eio.Condition.broadcast cond
done


let run_consumer cond v =
try
while true do
Fiber.both
(fun () -> Eio.Condition.await_no_mutex cond)
(fun () ->
let current = Atomic.get v in
(* traceln "saw %d" current; *)
if current = target then raise Exit
)
done
with Exit -> ()

let run_bench ?domain_mgr ~clock () =
let cond = Eio.Condition.create () in
let v = Atomic.make 0 in
let run_consumer () =
match domain_mgr with
| Some dm -> Eio.Domain_manager.run dm (fun () -> run_consumer cond v)
| None -> run_consumer cond v
in
Gc.full_major ();
let _minor0, prom0, _major0 = Gc.counters () in
let t0 = Eio.Time.now clock in
for _ = 1 to n_iters do
Fiber.all [
run_consumer;
run_consumer;
(fun () -> run_publisher cond v);
];
done;
let t1 = Eio.Time.now clock in
let time_total = t1 -. t0 in
let time_per_iter = time_total /. float n_iters in
let _minor1, prom1, _major1 = Gc.counters () in
let prom = prom1 -. prom0 in
Printf.printf "%11b, %7.2f, %13.4f\n%!" (domain_mgr <> None) (1e3 *. time_per_iter) (prom /. float n_iters)

let main ~domain_mgr ~clock =
Printf.printf "use_domains, ms/iter, promoted/iter\n%!";
run_bench ~clock ();
run_bench ~domain_mgr ~clock ()

let () =
Eio_main.run @@ fun env ->
main
~domain_mgr:(Eio.Stdenv.domain_mgr env)
~clock:(Eio.Stdenv.clock env)
2 changes: 1 addition & 1 deletion bench/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(executables
(names bench_stream bench_promise bench_semaphore bench_yield bench_cancel bench_mutex
bench_buf_read)
bench_buf_read bench_condition)
(libraries eio_main))
7 changes: 6 additions & 1 deletion doc/prelude.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ module Eio_main = struct

(* To avoid non-deterministic output, we run the examples a single domain. *)
let fake_domain_mgr = object (_ : #Eio.Domain_manager.t)
method run fn = fn ()
method run fn =
(* Since we're in the same domain, cancelling the calling fiber will
cancel the fake spawned one automatically. *)
let cancelled, _ = Promise.create () in
fn ~cancelled

method run_raw fn = fn ()
end

Expand Down
2 changes: 1 addition & 1 deletion lib_eio/buf_write.ml
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ let rec await_batch t =
);
t.wake_writer <- (fun () ->
(* Our caller has already set [wake_writer <- ignore]. *)
ignore (Fiber_context.clear_cancel_fn ctx : bool);
Fiber_context.clear_cancel_fn ctx;
enqueue (Ok ())
);
);
Expand Down
51 changes: 25 additions & 26 deletions lib_eio/core/cancel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ type state =
(* There is a tree of cancellation contexts for each domain.
A fiber is always in exactly one context, but can move to a new child and back (see [sub]).
While a fiber is performing a cancellable operation, it sets a cancel function.
When a context is cancelled, we attempt to call and remove each fiber's cancellation function, if any.
Cancelling always happens from the fiber's own domain, but the cancellation function may be removed
from another domain as soon as an operation is known to have succeeded.
An operation may either finish normally or be cancelled;
whoever manages to clear the cancellation function is responsible for resuming the continuation.
If cancelled, this is done by calling the cancellation function. *)
When a context is cancelled, we call each fiber's cancellation function (first replacing it with [ignore]).
Cancelling always happens from the fiber's own domain.
An operation may either finish normally or be cancelled (not both).
If a function can succeed in a separate domain,
the user's cancel function is responsible for ensuring that this is done atomically. *)
type t = {
mutable state : state;
children : t Lwt_dllist.t;
Expand All @@ -26,7 +25,7 @@ and fiber_context = {
tid : Ctf.id;
mutable cancel_context : t;
mutable cancel_node : fiber_context Lwt_dllist.node option; (* Our entry in [cancel_context.fibers] *)
cancel_fn : (exn -> unit) option Atomic.t;
mutable cancel_fn : exn -> unit; (* Encourage the current operation to finish *)
mutable vars : Hmap.t;
}

Expand Down Expand Up @@ -126,20 +125,19 @@ let protect fn =
We also do not check the parent context, to make sure the caller has a chance to handle the result. *)
fn ()

let rec cancel_internal t ex acc_fns =
let collect_cancel_fn fiber acc =
match Atomic.exchange fiber.cancel_fn None with
| None -> acc (* The operation succeeded and so can't be cancelled now *)
| Some cancel_fn -> cancel_fn :: acc
in
(* Mark the cancellation tree rooted at [t] as Cancelling (stopping at protected sub-contexts),
and return a list of all fibers in the newly-cancelling contexts. Since modifying the cancellation
tree can only be done from our domain, this is effectively an atomic operation. Once it returns,
new (non-protected) fibers cannot be added to any of the cancelling contexts. *)
let rec cancel_internal t ex acc_fibers =
match t.state with
| Finished -> invalid_arg "Cancellation context finished!"
| Cancelling _ -> acc_fns
| Cancelling _ -> acc_fibers
| On ->
let bt = Printexc.get_raw_backtrace () in
t.state <- Cancelling (ex, bt);
let acc_fns = Lwt_dllist.fold_r collect_cancel_fn t.fibers acc_fns in
Lwt_dllist.fold_r (cancel_child ex) t.children acc_fns
let acc_fibers = Lwt_dllist.fold_r List.cons t.fibers acc_fibers in
Lwt_dllist.fold_r (cancel_child ex) t.children acc_fibers
and cancel_child ex t acc =
if t.protected then acc
else cancel_internal t ex acc
Expand All @@ -149,17 +147,19 @@ let check_our_domain t =

let cancel t ex =
check_our_domain t;
let fns = cancel_internal t ex [] in
let fibers = cancel_internal t ex [] in
let cex = Cancelled ex in
let rec aux = function
| [] -> []
| fn :: fns ->
| x :: xs ->
let fn = x.cancel_fn in
x.cancel_fn <- ignore;
match fn cex with
| () -> aux fns
| exception ex2 -> ex2 :: aux fns
| () -> aux xs
| exception ex2 -> ex2 :: aux xs
in
if fns <> [] then (
match protect (fun () -> aux fns) with
if fibers <> [] then (
match aux fibers with
| [] -> ()
| exns -> raise (Cancel_hook_failed exns)
)
Expand Down Expand Up @@ -188,16 +188,15 @@ module Fiber_context = struct
let get_error t = get_error t.cancel_context

let set_cancel_fn t fn =
(* if Atomic.exchange t.cancel_fn (Some fn) <> None then failwith "Fiber already has a cancel function!" *)
Atomic.set t.cancel_fn (Some fn)
t.cancel_fn <- fn

let clear_cancel_fn t =
Atomic.exchange t.cancel_fn None <> None
t.cancel_fn <- ignore

let make ~cc ~vars =
let tid = Ctf.mint_id () in
Ctf.note_created tid Ctf.Task;
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = Atomic.make None; vars } in
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = ignore; vars } in
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers);
t

Expand Down
65 changes: 41 additions & 24 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@ module Cancel : sig
Ideally this should be done any time you have caught an exception and are planning to ignore it,
although if you forget then the next IO operation will typically abort anyway.
Quick clean-up actions (such as releasing a mutex or deleting a temporary file) are OK,
When handling a [Cancelled] exception, quick clean-up actions
(such as releasing a mutex or deleting a temporary file) are OK,
but operations that may block should be avoided.
For example, a network connection should simply be closed,
without attempting to send a goodbye message.
Expand Down Expand Up @@ -585,37 +586,32 @@ module Private : sig
or it may be cancelled.
If it is cancelled then the registered cancellation function is called.
This function will always be called from the fiber's own domain, but care must be taken
if the operation is being completed by another domain at the same time.
if the operation could be completed by another domain at the same time.
Consider the case of {!Stream.take}, which can be fulfilled by a {!Stream.add} from another domain.
We want to ensure that either the item is removed from the stream and returned to the waiting fiber,
or that the operation is cancelled and the item is not removed from the stream.
Therefore, cancelling and completing both attempt to clear the cancel function atomically,
Therefore, cancelling and completing both need to update an atomic value (with {!Atomic.compare_and_set})
so that only one can succeed. The case where [Stream.take] succeeds before cancellation:
+ A fiber calls [Suspend] and is suspended.
The callback sets a cancel function and registers a waiter on the stream.
+ When another domain has an item, it removes the cancel function (making the [take] uncancellable)
+ When another domain has an item, it marks the atomic as finished (making the [take] uncancellable)
and begins resuming the fiber with the new item.
+ If the taking fiber is cancelled after this, the cancellation will be ignored and the operation
+ If the taking fiber is cancelled after this, the cancellation must be ignored and the operation
will complete successfully. Future operations will fail immediately, however.
The case of cancellation winning the race:
+ A fiber calls [Suspend] and is suspended.
The callback sets a cancel function and registers a waiter on the stream.
+ The taking fiber is cancelled. Its cancellation function is called, which starts removing the waiter.
+ The taking fiber is cancelled. Its cancellation function is called,
which updates the atomic and starts removing the waiter.
+ If another domain tries to provide an item to the waiter as this is happening,
it will try to clear the cancel function and fail.
it will try to update the atomic too and fail.
The item will be given to the next waiter instead.
Note that there is a mutex around the list of waiters, so the taking domain
can't finish removing the waiter and start another operation while the adding
domain is trying to resume it.
In future, we may want to make this lock-free by using a fresh atomic
to hold the cancel function for each operation.
Note: A fiber will only have a cancel function set while it is suspended. *)

val cancellation_context : t -> Cancel.t
Expand All @@ -624,19 +620,40 @@ module Private : sig
val set_cancel_fn : t -> (exn -> unit) -> unit
(** [set_cancel_fn t fn] sets [fn] as the fiber's cancel function.
If the cancellation context is cancelled, the function is removed and called.
When the operation completes, you must call {!clear_cancel_fn} to remove it. *)
If [t]'s cancellation context is cancelled, the function is called.
It should attempt to make the current operation finish quickly, either with
a successful result or by raising the given exception.
Just before being called, the fiber's cancel function is replaced with [ignore]
so that [fn] cannot be called twice.
On success, the cancel function is cleared automatically when {!Suspend.enter} returns,
but for single-domain operations you may like to call {!clear_cancel_fn}
manually to remove it earlier.
[fn] will be called from [t]'s domain (from the fiber that called [cancel]).
[fn] must not switch fibers. If it did, this could happen:
+ Another suspended fiber in the same cancellation context resumes before
its cancel function is called.
+ It enters a protected block and starts a new operation.
+ [fn] returns.
+ We cancel the protected operation. *)

val clear_cancel_fn : t -> unit
(** [clear_cancel_fn t] is [set_cancel_fn t ignore].
This must only be called from the fiber's own domain.
val clear_cancel_fn : t -> bool
(** [clear_cancel_fn t] removes the function previously set with {!set_cancel_fn}, if any.
For single-domain operations, it can be useful to call this manually as soon as
the operation succeeds (i.e. when the fiber is added to the run-queue)
to prevent the cancel function from being called.
Returns [true] if this call removed the function, or [false] if there wasn't one.
This operation is atomic and thread-safe.
An operation that completes in another domain must use this to indicate that the operation is
finished (can no longer be cancelled) before enqueuing the result. If it returns [false],
the operation was cancelled first and the canceller has called (or is calling) the function.
If it returns [true], the caller is responsible for any resources owned by the function,
such as the continuation. *)
For operations where another domain may resume the fiber, your cancel function
will need to cope with being called after the operation has succeeded. In that
case you should not call [clear_cancel_fn]. The backend will do it automatically
just before resuming your fiber. *)

val get_error : t -> exn option
(** [get_error t] is [Cancel.get_error (cancellation_context t)] *)
Expand Down
6 changes: 1 addition & 5 deletions lib_eio/core/single_waiter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ let await t id =
enqueue (Error ex)
);
t.wake <- (fun x ->
let cleared = Cancel.Fiber_context.clear_cancel_fn ctx in
(* We're not attempting to be thread-safe, so the cancel function can
only be cleared from the same domain. In that case, [wake] will have
been reset before switching to another fiber. *)
assert cleared;
Cancel.Fiber_context.clear_cancel_fn ctx;
t.wake <- ignore;
Ctf.note_read ~reader:id ctx.tid;
enqueue x
Expand Down
15 changes: 9 additions & 6 deletions lib_eio/core/waiters.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
type 'a waiter = {
finished : bool Atomic.t;
enqueue : ('a, exn) result -> unit;
ctx : Cancel.Fiber_context.t;
}

type 'a t = 'a waiter Lwt_dllist.t
Expand All @@ -17,8 +17,8 @@ let add_waiter t cb =

(* Wake a waiter with the result.
Returns [false] if the waiter got cancelled while we were trying to wake it. *)
let wake { enqueue; ctx } r =
if Cancel.Fiber_context.clear_cancel_fn ctx then (enqueue (Ok r); true)
let wake { enqueue; finished } r =
if Atomic.compare_and_set finished false true then (enqueue (Ok r); true)
else false (* [cancel] gets called and we enqueue an error *)

let wake_all (t:_ t) v =
Expand All @@ -45,16 +45,19 @@ let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue =
enqueue (Error ex)
| None ->
let resolved_waiter = ref Hook.null in
let finished = Atomic.make false in
let enqueue x =
Ctf.note_read ~reader:id ctx.tid;
enqueue x
in
let cancel ex =
Hook.remove !resolved_waiter;
enqueue (Error ex)
if Atomic.compare_and_set finished false true then (
Hook.remove !resolved_waiter;
enqueue (Error ex)
)
in
Cancel.Fiber_context.set_cancel_fn ctx cancel;
let waiter = { enqueue; ctx } in
let waiter = { enqueue; finished } in
match mutex with
| None ->
resolved_waiter := add_waiter t waiter
Expand Down

0 comments on commit 589320e

Please sign in to comment.