-
Notifications
You must be signed in to change notification settings - Fork 9
/
stoptoken.d
364 lines (301 loc) · 10.3 KB
/
stoptoken.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
module concurrency.stoptoken;
// originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis
// it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0
class StopSource {
private stop_state state;
bool stop() nothrow @safe {
return state.request_stop();
}
bool stop() nothrow @trusted shared {
return (cast(StopSource)this).state.request_stop();
}
bool isStopRequested() nothrow @safe @nogc {
return state.is_stop_requested();
}
bool isStopRequested() nothrow @trusted @nogc shared {
return (cast(StopSource)this).isStopRequested();
}
}
struct StopToken {
private StopSource source;
this(StopSource source) nothrow @safe @nogc {
this.source = source;
}
bool isStopRequested() nothrow @safe @nogc {
return source.isStopRequested();
}
enum isStopPossible = true;
}
struct NeverStopToken {
enum isStopRequested = false;
enum isStopPossible = false;
}
enum isStopToken(T) = true; // TODO:
StopCallback onStop(StopToken)(auto ref StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe if (isStopToken!StopToken) {
import std.traits : hasMember;
auto cb = new StopCallback(callback);
static if (stopToken.isStopPossible && hasMember!(StopToken, "source")) {
if (stopToken.source.state.try_add_callback(cb, true))
cb.source = stopToken.source;
}
return cb;
}
class StopCallback {
void dispose() nothrow @trusted @nogc {
import core.atomic : cas;
if (source is null)
return;
auto local = source;
static if (__traits(compiles, cas(&source, local, null))) {
if (!cas(&source, local, null)) {
assert(source is null);
return;
}
} else {
if (!cas(cast(shared)&source, cast(shared)local, null)) {
assert(source is null);
return;
}
}
local.state.remove_callback(this);
}
private:
this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc {
this.callback = callback;
}
void delegate() nothrow shared @safe callback;
StopSource source;
StopCallback next_ = null;
StopCallback* prev_ = null;
bool* isRemoved_ = null;
shared bool callbackFinishedExecuting = false;
void execute() nothrow @safe {
callback();
}
}
void spin_yield() nothrow @trusted @nogc {
// TODO: could use the pause asm instruction
// it is available in LDC as intrinsic... but not in DMD
import core.thread : Thread;
Thread.yield();
}
private struct stop_state {
import core.thread : Thread;
import core.atomic : atomicOp, atomicStore, atomicLoad, MemoryOrder;
static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () {
import core.internal.atomic : atomicCompareExchangeWeakNoResult;
}))
import core.atomic : casWeak;
else
auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe {
import core.atomic : cas;
static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis)))
return cas!(M1, M2)(here, ifThis, writeThis);
else
return cas(here, ifThis, writeThis);
}
public:
void add_token_reference() nothrow @safe @nogc {
state_.atomicOp!"+="(token_ref_increment);
}
void remove_token_reference() nothrow @safe @nogc {
auto newState = state_.atomicOp!"-="(token_ref_increment);
if (newState < (token_ref_increment)) {
// delete this;
}
}
void add_source_reference() nothrow @safe @nogc {
state_.atomicOp!"+="(source_ref_increment);
}
void remove_source_reference() nothrow @safe @nogc {
auto newState = state_.atomicOp!"-="(source_ref_increment);
if (newState < (token_ref_increment)) {
// delete this;
}
}
bool request_stop() nothrow @safe {
if (!try_lock_and_signal_until_signalled()) {
// Stop has already been requested.
return false;
}
// Set the 'stop_requested' signal and acquired the lock.
signallingThread_ = Thread.getThis();
while (head_ !is null) {
// Dequeue the head of the queue
auto cb = head_;
head_ = cb.next_;
const bool anyMore = head_ !is null;
if (anyMore) {
(() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime...
}
// Mark this item as removed from the list.
cb.prev_ = null;
// Don't hold lock while executing callback
// so we don't block other threads from deregistering callbacks.
unlock();
// TRICKY: Need to store a flag on the stack here that the callback
// can use to signal that the destructor was executed inline
// during the call. If the destructor was executed inline then
// it's not safe to dereference cb after execute() returns.
// If the destructor runs on some other thread then the other
// thread will block waiting for this thread to signal that the
// callback has finished executing.
bool isRemoved = false;
(() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down.
cb.execute();
if (!isRemoved) {
cb.isRemoved_ = null;
cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true);
}
if (!anyMore) {
// This was the last item in the queue when we dequeued it.
// No more items should be added to the queue after we have
// marked the state as interrupted, only removed from the queue.
// Avoid acquring/releasing the lock in this case.
return true;
}
lock();
}
unlock();
return true;
}
bool is_stop_requested() nothrow @safe @nogc {
return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq));
}
bool is_stop_requestable() nothrow @safe @nogc {
return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq));
}
bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe {
ulong oldState;
do {
goto load_state;
do {
spin_yield();
load_state:
oldState = state_.atomicLoad!(MemoryOrder.acq);
if (is_stop_requested(oldState)) {
cb.execute();
return false;
}
else if (!is_stop_requestable(oldState)) {
return false;
}
}
while (is_locked(oldState));
}
while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag));
// Push callback onto callback list.
cb.next_ = head_;
if (cb.next_ !is null) {
cb.next_.prev_ = &cb.next_;
}
cb.prev_ = &head_;
head_ = cb;
if (incrementRefCountIfSuccessful) {
unlock_and_increment_token_ref_count();
}
else {
unlock();
}
// Successfully added the callback.
return true;
}
void remove_callback(StopCallback cb) nothrow @safe @nogc {
lock();
if (cb.prev_ !is null) {
// Still registered, not yet executed
// Just remove from the list.
*cb.prev_ = cb.next_;
if (cb.next_ !is null) {
cb.next_.prev_ = cb.prev_;
}
unlock_and_decrement_token_ref_count();
return;
}
unlock();
// Callback has either already executed or is executing
// concurrently on another thread.
if (signallingThread_ is Thread.getThis()) {
// Callback executed on this thread or is still currently executing
// and is deregistering itself from within the callback.
if (cb.isRemoved_ !is null) {
// Currently inside the callback, let the request_stop() method
// know the object is about to be destructed and that it should
// not try to access the object when the callback returns.
*cb.isRemoved_ = true;
}
}
else {
// Callback is currently executing on another thread,
// block until it finishes executing.
while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) {
spin_yield();
}
}
remove_token_reference();
}
private:
static bool is_locked(ulong state) nothrow @safe @nogc {
return (state & locked_flag) != 0;
}
static bool is_stop_requested(ulong state) nothrow @safe @nogc {
return (state & stop_requested_flag) != 0;
}
static bool is_stop_requestable(ulong state) nothrow @safe @nogc {
// Interruptible if it has already been interrupted or if there are
// still interrupt_source instances in existence.
return is_stop_requested(state) || (state >= source_ref_increment);
}
bool try_lock_and_signal_until_signalled() nothrow @safe @nogc {
ulong oldState = state_.atomicLoad!(MemoryOrder.acq);
do {
if (is_stop_requested(oldState))
return false;
while (is_locked(oldState)) {
spin_yield();
oldState = state_.atomicLoad!(MemoryOrder.acq);
if (is_stop_requested(oldState))
return false;
}
}
while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState,
oldState | stop_requested_flag | locked_flag));
return true;
}
void lock() nothrow @safe @nogc {
auto oldState = state_.atomicLoad!(MemoryOrder.raw);
do {
while (is_locked(oldState)) {
spin_yield();
oldState = state_.atomicLoad!(MemoryOrder.raw);
}
}
while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState,
oldState | locked_flag));
}
void unlock() nothrow @safe @nogc {
state_.atomicOp!"-="(locked_flag);
}
void unlock_and_increment_token_ref_count() nothrow @safe @nogc {
state_.atomicOp!"-="(locked_flag - token_ref_increment);
}
void unlock_and_decrement_token_ref_count() nothrow @safe @nogc {
auto newState = state_.atomicOp!"-="(locked_flag + token_ref_increment);
// Check if new state is less than token_ref_increment which would
// indicate that this was the last reference.
if (newState < (locked_flag + token_ref_increment)) {
// delete this;
}
}
enum stop_requested_flag = 1L;
enum locked_flag = 2L;
enum token_ref_increment = 4L;
enum source_ref_increment = 1L << 33u;
// bit 0 - stop-requested
// bit 1 - locked
// bits 2-32 - token ref count (31 bits)
// bits 33-63 - source ref count (31 bits)
shared ulong state_ = source_ref_increment;
StopCallback head_ = null;
Thread signallingThread_;
}