Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ws deque benchmark #24

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@

A collection of Concurrent Lockfree Data Structures for OCaml 5. It contains:

* [Chase-Lev Work-Stealing Queue](src/ws_deque.mli)
* [Chase-Lev Work-Stealing Queue](src/ws_deque.mli) Single-producer, multi-consumer dynamic-size queue. Ideal for throughput-focused scheduling using per-core queue.

* [SPMC Queue](src/spmc_queue.mli) Single-producer, multi-consumer dynamic-size queue. Ideal for latency-focused scheduling using per-core stacks.

* [MPMC Queue](src/mpmc_queue.mli) Multi-producer, multi-consumer fixed-size queue. Optimised for high number of threads.

## Usage

lockfree cam be installed from `opam`: `opam install lockfree`. Sample usage of
Expand Down
3 changes: 3 additions & 0 deletions bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Collection of benchmarks for the lockfree structures.

Use `./run_all.sh` to execute the usual set.
19 changes: 19 additions & 0 deletions bench/benchmarks_output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Mon 17 Oct 17:41:38 BST 2022
./../_build/default/bench/mpmc_queue.exe -takers 4 -pushers 4
iterations: 10, mean: 0.833760s, stddev: 0.062153s
./../_build/default/bench/mpmc_queue.exe -takers 1 pushers 8
iterations: 10, mean: 1.418691s, stddev: 0.015645s
./../_build/default/bench/mpmc_queue.exe -takers 8 pushers 1
iterations: 10, mean: 0.703808s, stddev: 0.026074s
./../_build/default/bench/mpmc_queue.exe -takers 4 -pushers 4 -use-cas
iterations: 10, mean: 2.426262s, stddev: 0.080545s
./../_build/default/bench/mpmc_queue.exe -takers 1 pushers 8 -use-cas
iterations: 10, mean: 1.982850s, stddev: 0.049347s
./../_build/default/bench/mpmc_queue.exe -takers 8 pushers 1 -use-cas
iterations: 10, mean: 3.250525s, stddev: 0.118707s
./../_build/default/bench/spmc_queue.exe -stealers 0
iterations: 10, mean: 0.150843s, stddev: 0.003683s
./../_build/default/bench/spmc_queue.exe -stealers 2
iterations: 10, mean: 0.293674s, stddev: 0.007478s
./../_build/default/bench/spmc_queue.exe -stealers 4
iterations: 10, mean: 0.431867s, stddev: 0.015829s
3 changes: 3 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(executables
(names mpmc_queue spmc_queue ws_deque)
(libraries lockfree unix))
64 changes: 64 additions & 0 deletions bench/mpmc_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
open Lockfree.Mpmc_queue

let num_of_elements = ref 10_000_000
let num_of_pushers = ref 4
let num_of_takers = ref 4
let iterations = ref 10
let use_cas_intf = ref false

let pop = ref Lockfree.Mpmc_queue.pop
let push = ref Lockfree.Mpmc_queue.push

let taker queue num_of_elements ~round:_ =
let i = ref 0 in
while !i < num_of_elements do
if Option.is_some (!pop queue) then i := !i + 1
done

let pusher queue num_of_elements ~round:_ =
let i = ref 0 in
while !i < num_of_elements do
if !push queue !i then i := !i + 1
done


let run_bench () =
let queue = create ~size_exponent:10 () in
let orchestrator =
Orchestrator.init ~total_domains:(!num_of_takers + !num_of_pushers)
in
(* define function to start domains *)
let start_n_domains n f =
assert (!num_of_elements mod n == 0);
let items_per_pusher = !num_of_elements / n in
List.init n (fun _ ->
Domain.spawn (fun () ->
Orchestrator.worker orchestrator (f queue items_per_pusher)))
in
(* start domains *)
let _domains =
let takers = start_n_domains !num_of_takers taker in
let pushers = start_n_domains !num_of_pushers pusher in
Sys.opaque_identity (pushers @ takers)
in
(* run test *)
let results = Orchestrator.run orchestrator !iterations in
Printf.printf "iterations: %d, mean: %fs, stddev: %fs" (!iterations) (Stats.mean results) (Stats.stddev results)
;;

let speclist =
[ ("-items", Arg.Set_int num_of_elements, "number of items to insert and remove");
("-pushers", Arg.Set_int num_of_pushers, "number of domains pushing items");
("-takers", Arg.Set_int num_of_takers, "number of domains taking times");
("-iterations", Arg.Set_int iterations, "run the benchmark this many times");
("-use-cas", Arg.Set use_cas_intf, "use CAS instead of FAD")
]

let () =
Arg.parse speclist
(fun _ -> ())
"mpmc_queue.exe [-items INT] [-pushers INT] [-takers INT] [-iterations INT] [-use-cas]";
if !use_cas_intf then
(push := Lockfree.Mpmc_queue.CAS_interface.push;
pop := Lockfree.Mpmc_queue.CAS_interface.pop);
run_bench ();
43 changes: 43 additions & 0 deletions bench/orchestrator.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
type t = { ready : int Atomic.t; total_domains : int; round : int Atomic.t }

let init ~total_domains =
{ ready = Atomic.make 0; total_domains; round = Atomic.make 0 }
;;

let wait_until_all_ready ?(round=0) { ready; total_domains; _ } =
while Atomic.get ready < total_domains*(round+1) do
()
done
;;

