Skip to content

Commit a5c0881

Browse files
better encapsulation of multidomain-sync internals
1 parent 1cc2803 commit a5c0881

File tree

3 files changed

+70
-67
lines changed

3 files changed

+70
-67
lines changed

src/core/lwt.ml

Lines changed: 61 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -364,61 +364,63 @@ module Storage_map =
364364
end)
365365
type storage = (unit -> unit) Storage_map.t
366366

367+
module Multidomain_sync = struct
368+
369+
(* callback_exchange is a domain-indexed map for storing callbacks that
370+
different domains should execute. This is used when a domain d1 resolves a
371+
promise on which a different domain d2 has attached callbacks (implicitely
372+
via bind etc. or explicitly via on_success etc.). When this happens, the
373+
domain resolving the promise calls its local callbacks and sends the other
374+
domains' callbacks into the callback exchange *)
375+
let callback_exchange = Domain_map.create_protected_map ()
376+
377+
(* notification_map is a domain-indexed map for waking sleeping domains. each
378+
(should) domain registers a notification (see Lwt_unix) into the map when it
379+
starts its scheduler. other domains can wake the domain up to indicate that
380+
callbacks are available to be called *)
381+
let notification_map = Domain_map.create_protected_map ()
382+
383+
(* send_callback d cb adds the callback cb into the callback_exchange and pings
384+
the domain d via the notification_map *)
385+
let send_callback d cb =
386+
Domain_map.update
387+
callback_exchange
388+
d
389+
(function
390+
| None ->
391+
let cbs = Lwt_sequence.create () in
392+
let _ : (unit -> unit) Lwt_sequence.node = Lwt_sequence.add_l cb cbs in
393+
Some cbs
394+
| Some cbs ->
395+
let _ : (unit -> unit) Lwt_sequence.node = Lwt_sequence.add_l cb cbs in
396+
Some cbs);
397+
begin match Domain_map.find notification_map d with
398+
| None ->
399+
failwith "ERROR: domain didn't register at startup"
400+
| Some n ->
401+
n ()
402+
end
367403

368-
(* callback_exchange is a domain-indexed map for storing callbacks that
369-
different domains should execute. This is used when a domain d1 resolves a
370-
promise on which a different domain d2 has attached callbacks (implicitely
371-
via bind etc. or explicitly via on_success etc.). When this happens, the
372-
domain resolving the promise calls its local callbacks and sends the other
373-
domains' callbacks into the callback exchange *)
374-
let callback_exchange = Domain_map.create_protected_map ()
375-
376-
(* notification_map is a domain-indexed map for waking sleeping domains. each
377-
(should) domain registers a notification (see Lwt_unix) into the map when it
378-
starts its scheduler. other domains can wake the domain up to indicate that
379-
callbacks are available to be called *)
380-
let notification_map = Domain_map.create_protected_map ()
381-
382-
(* send_callback d cb adds the callback cb into the callback_exchange and pings
383-
the domain d via the notification_map *)
384-
let send_callback d cb =
385-
Domain_map.update
386-
callback_exchange
387-
d
388-
(function
389-
| None ->
390-
let cbs = Lwt_sequence.create () in
391-
let _ : (unit -> unit) Lwt_sequence.node = Lwt_sequence.add_l cb cbs in
392-
Some cbs
393-
| Some cbs ->
394-
let _ : (unit -> unit) Lwt_sequence.node = Lwt_sequence.add_l cb cbs in
395-
Some cbs);
396-
begin match Domain_map.find notification_map d with
397-
| None ->
398-
failwith "ERROR: domain didn't register at startup"
399-
| Some n ->
400-
n ()
401-
end
402-
403-
(* get_sent_callbacks gets a domain's own callback from the callbasck exchange,
404-
this is so that the notification handler installed by main.run can obtain the
405-
callbacks that have been sent its way *)
406-
let get_sent_callbacks domain_id =
407-
match Domain_map.extract callback_exchange domain_id with
408-
| None -> Lwt_sequence.create ()
409-
| Some cbs -> cbs
410-
411-
(* register_notification adds a domain's own notification (see Lwt_unix) into
412-
the notification map *)
413-
let register_notification d n =
414-
Domain_map.update notification_map d (function
415-
| None -> Some n
416-
| Some _ -> failwith "already registered!!")
417-
418-
let is_alredy_registered d =
419-
match Domain_map.find notification_map d with
420-
| Some _ -> true
421-
| None -> false
404+
(* get_sent_callbacks gets a domain's own callback from the callbasck exchange,
405+
this is so that the notification handler installed by main.run can obtain the
406+
callbacks that have been sent its way *)
407+
let get_sent_callbacks domain_id =
408+
match Domain_map.extract callback_exchange domain_id with
409+
| None -> Lwt_sequence.create ()
410+
| Some cbs -> cbs
411+
412+
(* register_notification adds a domain's own notification (see Lwt_unix) into
413+
the notification map *)
414+
let register_notification d n =
415+
Domain_map.update notification_map d (function
416+
| None -> Some n
417+
| Some _ -> failwith "already registered!!")
418+
419+
let is_alredy_registered d =
420+
match Domain_map.find notification_map d with
421+
| Some _ -> true
422+
| None -> false
423+
end
422424

423425
module Main_internal_types =
424426
struct
@@ -1230,7 +1232,7 @@ struct
12301232
Domain.DLS.set current_storage storage;
12311233
handle_with_async_exception_hook f ()
12321234
end else
1233-
send_callback domain (fun () ->
1235+
Multidomain_sync.send_callback domain (fun () ->
12341236
Domain.DLS.set current_storage storage;
12351237
handle_with_async_exception_hook f ()
12361238
)
@@ -1240,7 +1242,7 @@ struct
12401242
begin if domain = Domain.self () then
12411243
Lwt_sequence.remove node
12421244
else
1243-
send_callback domain (fun () -> Lwt_sequence.remove node)
1245+
Multidomain_sync.send_callback domain (fun () -> Lwt_sequence.remove node)
12441246
end;
12451247
iter_list rest
12461248
| Cancel_callback_list_concat (fs, fs') ->
@@ -1265,7 +1267,7 @@ struct
12651267
begin if domain = Domain.self () then
12661268
f result
12671269
else
1268-
send_callback domain (fun () -> f result)
1270+
Multidomain_sync.send_callback domain (fun () -> f result)
12691271
end;
12701272
iter_list rest
12711273
| Regular_callback_list_explicitly_removable_callback (_, {contents = None}) ->
@@ -1274,7 +1276,7 @@ struct
12741276
begin if domain = Domain.self () then
12751277
f result
12761278
else
1277-
send_callback domain (fun () -> f result)
1279+
Multidomain_sync.send_callback domain (fun () -> f result)
12781280
end;
12791281
iter_list rest
12801282
| Regular_callback_list_concat (fs, fs') ->
@@ -3308,4 +3310,5 @@ end
33083310
module Private = struct
33093311
type nonrec storage = storage
33103312
module Sequence_associated_storage = Sequence_associated_storage
3313+
module Multidomain_sync = Multidomain_sync
33113314
end

src/core/lwt.mli

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2071,10 +2071,10 @@ module Private : sig
20712071
val empty_storage : storage
20722072
val current_storage : storage Domain.DLS.key
20732073
end
2074-
end [@@alert trespassing "for internal use only, keep away"]
20752074

2076-
[@@@ocaml.warning "-3"]
2077-
(* this is only for cross-domain scheduler synchronisation *)
2078-
val get_sent_callbacks : Domain.id -> (unit -> unit) Lwt_sequence.t
2079-
val register_notification : Domain.id -> (unit -> unit) -> unit
2080-
val is_alredy_registered : Domain.id -> bool
2075+
module Multidomain_sync : sig
2076+
val get_sent_callbacks : Domain.id -> (unit -> unit) Lwt_sequence.t[@ocaml.warning "-3"]
2077+
val register_notification : Domain.id -> (unit -> unit) -> unit
2078+
val is_alredy_registered : Domain.id -> bool
2079+
end
2080+
end [@@alert trespassing "for internal use only, keep away"]

src/unix/lwt_main.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ let abandon_yielded_and_paused () =
2222

2323
let run p =
2424
let domain_id = Domain.self () in
25-
let () = if Lwt.is_alredy_registered domain_id then
25+
let () = if (Lwt.Private.Multidomain_sync.is_alredy_registered[@alert "-trespassing"]) domain_id then
2626
()
2727
else begin
2828
let n = Lwt_unix.make_notification domain_id (fun () ->
29-
let cbs = Lwt.get_sent_callbacks domain_id in
29+
let cbs = (Lwt.Private.Multidomain_sync.get_sent_callbacks[@alert "-trespassing"]) domain_id in
3030
Lwt_sequence.iter_l (fun f -> f ()) cbs
3131
) in
32-
Lwt.register_notification domain_id (fun () -> Lwt_unix.send_notification domain_id n)
32+
(Lwt.Private.Multidomain_sync.register_notification[@alert "-trespassing"]) domain_id (fun () -> Lwt_unix.send_notification domain_id n)
3333
end
3434
in
3535
let rec run_loop () =

0 commit comments

Comments
 (0)