/
TaskFaker.kt
413 lines (345 loc) · 12.5 KB
/
TaskFaker.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
/*
* Copyright (C) 2019 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.concurrent
import assertk.assertThat
import assertk.assertions.isEqualTo
import java.io.Closeable
import java.util.AbstractQueue
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.logging.Logger
import kotlin.concurrent.withLock
import okhttp3.OkHttpClient
import okhttp3.TestUtil.threadFactory
/**
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
* deterministic.
*
* This class ensures that at most one thread is running at a time. This is initially the JUnit test
* thread, which yields its execution privilege while calling [runTasks], [runNextTask], or
* [advanceUntil]. These functions don't return until the task threads are all idle.
*
* Task threads release their execution privilege in these ways:
*
* * By yielding in [TaskRunner.Backend.coordinatorWait].
* * By yielding in [BlockingQueue.poll].
* * By completing.
*/
class TaskFaker : Closeable {
@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadHoldsLock() {
if (assertionsEnabled && !taskRunner.lock.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
}
}
@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadDoesntHoldLock() {
if (assertionsEnabled && taskRunner.lock.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
}
}
val logger = Logger.getLogger("TaskFaker." + instance++)
/** Though this executor service may hold many threads, they are not executed concurrently. */
private val tasksExecutor = Executors.newCachedThreadPool(threadFactory("TaskFaker"))
/**
* True if this task faker has ever had multiple tasks scheduled to run concurrently. Guarded by
* [taskRunner].
*/
var isParallel = false
/** Guarded by [taskRunner]. */
var nanoTime = 0L
private set
/** Backlog of tasks to run. Only one task runs at a time. Guarded by [taskRunner]. */
private val serialTaskQueue = ArrayDeque<SerialTask>()
/** The task that's currently executing. Guarded by [taskRunner]. */
private var currentTask: SerialTask = TestThreadSerialTask
/** The coordinator task if it's waiting, and how it will resume. Guarded by [taskRunner]. */
private var waitingCoordinatorTask: SerialTask? = null
private var waitingCoordinatorInterrupted = false
private var waitingCoordinatorNotified = false
/** How many times a new task has been started. Guarded by [taskRunner]. */
private var contextSwitchCount = 0
/** Guarded by [taskRunner]. */
private var activeThreads = 0
/** A task runner that posts tasks to this fake. Tasks won't be executed until requested. */
val taskRunner: TaskRunner =
TaskRunner(
object : TaskRunner.Backend {
override fun execute(
taskRunner: TaskRunner,
runnable: Runnable,
) {
taskRunner.assertThreadHoldsLock()
val queuedTask = RunnableSerialTask(runnable)
serialTaskQueue += queuedTask
isParallel = serialTaskQueue.size > 1
}
override fun nanoTime() = nanoTime
override fun coordinatorNotify(taskRunner: TaskRunner) {
taskRunner.assertThreadHoldsLock()
check(waitingCoordinatorTask != null)
// Queue a task to resume the waiting coordinator.
serialTaskQueue +=
object : SerialTask {
override fun start() {
taskRunner.assertThreadHoldsLock()
val coordinatorTask = waitingCoordinatorTask
if (coordinatorTask != null) {
waitingCoordinatorNotified = true
currentTask = coordinatorTask
taskRunner.condition.signalAll()
} else {
startNextTask()
}
}
}
}
override fun coordinatorWait(
taskRunner: TaskRunner,
nanos: Long,
) {
taskRunner.assertThreadHoldsLock()
check(waitingCoordinatorTask == null)
if (nanos == 0L) return
// Yield until notified, interrupted, or the duration elapses.
val waitUntil = nanoTime + nanos
val self = currentTask
waitingCoordinatorTask = self
waitingCoordinatorNotified = false
waitingCoordinatorInterrupted = false
yieldUntil {
waitingCoordinatorNotified || waitingCoordinatorInterrupted || nanoTime >= waitUntil
}
waitingCoordinatorTask = null
waitingCoordinatorNotified = false
if (waitingCoordinatorInterrupted) {
waitingCoordinatorInterrupted = false
throw InterruptedException()
}
}
override fun <T> decorate(queue: BlockingQueue<T>) = TaskFakerBlockingQueue(queue)
},
logger = logger,
)
/** Runs all tasks that are ready. Used by the test thread only. */
fun runTasks() {
advanceUntil(nanoTime)
}
/** Advance the simulated clock, then runs tasks that are ready. Used by the test thread only. */
fun advanceUntil(newTime: Long) {
taskRunner.assertThreadDoesntHoldLock()
taskRunner.lock.withLock {
check(currentTask == TestThreadSerialTask)
nanoTime = newTime
yieldUntil(ResumePriority.AfterOtherTasks)
}
}
/** Confirm all tasks have completed. Used by the test thread only. */
fun assertNoMoreTasks() {
taskRunner.assertThreadDoesntHoldLock()
taskRunner.lock.withLock {
assertThat(activeThreads).isEqualTo(0)
}
}
/** Unblock a waiting task thread. Used by the test thread only. */
fun interruptCoordinatorThread() {
taskRunner.assertThreadDoesntHoldLock()
require(currentTask == TestThreadSerialTask)
// Queue a task to interrupt the waiting coordinator.
serialTaskQueue +=
object : SerialTask {
override fun start() {
taskRunner.assertThreadHoldsLock()
waitingCoordinatorInterrupted = true
val coordinatorTask = waitingCoordinatorTask ?: error("no coordinator waiting")
currentTask = coordinatorTask
taskRunner.condition.signalAll()
}
}
// Let the coordinator process its interruption.
runTasks()
}
/** Ask a single task to proceed. Used by the test thread only. */
fun runNextTask() {
taskRunner.assertThreadDoesntHoldLock()
taskRunner.lock.withLock {
val contextSwitchCountBefore = contextSwitchCount
yieldUntil(ResumePriority.BeforeOtherTasks) {
contextSwitchCount > contextSwitchCountBefore
}
}
}
/** Sleep until [durationNanos] elapses. For use by the task threads. */
fun sleep(durationNanos: Long) {
taskRunner.lock.withLock {
val sleepUntil = nanoTime + durationNanos
yieldUntil { nanoTime >= sleepUntil }
}
}
/**
* Artificially stall until manually resumed by the test thread with [runTasks]. Use this to
* simulate races in tasks that doesn't have a deterministic sequence.
*/
fun yield() {
taskRunner.assertThreadDoesntHoldLock()
taskRunner.lock.withLock {
yieldUntil()
}
}
/** Process the queue until [condition] returns true. */
private tailrec fun yieldUntil(
strategy: ResumePriority = ResumePriority.AfterEnqueuedTasks,
condition: () -> Boolean = { true },
) {
taskRunner.assertThreadHoldsLock()
val self = currentTask
val yieldCompleteTask =
object : SerialTask {
override fun isReady() = condition()
override fun start() {
taskRunner.assertThreadHoldsLock()
currentTask = self
taskRunner.condition.signalAll()
}
}
if (strategy == ResumePriority.BeforeOtherTasks) {
serialTaskQueue.addFirst(yieldCompleteTask)
} else {
serialTaskQueue.addLast(yieldCompleteTask)
}
val startedTask = startNextTask()
val otherTasksStarted = startedTask != yieldCompleteTask
try {
while (currentTask != self) {
taskRunner.condition.await()
}
} finally {
serialTaskQueue.remove(yieldCompleteTask)
}
// If we're yielding until we're exhausted and a task run, keep going until a task doesn't run.
if (strategy == ResumePriority.AfterOtherTasks && otherTasksStarted) {
return yieldUntil(strategy, condition)
}
}
private enum class ResumePriority {
/** Resumes as soon as the condition is satisfied. */
BeforeOtherTasks,
/** Resumes after the already-enqueued tasks. */
AfterEnqueuedTasks,
/** Resumes after all other tasks, including tasks enqueued while yielding. */
AfterOtherTasks,
}
/** Returns the task that was started, or null if there were no tasks to start. */
private fun startNextTask(): SerialTask? {
taskRunner.assertThreadHoldsLock()
val index = serialTaskQueue.indexOfFirst { it.isReady() }
if (index == -1) return null
val nextTask = serialTaskQueue.removeAt(index)
currentTask = nextTask
contextSwitchCount++
nextTask.start()
return nextTask
}
private interface SerialTask {
/** Returns true if this task is ready to start. */
fun isReady() = true
/** Do this task's work, and then start another, such as by calling [startNextTask]. */
fun start()
}
private object TestThreadSerialTask : SerialTask {
override fun start() = error("unexpected call")
}
inner class RunnableSerialTask(
private val runnable: Runnable,
) : SerialTask {
override fun start() {
taskRunner.assertThreadHoldsLock()
require(currentTask == this)
activeThreads++
tasksExecutor.execute {
taskRunner.assertThreadDoesntHoldLock()
require(currentTask == this)
try {
runnable.run()
require(currentTask == this) { "unexpected current task: $currentTask" }
} finally {
taskRunner.lock.withLock {
activeThreads--
startNextTask()
}
}
}
}
}
/**
* This blocking queue hooks into a fake clock rather than using regular JVM timing for functions
* like [poll]. It is only usable within task faker tasks.
*/
private inner class TaskFakerBlockingQueue<T>(
val delegate: BlockingQueue<T>,
) : AbstractQueue<T>(), BlockingQueue<T> {
override val size: Int = delegate.size
private var editCount = 0
override fun poll(): T = delegate.poll()
override fun poll(
timeout: Long,
unit: TimeUnit,
): T? {
taskRunner.lock.withLock {
val waitUntil = nanoTime + unit.toNanos(timeout)
while (true) {
val result = poll()
if (result != null) return result
if (nanoTime >= waitUntil) return null
val editCountBefore = editCount
yieldUntil { nanoTime >= waitUntil || editCount > editCountBefore }
}
}
}
override fun put(element: T) {
taskRunner.lock.withLock {
delegate.put(element)
editCount++
}
}
override fun iterator() = error("unsupported")
override fun offer(e: T) = error("unsupported")
override fun peek(): T = error("unsupported")
override fun offer(
element: T,
timeout: Long,
unit: TimeUnit,
) = error("unsupported")
override fun take() = error("unsupported")
override fun remainingCapacity() = error("unsupported")
override fun drainTo(sink: MutableCollection<in T>) = error("unsupported")
override fun drainTo(
sink: MutableCollection<in T>,
maxElements: Int,
) = error("unsupported")
}
/** Returns true if no tasks have been scheduled. This runs the coordinator for confirmation. */
fun isIdle() = taskRunner.activeQueues().isEmpty()
override fun close() {
tasksExecutor.shutdownNow()
}
companion object {
var instance = 0
@JvmField
val assertionsEnabled: Boolean = OkHttpClient::class.java.desiredAssertionStatus()
}
}