Skip to content

Commit 445df5e

Browse files
committed
Add Picos_std_sync.Queue
1 parent 33ffdca commit 445df5e

File tree

8 files changed

+536
-0
lines changed

8 files changed

+536
-0
lines changed

bench/bench_queue.ml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
open Multicore_bench
2+
open Picos_std_sync
3+
4+
let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
5+
let t = Queue.create ~padded:true () in
6+
7+
let op push =
8+
if push then Queue.push t 101
9+
else match Queue.pop_exn t with _ -> () | exception Queue.Empty -> ()
10+
in
11+
12+
let init _ =
13+
assert (
14+
match Queue.pop_exn t with _ -> false | exception Queue.Empty -> true);
15+
Util.generate_push_and_pop_sequence n_msgs
16+
in
17+
let work _ bits = Util.Bits.iter op bits in
18+
19+
Times.record ~budgetf ~n_domains:1 ~init ~work ()
20+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
21+
22+
let run_one ~budgetf ~n_adders ~n_takers () =
23+
let n_domains = n_adders + n_takers in
24+
25+
let n_msgs = 50 * Util.iter_factor in
26+
27+
let t = Queue.create ~padded:true () in
28+
29+
let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in
30+
let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in
31+
32+
let init _ =
33+
assert (
34+
match Queue.pop_exn t with _ -> false | exception Queue.Empty -> true);
35+
Countdown.non_atomic_set n_msgs_to_add n_msgs;
36+
Countdown.non_atomic_set n_msgs_to_take n_msgs
37+
in
38+
let work i () =
39+
if i < n_adders then
40+
let rec work () =
41+
let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in
42+
if 0 < n then begin
43+
for i = 1 to n do
44+
Queue.push t i
45+
done;
46+
work ()
47+
end
48+
in
49+
work ()
50+
else
51+
let i = i - n_adders in
52+
let rec work () =
53+
let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in
54+
if 0 < n then
55+
let rec loop n =
56+
if 0 < n then begin
57+
match Queue.pop_exn t with
58+
| _ -> loop (n - 1)
59+
| exception Queue.Empty ->
60+
Backoff.once Backoff.default |> ignore;
61+
loop n
62+
end
63+
else work ()
64+
in
65+
loop n
66+
in
67+
work ()
68+
in
69+
70+
let config =
71+
let format role n =
72+
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
73+
in
74+
Printf.sprintf "%s, %s"
75+
(format "nb adder" n_adders)
76+
(format "nb taker" n_takers)
77+
in
78+
Times.record ~budgetf ~n_domains ~init ~work ()
79+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
80+
81+
let run_suite ~budgetf =
82+
run_one_domain ~budgetf ()
83+
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
84+
|> List.concat_map @@ fun (n_adders, n_takers) ->
85+
if Picos_domain.recommended_domain_count () < n_adders + n_takers then []
86+
else run_one ~budgetf ~n_adders ~n_takers ())

bench/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
(run %{test} -brief "Picos DLS")
1111
(run %{test} -brief "Yield with Picos_std_sync")
1212
(run %{test} -brief "Picos Spawn")
13+
(run %{test} -brief "Picos Queue")
1314
(run %{test} -brief "Picos Yield")
1415
(run %{test} -brief "Picos Cancel_after with Picos_select")
1516
(run %{test} -brief "Ref with Picos_std_sync")

bench/main.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ let benchmarks =
88
("Picos TLS", Bench_tls.run_suite);
99
("Picos DLS", Bench_dls.run_suite);
1010
("Yield with Picos_std_sync", Bench_lock_yield.run_suite);
11+
("Picos Queue", Bench_queue.run_suite);
1112
("Picos Spawn", Bench_spawn.run_suite);
1213
("Picos Yield", Bench_yield.run_suite);
1314
("Picos Cancel_after with Picos_select", Bench_cancel_after.run_suite);

lib/picos_std.sync/picos_std_sync.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ module Lazy = Lazy
88
module Latch = Latch
99
module Barrier = Barrier
1010
module Ivar = Ivar
11+
module Queue = Queue
1112
module Stream = Stream

lib/picos_std.sync/picos_std_sync.mli

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,81 @@ module Ivar : sig
695695
variable has either been assigned a value or has been poisoned. *)
696696
end
697697

698+
module Queue : sig
699+
(** A lock-free multi-producer, multi-consumer queue. *)
700+
701+
(** {1 API} *)
702+
703+
type !'a t
704+
(** A multi-producer, multi-consumer queue. *)
705+
706+
val create : ?padded:bool -> unit -> 'a t
707+
(** [create ()] returns a new empty multi-producer, multi-consumer queue. *)
708+
709+
val push : 'a t -> 'a -> unit
710+
(** [push queue value] adds the [value] to the tail of the [queue]. *)
711+
712+
val push_head : 'a t -> 'a -> unit
713+
(** [push_head queue value] adds the [value] to the head of the [queue]. *)
714+
715+
exception Empty
716+
(** Raised by {!pop_exn} in case it finds the queue empty. *)
717+
718+
val pop_exn : 'a t -> 'a
719+
(** [pop_exn queue] tries to remove the value at the head of the [queue].
720+
Returns the removed value or raises {!Empty} in case the queue was empty.
721+
722+
@raise Empty in case the queue was empty. *)
723+
724+
val pop_opt : 'a t -> 'a option
725+
(** [pop_opt queue] tries to remove the value at the head of the [queue].
726+
Returns the removed value or [None] in case the queue was empty. *)
727+
728+
val pop : 'a t -> 'a
729+
(** [pop queue] waits until the queue is not empty, removes the value at the
730+
head of the [queue], and returns it. *)
731+
732+
val length : 'a t -> int
733+
(** [length queue] returns the length or the number of values in the [queue].
734+
*)
735+
736+
val is_empty : 'a t -> bool
737+
(** [is_empty queue] is equivalent to {{!length} [length queue = 0]}. *)
738+
739+
(** {1 Examples}
740+
741+
An example top-level session:
742+
{[
743+
# let q : int Queue.t =
744+
Queue.create ()
745+
val q : int Picos_std_sync.Queue.t = <abstr>
746+
747+
# Queue.push q 42
748+
- : unit = ()
749+
750+
# Queue.push_head q 76
751+
- : unit = ()
752+
753+
# Queue.length q
754+
- : int = 2
755+
756+
# Queue.push q 101
757+
- : unit = ()
758+
759+
# Queue.pop_exn q
760+
- : int = 76
761+
762+
# Queue.pop_exn q
763+
- : int = 42
764+
765+
# Queue.pop_exn q
766+
- : int = 101
767+
768+
# Queue.pop_exn q
769+
Exception: Picos_std_sync__Queue.Empty.
770+
]} *)
771+
end
772+
698773
module Stream : sig
699774
(** A lock-free, poisonable, many-to-many, stream.
700775
@@ -768,6 +843,8 @@ end
768843
val push : 'a t -> 'a -> unit
769844
val pop : 'a t -> 'a
770845
end = struct
846+
module Queue = Stdlib.Queue
847+
771848
type 'a t = {
772849
lock : Lock.t;
773850
queue : 'a Queue.t;

0 commit comments

Comments
 (0)