Skip to content

Commit 12ffd79

Browse files
committed
Use adaptive backoff in mpmcq
1 parent 21448e7 commit 12ffd79

File tree

13 files changed

+200
-96
lines changed

13 files changed

+200
-96
lines changed

bench/bench_binaries.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ let paths =
1515
lib "picos";
1616
lib "picos.domain";
1717
lib "picos.thread";
18+
lib "picos_aux.adaptive_backoff";
1819
lib "picos_aux.htbl";
1920
lib "picos_aux.mpmcq";
2021
lib "picos_aux.mpscq";

bench/bench_mpmcq.ml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ let run_one ~budgetf ~n_adders ~n_takers () =
5656
if 0 < n then begin
5757
match Mpmcq.pop_exn t with
5858
| _ -> loop (n - 1)
59-
| exception Mpmcq.Empty ->
60-
Backoff.once Backoff.default |> ignore;
61-
loop n
59+
| exception Mpmcq.Empty -> loop n
6260
end
6361
else work ()
6462
in

bench/bench_mpscq.ml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ let run_one ~budgetf ~n_adders () =
5151
if 0 < n then
5252
match Mpscq.pop_exn t with
5353
| _ -> loop (n - 1)
54-
| exception Mpscq.Empty ->
55-
Backoff.once Backoff.default |> ignore;
56-
loop n
54+
| exception Mpscq.Empty -> loop n
5755
in
5856
loop n_msgs
5957
in
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
let has_domains = 1 < Domain.recommended_domain_count ()
2+
let n = 128 * 4
3+
let counters = Array.init n (fun _ -> Atomic.make 0)
4+
5+
let[@inline never] once_at counter ~log_scale =
6+
if has_domains then begin
7+
let n_contending_threads = Atomic.fetch_and_add counter 1 + 1 in
8+
let n = ref (Random.int ((n_contending_threads lsl log_scale) + 0)) in
9+
while 0 <= !n do
10+
Domain.cpu_relax ();
11+
decr n
12+
done;
13+
Atomic.decr counter
14+
end
15+
16+
let[@inline never] once ~random_key ~log_scale =
17+
let i = random_key land (n - 1) in
18+
let counter = Array.unsafe_get counters i in
19+
once_at counter ~log_scale
20+
21+
let[@inline] once_unless_alone ~random_key ~log_scale =
22+
let i = random_key land (n - 1) in
23+
let counter = Array.unsafe_get counters i in
24+
if 0 <> Atomic.get counter then once_at counter ~log_scale
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
val once : random_key:int -> log_scale:int -> unit
2+
val once_unless_alone : random_key:int -> log_scale:int -> unit
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
let cpu_relax = Fun.id
2+
let recommended_domain_count () = 1
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
(library
2+
(name picos_aux_adaptive_backoff)
3+
(public_name picos_aux.adaptive_backoff))
4+
5+
(rule
6+
(package picos_aux)
7+
(targets domain.ml)
8+
(deps domain.ocaml4.ml)
9+
(enabled_if
10+
(< %{ocaml_version} 5.0.0))
11+
(action
12+
(progn
13+
(copy domain.ocaml4.ml domain.ml))))

lib/picos_aux.mpmcq/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(name picos_aux_mpmcq)
33
(public_name picos_aux.mpmcq)
4-
(libraries backoff multicore-magic))
4+
(libraries picos_aux.adaptive_backoff multicore-magic))
55

66
(mdx
77
(package picos_meta)
Lines changed: 102 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
open Picos_aux_adaptive_backoff
12
module Atomic = Multicore_magic.Transparent_atomic
23

3-
type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }
4+
type 'a t = {
5+
random_key : int;
6+
head : 'a head Atomic.t;
7+
tail : 'a tail Atomic.t;
8+
}
49

