Skip to content

Commit c258612

Browse files
committed
Change to use the Lwt_unix notification mechanism
Previously I had rolled my own version of a notification mechanism as I didn't know one already existed in `Lwt_unix`. This PR changes `Picos_lwt_unix` to use the existing mechanism provided by `Lwt_unix`.
1 parent 39e41f4 commit c258612

File tree

1 file changed

+45
-63
lines changed

1 file changed

+45
-63
lines changed

lib/picos_lwt_unix/picos_lwt_unix.ml

Lines changed: 45 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,41 @@
1-
open Lwt.Infix
2-
31
let[@inline never] not_main_thread () =
42
invalid_arg "not called from the main thread"
53

64
let ready = Picos_mpscq.create ()
75

8-
type pipes = {
9-
mutable count : int;
10-
mutable inn : Lwt_unix.file_descr;
11-
mutable out : Unix.file_descr;
12-
mutable close_promise : int Lwt.t;
13-
mutable close_resolver : int Lwt.u;
14-
}
15-
16-
let pipes =
17-
let close_promise, close_resolver = Lwt.wait () in
18-
{
19-
count = 0;
20-
inn = Lwt_unix.stdin;
21-
out = Unix.stdout;
22-
close_promise;
23-
close_resolver;
24-
}
25-
26-
let byte = Bytes.create 1
27-
28-
let rec forever () =
29-
match Picos_mpscq.pop_exn ready with
30-
| resolver ->
31-
Lwt.wakeup resolver ();
32-
forever ()
33-
| exception Picos_mpscq.Empty ->
34-
let inn = pipes.inn in
35-
if inn == Lwt_unix.stdin then Lwt.return_unit
36-
else
37-
Lwt.pick [ pipes.close_promise; Lwt_unix.read inn byte 0 1 ]
38-
>>= forever_check
39-
40-
and forever_check n = if n < 0 then Lwt.return_unit else forever ()
6+
type notification = { mutable ref_count : int; mutable id : int }
7+
8+
let notification = { ref_count = 0; id = 0 }
9+
let state = Atomic.make `Not_running
10+
11+
let notify_callback () =
12+
Atomic.set state `Running;
13+
let rec loop () =
14+
match Picos_mpscq.pop_exn ready with
15+
| resolver ->
16+
Lwt.wakeup resolver ();
17+
loop ()
18+
| exception Picos_mpscq.Empty -> begin
19+
match Atomic.get state with
20+
| `Not_running | `Notified ->
21+
Atomic.set state `Running;
22+
loop ()
23+
| `Running ->
24+
if not (Atomic.compare_and_set state `Running `Not_running) then
25+
loop ()
26+
end
27+
in
28+
loop ()
29+
30+
let rec notify () =
31+
match Atomic.get state with
32+
| `Notified -> ()
33+
| (`Running | `Not_running) as before ->
34+
if Atomic.compare_and_set state before `Notified then begin
35+
if before == `Not_running then
36+
Lwt_unix.send_notification notification.id
37+
end
38+
else notify ()
4139

4240
module System = struct
4341
let sleep = Lwt_unix.sleep
@@ -50,45 +48,29 @@ module System = struct
5048
if Picos_thread.is_main_thread () then Lwt.wakeup resolver ()
5149
else begin
5250
Picos_mpscq.push ready resolver;
53-
assert (1 = Unix.write pipes.out byte 0 1)
51+
notify ()
5452
end
5553

5654
let await (promise, _) = promise
5755
end
5856

5957
let system = (module System : Picos_lwt.System)
6058

61-
let pipes_incr () =
62-
let count = pipes.count + 1 in
63-
if count = 1 then begin
64-
let promise, resolver = Lwt.wait () in
65-
pipes.close_promise <- promise;
66-
pipes.close_resolver <- resolver;
67-
let inn, out = Lwt_unix.pipe_in ~cloexec:true () in
68-
pipes.inn <- inn;
69-
pipes.out <- out;
70-
pipes.count <- count;
71-
Lwt.async forever
72-
end
73-
else pipes.count <- count
74-
75-
let pipes_decr _ =
76-
let count = pipes.count - 1 in
77-
if count = 0 then begin
78-
Lwt.wakeup pipes.close_resolver (-1);
79-
Unix.close pipes.out;
80-
pipes.out <- Unix.stdout;
81-
Lwt.async (fun () -> Lwt_unix.close pipes.inn);
82-
pipes.inn <- Lwt_unix.stdin;
83-
pipes.count <- count
84-
end
85-
else pipes.count <- count
59+
let notification_decr _ =
60+
let ref_count = notification.ref_count - 1 in
61+
notification.ref_count <- ref_count;
62+
if ref_count = 0 then Lwt_unix.stop_notification notification.id
8663

8764
let run ?forbid main =
8865
if not (Picos_thread.is_main_thread ()) then not_main_thread ();
89-
pipes_incr ();
66+
begin
67+
let ref_count = notification.ref_count + 1 in
68+
notification.ref_count <- ref_count;
69+
if ref_count = 1 then
70+
notification.id <- Lwt_unix.make_notification notify_callback
71+
end;
9072
let promise = Picos_lwt.run ?forbid system main in
91-
Lwt.on_any promise pipes_decr pipes_decr;
73+
Lwt.on_any promise notification_decr notification_decr;
9274
promise
9375

9476
let () = Lwt_main.run (Lwt_unix.sleep 0.0)

0 commit comments

Comments
 (0)