Skip to content

Commit 9f3917e

Browse files
WIP
1 parent 48f4348 commit 9f3917e

File tree

8 files changed

+58
-51
lines changed

8 files changed

+58
-51
lines changed

src/core/dune

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
(public_name lwt)
33
(synopsis "Monadic promises and concurrent I/O")
44
(wrapped false)
5-
(libraries domainslib)
65
(instrumentation
76
(backend bisect_ppx)))
87

src/core/lwt.ml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,14 @@ let send_callback d cb =
376376
callback_exchange :=
377377
Domain_map.update
378378
d
379-
(function | None -> Some [cb] | Some cbs -> Some (cb::cbs))
379+
(function
380+
| None ->
381+
let cbs = Lwt_sequence.create () in
382+
let _ : (unit -> unit) Lwt_sequence.node = Lwt_sequence.add_l cb cbs in
383+
Some cbs
384+
| Some cbs ->
385+
let _ : (unit -> unit) Lwt_sequence.node = Lwt_sequence.add_l cb cbs in
386+
Some cbs)
380387
!callback_exchange;
381388
begin match Domain_map.find_opt d !notification_map with
382389
| None ->
@@ -391,7 +398,7 @@ let send_callback d cb =
391398
let get_sent_callbacks () =
392399
Mutex.protect callback_mutex (fun () ->
393400
match Domain_map.find_opt (Domain.self ()) !callback_exchange with
394-
| None -> []
401+
| None -> Lwt_sequence.create ()
395402
| Some cbs ->
396403
callback_exchange := Domain_map.remove (Domain.self ()) !callback_exchange;
397404
cbs
@@ -1242,7 +1249,7 @@ struct
12421249
iter_list rest
12431250
end else
12441251
send_callback domain (fun () -> f result)
1245-
| Regular_callback_list_explicitly_removable_callback (domain, {contents = None}) ->
1252+
| Regular_callback_list_explicitly_removable_callback (_, {contents = None}) ->
12461253
iter_list rest
12471254
| Regular_callback_list_explicitly_removable_callback (domain, {contents = Some f}) ->
12481255
begin if domain = Domain.self () then

src/core/lwt.mli

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2062,6 +2062,7 @@ val abandon_wakeups : unit -> unit
20622062

20632063
val debug_state_is : 'a state -> 'a t -> bool t
20642064

2065+
[@@@ocaml.warning "-3"]
20652066
(* this is only for cross-domain scheduler synchronisation *)
2066-
val get_sent_callbacks : unit -> (unit -> unit) list
2067+
val get_sent_callbacks : unit -> (unit -> unit) Lwt_sequence.t
20672068
val register_notification : Domain.id -> (unit -> unit) -> unit

src/unix/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,6 @@
191191
(flags
192192
(:include unix_c_flags.sexp)))
193193
(c_library_flags
194-
(:include unix_c_library_flags.sexp))
194+
(:include unix_c_library_flags.sexp) -fPIC -pthread)
195195
(instrumentation
196196
(backend bisect_ppx)))

src/unix/lwt_main.ml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ let abandon_yielded_and_paused () =
2222

2323
let run p =
2424
let n = Lwt_unix.make_notification (fun () ->
25-
List.iter (fun cb -> cb ()) (Lwt.get_sent_callbacks ()) (* ??? *)
25+
let cbs = Lwt.get_sent_callbacks () in
26+
Printf.printf "domain %d go brrrr(%d)\n" (Domain.self () :> int) (Lwt_sequence.length cbs);
27+
flush_all ();
28+
Lwt_sequence.iter_l (fun f -> f ()) cbs
2629
) in
2730
let () = Lwt.register_notification (Domain.self ()) (fun () -> Lwt_unix.send_notification n) in
2831
let rec run_loop () =

src/unix/lwt_unix.cppo.ml

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,10 @@ let notifiers = Domain.DLS.new_key (fun () -> Notifiers.create 1024)
8282

