/
queue_mpsc.zig
116 lines (101 loc) · 3.95 KB
/
queue_mpsc.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
const std = @import("std");
const assert = std.debug.assert;
/// An intrusive MPSC (multi-provider, single consumer) queue implementation.
/// The type T must have a field "next" of type `?*T`.
///
/// This is an implementatin of a Vyukov Queue[1].
/// TODO(mitchellh): I haven't audited yet if I got all the atomic operations
/// correct. I was short term more focused on getting something that seemed
/// to work; I need to make sure it actually works.
///
/// For those unaware, an intrusive variant of a data structure is one in which
/// the data type in the list has the pointer to the next element, rather
/// than a higher level "node" or "container" type. The primary benefit
/// of this (and the reason we implement this) is that it defers all memory
/// management to the caller: the data structure implementation doesn't need
/// to allocate "nodes" to contain each element. Instead, the caller provides
/// the element and how its allocated is up to them.
///
/// [1]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
pub fn Intrusive(comptime T: type) type {
return struct {
const Self = @This();
/// Head is the front of the queue and tail is the back of the queue.
head: *T,
tail: *T,
stub: T,
/// Initialize the queue. This requires a stable pointer to itself.
/// This must be called before the queue is used concurrently.
pub fn init(self: *Self) void {
self.head = &self.stub;
self.tail = &self.stub;
self.stub.next = null;
}
/// Push an item onto the queue. This can be called by any number
/// of producers.
pub fn push(self: *Self, v: *T) void {
@atomicStore(?*T, &v.next, null, .unordered);
const prev = @atomicRmw(*T, &self.head, .Xchg, v, .acq_rel);
@atomicStore(?*T, &prev.next, v, .release);
}
/// Pop the first in element from the queue. This must be called
/// by only a single consumer at any given time.
pub fn pop(self: *Self) ?*T {
var tail = @atomicLoad(*T, &self.tail, .unordered);
var next_ = @atomicLoad(?*T, &tail.next, .acquire);
if (tail == &self.stub) {
const next = next_ orelse return null;
@atomicStore(*T, &self.tail, next, .unordered);
tail = next;
next_ = @atomicLoad(?*T, &tail.next, .acquire);
}
if (next_) |next| {
@atomicStore(*T, &self.tail, next, .release);
tail.next = null;
return tail;
}
const head = @atomicLoad(*T, &self.head, .unordered);
if (tail != head) return null;
self.push(&self.stub);
next_ = @atomicLoad(?*T, &tail.next, .acquire);
if (next_) |next| {
@atomicStore(*T, &self.tail, next, .unordered);
tail.next = null;
return tail;
}
return null;
}
};
}
test Intrusive {
const testing = std.testing;
// Types
const Elem = struct {
const Self = @This();
next: ?*Self = null,
};
const Queue = Intrusive(Elem);
var q: Queue = undefined;
q.init();
// Elems
var elems: [10]Elem = .{.{}} ** 10;
// One
try testing.expect(q.pop() == null);
q.push(&elems[0]);
try testing.expect(q.pop().? == &elems[0]);
try testing.expect(q.pop() == null);
// Two
try testing.expect(q.pop() == null);
q.push(&elems[0]);
q.push(&elems[1]);
try testing.expect(q.pop().? == &elems[0]);
try testing.expect(q.pop().? == &elems[1]);
try testing.expect(q.pop() == null);
// // Interleaved
try testing.expect(q.pop() == null);
q.push(&elems[0]);
try testing.expect(q.pop().? == &elems[0]);
q.push(&elems[1]);
try testing.expect(q.pop().? == &elems[1]);
try testing.expect(q.pop() == null);
}