Skip to content

Commit 9a538c5

Browse files
notifications now work with an abstract id
abstract id includes domain id for location of the domain this breaks backwards compat with 5.* less
1 parent a5c0881 commit 9a538c5

File tree

7 files changed

+69
-44
lines changed

7 files changed

+69
-44
lines changed

src/unix/lwt_gc.ml

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,16 @@ let ensure_termination t =
2020
(fun () -> Lwt_main.Exit_hooks.remove hook; Lwt.return_unit))
2121
end
2222

23-
let finaliser f =
24-
(* In order for the domain id to be consistent, wherever the real finaliser is
25-
called, we pass it in the continuation. *)
26-
let domain_id = Domain.self () in
23+
let finaliser ?domain f =
24+
let domain = match domain with None -> Domain.self () | Some domain -> domain in
2725
(* In order not to create a reference to the value in the
2826
notification callback, we use an initially unset option cell
2927
which will be filled when the finaliser is called. *)
3028
let opt = ref None in
3129
let id =
3230
Lwt_unix.make_notification
3331
~once:true
34-
domain_id
32+
domain
3533
(fun () ->
3634
match !opt with
3735
| None ->
@@ -43,10 +41,10 @@ let finaliser f =
4341
(* The real finaliser: fill the cell and send a notification. *)
4442
(fun x ->
4543
opt := Some x;
46-
Lwt_unix.send_notification domain_id id)
44+
Lwt_unix.send_notification id)
4745

48-
let finalise f x =
49-
Gc.finalise (finaliser f) x
46+
let finalise ?domain f x =
47+
Gc.finalise (finaliser ?domain f) x
5048

5149
(* Exit hook for a finalise_or_exit *)
5250
let foe_exit f called weak () =

src/unix/lwt_gc.mli

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,25 @@
99
thread to a value, without having to use [Lwt_unix.run] in the
1010
finaliser. *)
1111

12-
val finalise : ('a -> unit Lwt.t) -> 'a -> unit
12+
val finalise : ?domain:Domain.id -> ('a -> unit Lwt.t) -> 'a -> unit
1313
(** [finalise f x] ensures [f x] is evaluated after [x] has been
1414
garbage collected. If [f x] yields, then Lwt will wait for its
1515
termination at the end of the program.
1616
1717
Note that [f x] is not called at garbage collection time, but
18-
later in the main loop. *)
18+
later in the main loop.
19+
20+
If [domain] is provided, then [f x] is evaluated in the corresponding
21+
domain. Otherwise it is evaluated in the domain calling [finalise]. If
22+
Lwt is not running in the domain set to run the finaliser, an
23+
unspecified error occurs at an unspecified time or the finaliser doesn't
24+
run or some other bad thing happens. *)
1925

2026
val finalise_or_exit : ('a -> unit Lwt.t) -> 'a -> unit
2127
(** [finalise_or_exit f x] call [f x] when [x] is garbage collected
22-
or (exclusively) when the program exits. *)
28+
or (exclusively) when the program exits.
29+
30+
The finaliser [f] is called in the same domain that called
31+
[finalise_or_exit]. If there is no Lwt scheduler running in this domain an
32+
unspecified error occurs. You can use [Lwt_preemptive.run_in_domain] to
33+
bypass the same-domain limitation. *)

src/unix/lwt_main.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ let run p =
2929
let cbs = (Lwt.Private.Multidomain_sync.get_sent_callbacks[@alert "-trespassing"]) domain_id in
3030
Lwt_sequence.iter_l (fun f -> f ()) cbs
3131
) in
32-
(Lwt.Private.Multidomain_sync.register_notification[@alert "-trespassing"]) domain_id (fun () -> Lwt_unix.send_notification domain_id n)
32+
(Lwt.Private.Multidomain_sync.register_notification[@alert "-trespassing"]) domain_id (fun () -> Lwt_unix.send_notification n)
3333
end
3434
in
3535
let rec run_loop () =

src/unix/lwt_main.mli

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,14 @@ val abandon_yielded_and_paused : unit -> unit [@@deprecated "Use Lwt.abandon_pau
7777

7878

7979
(** Hook sequences. Each module of this type is a set of hooks, to be run by Lwt
80-
at certain points during execution. See modules {!Enter_iter_hooks},
81-
{!Leave_iter_hooks}, and {!Exit_hooks}. *)
80+
at certain points during execution.
81+
82+
Hooks are added for the current domain. If you are calling the Hook
83+
functions from a domain where Lwt is not running a scheduler then some
84+
unspecified error may occur. If you need to set some Hooks to/from a
85+
different domain, you can use [Lwt_preemptive.run_in_domain].
86+
87+
See modules {!Enter_iter_hooks}, {!Leave_iter_hooks}, and {!Exit_hooks}. *)
8288
module type Hooks =
8389
sig
8490
type 'return_value kind

src/unix/lwt_preemptive.ml

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ struct
7878
end
7979

