diff --git a/bench/mpmc_queue.ml b/bench/mpmc_queue.ml index f852296e..924b1624 100644 --- a/bench/mpmc_queue.ml +++ b/bench/mpmc_queue.ml @@ -11,7 +11,7 @@ module Bench (Q : QUEUE) = struct let num_of_elements = ref 2_100_000 let num_of_pushers = ref 4 let num_of_takers = ref 4 - let num_of_iterations = ref 10 + let num_of_iterations = ref 20 let taker queue num_of_elements () = let i = ref 0 in @@ -100,9 +100,10 @@ module Bench (Q : QUEUE) = struct let bench : (unit -> _) list = [ + benchmark ~takers:1 ~pushers:1; benchmark ~takers:4 ~pushers:4; - benchmark ~takers:1 ~pushers:8; - benchmark ~takers:8 ~pushers:1; + benchmark ~takers:1 ~pushers:7; + benchmark ~takers:7 ~pushers:1; ] end diff --git a/src/mpmc_queue.ml b/src/mpmc_queue.ml index 676a2247..8427f52c 100644 --- a/src/mpmc_queue.ml +++ b/src/mpmc_queue.ml @@ -1,145 +1,110 @@ -let default_capacity = 4096 -let spinlock_iterations = 16 - -type 'a cell = - | Empty - | Tombstone - | Value of 'a - -type 'a s = - { buffer : 'a cell Atomic.t array - ; head : int Atomic.t - ; tail : int Atomic.t - ; rest : 'a s option Atomic.t +module Array = struct + include Array + + let get = unsafe_get + let set = unsafe_set +end + +let default_capacity = 512 + +type 'a s = { + status : int Atomic.t array; + buffer : 'a array; + head : int Atomic.t; + tail : int Atomic.t; + rest : 'a s option Atomic.t; +} + +type 'a t = { first : 'a s Atomic.t; last : 'a s Atomic.t; dummy : 'a } + +let pack_size = Sys.int_size / 2 + +let make_s ~capacity ~dummy = + { + head = Atomic.make 0; + tail = Atomic.make (-1); + buffer = Array.make capacity dummy; + status = Array.init (1 + (capacity / pack_size)) (fun _ -> Atomic.make 0); + rest = Atomic.make None; } -type 'a t = - { first : 'a s Atomic.t - ; last : 'a s Atomic.t - } - -let make_s ~capacity () = - { head = Atomic.make 0 - ; tail = Atomic.make (-1) - ; buffer = Array.init capacity (fun _ -> Atomic.make Empty) - ; rest = Atomic.make None - } - -let make ?(capacity = default_capacity) () = - let s = make_s ~capacity () in - { first = Atomic.make s - ; last = Atomic.make s - } +let make ?(capacity = default_capacity) ~dummy () = + let s = make_s ~capacity ~dummy in + { first = Atomic.make s; last = Atomic.make s; dummy } let rec gift_rest t some_s = - if Atomic.compare_and_set t.rest None some_s - then () - else follow_rest t some_s + if not (Atomic.compare_and_set t.rest None some_s) then follow_rest t some_s and follow_rest t some_s = match Atomic.get t.rest with | None -> gift_rest t some_s | Some t -> follow_rest t some_s -let force_rest t = +let force_rest ~dummy t = match Atomic.get t.rest with | Some s -> s - | None -> - let s = make_s ~capacity:(Array.length t.buffer) () in + | None -> ( + let s = make_s ~capacity:(Array.length t.buffer) ~dummy in let some_s = Some s in - if Atomic.compare_and_set t.rest None some_s - then s - else match Atomic.get t.rest with - | None -> assert false - | Some rest -> - gift_rest rest some_s ; - rest - -let rec push_s t x = + if Atomic.compare_and_set t.rest None some_s then s + else + match Atomic.get t.rest with + | None -> assert false + | Some rest -> + gift_rest rest some_s; + rest) + +let mark t i = + let status = t.status.(i / pack_size) in + let shift = 2 * (i mod pack_size) in + let status = Atomic.fetch_and_add status (1 lsl shift) in + (status lsr shift) land 1 = 0 + +let rec push_s ~dummy t x = let i = Atomic.fetch_and_add t.tail 1 in - if i < 0 - then (let _ = force_rest t in push_s t x) - else if i >= Array.length t.buffer - then false - else begin - let cell = Array.unsafe_get t.buffer i in - match Atomic.get cell with - | Empty -> - if Atomic.compare_and_set cell Empty (Value x) - then true - else begin - assert (Atomic.get cell = Tombstone) ; - push_s t x - end - | Tombstone -> - push_s t x - | Value _ -> assert false - end - -let rec push t x = - let last = Atomic.get t.last in - if push_s last x - then () - else begin - let rest = force_rest last in - let _ : bool = Atomic.compare_and_set t.last last rest in + if i < 0 then + let _ = force_rest ~dummy t in + push_s ~dummy t x + else if i >= Array.length t.buffer then false + else ( + t.buffer.(i) <- x; + if mark t i then true + else ( + t.buffer.(i) <- dummy; + let hd = Atomic.get t.head in + let (_ : bool) = Atomic.compare_and_set t.tail (i + 1) (hd + 1) in + push_s ~dummy t x)) + +let rec push ({ last; dummy; _ } as t) x = + let last_s = Atomic.get last in + if not (push_s ~dummy last_s x) then + let rest = force_rest ~dummy last_s in + let (_ : bool) = Atomic.compare_and_set last last_s rest in push t x - end - -type 'a pop_result = - | Is_empty - | Wait_for_it - | Pop of 'a +type 'a pop_result = Is_empty | Wait_for_it | Pop of 'a -let rec pop_s t = +let rec pop_s ~dummy t = let current_head = Atomic.get t.head in - if current_head >= Array.length t.buffer - then Is_empty - else if current_head >= Atomic.get t.tail - then Wait_for_it + if current_head >= Array.length t.buffer then Is_empty + else if current_head >= Atomic.get t.tail then Wait_for_it else let i = Atomic.fetch_and_add t.head 1 in - if i >= Array.length t.buffer - then Is_empty + if i >= Array.length t.buffer then Is_empty + else if mark t i then pop_s ~dummy t else - let cell = Array.unsafe_get t.buffer i in - if i >= Atomic.get t.tail - then tombstone t cell - else spinlock ~retry:spinlock_iterations t cell - -and tombstone t cell = - if Atomic.compare_and_set cell Empty Tombstone - then pop_s t - else begin match Atomic.get cell with - | (Value v) as old -> - assert (Atomic.compare_and_set cell old Tombstone) ; - Pop v - | _ -> assert false - end - -and spinlock ~retry t cell = - match Atomic.get cell with - | (Value v) as old -> - assert (Atomic.compare_and_set cell old Tombstone) ; + let v = t.buffer.(i) in + t.buffer.(i) <- dummy; Pop v - | Empty when retry <= 0 -> - tombstone t cell - | Empty -> - Domain.cpu_relax () ; - spinlock ~retry:(retry - 1) t cell - | Tombstone -> - assert false let rec pop t = let first = Atomic.get t.first in - match pop_s first with + match pop_s ~dummy:t.dummy first with | Pop v -> Some v | Wait_for_it -> None - | Is_empty -> - begin match Atomic.get first.rest with + | Is_empty -> ( + match Atomic.get first.rest with | None -> None | Some rest -> - let _ : bool = Atomic.compare_and_set t.first first rest in - pop t - end + let (_ : bool) = Atomic.compare_and_set t.first first rest in + pop t) diff --git a/src/mpmc_queue.mli b/src/mpmc_queue.mli index 033de040..70f6f174 100644 --- a/src/mpmc_queue.mli +++ b/src/mpmc_queue.mli @@ -3,13 +3,15 @@ type 'a t (** A queue of items of type ['a]. *) -val make : ?capacity:int -> unit -> 'a t -(** [make ()] creates a new empty queue. +val make : ?capacity:int -> dummy:'a -> unit -> 'a t +(** [make ~dummy ()] creates a new empty queue. - The optional parameter [?capacity] defaults to 4096 and is used to size the - internal buffers of the queue: Choosing a small number lower the pause - durations caused by allocations, but a larger capacity provides overall - faster operations. *) + - The [dummy] element is a placeholder for ['a] values. + - The optional parameter [?capacity] defaults to 512 and is used to size the + internal buffers of the queue: Choosing a small number lower the pause + durations caused by allocations, but a larger capacity can provide overall + faster operations. +*) val push : 'a t -> 'a -> unit (** [push t x] adds [x] to the tail of the queue. *)