Skip to content

Commit cfbf5cf

Browse files
committed
Refine the Awaitable abstraction
The use of `Trigger` is exposed to allow awaiting for multiple things. The internal cleanup is made more robust to make sure awaitables will not be leaked.
1 parent 1ba3357 commit cfbf5cf

File tree

3 files changed

+66
-54
lines changed

3 files changed

+66
-54
lines changed

lib/picos_std.awaitable/dune

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
(library
22
(name picos_std_awaitable)
33
(public_name picos_std.awaitable)
4-
(libraries picos picos_aux.htbl backoff multicore-magic))
4+
(libraries
5+
(re_export picos)
6+
picos_aux.htbl
7+
backoff
8+
multicore-magic))
59

610
(mdx
711
(package picos_meta)

lib/picos_std.awaitable/picos_std_awaitable.ml

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -355,13 +355,14 @@ module Awaitable = struct
355355
let update t ~signal ~count =
356356
try
357357
let signal = ref signal in
358+
let count = ref count in
358359
let backoff = ref Backoff.default in
359360
while
360361
not
361362
(let before = Htbl.find_exn awaiters t in
362363
match
363-
if !signal then Awaiters.signal before ~count
364-
else Awaiters.cleanup before ~count
364+
if !signal then Awaiters.signal before ~count:!count
365+
else Awaiters.cleanup before ~count:!count
365366
with
366367
| Zero -> Htbl.try_compare_and_remove awaiters t before
367368
| One r ->
@@ -373,58 +374,70 @@ module Awaitable = struct
373374
before == after
374375
|| Htbl.try_compare_and_set awaiters t before after)
375376
do
377+
(* Even if the hash table update after signal fails, the trigger(s) have
378+
been signaled. *)
376379
signal := false;
380+
(* If a single awaiter and multi awaiter cleanup are attempted in
381+
parallel it might be that a multi awaiter cleanup "succeeds" and yet
382+
some awaiters are left in the queue. For this reason we perform a
383+
multi awaiter cleanup after failure. It might be possible to improve
384+
upon this with some more clever approach. *)
385+
count := Int.max_int;
377386
backoff := Backoff.once !backoff
378387
done
379388
with Not_found -> ()
380389

381-
let add_as (type a) (t : a awaitable) value =
382-
let trigger = Trigger.create () in
383-
let one : Awaiters.is1 =
384-
One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero }
385-
in
386-
let backoff = ref Backoff.default in
387-
while
388-
not
389-
(match Htbl.find_exn awaiters (Packed t) with
390-
| before ->
391-
let many = Awaiters.snoc before one in
392-
Htbl.try_compare_and_set awaiters (Packed t) before (Min1 many)
393-
| exception Not_found -> Htbl.try_add awaiters (Packed t) (Min1 one))
394-
do
395-
backoff := Backoff.once !backoff
396-
done;
397-
one
398-
399390
module Awaiter = struct
400391
type t = Awaiters.is1
401392

402-
let add (type a) (t : a awaitable) =
403-
add_as t (Sys.opaque_identity (Obj.magic awaiters : a))
393+
let add_as (type a) (t : a awaitable) trigger value =
394+
let one : Awaiters.is1 =
395+
One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero }
396+
in
397+
let backoff = ref Backoff.default in
398+
while
399+
not
400+
(match Htbl.find_exn awaiters (Packed t) with
401+
| before ->
402+
let many = Awaiters.snoc before one in
403+
Htbl.try_compare_and_set awaiters (Packed t) before (Min1 many)
404+
| exception Not_found -> Htbl.try_add awaiters (Packed t) (Min1 one))
405+
do
406+
backoff := Backoff.once !backoff
407+
done;
408+
one
409+
410+
let add (type a) (t : a awaitable) trigger =
411+
let unique_value = Sys.opaque_identity (Obj.magic awaiters : a) in
412+
add_as t trigger unique_value
404413

405414
let remove one =
406415
Awaiters.signal_and_clear one;
407416
update (Awaiters.awaitable_of one) ~signal:false ~count:1
417+
end
408418

409-
let await one =
419+
let await t value =
420+
let trigger = Trigger.create () in
421+
let one = Awaiter.add_as t trigger value in
422+
if Awaiters.is_signalable one then Awaiter.remove one
423+
else
410424
match Awaiters.await one with
411425
| None -> ()
412426
| Some exn_bt ->
413427
Awaiters.clear one;
414428
update (Awaiters.awaitable_of one) ~signal:true ~count:1;
415429
Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)
416-
end
417-
418-
let await t value =
419-
let one = add_as t value in
420-
if Awaiters.is_signalable one then Awaiter.remove one else Awaiter.await one
421430

