Skip to content

Commit

Permalink
Optimize MPMC unbounded queue with status bits
Browse files Browse the repository at this point in the history
  • Loading branch information
art-w committed Mar 20, 2023
1 parent ed55d65 commit 6fb1955
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 125 deletions.
7 changes: 4 additions & 3 deletions bench/mpmc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module Bench (Q : QUEUE) = struct
let num_of_elements = ref 2_100_000
let num_of_pushers = ref 4
let num_of_takers = ref 4
let num_of_iterations = ref 10
let num_of_iterations = ref 20

let taker queue num_of_elements () =
let i = ref 0 in
Expand Down Expand Up @@ -100,9 +100,10 @@ module Bench (Q : QUEUE) = struct

let bench : (unit -> _) list =
[
benchmark ~takers:1 ~pushers:1;
benchmark ~takers:4 ~pushers:4;
benchmark ~takers:1 ~pushers:8;
benchmark ~takers:8 ~pushers:1;
benchmark ~takers:1 ~pushers:7;
benchmark ~takers:7 ~pushers:1;
]
end

Expand Down
197 changes: 81 additions & 116 deletions src/mpmc_queue.ml
Original file line number Diff line number Diff line change
@@ -1,145 +1,110 @@
let default_capacity = 4096
let spinlock_iterations = 16

type 'a cell =
| Empty
| Tombstone
| Value of 'a

type 'a s =
{ buffer : 'a cell Atomic.t array
; head : int Atomic.t
; tail : int Atomic.t
; rest : 'a s option Atomic.t
module Array = struct
include Array

let get = unsafe_get
let set = unsafe_set
end

let default_capacity = 512

type 'a s = {
status : int Atomic.t array;
buffer : 'a array;
head : int Atomic.t;
tail : int Atomic.t;
rest : 'a s option Atomic.t;
}

type 'a t = { first : 'a s Atomic.t; last : 'a s Atomic.t; dummy : 'a }

let pack_size = Sys.int_size / 2

let make_s ~capacity ~dummy =
{
head = Atomic.make 0;
tail = Atomic.make (-1);
buffer = Array.make capacity dummy;
status = Array.init (1 + (capacity / pack_size)) (fun _ -> Atomic.make 0);
rest = Atomic.make None;
}

type 'a t =
{ first : 'a s Atomic.t
; last : 'a s Atomic.t
}

let make_s ~capacity () =
{ head = Atomic.make 0
; tail = Atomic.make (-1)
; buffer = Array.init capacity (fun _ -> Atomic.make Empty)
; rest = Atomic.make None
}

let make ?(capacity = default_capacity) () =
let s = make_s ~capacity () in
{ first = Atomic.make s
; last = Atomic.make s
}
let make ?(capacity = default_capacity) ~dummy () =
let s = make_s ~capacity ~dummy in
{ first = Atomic.make s; last = Atomic.make s; dummy }

let rec gift_rest t some_s =
if Atomic.compare_and_set t.rest None some_s
then ()
else follow_rest t some_s
if not (Atomic.compare_and_set t.rest None some_s) then follow_rest t some_s

and follow_rest t some_s =
match Atomic.get t.rest with
| None -> gift_rest t some_s
| Some t -> follow_rest t some_s

let force_rest t =
let force_rest ~dummy t =
match Atomic.get t.rest with
| Some s -> s
| None ->
let s = make_s ~capacity:(Array.length t.buffer) () in
| None -> (
let s = make_s ~capacity:(Array.length t.buffer) ~dummy in
let some_s = Some s in
if Atomic.compare_and_set t.rest None some_s
then s
else match Atomic.get t.rest with
| None -> assert false
| Some rest ->
gift_rest rest some_s ;
rest

let rec push_s t x =
if Atomic.compare_and_set t.rest None some_s then s
else
match Atomic.get t.rest with
| None -> assert false
| Some rest ->
gift_rest rest some_s;
rest)

let mark t i =
let status = t.status.(i / pack_size) in
let shift = 2 * (i mod pack_size) in
let status = Atomic.fetch_and_add status (1 lsl shift) in
(status lsr shift) land 1 = 0

