Skip to content

Commit 5199a36

Browse files
committed
Add interrupt support and implement async IO for blocking FDs
1 parent 0080913 commit 5199a36

File tree

9 files changed

+338
-67
lines changed

9 files changed

+338
-67
lines changed

lib/picos_select/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
(re_export picos.fd)
99
(re_export unix)
1010
picos.domain
11+
picos.tls
1112
picos_thread_atomic
1213
threads.posix
1314
psq

lib/picos_select/picos_select.ml

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
open Picos
22

3+
let intr_sig = Sys.sigusr2
4+
let is_intr_sig s = s == intr_sig
5+
let intr_sigs = [ intr_sig ]
6+
let intr_pending = Atomic.make 0
7+
38
type cancel_at =
49
| Cancel_at : {
510
time : Mtime.span;
@@ -174,9 +179,19 @@ and select_thread_continue s rd wr ex (rd_fds, wr_fds, ex_fds) =
174179
let wr = process_fds wr_fds wr (Picos_thread_atomic.exchange s.new_wr []) in
175180
let ex = process_fds ex_fds ex (Picos_thread_atomic.exchange s.new_ex []) in
176181
let tos = process_timeouts s in
182+
let tos =
183+
let n = Atomic.get intr_pending in
184+
if n = 0 then tos
185+
else begin
186+
Unix.kill (Unix.getpid ()) intr_sig;
187+
let idle = 0.000_001 (* 1μs *) in
188+
if tos < 0.0 || idle <= tos then idle else tos
189+
end
190+
in
177191
select_thread s tos rd wr ex
178192

179193
let select_thread s =
194+
if not Sys.win32 then Thread.sigmask SIG_BLOCK intr_sigs |> ignore;
180195
begin
181196
try
182197
let pipe_inn, pipe_out = Unix.pipe ~cloexec:true () in
@@ -283,3 +298,92 @@ let await_on file_descr op =
283298
with exn ->
284299
Computation.cancel computation exit_exn_bt;
285300
raise exn
301+
302+
(* *)
303+
304+
module Intr = struct
305+
type intr_status = Cleared | Signaled
306+
307+
type _ tdt =
308+
| Nothing : [> `Nothing ] tdt
309+
| Req : {
310+
state : state;
311+
mutable computation : intr_status Computation.t;
312+
}
313+
-> [> `Req ] tdt
314+
315+
type t = T : [< `Nothing | `Req ] tdt -> t [@@unboxed]
316+
317+
let cleared =
318+
let computation = Computation.create () in
319+
Computation.return computation Cleared;
320+
computation
321+
322+
let intr_key =
323+
Picos_tls.new_key @@ fun () : [ `Req ] tdt ->
324+
Req { state = get (); computation = cleared }
325+
326+
let handle _ =
327+
let (Req r) = Picos_tls.get intr_key in
328+
Computation.return r.computation Signaled
329+
330+
let intr_action trigger (Req r : [ `Req ] tdt) id =
331+
match Computation.await r.computation with
332+
| Cleared ->
333+
(* No signal needs to be delivered. *)
334+
remove_action trigger r.state id
335+
| Signaled ->
336+
(* Signal was delivered before timeout. *)
337+
remove_action trigger r.state id;
338+
if Atomic.fetch_and_add intr_pending 1 = 0 then begin
339+
(* We need to make sure at least one select thread will keep on
340+
triggering interrupts. *)
341+
wakeup r.state `Alive
342+
end
343+
| exception Exit ->
344+
(* The timeout was triggered. This must have been called from the
345+
select thread, which will soon trigger an interrupt. *)
346+
Atomic.incr intr_pending
347+
348+
let () =
349+
if not Sys.win32 then begin
350+
let previously_blocked = Thread.sigmask SIG_BLOCK intr_sigs in
351+
assert (not (List.exists is_intr_sig previously_blocked));
352+
let old_behavior = Sys.signal intr_sig (Sys.Signal_handle handle) in
353+
assert (old_behavior == Signal_default)
354+
end
355+
356+
let nothing = T Nothing
357+
358+
let[@alert "-handler"] req ~seconds =
359+
if Sys.win32 then
360+
invalid_arg "Picos_select.Intr is not supported on Windows"
361+
else begin
362+
let time = to_deadline ~seconds in
363+
let (Req r as req) = Picos_tls.get intr_key in
364+
assert (not (Computation.is_running r.computation));
365+
let id = next_id r.state in
366+
let computation = Computation.with_action req id intr_action in
367+
r.computation <- computation;
368+
let entry = Cancel_at { time; exn_bt = exit_exn_bt; computation } in
369+
add_timeout r.state id entry;
370+
let was_blocked : int list = Thread.sigmask SIG_UNBLOCK intr_sigs in
371+
assert (List.exists is_intr_sig was_blocked);
372+
T req
373+
end
374+
375+
let clr = function
376+
| T Nothing -> ()
377+
| T (Req r) ->
378+
let was_blocked : int list = Thread.sigmask SIG_BLOCK intr_sigs in
379+
assert (not (List.exists is_intr_sig was_blocked));
380+
if not (Computation.try_return r.computation Cleared) then begin
381+
while
382+
let count = Atomic.get intr_pending in
383+
count <= 0
384+
|| not (Atomic.compare_and_set intr_pending count (count - 1))
385+
do
386+
Thread.yield ()
387+
done
388+
end
389+
end

lib/picos_select/picos_select.mli

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,32 @@ val return_on : 'a Computation.t -> Picos_fd.t -> [ `R | `W | `E ] -> 'a -> unit
3131

3232
val await_on : Picos_fd.t -> [ `R | `W | `E ] -> Picos_fd.t
3333
(** [await_on fd op] awaits until [fd] becomes available for [op]. *)
34+
35+
module Intr : sig
36+
(** A mechanism to interrupt blocking {!Unix} IO operations.
37+
38+
⚠️ The mechanism uses the {!Sys.sigusr2} signal which should not be used
39+
for other purposes at the same time. *)
40+
41+
type t
42+
(** Represents an optional interrupt request. *)
43+
44+
val nothing : t
45+
(** A constant for a no request. {{!clr} [clr nothing]} does nothing. *)
46+
47+
val req : seconds:float -> t
48+
(** [req ~seconds] requests an interrupt in the form of a signal delivered to
49+
the thread that made the request within the specified number of [seconds].
50+
Blocking {!Unix} IO calls typically raise an error with the {{!Unix.EINTR}
51+
[Unix.EINTR]} error code when they are interrupted by a signal.
52+
53+
Regardless of whether the signal gets triggered or a system call gets
54+
interrupted, the request must be {{!clr} cleared}.
55+
56+
⚠️ Due to limitations of the OCaml system modules and unlike with typical
57+
timeout mechanisms, the interrupt may also be triggered sooner. *)
58+
59+
val clr : t -> unit
60+
(** [clr req] either cancels or acknowledges the interrupt request. Every
61+
{!req} must be cleared! *)
62+
end

lib/picos_stdio/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@
55
(libraries
66
(re_export unix)
77
(re_export picos.fd)
8+
picos.htbl
89
picos.select))

0 commit comments

Comments
 (0)