8080
type thread = {
81-
task_cell: (int * (unit -> unit)) CELL.t;
81+
task_cell: (Lwt_unix.notification_id * (unit -> unit)) CELL.t;
8282
(* Channel used to communicate notification id and tasks to the
8383
worker thread. *)
8484

@@ -104,7 +104,7 @@ let rec worker_loop worker =
104104
decreased the maximum: *)
105105
if Atomic.get threads_count > Atomic.get max_threads then worker.reuse <- false;
106106
(* Tell the main thread that work is done: *)
107-
Lwt_unix.send_notification (Domain.self ()) id;
107+
Lwt_unix.send_notification id;
108108
if worker.reuse then worker_loop worker
109109

110110
(* create a new worker: *)
@@ -186,6 +186,7 @@ let detach f args =
186186
get_worker () >>= fun worker ->
187187
let waiter, wakener = Lwt.wait () in
188188
let id =
189+
(* call back the domain that called the [detach] function: self *)
189190
Lwt_unix.make_notification ~once:true (Domain.self ())
190191
(fun () -> Lwt.wakeup_result wakener !result)
191192
in
@@ -216,23 +217,27 @@ let jobs = Queue.create ()
216217
(* Mutex to protect access to [jobs]. *)
217218
let jobs_mutex = Mutex.create ()
218219

219-
let job_notification =
220-
Lwt_unix.make_notification (Domain.self ())
220+
let job_notification = Domain_map.create_protected_map ()
221+
let get_job_notification d =
222+
Domain_map.init job_notification d
221223
(fun () ->
222-
(* Take the first job. The queue is never empty at this
223-
point. *)
224-
Mutex.lock jobs_mutex;
225-
let thunk = Queue.take jobs in
226-
Mutex.unlock jobs_mutex;
227-
ignore (thunk ()))
224+
Lwt_unix.make_notification (Domain.self ())
225+
(fun () ->
226+
(* Take the first job. The queue is never empty at this
227+
point. *)
228+
Mutex.lock jobs_mutex;
229+
let thunk = Queue.take jobs in
230+
Mutex.unlock jobs_mutex;
231+
ignore (thunk ()))
232+
)
228233

229234
let run_in_domain_dont_wait d f =
230235
(* Add the job to the queue. *)
231236
Mutex.lock jobs_mutex;
232237
Queue.add f jobs;
233238
Mutex.unlock jobs_mutex;
234239
(* Notify the main thread. *)
235-
Lwt_unix.send_notification d job_notification
240+
Lwt_unix.send_notification (get_job_notification d)
236241

237242
(* There is a potential performance issue from creating a cell every time this
238243
function is called. See:

src/unix/lwt_unix.cppo.ml

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ let notifiers = Domain_map.create_protected_map ()
8484
https://github.com/ocsigen/lwt/pull/278. *)
8585
let current_notification_id = Atomic.make (0x7FFFFFFF - 1000)
8686

87+
type notification_id = Domain.id * int
88+
8789
let make_notification ?(once=false) domain_id f =
8890
let id = Atomic.fetch_and_add current_notification_id 1 in
8991
Domain_map.update notifiers domain_id
@@ -95,17 +97,17 @@ let make_notification ?(once=false) domain_id f =
9597
| Some notifiers ->
9698
Notifiers.add notifiers id { notify_once = once; notify_handler = f };
9799
Some notifiers);
98-
id
100+
(domain_id, id)
99101

100-
let stop_notification domain_id id =
102+
let stop_notification (domain_id, id) =
101103
Domain_map.update notifiers domain_id
102104
(function
103105
| None -> None
104106
| Some notifiers ->
105107
Notifiers.remove notifiers id;
106108
Some notifiers)
107109

108-
let set_notification domain_id id f =
110+
let set_notification (domain_id, id) f =
109111
Domain_map.update notifiers domain_id
110112
(function
111113
| None -> raise Not_found
@@ -114,7 +116,7 @@ let set_notification domain_id id f =
114116
Notifiers.replace notifiers id { notifier with notify_handler = f };
115117
Some notifiers)
116118

117-
let call_notification domain_id id =
119+
let call_notification (domain_id, id) =
118120
match Domain_map.find notifiers domain_id with
119121
| None -> ()
120122
| Some notifiers ->
@@ -209,7 +211,7 @@ let run_job_aux async_method job result =
209211
jobs in
210212
ignore begin
211213
(* Create the notification for asynchronous wakeup. *)
212-
let id =
214+
let (_, notifid) as id =
213215
make_notification ~once:true domain_id
214216
(fun () ->
215217
Lwt_sequence.remove node;
@@ -220,7 +222,7 @@ let run_job_aux async_method job result =
220222
notification. *)
221223
Lwt.pause () >>= fun () ->
222224
(* The job has terminated, send the result immediately. *)
223-
if check_job job id then call_notification domain_id id;
225+
if check_job job notifid then call_notification id;
224226
Lwt.return_unit
225227
end;
226228
waiter
@@ -2199,11 +2201,12 @@ let tcflow ch act =
21992201

