Skip to content

Commit 1ff21c7

Browse files
committed
Allow to specify the order for signaling triggers on completion
The order has an effect on scheduling so it makes to allow it to be specified.
1 parent 8eb0a85 commit 1ff21c7

File tree

6 files changed

+167
-129
lines changed

6 files changed

+167
-129
lines changed

lib/picos/bootstrap.ml

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,16 @@ module Computation = struct
3434
type 'a state =
3535
| Canceled of Exn_bt.t
3636
| Returned of 'a
37-
| Continue of { balance : int; triggers : Trigger.t list }
37+
| Continue of { balance_and_mode : int; triggers : Trigger.t list }
3838

3939
type 'a t = 'a state Atomic.t
4040

41-
let create () = Atomic.make (Continue { balance = 0; triggers = [] })
41+
let fifo_bit = 1
42+
let one = 2
43+
44+
let create ?(mode : [ `FIFO | `LIFO ] = `FIFO) () =
45+
let balance_and_mode = Bool.to_int (mode == `FIFO) in
46+
Atomic.make (Continue { balance_and_mode; triggers = [] })
4247

4348
let canceled t =
4449
match Atomic.get t with
@@ -50,13 +55,16 @@ module Computation = struct
5055
[detach]. This ensures that the [O(n)] lazy removal done by [gc] cannot
5156
cause starvation, because the only reason that CAS fails after [gc] is
5257
that someone else completed the [gc]. *)
53-
let rec gc balance triggers = function
58+
let rec gc balance_and_mode triggers = function
5459
| [] ->
55-
let triggers = if balance <= 1 then triggers else List.rev triggers in
56-
Continue { balance; triggers }
60+
let triggers =
61+
if balance_and_mode <= one + fifo_bit then triggers
62+
else List.rev triggers
63+
in
64+
Continue { balance_and_mode; triggers }
5765
| r :: rs ->
58-
if Trigger.is_signaled r then gc balance triggers rs
59-
else gc (balance + 1) (r :: triggers) rs
66+
if Trigger.is_signaled r then gc balance_and_mode triggers rs
67+
else gc (balance_and_mode + one) (r :: triggers) rs
6068
end
6169

6270
let rec try_attach t trigger backoff =
@@ -67,10 +75,14 @@ module Computation = struct
6775
(not (Trigger.is_signaled trigger))
6876
&&
6977
let after =
70-
if 0 <= r.balance then
78+
if fifo_bit <= r.balance_and_mode then
7179
Continue
72-
{ balance = r.balance + 1; triggers = trigger :: r.triggers }
73-
else gc 1 [ trigger ] r.triggers
80+
{
81+
balance_and_mode = r.balance_and_mode + one;
82+
triggers = trigger :: r.triggers;
83+
}
84+
else
85+
gc (one + (r.balance_and_mode land fifo_bit)) [ trigger ] r.triggers
7486
in
7587
Atomic.compare_and_set t before after
7688
|| try_attach t trigger (Backoff.once backoff)
@@ -82,8 +94,10 @@ module Computation = struct
8294
| Returned _ | Canceled _ -> ()
8395
| Continue r as before ->
8496
let after =
85-
if 0 <= r.balance then Continue { r with balance = r.balance - 2 }
86-
else gc 0 [] r.triggers
97+
if fifo_bit <= r.balance_and_mode then
98+
Continue
99+
{ r with balance_and_mode = r.balance_and_mode - (2 * one) }
100+
else gc (r.balance_and_mode land fifo_bit) [] r.triggers
87101
in
88102
if not (Atomic.compare_and_set t before after) then
89103
detach t (Backoff.once backoff)
@@ -105,7 +119,10 @@ module Computation = struct
105119
| Returned _ | Canceled _ -> false
106120
| Continue r as before ->
107121
if Atomic.compare_and_set t before after then begin
108-
List.iter Trigger.signal r.triggers;
122+
List.iter Trigger.signal
123+
(if r.balance_and_mode land fifo_bit = fifo_bit then
124+
List.rev r.triggers
125+
else r.triggers);
109126
true
110127
end
111128
else try_terminate t after (Backoff.once backoff)

