Skip to content

Commit

Permalink
MPMC 2-stack queue
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Apr 2, 2024
1 parent 7065809 commit fa499cd
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 0 deletions.
78 changes: 78 additions & 0 deletions bench/bench_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
open Multicore_bench
module Queue = Saturn_lockfree.Two_stack_queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in

let init _ =
assert (Queue.length t = 0);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Queue.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (Queue.length t = 0);
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then
let rec loop n =
if 0 < n then
match Queue.pop_opt t with
| None ->
Domain.cpu_relax ();
loop n
| Some _ -> loop (n - 1)
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ let benchmarks =
("Saturn_lockfree Size", Bench_size.run_suite);
("Saturn_lockfree Skiplist", Bench_skiplist.run_suite);
("Saturn_lockfree Stack", Bench_stack.run_suite);
("Saturn_lockfree Two_stack_queue", Bench_two_stack_queue.run_suite);
("Saturn_lockfree Work_stealing_deque", Bench_ws_deque.run_suite);
]

Expand Down
1 change: 1 addition & 0 deletions src/saturn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ module Single_prod_single_cons_queue =
module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Two_stack_queue = Saturn_lockfree.Two_stack_queue
1 change: 1 addition & 0 deletions src/saturn.mli
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ module Single_prod_single_cons_queue =
module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Two_stack_queue = Saturn_lockfree.Two_stack_queue
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Size = Size
module Skiplist = Skiplist
module Two_stack_queue = Two_stack_queue
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Skiplist
module Size = Size
module Two_stack_queue = Two_stack_queue
176 changes: 176 additions & 0 deletions src_lockfree/two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
(* Copyright (c) 2023, Vesa Karvonen <vesa.a.j.k@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE. *)

module Atomic = Transparent_atomic

type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }

and ('a, _) tdt =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head;
}
-> ('a, [> `Cons ]) tdt
| Head : { counter : int } -> ('a, [> `Head ]) tdt
| Snoc : {
counter : int;
prefix : 'a tail;
value : 'a;
}
-> ('a, [> `Snoc ]) tdt
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc | `Used ]) tdt;
}
-> ('a, [> `Tail ]) tdt
| Used : ('a, [> `Used ]) tdt

and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]

let create () =
let head =
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as_padded
in
let tail =
Atomic.make (T (Tail { counter = 0; move = Obj.magic () }))
|> Multicore_magic.copy_as_padded
in
{ head; tail } |> Multicore_magic.copy_as_padded

let rec rev (suffix : (_, [< `Cons ]) tdt) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

let rec push t value backoff = function
| T (Snoc snoc_r as snoc) -> push_with t value backoff snoc_r.counter (T snoc)
| T (Tail tail_r as tail) -> begin
match tail_r.move with
| Used -> push_with t value backoff tail_r.counter (T tail)
| Snoc move_r as move ->
begin
match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Used
| _ -> ()
end;
let new_tail = Atomic.fenceless_get t.tail in
if new_tail != T tail then push t value backoff new_tail
else push_with t value backoff tail_r.counter (T tail)
end

and push_with t value backoff counter prefix =
let after = Snoc { counter = counter + 1; prefix; value } in
let new_tail = Atomic.fenceless_get t.tail in
if new_tail != prefix then push t value backoff new_tail
else if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push t value backoff (Atomic.fenceless_get t.tail)

let push t value = push t value Backoff.default (Atomic.fenceless_get t.tail)

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly

exception Empty

let rec pop_as : type a r. a t -> _ -> (a, r) poly -> a head -> r =
fun t backoff poly -> function
| H (Cons cons_r as cons) ->
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then
match poly with Value -> cons_r.value | Option -> Some cons_r.value
else
let backoff = Backoff.once backoff in
pop_as t backoff poly (Atomic.fenceless_get t.head)
| H (Head head_r as head) -> begin
match Atomic.fenceless_get t.tail with
| T (Snoc snoc_r as move) ->
if head_r.counter = snoc_r.counter then
if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then
match poly with
| Value -> snoc_r.value
| Option -> Some snoc_r.value
else pop_as t backoff poly (Atomic.fenceless_get t.head)
else
let tail = Tail { counter = snoc_r.counter; move } in
let new_head = Atomic.get t.head in
if new_head != H head then pop_as t backoff poly new_head
else if Atomic.compare_and_set t.tail (T move) (T tail) then
pop_moving_as t backoff poly head move tail
else pop_as t backoff poly (Atomic.fenceless_get t.head)
| T (Tail tail_r as tail) -> begin
match tail_r.move with
| Used -> pop_emptyish_as t backoff poly head
| Snoc _ as move -> pop_moving_as t backoff poly head move tail
end
end

and pop_moving_as :
type a r.
a t ->
_ ->
(a, r) poly ->
(a, [< `Head ]) tdt ->
(a, [ `Snoc ]) tdt ->
(a, [< `Tail ]) tdt ->
r =
fun t backoff poly (Head head_r as head) (Snoc move_r as move) (Tail tail_r) ->
if head_r.counter < move_r.counter then
match rev move with
| Cons cons_r ->
let after = cons_r.suffix in
let new_head = Atomic.get t.head in
if new_head != H head then pop_as t backoff poly new_head
else if Atomic.compare_and_set t.head (H head) after then begin
tail_r.move <- Used;
match poly with Value -> cons_r.value | Option -> Some cons_r.value
end
else
let backoff = Backoff.once backoff in
pop_as t backoff poly (Atomic.fenceless_get t.head)
else pop_emptyish_as t backoff poly head

and pop_emptyish_as : type a r. a t -> _ -> (a, r) poly -> (a, _) tdt -> r =
fun t backoff poly head ->
let new_head = Atomic.get t.head in
if new_head == H head then
match poly with Value -> raise_notrace Empty | Option -> None
else pop_as t backoff poly new_head

let pop t = pop_as t Backoff.default Value (Atomic.fenceless_get t.head)
let pop_opt t = pop_as t Backoff.default Option (Atomic.fenceless_get t.head)

let rec length t =
let head = Atomic.get t.head in
let tail = Atomic.fenceless_get t.tail in
if head != Atomic.get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
in
tail_at - head_at + 1
20 changes: 20 additions & 0 deletions src_lockfree/two_stack_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type !'a t
(** *)

val create : unit -> 'a t
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** Raised by {!pop} in case the queue is empty. *)

val pop : 'a t -> 'a
(** *)

val pop_opt : 'a t -> 'a option
(** *)

val length : 'a t -> int
(** *)
7 changes: 7 additions & 0 deletions test/two_stack_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(test
(package saturn_lockfree)
(name stm_two_stack_queue)
(modules stm_two_stack_queue)
(libraries saturn_lockfree qcheck-core qcheck-stm.stm stm_run)
(action
(run %{test} --verbose)))
74 changes: 74 additions & 0 deletions test/two_stack_queue/stm_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
module Queue = Saturn_lockfree.Two_stack_queue

let () =
let q = Queue.create () in
Queue.push q 101;
Queue.push q 42;
assert (Queue.pop_opt q = Some 101);
Queue.push q 76;
assert (Queue.pop_opt q = Some 42);
assert (Queue.pop_opt q = Some 76);
assert (Queue.pop_opt q = None)

module Spec = struct
type cmd = Push of int | Pop_opt | Length

let show_cmd = function
| Push x -> "Push " ^ string_of_int x
| Pop_opt -> "Pop_opt"
| Length -> "Length"

module State = struct
type t = int list * int list

let push x (h, t) = if h == [] then ([ x ], []) else (h, x :: t)
let peek_opt (h, _) = match h with x :: _ -> Some x | [] -> None
let length (h, t) = List.length h + List.length t

let drop ((h, t) as s) =
match h with [] -> s | [ _ ] -> (List.rev t, []) | _ :: h -> (h, t)
end

type state = State.t
type sut = int Queue.t

let arb_cmd _s =
let open QCheck in
[
Gen.int_range 1 10000 |> Gen.map (fun x -> Push x);
Gen.return Pop_opt;
Gen.return Length;
]
|> Gen.oneof |> make ~print:show_cmd

let init_state = ([], [])
let init_sut () = Queue.create ()
let cleanup _ = ()

let next_state c s =
match c with
| Push x -> State.push x s
| Pop_opt -> State.drop s
| Length -> s

let precond _ _ = true

let run c d =
let open STM in
match c with
| Push x -> Res (unit, Queue.push d x)
| Pop_opt -> Res (option int, Queue.pop_opt d)
| Length -> Res (int, Queue.length d)

let postcond c (s : state) res =
let open STM in
match (c, res) with
| Push _x, Res ((Unit, _), ()) -> true
| Pop_opt, Res ((Option Int, _), res) -> res = State.peek_opt s
| Length, Res ((Int, _), res) -> res = State.length s
| _, _ -> false
end

let () =
Stm_run.run ~count:1000 ~verbose:true ~name:"Two_stack_queue" (module Spec)
|> exit

0 comments on commit fa499cd

Please sign in to comment.