22002202
external init_notification : Domain.id -> Unix.file_descr = "lwt_unix_init_notification_stub"
22012203
external send_notification : Domain.id -> int -> unit = "lwt_unix_send_notification_stub"
2204+
let send_notification (d, n) = send_notification d n
22022205
external recv_notifications : Domain.id -> int array = "lwt_unix_recv_notifications_stub"
22032206

22042207
let handle_notifications (_ : Lwt_engine.event) =
22052208
let domain_id = Domain.self () in
2206-
Array.iter (call_notification domain_id) (recv_notifications domain_id)
2209+
Array.iter (fun n -> call_notification (domain_id, n)) (recv_notifications domain_id)
22072210

22082211
let event_notifications =
22092212
Domain.DLS.new_key (fun () ->
@@ -2247,8 +2250,8 @@ type signal_handler = {
22472250

22482251
and signal_handler_id = signal_handler option ref
22492252

2250-
(* TODO: what to do about signals? *)
2251-
let signals = ref Signal_map.empty
2253+
(* TODO: make parallel safe *)
2254+
let signals : ((Domain.id * int) * ((signal_handler_id -> file_perm -> unit) Lwt_sequence.t) ) Signal_map.t ref = ref Signal_map.empty
22522255
let signal_count () =
22532256
Signal_map.fold
22542257
(fun _signum (_id, actions) len -> len + Lwt_sequence.length actions)
@@ -2262,17 +2265,17 @@ let on_signal_full signum handler =
22622265
Signal_map.find signum !signals
22632266
with Not_found ->
22642267
let actions = Lwt_sequence.create () in
2265-
let notification =
2268+
let (_, notifid) as notification =
22662269
make_notification (Domain.self ())
22672270
(fun () ->
22682271
Lwt_sequence.iter_l
22692272
(fun f -> f id signum)
22702273
actions)
22712274
in
22722275
(try
2273-
set_signal signum notification
2276+
set_signal signum notifid
22742277
with exn when Lwt.Exception_filter.run exn ->
2275-
stop_notification (Domain.self ()) notification;
2278+
stop_notification notification;
22762279
raise exn);
22772280
signals := Signal_map.add signum (notification, actions) !signals;
22782281
(notification, actions)
@@ -2294,13 +2297,13 @@ let disable_signal_handler id =
22942297
if Lwt_sequence.is_empty actions then begin
22952298
remove_signal sh.sh_num;
22962299
signals := Signal_map.remove sh.sh_num !signals;
2297-
stop_notification (Domain.self ()) notification
2300+
stop_notification notification
22982301
end
22992302

23002303
let reinstall_signal_handler signum =
23012304
match Signal_map.find signum !signals with
23022305
| exception Not_found -> ()
2303-
| notification, _ ->
2306+
| (_, notification), _ ->
23042307
set_signal signum notification
23052308

23062309
(* +-----------------------------------------------------------------+

src/unix/lwt_unix.cppo.mli

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,29 +1462,31 @@ val wait_for_jobs : unit -> unit Lwt.t
14621462
(** Lwt internally use a pipe to send notification to the main
14631463
thread. The following functions allow to use this pipe. *)
14641464

1465-
val make_notification : ?once : bool -> Domain.id -> (unit -> unit) -> int
1465+
type notification_id
1466+
1467+
val make_notification : ?once : bool -> Domain.id -> (unit -> unit) -> notification_id
14661468
(** [make_notification ?once f] registers a new notifier. It returns the
14671469
id of the notifier. Each time a notification with this id is
14681470
received, [f] is called.
14691471
14701472
if [once] is specified, then the notification is stopped after
14711473
the first time it is received. It defaults to [false]. *)
14721474

1473-
val send_notification : Domain.id -> int -> unit
1475+
val send_notification : notification_id -> unit
14741476
(** [send_notification id] sends a notification.
14751477
14761478
This function is thread-safe. *)
14771479

1478-
val stop_notification : Domain.id -> int -> unit
1480+
val stop_notification : notification_id -> unit
14791481
(** Stop the given notification. Note that you should not reuse the
14801482
id after the notification has been stopped, the result is
14811483
unspecified if you do so. *)
14821484

1483-
val call_notification : Domain.id -> int -> unit
1485+
val call_notification : notification_id -> unit
14841486
(** Call the handler associated to the given notification. Note that
14851487
if the notification was defined with [once = true] it is removed. *)
14861488

1487-
val set_notification : Domain.id -> int -> (unit -> unit) -> unit
1489+
val set_notification : notification_id -> (unit -> unit) -> unit
14881490
(** [set_notification id f] replace the function associated to the
14891491
notification by [f]. It raises [Not_found] if the given
14901492
notification is not found. *)

0 commit comments

Comments
 (0)