lib/picos/picos.mli

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,8 +567,15 @@ module Computation : sig
567567
type !'a t
568568
(** Represents a cancelable computation. *)
569569

570-
val create : unit -> 'a t
571-
(** [create ()] creates a new computation in the running state. *)
570+
val create : ?mode:[ `FIFO | `LIFO ] -> unit -> 'a t
571+
(** [create ()] creates a new computation in the running state.
572+
573+
The optional [mode] specifies the order in which {{!Trigger} triggers}
574+
{{!try_attach} attached} to the computation will be {{!Trigger.signal}
575+
signaled} after the computation has been completed. [`FIFO] ordering may
576+
reduce latency of IO bound computations and is the default. [`LIFO] may
577+
improve thruput of CPU bound computations and be preferable on a
578+
work-stealing scheduler, for example. *)
572579

573580
val finished : unit t
574581
(** [finished] is a constant finished computation. *)

test/lib/elements/promise.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ let any xs =
9898
Fiber.spawn ~forbid:false y [ main ];
9999
y
100100

101-
let create = Computation.create
101+
let create () = Computation.create ()
102102
let try_return_to = Computation.try_return
103103
let try_reify_to t thunk = Computation.try_capture t thunk ()
104104
let publish = of_computation

test/lib/elements/sleep.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ open Picos
33
let exit_bt = Exn_bt.get_callstack 0 Exit
44

55
let sleepf seconds =
6-
let sleep = Computation.create () in
6+
let sleep = Computation.create ~mode:`LIFO () in
77
Computation.cancel_after ~seconds sleep exit_bt;
88
let trigger = Trigger.create () in
99
if Computation.try_attach sleep trigger then

test/test_picos.ml

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -110,48 +110,55 @@ let test_cancel_after () =
110110
(Exn_bt.get_callstack 0 Not_found);
111111
Computation.await computation
112112

113-
let test_computation_completion_signals_triggers_in_lifo_order () =
114-
let state = Random.State.make_self_init () in
115-
let num_non_trivial = ref 0 in
116-
for _ = 1 to 10 do
117-
let computation = Computation.create () in
118-
let signals = ref [] in
119-
let triggers = ref [] in
120-
let counter = ref 0 in
121-
let attach_one () =
122-
let trigger = Trigger.create () in
123-
triggers := trigger :: !triggers;
124-
let i = !counter in
125-
counter := i + 1;
126-
assert (Computation.try_attach computation trigger);
127-
assert (
128-
Trigger.on_signal trigger () () (fun _ _ _ -> signals := i :: !signals))
129-
in
130-
let detach_one () =
131-
let n = List.length !triggers in
132-
if 0 < n then begin
133-
let bits = Random.State.bits state in
134-
let i = bits mod n in
135-
let trigger = List.nth !triggers i in
136-
triggers := List.filter (( != ) trigger) !triggers;
137-
Computation.detach computation trigger
138-
end
139-
in
140-
for _ = 1 to 10 do
141-
for _ = 1 to 10 do
142-
let bits = Random.State.bits state in
143-
if bits land 3 <= 2 then attach_one () else detach_one ()
144-
done;
145-
for _ = 1 to List.length !triggers / 3 do
146-
detach_one ()
147-
done
148-
done;
149-
if List.length !triggers >= 2 then incr num_non_trivial;
150-
signals := [];
151-
Computation.finish computation;
152-
assert (!signals = List.sort Int.compare !signals)
153-
done;
154-
assert (0 < !num_non_trivial)
113+
let test_computation_completion_signals_triggers_in_order () =
114+
[ `FIFO; `LIFO ]
115+
|> List.iter @@ fun mode ->
116+
let state = Random.State.make_self_init () in
117+
let num_non_trivial = ref 0 in
118+
for _ = 1 to 10 do
119+
let computation = Computation.create ~mode () in
120+
let signals = ref [] in
121+
let triggers = ref [] in
122+
let counter = ref 0 in
123+
let attach_one () =
124+
let trigger = Trigger.create () in
125+
triggers := trigger :: !triggers;
126+
let i = !counter in
127+
counter := i + 1;
128+
assert (Computation.try_attach computation trigger);
129+
assert (
130+
Trigger.on_signal trigger () () (fun _ _ _ ->
131+
signals := i :: !signals))
132+
in
133+
let detach_one () =
134+
let n = List.length !triggers in
135+
if 0 < n then begin
136+
let bits = Random.State.bits state in
137+
let i = bits mod n in
138+
let trigger = List.nth !triggers i in
139+
triggers := List.filter (( != ) trigger) !triggers;
140+
Computation.detach computation trigger
141+
end
142+
in
143+
for _ = 1 to 10 do
144+
for _ = 1 to 10 do
145+
let bits = Random.State.bits state in
146+
if bits land 3 <= 2 then attach_one () else detach_one ()
147+
done;
148+
for _ = 1 to List.length !triggers / 3 do
149+
detach_one ()
150+
done
151+
done;
152+
if List.length !triggers >= 2 then incr num_non_trivial;
153+
signals := [];
154+
Computation.finish computation;
155+
let expected =
156+
List.sort Int.compare !signals
157+
|> if mode = `FIFO then List.rev else Fun.id
158+
in
159+
assert (!signals = expected)
160+
done;
161+
assert (0 < !num_non_trivial)
155162

