Skip to content

Commit 205581f

Browse files
fix preemptive test: but needs new primitive in preemptive
1 parent e6efdd7 commit 205581f

File tree

3 files changed

+37
-35
lines changed

3 files changed

+37
-35
lines changed

src/unix/lwt_preemptive.ml

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,15 @@ sig
4343
type 'a t
4444

4545
val make : unit -> 'a t
46-
val get : 'a t -> 'a
46+
val get : 'a t -> ('a, unit) result
4747
val set : 'a t -> 'a -> unit
48+
val kill : 'a t -> unit
4849
end =
4950
struct
5051
type 'a t = {
5152
m : Mutex.t;
5253
cv : Condition.t;
53-
mutable cell : 'a option;
54+
mutable cell : ('a, unit) result option;
5455
}
5556

5657
let make () = { m = Mutex.create (); cv = Condition.create (); cell = None }
@@ -71,7 +72,13 @@ struct
7172

7273
let set t v =
7374
Mutex.lock t.m;
74-
t.cell <- Some v;
75+
t.cell <- Some (Ok v);
76+
Mutex.unlock t.m;
77+
Condition.signal t.cv
78+
79+
let kill t =
80+
Mutex.lock t.m;
81+
t.cell <- Some (Error ());
7582
Mutex.unlock t.m;
7683
Condition.signal t.cv
7784
end
@@ -97,14 +104,16 @@ let waiters : thread Lwt.u Lwt_sequence.t Domain.DLS.key = Domain.DLS.new_key Lw
97104

98105
(* Code executed by a worker: *)
99106
let rec worker_loop worker =
100-
let id, task = CELL.get worker.task_cell in
101-
task ();
102-
(* If there is too much threads, exit. This can happen if the user
103-
decreased the maximum: *)
104-
if Domain.DLS.get threads_count > Domain.DLS.get max_threads then worker.reuse <- false;
105-
(* Tell the main thread that work is done: *)
106-
Lwt_unix.send_notification id;
107-
if worker.reuse then worker_loop worker
107+
match CELL.get worker.task_cell with
108+
| Error () -> ()
109+
| Ok (id, task) ->
110+
task ();
111+
(* If there is too much threads, exit. This can happen if the user
112+
decreased the maximum: *)
113+
if Domain.DLS.get threads_count > Domain.DLS.get max_threads then worker.reuse <- false;
114+
(* Tell the main thread that work is done: *)
115+
Lwt_unix.send_notification id;
116+
if worker.reuse then worker_loop worker
108117

109118
(* create a new worker: *)
110119
let make_worker () =
@@ -258,10 +267,14 @@ let run_in_domain d f =
258267
run_in_domain_dont_wait d job;
259268
(* Wait for the result. *)
260269
match CELL.get cell with
261-
| Result.Ok ret -> ret
262-
| Result.Error exn -> raise exn
270+
| Ok (Ok ret) -> ret
271+
| Ok (Error exn) -> raise exn
272+
| Error () -> assert false
263273

264274
(* This version shadows the one above, adding an exception handler *)
265275
let run_in_domain_dont_wait d f handler =
266276
let f () = Lwt.catch f (fun exc -> handler exc; Lwt.return_unit) in
267277
run_in_domain_dont_wait d f
278+
279+
let kill_all () =
280+
Queue.iter (fun thread -> CELL.kill thread.task_cell) (Domain.DLS.get workers)

src/unix/lwt_preemptive.mli

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,7 @@ val get_max_number_of_threads_queued : unit -> int
8787
val nbthreads : unit -> int
8888
val nbthreadsbusy : unit -> int
8989
val nbthreadsqueued : unit -> int
90+
91+
(* kill_all is to be called before joining the domain, not satisfying UI for
92+
now, searching for a better way *)
93+
val kill_all : unit -> unit

test/multidomain/preempting.ml

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@ let simulate_work data =
1212

1313
let () = Lwt_unix.init_domain ()
1414

15-
(* atomic just for debugging: to record when domains are finished *)
16-
let x = Atomic.make 0
17-
18-
let domain_go_brrrrrrr n input = Domain.spawn (fun () ->
15+
let domain_go_brrrrrrr input = Domain.spawn (fun () ->
1916
flush_all ();
2017
Lwt_unix.init_domain ();
2118
let v = Lwt_main.run (
@@ -24,35 +21,23 @@ let domain_go_brrrrrrr n input = Domain.spawn (fun () ->
2421
Lwt_list.map_p (Lwt_preemptive.detach simulate_work) input
2522
)
2623
in
27-
(* printing just for debug: to see when different domains are finished *)
28-
Printf.printf "domain #%d %d scheduler returned\n" n (Domain.self () :> int);
29-
flush_all ();
30-
Atomic.incr x;
24+
Lwt_preemptive.kill_all ();
3125
v
3226
)
3327

3428
let () =
35-
let rec go n acc = function
29+
let rec go acc = function
3630
| [_] | [] ->
37-
Printf.printf "all domain started\n"; flush_all ();
3831
acc
3932
| (_ :: more) as wrk ->
4033
let expected = List.map String.length wrk in
41-
let acc = (n, expected, domain_go_brrrrrrr n wrk) :: acc in
42-
go (n + 1) acc more
43-
in
44-
let results = go 1 [] input in
45-
Unix.sleepf 5.; (* sleeping for debug: to observe that atomic is at max value *)
46-
Printf.printf "done debug-sleeping, about to join all domains (atomic=%d)\n" (Atomic.get x); flush_all ();
47-
let results = List.map (fun (n, e, d) ->
48-
Printf.printf "joining domain #%d (atomic=%d)\n" n (Atomic.get x); flush_all ();
49-
let d = Domain.join d in
50-
Printf.printf "joined domain #%d (atomic=%d)\n" n (Atomic.get x); flush_all ();
51-
(e, d)) results
34+
let acc = (expected, domain_go_brrrrrrr wrk) :: acc in
35+
go acc more
5236
in
37+
let results = go [] input in
5338
let success =
5439
List.for_all
55-
(fun (expected, d) -> List.for_all2 Int.equal expected d)
40+
(fun (expected, d) -> List.for_all2 Int.equal expected (Domain.join d))
5641
results
5742
in
5843
let code =

0 commit comments

Comments
 (0)