Skip to content

Commit d4e0210

Browse files
committed
Add more structured Run operations
1 parent bf0bdfa commit d4e0210

File tree

4 files changed

+79
-0
lines changed

4 files changed

+79
-0
lines changed

lib/picos_std.structured/picos_std_structured.mli

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,13 @@ module Run : sig
398398
with Control.Terminate -> ()
399399
]}
400400
but treats the list of actions as a single computation. *)
401+
402+
val for_n : int -> (int -> unit) -> unit
403+
(** [for_n n action], when [0 < n], calls [action i] for each integer [i] from
404+
[0] to [n-1] using up to [n] fibers, including the current fiber. The
405+
actual number of fibers used can be much less than [n] in case the calls
406+
do not block, the calls return quickly, and/or the scheduler doesn't
407+
provide parallelism. *)
401408
end
402409

403410
(** {1 Examples}

lib/picos_std.structured/run.ml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,59 @@ let run actions wrap =
3737

3838
let all actions = run actions wrap_all
3939
let any actions = run actions wrap_any
40+
41+
(* *)
42+
43+
type _ tdt =
44+
| Empty : [> `Empty ] tdt
45+
| Range : {
46+
mutable _lo : int;
47+
hi : int;
48+
parent : [ `Empty | `Range ] tdt;
49+
}
50+
-> [> `Range ] tdt
51+
52+
external lo_as_atomic : [ `Range ] tdt -> int Atomic.t = "%identity"
53+
54+
let rec for_out (Range r as range : [ `Range ] tdt) action backoff =
55+
let lo_before = Atomic.get (lo_as_atomic range) in
56+
let n = r.hi - lo_before in
57+
if 0 < n then begin
58+
if Atomic.compare_and_set (lo_as_atomic range) lo_before (lo_before + 1)
59+
then begin
60+
action lo_before;
61+
for_out range action Backoff.default
62+
end
63+
else for_out range action (Backoff.once backoff)
64+
end
65+
else
66+
match r.parent with
67+
| Empty -> ()
68+
| Range _ as range -> for_out range action Backoff.default
69+
70+
let rec for_in bundle (Range r as range : [ `Range ] tdt) action backoff =
71+
let lo_before = Atomic.get (lo_as_atomic range) in
72+
let n = r.hi - lo_before in
73+
if n <= 1 then begin
74+
if n = 1 && Atomic.compare_and_set (lo_as_atomic range) lo_before r.hi then
75+
action lo_before;
76+
match r.parent with
77+
| Empty -> ()
78+
| Range _ as range -> for_out range action Backoff.default
79+
end
80+
else
81+
let lo_after = lo_before + (n asr 1) in
82+
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
83+
Bundle.fork bundle (fun () -> for_in bundle range action Backoff.default);
84+
let child = Range { _lo = lo_before; hi = lo_after; parent = range } in
85+
for_in bundle child action Backoff.default
86+
end
87+
else for_in bundle range action (Backoff.once backoff)
88+
89+
let for_n n action =
90+
if 0 < n then
91+
if n = 1 then action 0
92+
else
93+
let range = Range { _lo = 0; hi = n; parent = Empty } in
94+
Bundle.join_after @@ fun bundle ->
95+
for_in bundle range action Backoff.default

test/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@
249249
(libraries
250250
alcotest
251251
picos
252+
picos.domain
252253
picos_aux.mpscq
253254
picos_std.finally
254255
picos_std.structured

test/test_structured.ml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,17 @@ let test_race_any () =
258258
(* This is non-deterministic and may need to changed if flaky *)
259259
assert (Atomic.get winner = 1)
260260

261+
let test_for_n_basic () =
262+
Test_scheduler.run ~max_domains:(Picos_domain.recommended_domain_count ())
263+
@@ fun () ->
264+
for n = 0 to 128 do
265+
let bytes = Bytes.create n in
266+
Run.for_n n (fun i -> Bytes.set bytes i (Char.chr i));
267+
for i = 0 to n - 1 do
268+
assert (Bytes.get bytes i = Char.chr i)
269+
done
270+
done
271+
261272
let () =
262273
[
263274
( "Bundle",
@@ -280,9 +291,13 @@ let () =
280291
test_error_in_promise_terminates;
281292
Alcotest.test_case "can wait promises" `Quick test_can_wait_promises;
282293
Alcotest.test_case "can select promises" `Quick test_can_select_promises;
294+
] );
295+
( "Run",
296+
[
283297
Alcotest.test_case "any and all errors" `Quick test_any_and_all_errors;
284298
Alcotest.test_case "any and all returns" `Quick test_any_and_all_returns;
285299
Alcotest.test_case "race any" `Quick test_race_any;
300+
Alcotest.test_case "for_n basic" `Quick test_for_n_basic;
286301
] );
287302
]
288303
|> Alcotest.run "Picos_structured"

0 commit comments

Comments
 (0)