510
and ('a, _) tdt =
611
| Cons : {
@@ -27,14 +32,15 @@ and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
2732
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]
2833

2934
let create ?padded () =
35+
let random_key = Int64.to_int (Random.bits64 ()) in
3036
let head =
3137
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded
3238
in
3339
let tail =
3440
Atomic.make (T (Tail { counter = 0; move = Used }))
3541
|> Multicore_magic.copy_as ?padded
3642
in
37-
Multicore_magic.copy_as ?padded { head; tail }
43+
Multicore_magic.copy_as ?padded { random_key; head; tail }
3844

3945
let rec rev (suffix : (_, [< `Cons ]) tdt) = function
4046
| T (Snoc { counter; prefix; value }) ->
@@ -47,139 +53,179 @@ let rev = function
4753
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
4854
prefix
4955

50-
let rec push t value backoff = function
56+
let[@inline] backoff { random_key; _ } =
57+
Adaptive_backoff.once ~random_key ~log_scale:10
58+
59+
let[@inline] backoff_unless_alone { random_key; _ } =
60+
Adaptive_backoff.once_unless_alone ~random_key ~log_scale:10
61+
62+
let rec push t value = function
5163
| T (Snoc snoc_r) as prefix ->
5264
let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in
53-
if not (Atomic.compare_and_set t.tail prefix (T after)) then
54-
let backoff = Backoff.once backoff in
55-
push t value backoff (Atomic.fenceless_get t.tail)
65+
if not (Atomic.compare_and_set t.tail prefix (T after)) then begin
66+
backoff t;
67+
push t value (Atomic.fenceless_get t.tail)
68+
end
5669
| T (Tail tail_r) as prefix -> begin
5770
match tail_r.move with
5871
| Used ->
5972
let after = Snoc { counter = tail_r.counter + 1; prefix; value } in
60-
if not (Atomic.compare_and_set t.tail prefix (T after)) then
61-
let backoff = Backoff.once backoff in
62-
push t value backoff (Atomic.fenceless_get t.tail)
73+
if not (Atomic.compare_and_set t.tail prefix (T after)) then begin
74+
backoff t;
75+
push t value (Atomic.fenceless_get t.tail)
76+
end
6377
| Snoc move_r as move ->
6478
begin match Atomic.get t.head with
6579
| H (Head head_r as head) when head_r.counter < move_r.counter ->
6680
let after = rev move in
67-
if
68-
Atomic.fenceless_get t.head == H head
69-
&& Atomic.compare_and_set t.head (H head) (H after)
70-
then tail_r.move <- Used
81+
if Atomic.fenceless_get t.head == H head then
82+
if Atomic.compare_and_set t.head (H head) (H after) then
83+
tail_r.move <- Used
84+
else backoff t
7185
| _ -> tail_r.move <- Used
7286
end;
73-
push t value backoff (Atomic.get t.tail)
87+
push t value (Atomic.get t.tail)
7488
end
7589

7690
exception Empty
7791

78-
let rec pop t backoff = function
92+
let rec pop t = function
7993
| H (Cons cons_r as cons) ->
8094
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
81-
else
82-
let backoff = Backoff.once backoff in
83-
pop t backoff (Atomic.fenceless_get t.head)
95+
else begin
96+
backoff t;
97+
pop t (Atomic.fenceless_get t.head)
98+
end
8499
| H (Head head_r as head) -> begin
85100
match Atomic.get t.tail with
86101
| T (Snoc snoc_r as move) ->
87102
if head_r.counter = snoc_r.counter then
88103
if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then
89104
snoc_r.value
90-
else pop t backoff (Atomic.fenceless_get t.head)
105+
else begin
106+
backoff t;
107+
pop t (Atomic.fenceless_get t.head)
108+
end
91109
else
92110
let (Tail tail_r as tail : (_, [ `Tail ]) tdt) =
93111
Tail { counter = snoc_r.counter; move }
94112
in
95113
let new_head = Atomic.get t.head in
96-
if new_head != H head then pop t backoff new_head
114+
if new_head != H head then begin
115+
backoff t;
116+
pop t new_head
117+
end
97118
else if Atomic.compare_and_set t.tail (T move) (T tail) then
98119
let (Cons cons_r) = rev move in
99120
let after = cons_r.suffix in
100121
let new_head = Atomic.get t.head in
101-
if new_head != H head then pop t backoff new_head
122+
if new_head != H head then begin
123+
backoff t;
124+
pop t new_head
125+
end
102126
else if Atomic.compare_and_set t.head (H head) after then begin
103127
tail_r.move <- Used;
104128
cons_r.value
105129
end
106-
else
107-
let backoff = Backoff.once backoff in
108-
pop t backoff (Atomic.fenceless_get t.head)
109-
else pop t backoff (Atomic.fenceless_get t.head)
130+
else begin
131+
backoff t;
132+
pop t (Atomic.fenceless_get t.head)
133+
end
134+
else pop t (Atomic.fenceless_get t.head)
110135
| T (Tail tail_r) -> begin
111136
match tail_r.move with
112137
| Used ->
113138
let new_head = Atomic.get t.head in
114-
if new_head != H head then pop t backoff new_head
115-
else raise_notrace Empty
139+
if new_head != H head then begin
140+
backoff t;
141+
pop t new_head
142+
end
143+
else begin
144+
backoff_unless_alone t;
145+
raise_notrace Empty
146+
end
116147
| Snoc move_r as move ->
117148
if head_r.counter < move_r.counter then
118149
let (Cons cons_r) = rev move in
119150
let after = cons_r.suffix in
120151
let new_head = Atomic.get t.head in
121-
if new_head != H head then pop t backoff new_head
152+
if new_head != H head then begin
153+
backoff t;
154+
pop t new_head
155+
end
122156
else if Atomic.compare_and_set t.head (H head) after then begin
123157
tail_r.move <- Used;
124158
cons_r.value
125159
end
126-
else
127-
let backoff = Backoff.once backoff in
128-
pop t backoff (Atomic.fenceless_get t.head)
160+
else begin
161+
backoff t;
162+
pop t (Atomic.fenceless_get t.head)
163+
end
129164
else
130165
let new_head = Atomic.get t.head in
131-
if new_head != H head then pop t backoff new_head
132-
else raise_notrace Empty
166+
if new_head != H head then begin
167+
backoff t;
168+
pop t new_head
169+
end
170+
else begin
171+
backoff_unless_alone t;
172+
raise_notrace Empty
173+
end
133174
end
134175
end
135176

136-
let rec push_head t value backoff =
177+
let rec push_head t value =
137178
match Atomic.get t.head with
138179
| H (Cons cons_r) as suffix ->
139180
let after = Cons { counter = cons_r.counter - 1; value; suffix } in
140-
if not (Atomic.compare_and_set t.head suffix (H after)) then
141-
push_head t value (Backoff.once backoff)
181+
if not (Atomic.compare_and_set t.head suffix (H after)) then begin
182+
backoff t;
183+
push_head t value
184+
end
142185
| H (Head head_r) as head -> begin
143186
match Atomic.get t.tail with
144187
| T (Snoc snoc_r as move) ->
145-
if Atomic.get t.head != head then push_head t value backoff
188+
if Atomic.get t.head != head then push_head t value
146189
else if head_r.counter = snoc_r.counter then begin
147190
let prefix = T (Snoc { snoc_r with value }) in
148191
let after =
149192
Snoc { snoc_r with counter = snoc_r.counter + 1; prefix }
150193
in
151-
if not (Atomic.compare_and_set t.tail (T move) (T after)) then
152-
push_head t value (Backoff.once backoff)
194+
if not (Atomic.compare_and_set t.tail (T move) (T after)) then begin
195+
backoff t;
196+
push_head t value
197+
end
153198
end
154-
else
199+
else begin
155200
let tail = Tail { counter = snoc_r.counter; move } in
156-
let backoff =
157-
if Atomic.compare_and_set t.tail (T move) (T tail) then backoff
158-
else Backoff.once backoff
159-
in
160-
push_head t value backoff
201+
if not (Atomic.compare_and_set t.tail (T move) (T tail)) then
202+
backoff t;
203+
push_head t value
204+
end
161205
| T (Tail tail_r) as prefix -> begin
162206
match tail_r.move with
163207
| Used ->
164208
if Atomic.get t.head == head then begin
165209
let tail =
166210
Snoc { counter = tail_r.counter + 1; value; prefix }
167211
in
168-
if not (Atomic.compare_and_set t.tail prefix (T tail)) then
169-
push_head t value (Backoff.once backoff)
212+
if not (Atomic.compare_and_set t.tail prefix (T tail)) then begin
213+
backoff t;
214+
push_head t value
215+
end
170216
end
171-
else push_head t value backoff
217+
else push_head t value
172218
| Snoc move_r as move ->
173219
begin match Atomic.get t.head with
174220
| H (Head head_r as head) when head_r.counter < move_r.counter ->
175221
let after = rev move in
176-
if
177-
Atomic.fenceless_get t.head == H head
178-
&& Atomic.compare_and_set t.head (H head) (H after)
179-
then tail_r.move <- Used
222+
if Atomic.fenceless_get t.head == H head then
223+
if Atomic.compare_and_set t.head (H head) (H after) then
224+
tail_r.move <- Used
225+
else backoff t
180226
| _ -> tail_r.move <- Used
181227
end;
182-
push_head t value backoff
228+
push_head t value
183229
end
184230
end
185231

@@ -193,7 +239,7 @@ let[@inline] length t =
193239
tail := Atomic.fenceless_get t_tail;
194240
!head != Atomic.get t_head
195241
do
196-
()
242+
backoff_unless_alone t
197243
done;
198244
let head_at =
199245
match !head with H (Cons r) -> r.counter | H (Head r) -> r.counter
@@ -204,9 +250,5 @@ let[@inline] length t =
204250
tail_at - head_at + 1
205251

206252
let[@inline] is_empty t = length t == 0
207-
let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head)
208-
209-
let[@inline] push t value =
210-
push t value Backoff.default (Atomic.fenceless_get t.tail)
211-
212-
let[@inline] push_head t value = push_head t value Backoff.default
253+
let[@inline] pop_exn t = pop t (Atomic.fenceless_get t.head)
254+
let[@inline] push t value = push t value (Atomic.fenceless_get t.tail)

lib/picos_aux.mpscq/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(name picos_aux_mpscq)
33
(public_name picos_aux.mpscq)
4-
(libraries backoff multicore-magic))
4+
(libraries picos_aux.adaptive_backoff multicore-magic))
55

66
(mdx
77
(package picos_meta)

0 commit comments

Comments
 (0)