let rec push_s ~dummy t x =
let i = Atomic.fetch_and_add t.tail 1 in
if i < 0
then (let _ = force_rest t in push_s t x)
else if i >= Array.length t.buffer
then false
else begin
let cell = Array.unsafe_get t.buffer i in
match Atomic.get cell with
| Empty ->
if Atomic.compare_and_set cell Empty (Value x)
then true
else begin
assert (Atomic.get cell = Tombstone) ;
push_s t x
end
| Tombstone ->
push_s t x
| Value _ -> assert false
end

let rec push t x =
let last = Atomic.get t.last in
if push_s last x
then ()
else begin
let rest = force_rest last in
let _ : bool = Atomic.compare_and_set t.last last rest in
if i < 0 then
let _ = force_rest ~dummy t in
push_s ~dummy t x
else if i >= Array.length t.buffer then false
else (
t.buffer.(i) <- x;
if mark t i then true
else (
t.buffer.(i) <- dummy;
let hd = Atomic.get t.head in
let (_ : bool) = Atomic.compare_and_set t.tail (i + 1) (hd + 1) in
push_s ~dummy t x))

let rec push ({ last; dummy; _ } as t) x =
let last_s = Atomic.get last in
if not (push_s ~dummy last_s x) then
let rest = force_rest ~dummy last_s in
let (_ : bool) = Atomic.compare_and_set last last_s rest in
push t x
end


type 'a pop_result =
| Is_empty
| Wait_for_it
| Pop of 'a
type 'a pop_result = Is_empty | Wait_for_it | Pop of 'a

let rec pop_s t =
let rec pop_s ~dummy t =
let current_head = Atomic.get t.head in
if current_head >= Array.length t.buffer
then Is_empty
else if current_head >= Atomic.get t.tail
then Wait_for_it
if current_head >= Array.length t.buffer then Is_empty
else if current_head >= Atomic.get t.tail then Wait_for_it
else
let i = Atomic.fetch_and_add t.head 1 in
if i >= Array.length t.buffer
then Is_empty
if i >= Array.length t.buffer then Is_empty
else if mark t i then pop_s ~dummy t
else
let cell = Array.unsafe_get t.buffer i in
if i >= Atomic.get t.tail
then tombstone t cell
else spinlock ~retry:spinlock_iterations t cell

and tombstone t cell =
if Atomic.compare_and_set cell Empty Tombstone
then pop_s t
else begin match Atomic.get cell with
| (Value v) as old ->
assert (Atomic.compare_and_set cell old Tombstone) ;
Pop v
| _ -> assert false
end

and spinlock ~retry t cell =
match Atomic.get cell with
| (Value v) as old ->
assert (Atomic.compare_and_set cell old Tombstone) ;
let v = t.buffer.(i) in
t.buffer.(i) <- dummy;
Pop v
| Empty when retry <= 0 ->
tombstone t cell
| Empty ->
Domain.cpu_relax () ;
spinlock ~retry:(retry - 1) t cell
| Tombstone ->
assert false

let rec pop t =
let first = Atomic.get t.first in
match pop_s first with
match pop_s ~dummy:t.dummy first with
| Pop v -> Some v
| Wait_for_it -> None
| Is_empty ->
begin match Atomic.get first.rest with
| Is_empty -> (
match Atomic.get first.rest with
| None -> None
| Some rest ->
let _ : bool = Atomic.compare_and_set t.first first rest in
pop t
end
let (_ : bool) = Atomic.compare_and_set t.first first rest in
pop t)
14 changes: 8 additions & 6 deletions src/mpmc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
type 'a t
(** A queue of items of type ['a]. *)

val make : ?capacity:int -> unit -> 'a t
(** [make ()] creates a new empty queue.
val make : ?capacity:int -> dummy:'a -> unit -> 'a t
(** [make ~dummy ()] creates a new empty queue.
The optional parameter [?capacity] defaults to 4096 and is used to size the
internal buffers of the queue: Choosing a small number lower the pause
durations caused by allocations, but a larger capacity provides overall
faster operations. *)
- The [dummy] element is a placeholder for ['a] values.
- The optional parameter [?capacity] defaults to 512 and is used to size the
internal buffers of the queue: Choosing a small number lower the pause
durations caused by allocations, but a larger capacity can provide overall
faster operations.
*)

val push : 'a t -> 'a -> unit
(** [push t x] adds [x] to the tail of the queue. *)
Expand Down

0 comments on commit 6fb1955

Please sign in to comment.