8383
(* See https://github.com/ocsigen/lwt/issues/277 and
8484
https://github.com/ocsigen/lwt/pull/278. *)
85-
let current_notification_id = Domain.DLS.new_key (fun () -> ref (0x7FFFFFFF - 1000))
86-
87-
let rec find_free_id id =
88-
if Notifiers.mem (Domain.DLS.get notifiers) id then
89-
find_free_id (id + 1)
90-
else
91-
id
85+
let current_notification_id = Atomic.make (0x7FFFFFFF - 1000)
9286

9387
let make_notification ?(once=false) f =
94-
let id = find_free_id (!(Domain.DLS.get current_notification_id) + 1) in
95-
(Domain.DLS.get current_notification_id) := id;
88+
let id = Atomic.fetch_and_add current_notification_id 1 in
9689
Notifiers.add (Domain.DLS.get notifiers) id { notify_once = once; notify_handler = f };
9790
id
9891

src/unix/lwt_unix_stubs.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -496,13 +496,13 @@ CAMLprim value lwt_unix_socketpair_stub(value cloexec, value domain, value type,
496496
static lwt_unix_mutex notification_mutex;
497497

498498
/* All pending notifications. */
499-
static intnat *notifications = NULL;
499+
_Thread_local static intnat *notifications = NULL;
500500

501501
/* The size of the notification buffer. */
502-
static long notification_count = 0;
502+
_Thread_local static long notification_count = 0;
503503

504504
/* The index to the next available cell in the notification buffer. */
505-
static long notification_index = 0;
505+
_Thread_local static long notification_index = 0;
506506

507507
/* The mode currently used for notifications. */
508508
enum notification_mode {
@@ -523,14 +523,14 @@ enum notification_mode {
523523
};
524524

525525
/* The current notification mode. */
526-
static enum notification_mode notification_mode =
526+
_Thread_local static enum notification_mode notification_mode =
527527
NOTIFICATION_MODE_NOT_INITIALIZED;
528528

529529
/* Send one notification. */
530-
static int (*notification_send)();
530+
_Thread_local static int (*notification_send)();
531531

532532
/* Read one notification. */
533-
static int (*notification_recv)();
533+
_Thread_local static int (*notification_recv)();
534534

535535
static void init_notifications() {
536536
lwt_unix_mutex_init(&notification_mutex);

test/multidomain/basic.ml

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,53 @@
11
open Lwt.Syntax
22

3-
let pmain, wmain = Lwt.wait ()
4-
let palt, walt = Lwt.wait ()
3+
let p_one, w_one = Lwt.wait ()
4+
let p_two, w_two = Lwt.wait ()
55

6-
let dalt = Domain.spawn (fun () ->
6+
let d_one = Domain.spawn (fun () ->
7+
(* domain one: wait for value from domain two then work and then send a value *)
8+
Lwt_main.run (
9+
let* v_two = p_two in
10+
Printf.printf "one received %d from two\n" v_two;
11+
flush_all ();
12+
let* () = Lwt_unix.sleep 0.1 in
13+
Printf.printf "one slept\n";
14+
flush_all ();
15+
let v_one = 3 in
16+
Lwt.wakeup w_one v_one;
17+
Printf.printf "one sent %d\n" v_one;
18+
flush_all ();
19+
let* v_two = p_two and* v_one = p_one in
20+
Lwt.return (v_two * v_one)
21+
)
22+
)
23+
let d_two = Domain.spawn (fun () ->
724
Lwt_main.run (
825
let () =
9-
(* concurrent thread within alt domain *)
26+
(* concurrent thread within domain "two" send a value and then work and
27+
then wait for a value from domain one *)
1028
Lwt.dont_wait (fun () ->
1129
let* () = Lwt_unix.sleep 0.1 in
12-
Printf.printf "alt slept\n";
30+
Printf.printf "two slept\n";
1331
flush_all ();
14-
let valt = 2 in
15-
Lwt.wakeup walt valt;
16-
Printf.printf "alt sent %d\n" valt;
32+
let v_two = 2 in
33+
Lwt.wakeup w_two v_two;
34+
Printf.printf "two sent %d\n" v_two;
1735
flush_all ();
18-
let* from_main = pmain in
19-
Printf.printf "alt received %d from main\n" from_main;
36+
let* from_one = p_one in
37+
Printf.printf "two received %d from one\n" from_one;
2038
flush_all ();
2139
Lwt.return ()
2240
)
2341
(fun _ -> exit 1)
2442
in
25-
let* valt = palt and* vmain = pmain in
26-
Lwt.return (valt + vmain)
43+
let* v_two = p_two and* v_one = p_one in
44+
Lwt.return (v_two + v_one)
2745
)
2846
)
2947

30-
let main =
31-
Lwt_main.run (
32-
let* valt = palt in
33-
Printf.printf "main received %d from alt\n" valt;
34-
flush_all ();
35-
let* () = Lwt_unix.sleep 0.1 in
36-
Printf.printf "main slept\n";
37-
flush_all ();
38-
let vmain = 3 in
39-
Lwt.wakeup wmain vmain;
40-
Printf.printf "main sent %d\n" vmain;
41-
flush_all ();
42-
let* valt = palt and* vmain = pmain in
43-
Lwt.return (valt * vmain)
44-
)
4548

46-
let alt = Domain.join dalt
49+
let one = Domain.join d_one
50+
let two = Domain.join d_two
4751

48-
let () = Printf.printf "alt: %d, main: %d\n" alt main
52+
let () = Printf.printf "one: %d, two: %d\n" one two
4953
let () = flush_all ()

0 commit comments

Comments
 (0)