Skip to content

Commit abc01b2

Browse files
committed
Add Fiber.try_suspend and Fiber.unsuspend helpers
I noticed I had failed to account for one case in my previous `await` handler implementations. This fixes that by introducing a pair of helpers for suspending and unsuspending a fiber.
1 parent 42cd31d commit abc01b2

File tree

4 files changed

+60
-146
lines changed

4 files changed

+60
-146
lines changed

lib/picos/bootstrap/picos_bootstrap.ml

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,22 +108,23 @@ module Computation = struct
108108

109109
let try_attach t trigger = try_attach t trigger Backoff.default
110110

111-
let rec detach t backoff =
111+
let rec unsafe_unsuspend t backoff =
112112
match Atomic.get t with
113-
| Returned _ | Canceled _ -> ()
113+
| Returned _ -> true
114+
| Canceled _ -> false
114115
| Continue r as before ->
115116
let after =
116117
if fifo_bit <= r.balance_and_mode then
117118
Continue
118119
{ r with balance_and_mode = r.balance_and_mode - (2 * one) }
119120
else gc (r.balance_and_mode land fifo_bit) [] r.triggers
120121
in
121-
if not (Atomic.compare_and_set t before after) then
122-
detach t (Backoff.once backoff)
122+
Atomic.compare_and_set t before after
123+
|| unsafe_unsuspend t (Backoff.once backoff)
123124

124125
let detach t trigger =
125126
Trigger.signal trigger;
126-
detach t Backoff.default
127+
unsafe_unsuspend t Backoff.default |> ignore
127128

128129
type packed = Packed : 'a t -> packed
129130

@@ -231,6 +232,26 @@ module Fiber = struct
231232
let forbid t body = explicitly t body ~forbid:true
232233
let permit t body = explicitly t body ~forbid:false
233234

235+
let try_suspend (Fiber r) trigger x y resume =
236+
if not r.forbid then begin
237+
if Computation.try_attach r.computation trigger then
238+
Trigger.on_signal trigger x y resume
239+
|| begin
240+
Computation.detach r.computation trigger;
241+
false
242+
end
243+
else if Computation.is_canceled r.computation then begin
244+
Trigger.dispose trigger;
245+
false
246+
end
247+
else Trigger.on_signal trigger x y resume
248+
end
249+
else Trigger.on_signal trigger x y resume
250+
251+
let[@inline] unsuspend (Fiber r : t) trigger =
252+
assert (Trigger.is_signaled trigger);
253+
r.forbid || Computation.unsafe_unsuspend r.computation Backoff.default
254+
234255
module FLS = struct
235256
type 'a key = { index : int; default : non_float; compute : unit -> 'a }
236257

lib/picos/picos.mli

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,25 @@ module Fiber : sig
10421042
val create : forbid:bool -> 'a Computation.t -> t
10431043
(** [create ~forbid computation] creates a new fiber. *)
10441044

1045+
val try_suspend :
1046+
t -> Trigger.t -> 'x -> 'y -> (Trigger.t -> 'x -> 'y -> unit) -> bool
1047+
(** [try_suspend fiber trigger x y resume] tries to suspend the [fiber] to
1048+
await for the [trigger] to be {{!Trigger.signal} signaled}. If the result
1049+
is [false], then the [trigger] is guaranteed to be in the signaled state
1050+
and the fiber should be eventually resumed. If the result is [true], then
1051+
the fiber was suspended, meaning that the [trigger] will have had the
1052+
[resume] action {{!Trigger.on_signal} attached} to it and the trigger has
1053+
potentially been {{!Computation.try_attach} attached} to the
1054+
{!computation} of the fiber. *)
1055+
1056+
val unsuspend : t -> Trigger.t -> bool
1057+
(** [unsuspend fiber trigger] makes sure that the [trigger] will not be
1058+
attached to the computation of the [fiber]. Returns [false] in case the
1059+
fiber has been canceled and propagation of cancelation is not forbidden.
1060+
Otherwise returns [true].
1061+
1062+
⚠️ The trigger must be in the signaled state! *)
1063+
10451064
include
10461065
Intf.Fiber
10471066
with type t := t

lib/picos_fifos/picos_fifos.ml

Lines changed: 10 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ type ready =
77
| Spawn of Fiber.t * (unit -> unit)
88
| Continue of Fiber.t * (unit, unit) Effect.Deep.continuation
99
| Resume of Fiber.t * (Exn_bt.t option, unit) Effect.Deep.continuation
10-
| Resume_forbidden of (Exn_bt.t option, unit) Effect.Deep.continuation
1110

