|
| 1 | +open Lwt.Syntax |
| 2 | + |
| 3 | +let rec worker recv_task f send_result = |
| 4 | + let* task = Lwt_stream.get recv_task in |
| 5 | + match task with |
| 6 | + | None -> |
| 7 | +(* let () = Printf.printf "worker(%d) received interrupt\n" (Domain.self () :> int); flush_all() in *) |
| 8 | + send_result None; |
| 9 | + Lwt.return () |
| 10 | + | Some data -> |
| 11 | +(* let () = Printf.printf "worker(%d) received task (%S)\n" (Domain.self () :> int) data; flush_all() in *) |
| 12 | + let* result = f data in |
| 13 | + send_result (Some result); |
| 14 | +(* let () = Printf.printf "worker(%d) sent result (%d)\n" (Domain.self () :> int) result; flush_all() in *) |
| 15 | + let* () = Lwt.pause () in |
| 16 | + worker recv_task f send_result |
| 17 | + |
| 18 | +let spawn_domain_worker f = |
| 19 | + let recv_task, send_task = Lwt_stream.create () in |
| 20 | + let recv_result, send_result = Lwt_stream.create () in |
| 21 | + let dw = |
| 22 | + Domain.spawn (fun () -> |
| 23 | + Lwt_unix.init_domain (); |
| 24 | + Lwt_main.run ( |
| 25 | + let* () = Lwt.pause () in |
| 26 | + worker recv_task f send_result |
| 27 | + ) |
| 28 | + ) |
| 29 | + in |
| 30 | + send_task, dw, recv_result |
| 31 | + |
| 32 | +let simulate_work data = |
| 33 | + let* () = Lwt.pause () in |
| 34 | + Lwt.return (String.length data) |
| 35 | + |
| 36 | +let input = [""; "adsf"; "lkjh"; "lkjahsdflkjahdlfkjha"; "0"; ""; ""; ""; ""; ""; "adf"; "ASDSKJLHDAS"; "WPOQIEU"; "DSFALKHJ"; ""; ""; ""; ""; "SD"; "SD"; "SAD; SD;SD"; "ad"; "...."] |
| 37 | +let expected_result = List.fold_left (fun acc s -> acc + String.length s) 0 input |
| 38 | + |
| 39 | +let main () = |
| 40 | + let send_task1, dw1, recv_result1 = spawn_domain_worker simulate_work in |
| 41 | + let send_task2, dw2, recv_result2 = spawn_domain_worker simulate_work in |
| 42 | + let l = |
| 43 | + Lwt_unix.init_domain (); |
| 44 | + Lwt_main.run ( |
| 45 | + let* () = Lwt.pause () in |
| 46 | + let () = (* push work *) |
| 47 | + List.iteri |
| 48 | + (fun idx s -> if idx mod 3 = 0 then send_task1 (Some s) else send_task2 (Some s)) |
| 49 | + input |
| 50 | + in |
| 51 | + send_task1 None; |
| 52 | + send_task2 None; |
| 53 | + let* lengths1 = Lwt_stream.fold (+) recv_result1 0 |
| 54 | + and* lengths2 = Lwt_stream.fold (+) recv_result2 0 |
| 55 | + in |
| 56 | + Lwt.return (lengths1 + lengths2) |
| 57 | + ) |
| 58 | + in |
| 59 | + let () = Domain.join dw1 in |
| 60 | + let () = Domain.join dw2 in |
| 61 | + let _code = |
| 62 | + if l = expected_result then begin |
| 63 | + 0 |
| 64 | + end else begin |
| 65 | + Printf.printf "domain-workers: ×\n"; |
| 66 | + flush_all (); |
| 67 | + exit 1 |
| 68 | + end |
| 69 | + in |
| 70 | + () |
| 71 | + |
| 72 | +let () = |
| 73 | + for _ = 1 to 1000 do |
| 74 | + main () |
| 75 | + done; |
| 76 | + Printf.printf "domain-workers: ✓\n"; |
| 77 | + flush_all (); |
| 78 | + exit 0 |
0 commit comments