Skip to content

Commit dae00c0

Browse files
committed
New MPMC queue using cooperative pointer reversal
1 parent 1666b8c commit dae00c0

File tree

1 file changed

+66
-189
lines changed

1 file changed

+66
-189
lines changed
Lines changed: 66 additions & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -1,208 +1,85 @@
11
module Atomic = Multicore_magic.Transparent_atomic
22

3-
type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }
3+
type 'a node = { mutable next : 'a node; mutable value : 'a; counter : int }
4+
type 'a t = { head : 'a node Atomic.t; tail : 'a node Atomic.t }
45

5-
and ('a, _) tdt =
6-
| Cons : {
7-
counter : int;
8-
value : 'a;
9-
suffix : 'a head;
10-
}
11-
-> ('a, [> `Cons ]) tdt
12-
| Head : { counter : int } -> ('a, [> `Head ]) tdt
13-
| Snoc : {
14-
counter : int;
15-
prefix : 'a tail;
16-
value : 'a;
17-
}
18-
-> ('a, [> `Snoc ]) tdt
19-
| Tail : {
20-
counter : int;
21-
mutable move : ('a, [ `Snoc | `Used ]) tdt;
22-
}
23-
-> ('a, [> `Tail ]) tdt
24-
| Used : ('a, [> `Used ]) tdt
6+
let[@inline] rec fix next work =
7+
let mystery = work.next in
8+
if mystery != next then begin
9+
work.next <- next;
10+
if mystery.counter = work.counter - 1 then fix work mystery
11+
end
2512

26-
and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
27-
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]
13+
let[@inline] fix next work =
14+
let mystery = work.next in
15+
if mystery != next then begin
16+
work.next <- next;
17+
if mystery.counter = work.counter - 1 then fix work mystery
18+
end
2819

29-
let create ?padded () =
30-
let head =
31-
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded
32-
in
33-
let tail =
34-
Atomic.make (T (Tail { counter = 0; move = Used }))
35-
|> Multicore_magic.copy_as ?padded
36-
in
37-
Multicore_magic.copy_as ?padded { head; tail }
38-
39-
let rec rev (suffix : (_, [< `Cons ]) tdt) = function
40-
| T (Snoc { counter; prefix; value }) ->
41-
rev (Cons { counter; value; suffix = H suffix }) prefix
42-
| T (Tail _) -> suffix
43-
44-
let rev = function
45-
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
46-
rev
47-
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
48-
prefix
49-
50-
let rec push t value backoff = function
51-
| T (Snoc snoc_r) as prefix ->
52-
let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in
53-
if not (Atomic.compare_and_set t.tail prefix (T after)) then
54-
let backoff = Backoff.once backoff in
55-
push t value backoff (Atomic.fenceless_get t.tail)
56-
| T (Tail tail_r) as prefix -> begin
57-
match tail_r.move with
58-
| Used ->
59-
let after = Snoc { counter = tail_r.counter + 1; prefix; value } in
60-
if not (Atomic.compare_and_set t.tail prefix (T after)) then
61-
let backoff = Backoff.once backoff in
62-
push t value backoff (Atomic.fenceless_get t.tail)
63-
| Snoc move_r as move ->
64-
begin
65-
match Atomic.get t.head with
66-
| H (Head head_r as head) when head_r.counter < move_r.counter ->
67-
let after = rev move in
68-
if
69-
Atomic.fenceless_get t.head == H head
70-
&& Atomic.compare_and_set t.head (H head) (H after)
71-
then tail_r.move <- Used
72-
| _ -> tail_r.move <- Used
73-
end;
74-
push t value backoff (Atomic.get t.tail)
75-
end
20+
let rec push t value backoff =
21+
let before = Atomic.get t.tail in
22+
let after = { next = before; value; counter = before.counter + 1 } in
23+
if Atomic.compare_and_set t.tail before after then fix after before
24+
else push t value (Backoff.once backoff)
7625

7726
exception Empty
7827

79-
let rec pop t backoff = function
80-
| H (Cons cons_r as cons) ->
81-
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
82-
else
83-
let backoff = Backoff.once backoff in
84-
pop t backoff (Atomic.fenceless_get t.head)
85-
| H (Head head_r as head) -> begin
86-
match Atomic.get t.tail with
87-
| T (Snoc snoc_r as move) ->
88-
if head_r.counter = snoc_r.counter then
89-
if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then
90-
snoc_r.value
91-
else pop t backoff (Atomic.fenceless_get t.head)
92-
else
93-
let (Tail tail_r as tail : (_, [ `Tail ]) tdt) =
94-
Tail { counter = snoc_r.counter; move }
95-
in
96-
let new_head = Atomic.get t.head in
97-
if new_head != H head then pop t backoff new_head
98-
else if Atomic.compare_and_set t.tail (T move) (T tail) then
99-
let (Cons cons_r) = rev move in
100-
let after = cons_r.suffix in
101-
let new_head = Atomic.get t.head in
102-
if new_head != H head then pop t backoff new_head
103-
else if Atomic.compare_and_set t.head (H head) after then begin
104-
tail_r.move <- Used;
105-
cons_r.value
106-
end
107-
else
108-
let backoff = Backoff.once backoff in
109-
pop t backoff (Atomic.fenceless_get t.head)
110-
else pop t backoff (Atomic.fenceless_get t.head)
111-
| T (Tail tail_r) -> begin
112-
match tail_r.move with
113-
| Used ->
114-
let new_head = Atomic.get t.head in
115-
if new_head != H head then pop t backoff new_head
116-
else raise_notrace Empty
117-
| Snoc move_r as move ->
118-
if head_r.counter < move_r.counter then
119-
let (Cons cons_r) = rev move in
120-
let after = cons_r.suffix in
121-
let new_head = Atomic.get t.head in
122-
if new_head != H head then pop t backoff new_head
123-
else if Atomic.compare_and_set t.head (H head) after then begin
124-
tail_r.move <- Used;
125-
cons_r.value
126-
end
127-
else
128-
let backoff = Backoff.once backoff in
129-
pop t backoff (Atomic.fenceless_get t.head)
130-
else
131-
let new_head = Atomic.get t.head in
132-
if new_head != H head then pop t backoff new_head
133-
else raise_notrace Empty
134-
end
28+
let rec pop_exn t backoff =
29+
let before = Atomic.get t.head in
30+
let after = before.next in
31+
if before.counter = after.counter - 1 then
32+
let value = after.value in
33+
if Atomic.compare_and_set t.head before after then begin
34+
after.value <- Obj.magic ();
35+
value
36+
end
37+
else pop_exn t (Backoff.once backoff)
38+
else
39+
let tail = Atomic.get t.tail in
40+
if tail != before then begin
41+
let mystery = tail.next in
42+
if mystery.counter = tail.counter - 1 then fix tail mystery;
43+
pop_exn t Backoff.default
13544
end
45+
else raise_notrace Empty
13646

13747
let rec push_head t value backoff =
138-
match Atomic.get t.head with
139-
| H (Cons cons_r) as suffix ->
140-
let after = Cons { counter = cons_r.counter - 1; value; suffix } in
141-
if not (Atomic.compare_and_set t.head suffix (H after)) then
142-
push_head t value (Backoff.once backoff)
143-
| H (Head head_r) as head -> begin
144-
match Atomic.get t.tail with
145-
| T (Snoc snoc_r as move) ->
146-
if Atomic.get t.head != head then push_head t value backoff
147-
else if head_r.counter = snoc_r.counter then begin
148-
let prefix = T (Snoc { snoc_r with value }) in
149-
let after =
150-
Snoc { snoc_r with counter = snoc_r.counter + 1; prefix }
151-
in
152-
if not (Atomic.compare_and_set t.tail (T move) (T after)) then
153-
push_head t value (Backoff.once backoff)
154-
end
155-
else
156-
let tail = Tail { counter = snoc_r.counter; move } in
157-
let backoff =
158-
if Atomic.compare_and_set t.tail (T move) (T tail) then backoff
159-
else Backoff.once backoff
160-
in
161-
push_head t value backoff
162-
| T (Tail tail_r) as prefix -> begin
163-
match tail_r.move with
164-
| Used ->
165-
if Atomic.get t.head == head then begin
166-
let tail =
167-
Snoc { counter = tail_r.counter + 1; value; prefix }
168-
in
169-
if not (Atomic.compare_and_set t.tail prefix (T tail)) then
170-
push_head t value (Backoff.once backoff)
171-
end
172-
else push_head t value backoff
173-
| Snoc move_r as move ->
174-
begin
175-
match Atomic.get t.head with
176-
| H (Head head_r as head) when head_r.counter < move_r.counter
177-
->
178-
let after = rev move in
179-
if
180-
Atomic.fenceless_get t.head == H head
181-
&& Atomic.compare_and_set t.head (H head) (H after)
182-
then tail_r.move <- Used
183-
| _ -> tail_r.move <- Used
184-
end;
185-
push_head t value backoff
186-
end
48+
let before = Atomic.get t.head in
49+
let next = before.next in
50+
if before.counter + 1 = next.counter then begin
51+
let node = { value; next; counter = before.counter } in
52+
let after =
53+
{ value = Obj.magic (); next = node; counter = before.counter - 1 }
54+
in
55+
if not (Atomic.compare_and_set t.head before after) then
56+
push_head t value (Backoff.once backoff)
57+
end
58+
else
59+
let tail = Atomic.get t.tail in
60+
if tail != before then begin
61+
let mystery = tail.next in
62+
if mystery.counter = tail.counter - 1 then fix tail mystery;
63+
push_head t value Backoff.default
18764
end
65+
else
66+
let node = { value; next = tail; counter = tail.counter + 1 } in
67+
if Atomic.compare_and_set t.tail tail node then fix node tail
68+
else push_head t value (Backoff.once backoff)
18869

18970
let rec length t =
19071
let head = Atomic.get t.head in
19172
let tail = Atomic.fenceless_get t.tail in
192-
if head != Atomic.get t.head then length t
193-
else
194-
let head_at =
195-
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
196-
in
197-
let tail_at =
198-
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
199-
in
200-
tail_at - head_at + 1
73+
if head == Atomic.get t.head then tail.counter - head.counter else length t
20174

202-
let[@inline] is_empty t = length t == 0
203-
let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head)
204-
205-
let[@inline] push t value =
206-
push t value Backoff.default (Atomic.fenceless_get t.tail)
75+
let create ?padded () =
76+
let node = { next = Obj.magic (); value = Obj.magic (); counter = 0 } in
77+
node.next <- node;
78+
let head = Atomic.make node |> Multicore_magic.copy_as ?padded in
79+
let tail = Atomic.make node |> Multicore_magic.copy_as ?padded in
80+
{ head; tail } |> Multicore_magic.copy_as ?padded
20781

82+
let[@inline] is_empty t = length t == 0
83+
let[@inline] push t value = push t value Backoff.default
84+
let[@inline] pop_exn t = pop_exn t Backoff.default
20885
let[@inline] push_head t value = push_head t value Backoff.default

0 commit comments

Comments
 (0)