422431
let[@inline] broadcast t = update (Packed t) ~signal:true ~count:Int.max_int
423432
let[@inline] signal t = update (Packed t) ~signal:true ~count:1
424433

425434
let () =
426435
Stdlib.at_exit @@ fun () ->
427436
match Htbl.find_random_exn awaiters with
428-
| _ -> failwith "leaked awaitable"
437+
| _ ->
438+
(* This should not normally happen, but might happen due to the program
439+
being forced to exit without proper cleanup. Otherwise this may
440+
indicate a bug in the cleanup of awaiters. *)
441+
Printf.eprintf "Awaitable leaked\n%!"
429442
| exception Not_found -> ()
430443
end

lib/picos_std.awaitable/picos_std_awaitable.mli

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
(** Basic {{:https://en.wikipedia.org/wiki/Futex} futex}-like awaitable atomic
22
location for {!Picos}. *)
33

4+
open Picos
5+
46
(** {1 Modules} *)
57

68
module Awaitable : sig
@@ -18,7 +20,7 @@ module Awaitable : sig
1820

1921
(** {1 Atomic API} *)
2022

21-
type 'a t
23+
type !'a t
2224
(** Represents an awaitable atomic location. *)
2325

2426
val make : ?padded:bool -> 'a -> 'a t
@@ -90,34 +92,26 @@ module Awaitable : sig
9092
implicitly wake up awaiters. *)
9193

9294
module Awaiter : sig
93-
(** Ability to await for a signal from the past.
94-
95-
{!Awaitable.await} only receives a signal at or after the point of
96-
calling it. This API allows the awaiting process to be broken into two
97-
steps, {!add} and {!await}, such that a signal after {!add} can be
98-
received by {!await}. *)
95+
(** Low level interface for more flexible waiting. *)
9996

10097
type 'a awaitable := 'a t
10198
(** An erased type alias for {!Awaitable.t}. *)
10299

103100
type t
104101
(** Represents a single use awaiter of a signal to an {!awaitable}. *)
105102

106-
val add : 'a awaitable -> t
107-
(** [add awaitable] create a single use awaiter, adds it to the FIFO
108-
associated with the awaitable, and returns the awaiter. *)
109-
110-
val await : t -> unit
111-
(** [await awaiter] awaits for the association awaitable to be signaled. *)
103+
val add : 'a awaitable -> Trigger.t -> t
104+
(** [add awaitable trigger] creates a single use awaiter, adds it to the
105+
FIFO associated with the awaitable, and returns the awaiter. *)
112106

113107
val remove : t -> unit
114108
(** [remove awaiter] marks the awaiter as having been signaled and removes it
115109
from the FIFO associated with the awaitable.
116110
117-
⚠️ An explicit call of [remove] is needed when an {!add}ed awaiter is not
118-
{!await}ed for. In such a case, from the point of view of lost signals,
119-
the caller of [remove] should be considered to have received or consumed
120-
a signal before the call of [remove]. *)
111+
ℹ️ If the associated trigger is used with only one awaiter and the
112+
{!Trigger.await await} on the trigger returns [None], there is no need
113+
to explicitly remove the awaiter, because it has already been
114+
removed. *)
121115
end
122116
end
123117

@@ -164,7 +158,7 @@ end
164158
{2 [Condition]}
165159
166160
Let's also implement a condition variable. For that we'll also make use of
167-
low level operations in the {!Picos} core library:
161+
low level abstractions and operations from the {!Picos} core library:
168162
169163
{[
170164
# open Picos
@@ -180,20 +174,21 @@ end
180174
let create () = Awaitable.make ()
181175
182176
let wait t mutex =
183-
let awaiter = Awaitable.Awaiter.add t in
177+
let trigger = Trigger.create () in
178+
let awaiter = Awaitable.Awaiter.add t trigger in
184179
Mutex.unlock mutex;
185180
let lock_forbidden mutex =
186181
let fiber = Fiber.current () in
187182
let forbid = Fiber.exchange fiber ~forbid:true in
188183
Mutex.lock mutex;
189184
Fiber.set fiber ~forbid
190185
in
191-
match Awaitable.Awaiter.await awaiter with
192-
| () -> lock_forbidden mutex
193-
| exception exn ->
194-
let bt = Printexc.get_raw_backtrace () in
186+
match Trigger.await trigger with
187+
| None -> lock_forbidden mutex
188+
| Some exn_bt ->
189+
Awaitable.Awaiter.remove awaiter;
195190
lock_forbidden mutex;
196-
Printexc.raise_with_backtrace exn bt
191+
Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)
197192
198193
let signal = Awaitable.signal
199194
let broadcast = Awaitable.broadcast

0 commit comments

Comments
 (0)