let worker ({ ready; round; _ } as t) f =
Atomic.incr ready;
wait_until_all_ready t;
(* all domains are up at this point *)
for i = 1 to Int.max_int do
(* wait for signal to start work *)
while Atomic.get round < i do
()
done;
f ~round:i;
(* signal that we're done *)
Atomic.incr ready;
done
;;

let run ({ ready = _; round; total_domains = _ } as t) ?(drop_first = true) rounds =
wait_until_all_ready t;
(* all domains are up, can start benchmarks *)
let results = ref [] in
for i = 1 to rounds do

let start_time = Unix.gettimeofday () in
Atomic.incr round;
wait_until_all_ready ~round:i t;
let end_time = Unix.gettimeofday () in

let diff = end_time -. start_time in
if drop_first && i == 1 then () else results := diff :: !results
done;
!results
;;
8 changes: 8 additions & 0 deletions bench/orchestrator.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(** Helper library that ensures all workers have started before any
starts making progress on the benchmark. *)

type t

val init : total_domains:int -> t
val worker : t -> (round:int -> unit) -> unit
val run : t -> ?drop_first:bool -> int -> float List.t
37 changes: 37 additions & 0 deletions bench/run_all.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/bin/bash
set -x

date=$(date)

echo "$date" > benchmarks_output.txt

# first bench
with_cas=("" "-use-cas")
rw_balance=("-takers 4 -pushers 4" "-takers 1 pushers 8" "-takers 8 pushers 1")
path="./../_build/default/bench/mpmc_queue.exe "

for i in "${with_cas[@]}"
do
for j in "${rw_balance[@]}"
do
cmd="$path $j $i"
output=$($cmd)

echo "$cmd" >> benchmarks_output.txt
echo "$output" >> benchmarks_output.txt
done
done


# second bench
rw_balance=("-stealers 0" "-stealers 2" "-stealers 4")
path="./../_build/default/bench/spmc_queue.exe "

for j in "${rw_balance[@]}"
do
cmd="$path $j "
output=$($cmd)

echo "$cmd" >> benchmarks_output.txt
echo "$output" >> benchmarks_output.txt
done
71 changes: 71 additions & 0 deletions bench/spmc_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
open Lockfree

let num_of_elements = ref 2_000_000
let num_of_stealers = ref 0
let iterations = ref 10
let owner_adds_only = ref false

let round_done = Atomic.make 0


let owner queue ~round:_ =
let left = ref !num_of_elements in
while !left > 0 do
(* insert items as owner*)
for _ = 0 to Random.int 100 do
if Spmc_queue.Local.push queue 0
then
left := !left - 1;
done;
(* pop *)
if not !owner_adds_only then (
while Option.is_some (Spmc_queue.Local.pop queue) do () done);
done;
(* drain whatever remains in the queue *)
while Option.is_some (Spmc_queue.Local.pop queue) do () done;
Atomic.incr round_done;
;;

let stealer victim_queue ~round =
let queue = Spmc_queue.create ~size_exponent:10 () in
while Atomic.get round_done < round do
Spmc_queue.Local.steal ~from:victim_queue queue |> ignore;
while Option.is_some (Spmc_queue.Local.pop queue) do () done;
done;;

let run_bench () =
let queue = Spmc_queue.create ~size_exponent:10 () in
let orchestrator =
Orchestrator.init ~total_domains:(!num_of_stealers + 1)
in
(* define function to start domains *)
let start_n_domains n f =
List.init n (fun _ ->
Domain.spawn (fun () ->
Orchestrator.worker orchestrator (f queue)))
in
(* start domains *)
let _domains =
let owner = start_n_domains 1 owner in
let takers = start_n_domains !num_of_stealers stealer in
Sys.opaque_identity (owner @ takers)
in
(* run test *)
let results = Orchestrator.run orchestrator !iterations in
Printf.printf "iterations: %d, mean: %fs, stddev: %fs" (!iterations) (Stats.mean results) (Stats.stddev results)
;;

let speclist =
[ ("-items", Arg.Set_int num_of_elements, "number of items to insert and remove");
("-stealers", Arg.Set_int num_of_stealers, "number of domains stealing items");
("-iterations", Arg.Set_int iterations, "run the benchmark this many times");
("-owner-adds-only", Arg.Set owner_adds_only, "queue owner is only adding items; all pops are by thieves")
]

let f () =
Arg.parse speclist
(fun _ -> ())
"spmc_queue.exe [-items INT] [-stealers INT] [-iterations INT] [-owner-adds-only]";
run_bench ();;

f ()
28 changes: 28 additions & 0 deletions bench/stats.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
let mean data =
let sum = List.fold_left (fun curr_sum b -> curr_sum +. b) 0. data in
let n = Int.to_float (List.length data) in
sum /. n

let stddev data =
let mean = mean data in
let sum =
List.fold_left
(fun curr_sum datapoint ->
let squared_diff = Float.pow (datapoint -. mean) 2. in
curr_sum +. squared_diff)
0. data
in
let n = Int.to_float (List.length data) in
Float.sqrt (sum /. n)

let cmp ?(epsilon=0.01) a b =
Float.abs (a -. b) < epsilon;;

let sanity_checks () =
assert (cmp (mean [1.; 5.]) 3.);
assert (cmp (mean [1.; 3.; 7.]) 3.6666);
assert (cmp (stddev [1.; 5.]) 2.);
assert (cmp (stddev [1.; 3.; 7.]) 2.4944);
;;

sanity_checks ()
2 changes: 2 additions & 0 deletions bench/stats.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
val mean : float List.t -> float
val stddev : float List.t -> float
Loading