1211
type t = {
1312
ready : ready Queue.t;
@@ -77,70 +76,10 @@ let rec next t =
7776
Effect.Deep.continue k ()
7877
| Some exn_bt -> Exn_bt.discontinue k exn_bt)
7978
| Trigger.Await trigger ->
80-
(* We handle [Await] last as it is probably the least latency
81-
sensitive effect. It could also be that another fiber running in
82-
parallel is just about to signal the trigger, so checking the
83-
trigger last gives a tiny bit of time for that to happen and
84-
potentially allows us to make better/different decisions here. *)
8579
Some
8680
(fun k ->
87-
(* The non-blocking logic below for suspending a fiber with
88-
support for parallelism safe cancelation is somewhat
89-
intricate. Hopefully the comments help to understand it. *)
90-
if Fiber.has_forbidden fiber then begin
91-
(* Fiber has forbidden propagation of cancelation. This is
92-
the easy case to handle. *)
93-
if Trigger.on_signal trigger fiber k t.resume then begin
94-
(* Fiber is now suspended and can be resumed through the
95-
trigger. We just continue the next ready fiber. *)
96-
next t
97-
end
98-
else begin
99-
(* The trigger was already signaled. We could now freely
100-
choose which fiber to continue here, but in this
101-
scheduler we choose to continue the current fiber. *)
102-
Effect.Deep.continue k None
103-
end
104-
end
105-
else begin
106-
(* Fiber permits propagation of cancelation. We support
107-
cancelation and so first try to attach the trigger to the
108-
computation of the fiber. *)
109-
if Fiber.try_attach fiber trigger then begin
110-
(* The trigger was successfully attached, which means the
111-
computation has not been canceled. *)
112-
if Trigger.on_signal trigger fiber k t.resume then begin
113-
(* Fiber is now suspended and can be resumed through the
114-
trigger. That can now happen by signaling the trigger
115-
directly or by canceling the computation of the fiber,
116-
which will also signal the trigger. We just continue
117-
the next ready fiber. *)
118-
next t
119-
end
120-
else begin
121-
(* The trigger was already signaled. We first need to
122-
ensure that the trigger is detached from the
123-
computation of the fiber. *)
124-
Fiber.detach fiber trigger;
125-
(* We could now freely decide which fiber to continue, but
126-
in this scheduler we choose to continue the current
127-
fiber. *)
128-
Fiber.resume fiber k
129-
end
130-
end
131-
else begin
132-
(* We could not attach the trigger to the computation of the
133-
fiber, which means that either the computation has been
134-
canceled or the trigger has been signaled. We still need
135-
to ensure that the trigger really is put into the
136-
signaled state before the fiber is continued. *)
137-
Trigger.dispose trigger;
138-
(* We could now freely decide which fiber to continue, but
139-
in this scheduler we choose to continue the current
140-
fiber. *)
141-
Fiber.resume fiber k
142-
end
143-
end)
81+
if Fiber.try_suspend fiber trigger fiber k t.resume then next t
82+
else Fiber.resume fiber k)
14483
| _ -> None
14584
and retc () =
14685
Atomic.decr t.num_alive_fibers;
@@ -149,7 +88,6 @@ let rec next t =
14988
Effect.Deep.match_with main () { retc; exnc = raise; effc }
15089
| Continue (fiber, k) -> Fiber.continue fiber k ()
15190
| Resume (fiber, k) -> Fiber.resume fiber k
152-
| Resume_forbidden k -> Effect.Deep.continue k None
15391
| exception Queue.Empty ->
15492
if Atomic.get t.num_alive_fibers <> 0 then begin
15593
if Atomic.get t.needs_wakeup then begin
@@ -171,33 +109,14 @@ let run ~forbid main =
171109
and mc = Picos_ptmc.get () in
172110
let rec t = { ready; needs_wakeup; num_alive_fibers; mc; resume }
173111
and resume trigger fiber k =
174-
begin
175-
if Fiber.has_forbidden fiber then
176-
(* Fiber has forbidden propagation of cancelation. This is the easy
177-
case. *)
178-
Queue.push t.ready (Resume_forbidden k)
179-
else
180-
let resume = Resume (fiber, k) in
181-
if Fiber.is_canceled fiber then begin
182-
(* The fiber has been canceled so we give priority to it in this
183-
scheduler.
184-
185-
Assuming fibers are written to cooperate and perform cleanup
186-
promptly, this can be advantageous as it allows resources to be
187-
released more quickly. However, malicious or buggy fibers could
188-
use this to prevent other fibers from running. *)
189-
Queue.push_head t.ready resume
190-
end
191-
else begin
192-
(* The fiber hasn't yet been canceled.
193-
194-
As propagation of cancelation was not forbidden, and we have
195-
attached a trigger, we need to ensure that the trigger will not be
196-
leaked. *)
197-
Fiber.detach fiber trigger;
198-
Queue.push t.ready resume
199-
end
200-
end;
112+
let resume = Resume (fiber, k) in
113+
if Fiber.unsuspend fiber trigger then
114+
(* The fiber has not been canceled, so we queue the fiber normally. *)
115+
Queue.push t.ready resume
116+
else
117+
(* The fiber has been canceled, so we give priority to it in this
118+
scheduler. *)
119+
Queue.push_head t.ready resume;
201120
(* As the trigger might have been signaled from another domain or systhread
202121
outside of the scheduler, we check whether the scheduler needs to be
203122
woken up and take care of it if necessary. *)

