Skip to content

Commit b083335

Browse files
committed
Add Picos_stdio benchmark
1 parent 5199a36 commit b083335

File tree

11 files changed

+157
-68
lines changed

11 files changed

+157
-68
lines changed

bench/bench_cancel_after.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ let run_round_trip ~budgetf ~n_domains () =
3232
let config =
3333
Printf.sprintf "%d worker%s" n_domains (if n_domains = 1 then "" else "s")
3434
in
35-
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
35+
Times.record ~budgetf ~n_domains ~init ~n_warmups:1 ~n_runs_min:1 ~wrap ~work
36+
()
3637
|> Times.to_thruput_metrics ~n:n_ops ~singular:"round-trip" ~config
3738

3839
let run_async ~budgetf ~n_domains () =
@@ -81,7 +82,8 @@ let run_async ~budgetf ~n_domains () =
8182
let config =
8283
Printf.sprintf "%d worker%s" n_domains (if n_domains = 1 then "" else "s")
8384
in
84-
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
85+
Times.record ~budgetf ~n_domains ~n_warmups:1 ~n_runs_min:1 ~init ~wrap ~work
86+
()
8587
|> Times.to_thruput_metrics ~n:n_ops ~singular:"async round-trip" ~config
8688

8789
let run_suite ~budgetf =

bench/bench_mutex.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ let run_one ~budgetf ~n_fibers ~use_domains () =
6565
(if use_domains then "domain" else "fiber")
6666
(if n_fibers = 1 then "" else "s")
6767
in
68-
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
68+
Times.record ~budgetf ~n_domains ~n_warmups:1 ~n_runs_min:1 ~init ~wrap ~work
69+
()
6970
|> Times.to_thruput_metrics ~n:n_ops ~singular:"locked yield" ~config
7071

7172
let run_suite ~budgetf =

bench/bench_spawn.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ let run_one ~budgetf ~at_a_time () =
2626
in
2727

2828
let config = Printf.sprintf "%d at a time" at_a_time in
29-
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
29+
Times.record ~budgetf ~n_domains:1 ~n_warmups:1 ~n_runs_min:1 ~init ~wrap
30+
~work ()
3031
|> Times.to_thruput_metrics ~n:n_spawns ~singular:"spawn" ~config
3132

3233
let run_suite ~budgetf =

