Skip to content

Commit ae18085

Browse files
committed
Use adaptive backoff in mpmcq
1 parent 6aaf75f commit ae18085

File tree

13 files changed

+221
-105
lines changed

13 files changed

+221
-105
lines changed

bench/bench_binaries.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ let paths =
1515
lib "picos";
1616
lib "picos.domain";
1717
lib "picos.thread";
18+
lib "picos_aux.adaptive_backoff";
1819
lib "picos_aux.htbl";
1920
lib "picos_aux.mpmcq";
2021
lib "picos_aux.mpscq";

bench/bench_mpmcq.ml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ let run_one ~budgetf ~n_adders ~n_takers () =
5656
if 0 < n then begin
5757
match Mpmcq.pop_exn t with
5858
| _ -> loop (n - 1)
59-
| exception Mpmcq.Empty ->
60-
Backoff.once Backoff.default |> ignore;
61-
loop n
59+
| exception Mpmcq.Empty -> loop n
6260
end
6361
else work ()
6462
in

bench/bench_mpscq.ml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ let run_one ~budgetf ~n_adders () =
5151
if 0 < n then
5252
match Mpscq.pop_exn t with
5353
| _ -> loop (n - 1)
54-
| exception Mpscq.Empty ->
55-
Backoff.once Backoff.default |> ignore;
56-
loop n
54+
| exception Mpscq.Empty -> loop n
5755
in
5856
loop n_msgs
5957
in
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
let has_domains = 1 < Domain.recommended_domain_count ()
2+
let n = 128 * 4
3+
let counters = Array.init n (fun _ -> Atomic.make 0)
4+
5+
let[@inline never] once_at counter ~log_scale =
6+
if has_domains then begin
7+
let n_contending_threads = Atomic.fetch_and_add counter 1 + 1 in
8+
let n = ref (Random.int ((n_contending_threads lsl log_scale) + 0)) in
9+
while 0 <= !n do
10+
Domain.cpu_relax ();
11+
decr n
12+
done;
13+
Atomic.decr counter
14+
end
15+
16+
let[@inline never] once ~random_key ~log_scale =
17+
let i = random_key land (n - 1) in
18+
let counter = Array.unsafe_get counters i in
19+
once_at counter ~log_scale
20+
21+
let[@inline] once_unless_alone ~random_key ~log_scale =
22+
let i = random_key land (n - 1) in
23+
let counter = Array.unsafe_get counters i in
24+
if 0 <> Atomic.get counter then once_at counter ~log_scale
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
val once : random_key:int -> log_scale:int -> unit
2+
val once_unless_alone : random_key:int -> log_scale:int -> unit
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
let cpu_relax = Fun.id
2+
let recommended_domain_count () = 1
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
(library
2+
(name picos_aux_adaptive_backoff)
3+
(public_name picos_aux.adaptive_backoff))
4+
5+
(rule
6+
(package picos_aux)
7+
(targets domain.ml)
8+
(deps domain.ocaml4.ml)
9+
(enabled_if
10+
(< %{ocaml_version} 5.0.0))
11+
(action
12+
(progn
13+
(copy domain.ocaml4.ml domain.ml))))

lib/picos_aux.mpmcq/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(name picos_aux_mpmcq)
33
(public_name picos_aux.mpmcq)
4-
(libraries backoff multicore-magic))
4+
(libraries picos_aux.adaptive_backoff multicore-magic))
55

66
(mdx
77
(package picos_meta)
Lines changed: 110 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
open Picos_aux_adaptive_backoff
12
module Atomic = Multicore_magic.Transparent_atomic
23

3-
type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }
4+
type 'a t = {
5+
random_key : int;
6+
head : 'a head Atomic.t;
7+
tail : 'a tail Atomic.t;
8+
}
49

