/
lock_free_queue.h
106 lines (84 loc) · 4.1 KB
/
lock_free_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Copyright (C) 2014-2016,2019 Sean Middleditch, all rights reserverd.
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
#pragma once
#include <type_traits>
#include <atomic>
#include <cstdint>
namespace up {
template <typename T, std::size_t CacheLineWidth = 64>
struct alignas(CacheLineWidth) AlignedAtomic : std::atomic<T> {
using std::atomic<T>::atomic;
char _padding[CacheLineWidth - sizeof(std::atomic<T>)];
};
template <typename T, std::size_t Capacity = 512, std::size_t CacheLineWidth = 64>
class LockFreeQueue {
static constexpr std::uint32_t kBufferSize = Capacity;
static constexpr std::uint32_t kBufferMask = kBufferSize - 1;
static_assert((kBufferSize & kBufferMask) == 0, "LockFreeQueue size must be a power of 2");
public:
LockFreeQueue();
~LockFreeQueue();
LockFreeQueue(LockFreeQueue const&) = delete;
LockFreeQueue& operator=(LockFreeQueue const&) = delete;
constexpr auto capacity() const noexcept { return Capacity; }
template <typename InsertT>
[[nodiscard]] inline bool tryEnque(InsertT&& value);
[[nodiscard]] inline bool tryDeque(T& out);
private:
AlignedAtomic<std::uint32_t, CacheLineWidth> _enque;
AlignedAtomic<std::uint32_t, CacheLineWidth> _deque;
AlignedAtomic<std::uint32_t, CacheLineWidth>* _sequence = nullptr;
std::aligned_storage_t<sizeof(T), alignof(T)>* _buffer = nullptr;
};
template <typename T, std::size_t Capacity, std::size_t CacheLineWidth>
LockFreeQueue<T, Capacity, CacheLineWidth>::LockFreeQueue()
: _enque(0)
, _deque(0)
, _sequence(new AlignedAtomic<std::uint32_t, CacheLineWidth>[kBufferSize])
, _buffer(new std::aligned_storage_t<sizeof(T), alignof(T)>[kBufferSize])
{
for (std::uint32_t i = 0; i != kBufferSize; ++i)
_sequence[i].store(i, std::memory_order_relaxed);
}
template <typename T, std::size_t Capacity, std::size_t CacheLineWidth>
LockFreeQueue<T, Capacity, CacheLineWidth>::~LockFreeQueue() {
delete[] _sequence;
delete[] _buffer;
}
template <typename T, std::size_t Capacity, std::size_t CacheLineWidth>
template <typename InsertT>
bool LockFreeQueue<T, Capacity, CacheLineWidth>::tryEnque(InsertT&& value) {
std::uint32_t target = _enque.load(std::memory_order_relaxed);
std::uint32_t id = _sequence[target & kBufferMask].load(std::memory_order_acquire);
std::int32_t delta = id - target;
while (!(delta == 0 && _enque.compare_exchange_weak(target, target + 1, std::memory_order_relaxed))) {
if (delta < 0) {
return false;
}
target = _enque.load(std::memory_order_relaxed);
id = _sequence[target & kBufferMask].load(std::memory_order_acquire);
delta = id - target;
}
new (&_buffer[target & kBufferMask]) T(std::forward<InsertT>(value));
_sequence[target & kBufferMask].store(target + 1, std::memory_order_release);
return true;
}
template <typename T, std::size_t Capacity, std::size_t CacheLineWidth>
bool LockFreeQueue<T, Capacity, CacheLineWidth>::tryDeque(T& out) {
std::uint32_t target = _deque.load(std::memory_order_relaxed);
std::uint32_t id = _sequence[target & kBufferMask].load(std::memory_order_acquire);
std::int32_t delta = id - (target + 1);
while (!(delta == 0 && _deque.compare_exchange_weak(target, target + 1, std::memory_order_relaxed))) {
if (delta < 0) {
return false;
}
target = _deque.load(std::memory_order_relaxed);
id = _sequence[target & kBufferMask].load(std::memory_order_acquire);
delta = id - (target + 1);
}
out = std::move(reinterpret_cast<T&>(_buffer[target & kBufferMask]));
reinterpret_cast<T&>(_buffer[target & kBufferMask]).~T();
_sequence[target & kBufferMask].store(target + kBufferMask + 1, std::memory_order_release);
return true;
}
} // namespace up