bench/bench_stdio.ml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
open Multicore_bench
2+
open Picos_stdio
3+
4+
let run_one ~budgetf ~block_or_nonblock ~n_domains () =
5+
let n_bytes =
6+
match block_or_nonblock with `Block -> 4096 | `Nonblock -> 65536
7+
in
8+
9+
let init _ =
10+
let inn, out = Unix.pipe ~cloexec:true () in
11+
(inn, out, Bytes.create 1)
12+
in
13+
let wrap _ _ = Scheduler.run in
14+
let work _ (inn, out, byte) =
15+
begin
16+
match block_or_nonblock with
17+
| `Block -> ()
18+
| `Nonblock -> Unix.set_nonblock inn
19+
end;
20+
let n = Unix.write out (Bytes.create n_bytes) 0 n_bytes in
21+
assert (n = n_bytes);
22+
for _ = 1 to n_bytes do
23+
let n : int = Unix.read inn byte 0 1 in
24+
assert (n = 1)
25+
done;
26+
Unix.close inn;
27+
Unix.close out
28+
in
29+
30+
let times =
31+
Times.record ~budgetf ~n_domains ~n_warmups:1 ~n_runs_min:1 ~init ~wrap
32+
~work ()
33+
in
34+
35+
let config =
36+
Printf.sprintf "%d worker%s" n_domains (if n_domains = 1 then "" else "s")
37+
and singular =
38+
match block_or_nonblock with
39+
| `Block -> "blocking read"
40+
| `Nonblock -> "non-blocking read"
41+
in
42+
Times.to_thruput_metrics ~n:(n_bytes * n_domains) ~singular ~config times
43+
44+
let run_suite ~budgetf =
45+
Util.cross [ `Nonblock; `Block ] [ 1; 2; 4 ]
46+
|> List.concat_map @@ fun (block_or_nonblock, n_domains) ->
47+
if
48+
Sys.win32
49+
|| Picos_domain.recommended_domain_count () < n_domains
50+
|| String.starts_with ~prefix:"5.0." Sys.ocaml_version
51+
&& block_or_nonblock == `Block
52+
then []
53+
else run_one ~budgetf ~block_or_nonblock ~n_domains ()

bench/bench_yield.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ let run_one ~budgetf ~n_fibers () =
2727
let config =
2828
Printf.sprintf "%d fiber%s" n_fibers (if n_fibers = 1 then "" else "s")
2929
in
30-
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
30+
Times.record ~budgetf ~n_domains:1 ~n_warmups:1 ~n_runs_min:1 ~init ~wrap
31+
~work ()
3132
|> Times.to_thruput_metrics ~n:n_yields ~singular:"yield" ~config
3233

3334
let run_suite ~budgetf =

bench/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
picos.sync
1111
picos.domain
1212
picos.tls
13+
picos.stdio
1314
(select
1415
scheduler.ml
1516
from

bench/main.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ let benchmarks =
1212
("Ref with Picos_sync.Mutex", Bench_ref_mutex.run_suite);
1313
("Foundation Mpsc_queue", Bench_mpsc_queue.run_suite);
1414
("Picos_htbl", Bench_htbl.run_suite);
15+
("Picos_stdio", Bench_stdio.run_suite);
1516
]
1617

1718
let () = Multicore_bench.Cmd.run ~benchmarks ()

lib/picos_select/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
picos.domain
1111
picos.tls
1212
picos_thread_atomic
13+
backoff
1314
threads.posix
1415
psq
1516
mtime

lib/picos_select/picos_select.ml

Lines changed: 68 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ open Picos
33
let intr_sig = Sys.sigusr2
44
let is_intr_sig s = s == intr_sig
55
let intr_sigs = [ intr_sig ]
6-
let intr_pending = Atomic.make 0
76

87
type cancel_at =
98
| Cancel_at : {
@@ -49,6 +48,21 @@ type state = {
4948
new_ex : return_on list ref;
5049
}
5150

51+
type intr_status = Cleared | Signaled
52+
53+
type _ tdt =
54+
| Nothing : [> `Nothing ] tdt
55+
| Req : {
56+
state : state;
57+
mutable unused : bool;
58+
computation : intr_status Computation.t;
59+
}
60+
-> [> `Req ] tdt
61+
62+
type req = R : [< `Nothing | `Req ] tdt -> req [@@unboxed]
63+
type counter_state = { value : int; req : req }
64+
65+
let intr_pending = Atomic.make { value = 0; req = R Nothing }
5266
let exit_exn_bt = Exn_bt.get_callstack 0 Exit
5367

5468
let key =
@@ -180,9 +194,10 @@ and select_thread_continue s rd wr ex (rd_fds, wr_fds, ex_fds) =
180194
let ex = process_fds ex_fds ex (Picos_thread_atomic.exchange s.new_ex []) in
181195
let tos = process_timeouts s in
182196
let tos =
183-
let n = Atomic.get intr_pending in
184-
if n = 0 then tos
197+
let state = Atomic.get intr_pending in
198+
if state.value = 0 then tos
185199
else begin
200+
assert (0 < state.value);
186201
Unix.kill (Unix.getpid ()) intr_sig;
187202
let idle = 0.000_001 (* 1μs *) in
188203
if tos < 0.0 || idle <= tos then idle else tos
@@ -302,17 +317,7 @@ let await_on file_descr op =
302317
(* *)
303318

304319
module Intr = struct
305-
type intr_status = Cleared | Signaled
306-
307-
type _ tdt =
308-
| Nothing : [> `Nothing ] tdt
309-
| Req : {
310-
state : state;
311-
mutable computation : intr_status Computation.t;
312-
}
313-
-> [> `Req ] tdt
314-
315-
type t = T : [< `Nothing | `Req ] tdt -> t [@@unboxed]
320+
type t = req
316321

317322
let cleared =
318323
let computation = Computation.create () in
@@ -321,29 +326,42 @@ module Intr = struct
321326

322327
let intr_key =
323328
Picos_tls.new_key @@ fun () : [ `Req ] tdt ->
324-
Req { state = get (); computation = cleared }
329+
Req { state = get (); unused = true; computation = cleared }
330+
331+
let[@inline] use = function R Nothing -> () | R (Req r) -> r.unused <- false
325332

326333
let handle _ =
327334
let (Req r) = Picos_tls.get intr_key in
328335
Computation.return r.computation Signaled
329336

