Skip to content

Commit

Permalink
Make Eio_linux.wakeup signal-safe
Browse files Browse the repository at this point in the history
This will eventually allow Waiters to be used from signal handlers and
GC finalizers (once that's lock-free too).

Previously, `wakeup` couldn't be used from a signal handler because it
took a lock (to ensure that the eventfd wasn't closed while it wrote to
it). Now, we avoid ever closing eventfds and simply keep free ones in a
pool.
  • Loading branch information
talex5 committed Dec 7, 2022
1 parent aa91a7b commit 1233cea
Showing 1 changed file with 50 additions and 19 deletions.
69 changes: 50 additions & 19 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,8 @@ type t = {
run_q : runnable Lf_queue.t;

(* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event.
In that case, [need_wakeup = true] and you must signal using [eventfd]. You must hold [eventfd_mutex]
when writing to or closing [eventfd]. *)
In that case, [need_wakeup = true] and you must signal using [eventfd]. *)
eventfd : FD.t;
eventfd_mutex : Mutex.t;

(* If [false], the main thread will check [run_q] before sleeping again
(possibly because an event has been or will be sent to [eventfd]).
Expand All @@ -216,21 +214,22 @@ let wake_buffer =
Bytes.set_int64_ne b 0 1L;
b

(* This can be called from any systhread (including ones not running Eio),
and also from signal handlers or GC finalizers. It must not take any locks. *)
let wakeup t =
Mutex.lock t.eventfd_mutex;
match
Log.debug (fun f -> f "Sending wakeup on eventfd %a" FD.pp t.eventfd);
Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *)
let sent = Unix.single_write (FD.get_exn "wakeup" t.eventfd) wake_buffer 0 8 in
Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *)
match t.eventfd.fd with
| `Closed -> () (* Domain has shut down (presumably after handling the event) *)
| `Open fd ->
let sent = Unix.single_write fd wake_buffer 0 8 in
assert (sent = 8)
with
| () -> Mutex.unlock t.eventfd_mutex
| exception ex -> Mutex.unlock t.eventfd_mutex; raise ex

(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *)
let enqueue_thread st k x =
Lf_queue.push st.run_q (Thread (k, x));
if Atomic.get st.need_wakeup then wakeup st

(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *)
let enqueue_failed_thread st k ex =
Lf_queue.push st.run_q (Failed_thread (k, ex));
if Atomic.get st.need_wakeup then wakeup st
Expand Down Expand Up @@ -964,7 +963,42 @@ module Low_level = struct
|> List.filter_map to_eio_sockaddr_t
end

external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd"
module EventFD_pool : sig
(* We need to write to event FDs from signal handlers and GC finalizers.
This means we can't take a lock, which means we can't easily prevent
the owning domain from closing the FD while we're writing to it
(which could result in us writing to an unreleaded file if the FD
got reused). To avoid that, we never close event FDs but just return them
to a free pool.
The case where this matters is:
1. Some other systhread calls [wakeup].
2 [wakeup] adds an item to the run-queue and sees it needs to send a wake-up event.
3. The domain wakes up for some other reason, handles the event, then shuts down.
4. The original systhread writes to the eventfd.
*)

val get : unit -> Unix.file_descr
(* Take the next free eventfd from the pool, or create a new one if the pool's empty.
You might get a few spurious events from it as other threads are shutting down,
so you must be able to cope with that. *)

val put : Unix.file_descr -> unit
(* [put fd] adds [fd] to the free pool. *)
end = struct
external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd"

let free = Lf_queue.create ()

let get () =
match Lf_queue.pop free with
| Some fd -> fd
| None -> eio_eventfd 0

let put fd =
Lf_queue.push free fd
end

type has_fd = < fd : FD.t >
type source = < Eio.Flow.source; Eio.Flow.close; has_fd >
Expand Down Expand Up @@ -1405,12 +1439,11 @@ let rec run : type a.
in
let run_q = Lf_queue.create () in
Lf_queue.push run_q IO;
let eventfd_mutex = Mutex.create () in
let sleep_q = Zzz.create () in
let io_q = Queue.create () in
let mem_q = Queue.create () in
let eventfd = FD.placeholder ~seekable:false ~close_unix:false in
let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } in
let st = { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } in
Log.debug (fun l -> l "starting main thread");
let rec fork ~new_fiber:fiber fn =
let open Effect.Deep in
Expand Down Expand Up @@ -1542,13 +1575,11 @@ let rec run : type a.
let new_fiber = Fiber_context.make_root () in
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
let fd = eio_eventfd 0 in
let fd = EventFD_pool.get () in
st.eventfd.fd <- `Open fd;
Switch.on_release sw (fun () ->
Mutex.lock st.eventfd_mutex;
FD.close st.eventfd;
Mutex.unlock st.eventfd_mutex;
Unix.close fd
let unix = FD.to_unix `Take st.eventfd in
EventFD_pool.put unix
);
Log.debug (fun f -> f "Monitoring eventfd %a" FD.pp st.eventfd);
result := Some (
Expand Down

0 comments on commit 1233cea

Please sign in to comment.