Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ open Effect.Deep
type 'a task = unit -> 'a

type message =
Work of (unit -> unit)
| Work of (unit -> unit)
(* Invariant: the Work function does not need to run under the 'step' handler,
it installs its own handler or re-invokes a deep-handler continuation. *)
| Quit

type task_chan = message Multi_channel.t
Expand Down Expand Up @@ -48,12 +50,6 @@ let do_task (type a) (f : unit -> a) (p : a promise) : unit =
| Pending l -> List.iter action l
| _ -> failwith "Task.do_task: impossible, can only set result of task once"

let async pool f =
let pd = get_pool_data pool in
let p = Atomic.make (Pending []) in
Multi_channel.send pd.task_chan (Work (fun _ -> do_task f p));
p

let await pool promise =
let pd = get_pool_data pool in
match Atomic.get promise with
Expand All @@ -78,10 +74,16 @@ let step (type a) (f : a -> unit) (v : a) : unit =
loop ())
| _ -> None }

let async pool f =
let pd = get_pool_data pool in
let p = Atomic.make (Pending []) in
Multi_channel.send pd.task_chan (Work (fun _ -> step (do_task f) p));
p

let rec worker task_chan =
match Multi_channel.recv task_chan with
| Quit -> Multi_channel.clear_local_state task_chan
| Work f -> step f (); worker task_chan
| Work f -> f (); worker task_chan

let run (type a) pool (f : unit -> a) : a =
let pd = get_pool_data pool in
Expand All @@ -93,7 +95,7 @@ let run (type a) pool (f : unit -> a) : a =
begin
try
match Multi_channel.recv_poll pd.task_chan with
| Work f -> step f ()
| Work f -> f ()
| Quit -> failwith "Task.run: tasks are active on pool"
with Exit -> Domain.cpu_relax ()
end;
Expand Down