330-
let intr_action trigger (Req r : [ `Req ] tdt) id =
337+
let rec finish (Req r as req : [ `Req ] tdt) backoff =
338+
let before = Atomic.get intr_pending in
339+
r.unused && before.req != R req
340+
&& begin
341+
use before.req;
342+
let after = { value = before.value + 1; req = R req } in
343+
if Atomic.compare_and_set intr_pending before after then
344+
after.value = 1
345+
else finish req (Backoff.once backoff)
346+
end
347+
348+
let intr_action trigger (Req r as req : [ `Req ] tdt) id =
331349
match Computation.await r.computation with
332350
| Cleared ->
333351
(* No signal needs to be delivered. *)
334352
remove_action trigger r.state id
335353
| Signaled ->
336354
(* Signal was delivered before timeout. *)
337355
remove_action trigger r.state id;
338-
if Atomic.fetch_and_add intr_pending 1 = 0 then begin
356+
if finish req Backoff.default then
339357
(* We need to make sure at least one select thread will keep on
340358
triggering interrupts. *)
341359
wakeup r.state `Alive
342-
end
343360
| exception Exit ->
344361
(* The timeout was triggered. This must have been called from the
345362
select thread, which will soon trigger an interrupt. *)
346-
Atomic.incr intr_pending
363+
let _ : bool = finish req Backoff.default in
364+
()
347365

348366
let () =
349367
if not Sys.win32 then begin
@@ -353,37 +371,47 @@ module Intr = struct
353371
assert (old_behavior == Signal_default)
354372
end
355373

356-
let nothing = T Nothing
374+
let nothing = R Nothing
357375

358376
let[@alert "-handler"] req ~seconds =
359377
if Sys.win32 then
360378
invalid_arg "Picos_select.Intr is not supported on Windows"
361379
else begin
362380
let time = to_deadline ~seconds in
363-
let (Req r as req) = Picos_tls.get intr_key in
364-
assert (not (Computation.is_running r.computation));
365-
let id = next_id r.state in
366-
let computation = Computation.with_action req id intr_action in
367-
r.computation <- computation;
381+
(* assert (not (Computation.is_running r.computation)); *)
382+
let state = get () in
383+
let id = next_id state in
384+
let computation = Computation.create () in
385+
let (Req _ as req : [ `Req ] tdt) =
386+
Req { state; unused = true; computation }
387+
in
388+
let _ : bool =
389+
Computation.try_attach computation
390+
(Trigger.from_action req id intr_action)
391+
in
392+
Picos_tls.set intr_key req;
368393
let entry = Cancel_at { time; exn_bt = exit_exn_bt; computation } in
369-
add_timeout r.state id entry;
370-
let was_blocked : int list = Thread.sigmask SIG_UNBLOCK intr_sigs in
371-
assert (List.exists is_intr_sig was_blocked);
372-
T req
394+
add_timeout state id entry;
395+
let _was_blocked : int list = Thread.sigmask SIG_UNBLOCK intr_sigs in
396+
(* assert (List.exists is_intr_sig was_blocked); *)
397+
R req
373398
end
374399

400+
let rec decr backoff =
401+
let before = Atomic.get intr_pending in
402+
use before.req;
403+
let after = { value = before.value - 1; req = R Nothing } in
404+
assert (0 <= after.value);
405+
if not (Atomic.compare_and_set intr_pending before after) then
406+
decr (Backoff.once backoff)
407+
375408
let clr = function
376-
| T Nothing -> ()
377-
| T (Req r) ->
378-
let was_blocked : int list = Thread.sigmask SIG_BLOCK intr_sigs in
379-
assert (not (List.exists is_intr_sig was_blocked));
409+
| R Nothing -> ()
410+
| R (Req r as req) ->
411+
let _was_blocked : int list = Thread.sigmask SIG_BLOCK intr_sigs in
412+
(* assert (not (List.exists is_intr_sig was_blocked)); *)
380413
if not (Computation.try_return r.computation Cleared) then begin
381-
while
382-
let count = Atomic.get intr_pending in
383-
count <= 0
384-
|| not (Atomic.compare_and_set intr_pending count (count - 1))
385-
do
386-
Thread.yield ()
387-
done
414+
let _ : bool = finish req Backoff.default in
415+
decr Backoff.default
388416
end
389417
end

0 commit comments

Comments
 (0)