510
and ('a, _) tdt =
611
| Cons : {
@@ -27,14 +32,15 @@ and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
2732
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]
2833

2934
let create ?padded () =
35+
let random_key = Int64.to_int (Random.bits64 ()) in
3036
let head =
3137
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded
3238
in
3339
let tail =
3440
Atomic.make (T (Tail { counter = 0; move = Used }))
3541
|> Multicore_magic.copy_as ?padded
3642
in
37-
Multicore_magic.copy_as ?padded { head; tail }
43+
Multicore_magic.copy_as ?padded { random_key; head; tail }
3844

3945
let rec rev (suffix : (_, [< `Cons ]) tdt) = function
4046
| T (Snoc { counter; prefix; value }) ->
@@ -47,139 +53,184 @@ let rev = function
4753
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
4854
prefix
4955

50-
let rec push t value backoff = function
56+
let log_scale = 10
57+
58+
let[@inline] backoff { random_key; _ } =
59+
Adaptive_backoff.once ~random_key ~log_scale
60+
61+
let[@inline] backoff_unless_alone { random_key; _ } =
62+
Adaptive_backoff.once_unless_alone ~random_key ~log_scale
63+
64+
let rec push t value = function
5165
| T (Snoc snoc_r) as prefix ->
5266
let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in
53-
if not (Atomic.compare_and_set t.tail prefix (T after)) then
54-
let backoff = Backoff.once backoff in
55-
push t value backoff (Atomic.fenceless_get t.tail)
67+
if not (Atomic.compare_and_set t.tail prefix (T after)) then begin
68+
backoff t;
69+
push t value (Atomic.fenceless_get t.tail)
70+
end
5671
| T (Tail tail_r) as prefix -> begin
5772
match tail_r.move with
5873
| Used ->
5974
let after = Snoc { counter = tail_r.counter + 1; prefix; value } in
60-
if not (Atomic.compare_and_set t.tail prefix (T after)) then
61-
let backoff = Backoff.once backoff in
62-
push t value backoff (Atomic.fenceless_get t.tail)
75+
if not (Atomic.compare_and_set t.tail prefix (T after)) then begin
76+
backoff t;
77+
push t value (Atomic.fenceless_get t.tail)
78+
end
6379
| Snoc move_r as move ->
6480
begin match Atomic.get t.head with
6581
| H (Head head_r as head) when head_r.counter < move_r.counter ->
6682
let after = rev move in
67-
if
68-
Atomic.fenceless_get t.head == H head
69-
&& Atomic.compare_and_set t.head (H head) (H after)
70-
then tail_r.move <- Used
83+
if Atomic.fenceless_get t.head == H head then
84+
if Atomic.compare_and_set t.head (H head) (H after) then
85+
tail_r.move <- Used
86+
else backoff t
7187
| _ -> tail_r.move <- Used
7288
end;
73-
push t value backoff (Atomic.get t.tail)
89+
push t value (Atomic.get t.tail)
7490
end
7591

7692
exception Empty
7793

78-
let rec pop t backoff = function
94+
let rec pop t = function
7995
| H (Cons cons_r as cons) ->
8096
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
81-
else
82-
let backoff = Backoff.once backoff in
83-
pop t backoff (Atomic.fenceless_get t.head)
97+
else begin
98+
backoff t;
99+
pop t (Atomic.fenceless_get t.head)
100+
end
84101
| H (Head head_r as head) -> begin
85102
match Atomic.get t.tail with
86103
| T (Snoc snoc_r as move) ->
87104
if head_r.counter = snoc_r.counter then
88105
if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then
89106
snoc_r.value
90-
else pop t backoff (Atomic.fenceless_get t.head)
107+
else begin
108+
backoff t;
109+
pop t (Atomic.fenceless_get t.head)
110+
end
91111
else
92112
let (Tail tail_r as tail : (_, [ `Tail ]) tdt) =
93113
Tail { counter = snoc_r.counter; move }
94114
in
95115
let new_head = Atomic.get t.head in
96-
if new_head != H head then pop t backoff new_head
116+
if new_head != H head then begin
117+
(* backoff t; *)
118+
pop t new_head
119+
end
97120
else if Atomic.compare_and_set t.tail (T move) (T tail) then
98121
let (Cons cons_r) = rev move in
99122
let after = cons_r.suffix in
100123
let new_head = Atomic.get t.head in
101-
if new_head != H head then pop t backoff new_head
124+
if new_head != H head then begin
125+
(* backoff t; *)
126+
pop t new_head
127+
end
102128
else if Atomic.compare_and_set t.head (H head) after then begin
103129
tail_r.move <- Used;
104130
cons_r.value
105131
end
106-
else
107-
let backoff = Backoff.once backoff in
108-
pop t backoff (Atomic.fenceless_get t.head)
109-
else pop t backoff (Atomic.fenceless_get t.head)
132+
else begin
133+
backoff t;
134+
pop t (Atomic.fenceless_get t.head)
135+
end
136+
else begin
137+
(* backoff t; *)
138+
pop t (Atomic.fenceless_get t.head)
139+
end
110140
| T (Tail tail_r) -> begin
111141
match tail_r.move with
112142
| Used ->
113143
let new_head = Atomic.get t.head in
114-
if new_head != H head then pop t backoff new_head
115-
else raise_notrace Empty
144+
if new_head != H head then begin
145+
(* backoff t; *)
146+
pop t new_head
147+
end
148+
else begin
149+
backoff_unless_alone t;
150+
raise_notrace Empty
151+
end
116152
| Snoc move_r as move ->
117153
if head_r.counter < move_r.counter then
118154
let (Cons cons_r) = rev move in
119155
let after = cons_r.suffix in
120156
let new_head = Atomic.get t.head in
121-
if new_head != H head then pop t backoff new_head
157+
if new_head != H head then begin
158+
(* backoff t; *)
159+
pop t new_head
160+
end
122161
else if Atomic.compare_and_set t.head (H head) after then begin
123162
tail_r.move <- Used;
124163
cons_r.value
125164
end
126-
else
127-
let backoff = Backoff.once backoff in
128-
pop t backoff (Atomic.fenceless_get t.head)
165+
else begin
166+
backoff t;
167+
pop t (Atomic.fenceless_get t.head)
168+
end
129169
else
130170
let new_head = Atomic.get t.head in
131-
if new_head != H head then pop t backoff new_head
132-
else raise_notrace Empty
171+
if new_head != H head then begin
172+
(* backoff t; *)
173+
pop t new_head
174+
end
175+
else begin
176+
backoff_unless_alone t;
177+
raise_notrace Empty
178+
end
133179
end
134180
end
135181

136-
let rec push_head t value backoff =
182+
let rec push_head t value =
137183
match Atomic.get t.head with
138184
| H (Cons cons_r) as suffix ->
139185
let after = Cons { counter = cons_r.counter - 1; value; suffix } in
140-
if not (Atomic.compare_and_set t.head suffix (H after)) then
141-
push_head t value (Backoff.once backoff)
186+
if not (Atomic.compare_and_set t.head suffix (H after)) then begin
187+
backoff t;
188+
push_head t value
189+
end
142190
| H (Head head_r) as head -> begin
143191
match Atomic.get t.tail with
144192
| T (Snoc snoc_r as move) ->
145-
if Atomic.get t.head != head then push_head t value backoff
193+
if Atomic.get t.head != head then push_head t value
146194
else if head_r.counter = snoc_r.counter then begin
147195
let prefix = T (Snoc { snoc_r with value }) in
148196
let after =
149197
Snoc { snoc_r with counter = snoc_r.counter + 1; prefix }
150198
in
151-
if not (Atomic.compare_and_set t.tail (T move) (T after)) then
152-
push_head t value (Backoff.once backoff)
199+
if not (Atomic.compare_and_set t.tail (T move) (T after)) then begin
200+
backoff t;
201+
push_head t value
202+
end
153203
end
154-
else
204+
else begin
155205
let tail = Tail { counter = snoc_r.counter; move } in
156-
let backoff =
157-
if Atomic.compare_and_set t.tail (T move) (T tail) then backoff
158-
else Backoff.once backoff
159-
in
160-
push_head t value backoff
206+
if not (Atomic.compare_and_set t.tail (T move) (T tail)) then
207+
backoff t;
208+
push_head t value
209+
end
161210
| T (Tail tail_r) as prefix -> begin
162211
match tail_r.move with
163212
| Used ->
164213
if Atomic.get t.head == head then begin
165214
let tail =
166215
Snoc { counter = tail_r.counter + 1; value; prefix }
167216
in
168-
if not (Atomic.compare_and_set t.tail prefix (T tail)) then
169-
push_head t value (Backoff.once backoff)
217+
if not (Atomic.compare_and_set t.tail prefix (T tail)) then begin
218+
backoff t;
219+
push_head t value
220+
end
170221
end
171-
else push_head t value backoff
222+
else push_head t value
172223
| Snoc move_r as move ->
173224
begin match Atomic.get t.head with
174225
| H (Head head_r as head) when head_r.counter < move_r.counter ->
175226
let after = rev move in
176-
if
177-
Atomic.fenceless_get t.head == H head
178-
&& Atomic.compare_and_set t.head (H head) (H after)
179-
then tail_r.move <- Used
227+
if Atomic.fenceless_get t.head == H head then
228+
if Atomic.compare_and_set t.head (H head) (H after) then
229+
tail_r.move <- Used
230+
else backoff t
180231
| _ -> tail_r.move <- Used
181232
end;
182-
push_head t value backoff
233+
push_head t value
183234
end
184235
end
185236

@@ -193,20 +244,18 @@ let[@inline] length t =
193244
tail := Atomic.fenceless_get t_tail;
194245
!head != Atomic.get t_head
195246
do
196-
()
247+
backoff_unless_alone t
197248
done;
198249
let head_at =
199250
match !head with H (Cons r) -> r.counter | H (Head r) -> r.counter
200251
in
201252
let tail_at =
202253
match !tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
203254
in
204-
tail_at - head_at + 1
255+
let n = tail_at - head_at + 1 in
256+
if n = 0 then backoff_unless_alone t;
257+
n
205258

206259
let[@inline] is_empty t = length t == 0
207-
let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head)
208-
209-
let[@inline] push t value =
210-
push t value Backoff.default (Atomic.fenceless_get t.tail)
211-
212-
let[@inline] push_head t value = push_head t value Backoff.default
260+
let[@inline] pop_exn t = pop t (Atomic.fenceless_get t.head)
261+
let[@inline] push t value = push t value (Atomic.fenceless_get t.tail)

lib/picos_aux.mpscq/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(name picos_aux_mpscq)
33
(public_name picos_aux.mpscq)
4-
(libraries backoff multicore-magic))
4+
(libraries picos_aux.adaptive_backoff multicore-magic))
55

66
(mdx
77
(package picos_meta)

0 commit comments

Comments
 (0)