Skip to content

Commit 31675f0

Browse files
committed
Use heartbeats to wake up runners in multififo
1 parent 397e44a commit 31675f0

File tree

3 files changed

+113
-61
lines changed

3 files changed

+113
-61
lines changed

lib/picos_mux.multififo/dune

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
(-> select.none.ml))
1313
multicore-magic
1414
picos.thread
15-
picos_aux.mpmcq))
15+
picos_aux.mpmcq
16+
threads.posix))

lib/picos_mux.multififo/picos_mux_multififo.ml

Lines changed: 109 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@ type ready =
1414
* ((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation
1515
| Return of Fiber.t * (unit, unit) Effect.Deep.continuation
1616

17+
let state_running = 1 lsl 0
18+
let state_idlers = 1 lsl 1
19+
let state_arrhytmia = 1 lsl 2
20+
let state_killed = 1 lsl 3
21+
1722
type t = {
18-
mutable num_waiters_non_zero : bool;
19-
num_waiters : int ref;
23+
mutable state : int;
2024
num_started : int Atomic.t;
2125
mutex : Mutex.t;
22-
condition : Condition.t;
26+
worker_condition : Condition.t;
27+
heartbeat_condition : Condition.t;
2328
handler : (unit, unit) Effect.Deep.handler;
2429
quota : int;
25-
mutable run : bool;
2630
mutable threads : [ `Nothing | `Per_thread ] tdt array;
2731
mutable threads_num : int;
2832
}
@@ -100,11 +104,29 @@ let next_index t i =
100104
let i = i + 1 in
101105
if i < t.threads_num then i else 0
102106

103-
let[@inline] relaxed_wakeup t ~known_not_empty ready =
104-
if t.num_waiters_non_zero && (known_not_empty || Mpmcq.length ready != 0) then begin
107+
let[@inline never] wakeup_heartbeat t =
108+
Mutex.lock t.mutex;
109+
let state = t.state in
110+
if state_arrhytmia <= state then begin
111+
t.state <- state land lnot state_arrhytmia;
112+
Mutex.unlock t.mutex;
113+
Condition.broadcast t.heartbeat_condition
114+
end
115+
else begin
116+
Mutex.unlock t.mutex
117+
end
118+
119+
let[@inline] wakeup_heartbeat t =
120+
if state_arrhytmia <= t.state then wakeup_heartbeat t
121+
122+
let kill t =
123+
if t.state < state_killed then begin
105124
Mutex.lock t.mutex;
125+
let state = t.state in
126+
if state != state lor state_killed then t.state <- state lor state_killed;
106127
Mutex.unlock t.mutex;
107-
Condition.signal t.condition
128+
Condition.broadcast t.heartbeat_condition;
129+
Condition.broadcast t.worker_condition
108130
end
109131

110132
let exec ready (Per_thread p : per_thread) t =
@@ -149,7 +171,6 @@ let rec next (Per_thread p as pt : per_thread) =
149171
match Mpmcq.pop_exn ready with
150172
| ready ->
151173
let t = p.context in
152-
relaxed_wakeup t ~known_not_empty:false p.ready;
153174
exec ready pt t
154175
| exception Mpmcq.Empty ->
155176
p.fiber <- Fiber.Maybe.nothing;
@@ -162,9 +183,7 @@ and try_steal (Per_thread p as pt : per_thread) t i =
162183
| Nothing -> try_steal pt t (next_index t i)
163184
| Per_thread other_p -> begin
164185
match Mpmcq.pop_exn other_p.ready with
165-
| ready ->
166-
relaxed_wakeup t ~known_not_empty:false other_p.ready;
167-
exec ready pt t
186+
| ready -> exec ready pt t
168187
| exception Mpmcq.Empty -> try_steal pt t (next_index t i)
169188
end
170189
end
@@ -173,36 +192,32 @@ and try_steal (Per_thread p as pt : per_thread) t i =
173192
and wait (pt : per_thread) t =
174193
if any_fibers_alive t then begin
175194
Mutex.lock t.mutex;
176-
let n = !(t.num_waiters) + 1 in
177-
t.num_waiters := n;
178-
if n = 1 then t.num_waiters_non_zero <- true;
179-
if (not (any_fibers_ready t)) && any_fibers_alive t then begin
180-
match Condition.wait t.condition t.mutex with
195+
let state = t.state in
196+
if state != state lor state_idlers land lnot state_arrhytmia then
197+
t.state <- state lor state_idlers land lnot state_arrhytmia;
198+
if state_arrhytmia <= state then Condition.broadcast t.heartbeat_condition;
199+
if state < state_killed && not (any_fibers_ready t) then begin
200+
match Condition.wait t.worker_condition t.mutex with
181201
| () ->
182-
let n = !(t.num_waiters) - 1 in
183-
t.num_waiters := n;
184-
if n = 0 then t.num_waiters_non_zero <- false;
202+
let state = t.state in
203+
if state != state lor state_idlers then
204+
t.state <- state lor state_idlers;
185205
Mutex.unlock t.mutex;
186-
next pt
206+
if state < state_killed then next pt
187207
| exception async_exn ->
188-
let n = !(t.num_waiters) - 1 in
189-
t.num_waiters := n;
190-
if n = 0 then t.num_waiters_non_zero <- false;
208+
let state = t.state in
209+
if state != state lor state_idlers then
210+
t.state <- state lor state_idlers;
191211
Mutex.unlock t.mutex;
192212
raise async_exn
193213
end
194214
else begin
195-
let n = !(t.num_waiters) - 1 in
196-
t.num_waiters := n;
197-
if n = 0 then t.num_waiters_non_zero <- false;
198215
Mutex.unlock t.mutex;
199-
next pt
216+
if state < state_killed then next pt
200217
end
201218
end
202219
else begin
203-
Mutex.lock t.mutex;
204-
Mutex.unlock t.mutex;
205-
Condition.broadcast t.condition
220+
kill t
206221
end
207222

208223
let default_fatal_exn_handler exn =
@@ -238,25 +253,19 @@ let per_thread context =
238253
match Picos_thread.TLS.get_exn per_thread_key with
239254
| Per_thread p_current when p_original.context == p_current.context ->
240255
(* We are running on a thread of this scheduler *)
241-
if Fiber.unsuspend fiber trigger then
242-
Mpmcq.push p_current.ready resume
243-
else Mpmcq.push_head p_current.ready resume;
244-
relaxed_wakeup p_current.context ~known_not_empty:true p_current.ready
256+
let ready = p_current.ready in
257+
if Fiber.unsuspend fiber trigger then Mpmcq.push ready resume
258+
else Mpmcq.push_head ready resume;
259+
let t = p_current.context in
260+
wakeup_heartbeat t
245261
| _ | (exception Picos_thread.TLS.Not_set) ->
246262
(* We are running on a foreign thread *)
247-
if Fiber.unsuspend fiber trigger then
248-
Mpmcq.push p_original.ready resume
249-
else Mpmcq.push_head p_original.ready resume;
263+
let ready = p_original.ready in
264+
if Fiber.unsuspend fiber trigger then Mpmcq.push ready resume
265+
else Mpmcq.push_head ready resume;
250266
let t = p_original.context in
251-
let non_zero =
252-
match Mutex.lock t.mutex with
253-
| () ->
254-
let non_zero = t.num_waiters_non_zero in
255-
Mutex.unlock t.mutex;
256-
non_zero
257-
| exception Sys_error _ -> false
258-
in
259-
if non_zero then Condition.signal t.condition);
267+
wakeup_heartbeat t;
268+
Condition.signal t.worker_condition);
260269
p.return <-
261270
Some
262271
(fun k ->
@@ -293,6 +302,43 @@ let[@inline never] with_per_thread new_pt fn old_p =
293302
| value -> returned value old_p
294303
| exception exn -> raised exn old_p
295304

305+
let rec heartbeat_thread t nth =
306+
if state_idlers lor state_running = t.state && any_fibers_ready t then begin
307+
if Mutex.try_lock t.mutex then begin
308+
t.state <- t.state land lnot state_idlers;
309+
Mutex.unlock t.mutex;
310+
Condition.signal t.worker_condition
311+
end;
312+
Thread.yield ();
313+
heartbeat_thread t 0
314+
end
315+
else begin
316+
if nth < 100 then begin
317+
if t.state <= state_killed then begin
318+
Thread.delay 0.0001;
319+
heartbeat_thread t (nth + 1)
320+
end
321+
end
322+
else begin
323+
if Mutex.try_lock t.mutex then begin
324+
let state = t.state in
325+
if state < state_killed then begin
326+
t.state <- state lor state_arrhytmia;
327+
Condition.wait t.heartbeat_condition t.mutex
328+
end;
329+
Mutex.unlock t.mutex;
330+
heartbeat_thread t 0
331+
end
332+
else heartbeat_thread t nth
333+
end
334+
end
335+
336+
let heartbeat_thread t =
337+
try heartbeat_thread t 0
338+
with exn ->
339+
kill t;
340+
t.handler.exnc exn
341+
296342
let with_per_thread t fn =
297343
let (Per_thread new_p as new_pt) = per_thread t in
298344
begin
@@ -308,7 +354,11 @@ let with_per_thread t fn =
308354
end;
309355
new_p.index <- t.threads_num;
310356
Array.unsafe_set t.threads t.threads_num new_pt;
311-
if t.threads_num = 0 then Atomic.incr t.num_started
357+
if t.threads_num = 0 then begin
358+
Atomic.incr t.num_started;
359+
let _ = Thread.create heartbeat_thread t in
360+
()
361+
end
312362
else Multicore_magic.fence t.num_started;
313363
t.threads_num <- t.threads_num + 1
314364
with
@@ -351,8 +401,7 @@ let effc : type a. a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option =
351401
(* The queue [push] includes a full fence, which means the increment
352402
of [num_started] will happen before increment of [num_stopped]. *)
353403
Mpmcq.push p.ready (Spawn (r.fiber, r.main));
354-
let t = p.context in
355-
relaxed_wakeup t ~known_not_empty:true p.ready;
404+
wakeup_heartbeat p.context;
356405
p.return
357406
end
358407
| Fiber.Yield -> yield
@@ -403,23 +452,24 @@ let context ?quota ?fatal_exn_handler () =
403452
| None -> default_fatal_exn_handler
404453
| Some handler ->
405454
fun exn ->
455+
let (Per_thread p) = get_per_thread () in
456+
kill p.context;
406457
handler exn;
407458
raise exn
408459
in
409460
Select.check_configured ();
410461
let mutex = Mutex.create ()
411-
and condition = Condition.create ()
412-
and num_waiters = ref 0 |> Multicore_magic.copy_as_padded
413-
and num_started = Atomic.make 0 |> Multicore_magic.copy_as_padded in
462+
and worker_condition = Condition.create ()
463+
and heartbeat_condition = Condition.create ()
464+
and num_started = Atomic.make 0 in
414465
{
415-
num_waiters_non_zero = false;
416-
num_waiters;
466+
state = 0;
417467
num_started;
418468
mutex;
419-
condition;
469+
worker_condition;
470+
heartbeat_condition;
420471
handler = { retc; exnc; effc };
421472
quota;
422-
run = false;
423473
threads = Array.make 15 Nothing;
424474
threads_num = 0;
425475
}
@@ -432,12 +482,13 @@ let run_fiber ?context:t_opt fiber main =
432482
let t = match t_opt with None -> context () | Some t -> t in
433483
with_per_thread t @@ fun (Per_thread p) ->
434484
Mutex.lock t.mutex;
435-
if t.run then begin
485+
let state = t.state in
486+
if state = state lor state_running then begin
436487
Mutex.unlock t.mutex;
437488
already_running ()
438489
end
439490
else begin
440-
t.run <- true;
491+
t.state <- state lor state_running;
441492
Mutex.unlock t.mutex;
442493
p.remaining_quota <- t.quota;
443494
p.fiber <- Fiber.Maybe.of_fiber fiber;

lib/picos_mux.multififo/picos_mux_multififo.mli

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ type t
2727
(** Represents a shared context for fifo runners. *)
2828

2929
val context : ?quota:int -> ?fatal_exn_handler:(exn -> unit) -> unit -> t
30-
(** [context ()] creates a new context for randomized runners. The context
31-
should be consumed by a call of {{!run} [run ~context ...]}.
30+
(** [context ()] creates a new context for randomized runners. The context must
31+
always be consumed by a call of {{!run} [run ~context ...]}.
3232
3333
The optional [quota] argument defaults to [Int.max_int] and determines the
3434
number of effects a fiber is allowed to perform before it is forced to

0 commit comments

Comments
 (0)