156163
let () =
157164
[
@@ -163,10 +170,10 @@ let () =
163170
( "Thread cancelation",
164171
[ Alcotest.test_case "" `Quick test_thread_cancelation ] );
165172
("Cancel after", [ Alcotest.test_case "" `Quick test_cancel_after ]);
166-
( "Computation signals in LIFO order",
173+
( "Computation signals in order",
167174
[
168175
Alcotest.test_case "" `Quick
169-
test_computation_completion_signals_triggers_in_lifo_order;
176+
test_computation_completion_signals_triggers_in_order;
170177
] );
171178
]
172179
|> Alcotest.run "Picos"

test/test_picos_dscheck.ml

Lines changed: 75 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -43,81 +43,88 @@ let test_trigger_contract () =
4343

4444
(** This tries to cover much of the public contract of [Computation]s. *)
4545
let test_computation_contract () =
46-
let attached_total = ref 0 and unattached_total = ref 0 in
47-
let () =
48-
Atomic.trace @@ fun () ->
49-
let computation = Computation.create () in
50-
let returns = ref 0 and cancels = ref 0 in
51-
let () =
52-
Atomic.spawn @@ fun () ->
53-
if Computation.try_return computation 101 then incr returns
54-
in
55-
let () =
56-
Atomic.spawn @@ fun () ->
57-
if Computation.try_cancel computation (Exn_bt.get_callstack 1 Exit) then
58-
incr cancels
59-
in
60-
let triggers = Array.init 2 @@ fun _ -> Trigger.create () in
61-
let attached = ref 0 and unattached = ref 0 in
62-
let () =
63-
triggers
64-
|> Array.iter @@ fun trigger ->
46+
[ `FIFO; `LIFO ]
47+
|> List.iter @@ fun mode ->
48+
let attached_total = ref 0 and unattached_total = ref 0 in
49+
let () =
50+
Atomic.trace @@ fun () ->
51+
let computation = Computation.create ~mode () in
52+
let returns = ref 0 and cancels = ref 0 in
53+
let () =
6554
Atomic.spawn @@ fun () ->
66-
if Computation.try_attach computation trigger then incr attached
67-
else incr unattached
68-
in
69-
Atomic.final @@ fun () ->
70-
Atomic.check @@ fun () ->
71-
attached_total += !attached;
72-
unattached_total += !unattached;
73-
begin
74-
match Computation.peek computation with
75-
| Some (Ok 101) when !returns = 1 && !cancels = 0 -> true
76-
| Some (Error { exn = Exit; _ }) when !returns = 0 && !cancels = 1 -> true
77-
| _ -> false
78-
end
79-
&& !attached + !unattached = Array.length triggers
80-
&& !attached
81-
= sum_as
82-
(fun trigger -> Bool.to_int (Trigger.is_signaled trigger))
83-
triggers
84-
in
85-
[ attached_total; unattached_total ]
86-
|> List.iter @@ fun total -> if !total = 0 then Alcotest.fail "uncovered case"
55+
if Computation.try_return computation 101 then incr returns
56+
in
57+
let () =
58+
Atomic.spawn @@ fun () ->
59+
if Computation.try_cancel computation (Exn_bt.get_callstack 1 Exit)
60+
then incr cancels
61+
in
62+
let triggers = Array.init 2 @@ fun _ -> Trigger.create () in
63+
let attached = ref 0 and unattached = ref 0 in
64+
let () =
65+
triggers
66+
|> Array.iter @@ fun trigger ->
67+
Atomic.spawn @@ fun () ->
68+
if Computation.try_attach computation trigger then incr attached
69+
else incr unattached
70+
in
71+
Atomic.final @@ fun () ->
72+
Atomic.check @@ fun () ->
73+
attached_total += !attached;
74+
unattached_total += !unattached;
75+
begin
76+
match Computation.peek computation with
77+
| Some (Ok 101) when !returns = 1 && !cancels = 0 -> true
78+
| Some (Error { exn = Exit; _ }) when !returns = 0 && !cancels = 1 ->
79+
true
80+
| _ -> false
81+
end
82+
&& !attached + !unattached = Array.length triggers
83+
&& !attached
84+
= sum_as
85+
(fun trigger -> Bool.to_int (Trigger.is_signaled trigger))
86+
triggers
87+
in
88+
[ attached_total; unattached_total ]
89+
|> List.iter @@ fun total ->
90+
if !total = 0 then Alcotest.fail "uncovered case"
8791

8892
(** This covers the contract of [Computation] to remove detached triggers.
8993
9094
Testing this through the public API would require relying on GC
9195
statistics. *)
9296
let test_computation_removes_triggers () =
93-
Atomic.trace @@ fun () ->
94-
let computation = Computation.create () in
95-
let triggers = Array.init 4 @@ fun _ -> Trigger.create () in
96-
let () =
97-
triggers
98-
|> Array.iter @@ fun trigger ->
99-
Atomic.spawn @@ fun () ->
100-
Atomic.check (fun () -> Computation.try_attach computation trigger);
101-
Computation.detach computation trigger
102-
in
103-
Atomic.final @@ fun () ->
104-
Atomic.check @@ fun () ->
105-
Array.for_all Trigger.is_signaled triggers
106-
&&
107-
match Atomic.get computation with
108-
| Canceled _ | Returned _ -> false
109-
| Continue { balance; triggers } ->
110-
balance <= 0
111-
&& List.length triggers <= 2
112-
&&
113-
let trigger = Trigger.create () in
114-
Computation.try_attach computation trigger
115-
&& begin
116-
match Atomic.get computation with
117-
| Canceled _ | Returned _ -> false
118-
| Continue { balance; triggers } ->
119-
balance = 1 && triggers = [ trigger ]
120-
end
97+
[ `FIFO; `LIFO ]
98+
|> List.iter @@ fun mode ->
99+
Atomic.trace @@ fun () ->
100+
let computation = Computation.create ~mode () in
101+
let triggers = Array.init 4 @@ fun _ -> Trigger.create () in
102+
let () =
103+
triggers
104+
|> Array.iter @@ fun trigger ->
105+
Atomic.spawn @@ fun () ->
106+
Atomic.check (fun () -> Computation.try_attach computation trigger);
107+
Computation.detach computation trigger
108+
in
109+
Atomic.final @@ fun () ->
110+
Atomic.check @@ fun () ->
111+
Array.for_all Trigger.is_signaled triggers
112+
&&
113+
match Atomic.get computation with
114+
| Canceled _ | Returned _ -> false
115+
| Continue { balance_and_mode; triggers } ->
116+
balance_and_mode <= Computation.fifo_bit
117+
&& List.length triggers <= 2
118+
&&
119+
let trigger = Trigger.create () in
120+
Computation.try_attach computation trigger
121+
&& begin
122+
match Atomic.get computation with
123+
| Canceled _ | Returned _ -> false
124+
| Continue { balance_and_mode; triggers } ->
125+
balance_and_mode <= Computation.one + Computation.fifo_bit
126+
&& triggers = [ trigger ]
127+
end
121128

122129
let () =
123130
Alcotest.run "Picos DSCheck"

0 commit comments

Comments
 (0)