lib/picos_threaded/picos_threaded.ml

Lines changed: 5 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,61 +15,16 @@ let block trigger ptmc =
1515
Picos_ptmc.unlock ptmc;
1616
raise exn
1717

18-
let release _ _ ptmc =
18+
let resume trigger fiber ptmc =
19+
let _is_canceled : bool = Fiber.unsuspend fiber trigger in
1920
(* This will be called when the trigger is signaled. We simply broadcast on
2021
the per thread condition variable. *)
2122
Picos_ptmc.broadcast ptmc
2223

2324
let[@alert "-handler"] rec await fiber trigger =
24-
(* The non-blocking logic below for suspending a fiber with support for
25-
parallelism safe cancelation is somewhat intricate. Hopefully the comments
26-
help to understand it. *)
27-
if Fiber.has_forbidden fiber then begin
28-
(* Fiber has forbidden propagation of cancelation. This is the easy case to
29-
handle. *)
30-
let ptmc = Picos_ptmc.get () in
31-
(* We could also have stored the per thread mutex and condition in the
32-
context and avoid getting it here, but this is likely cheap enough at
33-
this point anyway and makes the context trivial. *)
34-
if Trigger.on_signal trigger () ptmc release then begin
35-
(* Fiber is now suspended and can be resumed through the trigger. We
36-
block the thread on the per thread mutex and condition waiting for the
37-
trigger. *)
38-
block trigger ptmc
39-
end;
40-
(* We return to continue the fiber. *)
41-
None
42-
end
43-
else begin
44-
(* Fiber permits propagation of cancelation. We support cancelation and so
45-
first try to attach the trigger to the computation of the fiber. *)
46-
if Fiber.try_attach fiber trigger then
47-
(* The trigger was successfully attached, which means the computation has
48-
not been canceled. *)
49-
let ptmc = Picos_ptmc.get () in
50-
if Trigger.on_signal trigger () ptmc release then begin
51-
(* Fiber is now suspended and can be resumed through the trigger. That
52-
can now happen by signaling the trigger directly or by canceling the
53-
computation of the fiber, which will also signal the trigger. We
54-
block the thread on the per thread mutex and condition waiting for
55-
the trigger. *)
56-
block trigger ptmc
57-
end
58-
else begin
59-
(* The trigger was already signaled. We first need to ensure that the
60-
trigger is detached from the computation of the fiber. *)
61-
Fiber.detach fiber trigger
62-
end
63-
else begin
64-
(* We could not attach the trigger to the computation of the fiber, which
65-
means that either the computation has been canceled or the trigger has
66-
been signaled. We still need to ensure that the trigger really is put
67-
into the signaled state before the fiber is continued. *)
68-
Trigger.dispose trigger
69-
end;
70-
(* We return to continue or discontinue the fiber. *)
71-
Fiber.canceled fiber
72-
end
25+
let ptmc = Picos_ptmc.get () in
26+
if Fiber.try_suspend fiber trigger fiber ptmc resume then block trigger ptmc;
27+
Fiber.canceled fiber
7328

7429
and current fiber =
7530
(* The current handler must never propagate cancelation, but it would be

0 commit comments

Comments
 (0)