From 9be76d2507f5ae05567fabb841d9cbf8caaac206 Mon Sep 17 00:00:00 2001 From: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> Date: Mon, 30 Mar 2020 18:54:09 +0300 Subject: [PATCH] Fix DefaultExecutor not being able to exit (#1876) * Fix DefaultExecutor not being able to exit. * Also adds the performance optimization. See the discussion on the PR. * Add a stress test for the DefaultExecutor worker shutting down. * Make `testDelayChannelBackpressure2` not fail: This test could in theory already fail on the second `checkNotEmpty`: after the first `checkNotEmpty` has passed, first, the ticker channel awakens to produce a new element, and then the main thread awakens to check if it's there. However, if the ticker channel is sufficiently slowed down, it may not produce the element in time for the main thread to find it. After introducing the change that allows the worker thread in `DefaultExecutor` to shut down, the initial delay of 2500 ms is sufficient for the worker to shut down (which, by default, happens after 1000 ms of inactivity), and then the aforementioned race condition worsens: additional time is required to launch a new worker thread and it's much easier to miss the deadline. Now, the delays are much shorter, meaning that the worker thread is never inactive long enough to shut down. Fixes #856 --- .../common/src/EventLoop.common.kt | 10 +++++-- .../jvm/src/DefaultExecutor.kt | 19 ++++++------ .../jvm/test/DefaultExecutorStressTest.kt | 29 ++++++++++++++++++- .../jvm/test/channels/TickerChannelTest.kt | 10 +++---- 4 files changed, 50 insertions(+), 18 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index ba331e20df..69ea9fe312 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -52,7 +52,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { */ public open fun processNextEvent(): Long { if (!processUnconfinedEvent()) return Long.MAX_VALUE - return nextTime + return 0 } protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty @@ -251,7 +251,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { override fun processNextEvent(): Long { // unconfined events take priority - if (processUnconfinedEvent()) return nextTime + if (processUnconfinedEvent()) return 0 // queue all delayed tasks that are due to be executed val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { @@ -269,7 +269,11 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } // then process one event from queue - dequeue()?.run() + val task = dequeue() + if (task != null) { + task.run() + return 0 + } return nextTime } diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 4e107a7b1d..ed84f55e74 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -68,15 +68,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { var parkNanos = processNextEvent() if (parkNanos == Long.MAX_VALUE) { // nothing to do, initialize shutdown timeout - if (shutdownNanos == Long.MAX_VALUE) { - val now = nanoTime() - if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS - val tillShutdown = shutdownNanos - now - if (tillShutdown <= 0) return // shut thread down - parkNanos = parkNanos.coerceAtMost(tillShutdown) - } else - parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway - } + val now = nanoTime() + if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS + val tillShutdown = shutdownNanos - now + if (tillShutdown <= 0) return // shut thread down + parkNanos = parkNanos.coerceAtMost(tillShutdown) + } else + shutdownNanos = Long.MAX_VALUE if (parkNanos > 0) { // check if shutdown was requested and bail out in this case if (isShutdownRequested) return @@ -142,4 +140,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { resetAll() // clear queues (this as Object).notifyAll() } + + internal val isThreadPresent + get() = _thread != null } diff --git a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt index b4c6aaed4d..bc2de8c998 100644 --- a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt @@ -4,7 +4,8 @@ package kotlinx.coroutines -import org.junit.* +import org.junit.Test +import kotlin.test.* class DefaultExecutorStressTest : TestBase() { @Test @@ -35,4 +36,30 @@ class DefaultExecutorStressTest : TestBase() { } finish(2 + iterations * 4) } + + @Test + fun testWorkerShutdown() = withVirtualTimeSource { + val iterations = 1_000 * stressTestMultiplier + // wait for the worker to shut down + suspend fun awaitWorkerShutdown() { + val executorTimeoutMs = 1000L + delay(executorTimeoutMs) + while (DefaultExecutor.isThreadPresent) { delay(10) } // hangs if the thread refuses to stop + assertFalse(DefaultExecutor.isThreadPresent) // just to make sure + } + runTest { + awaitWorkerShutdown() // so that the worker shuts down after the initial launch + repeat (iterations) { + val job = launch(Dispatchers.Unconfined) { + // this line runs in the main thread + delay(1) + // this line runs in the DefaultExecutor worker + } + delay(100) // yield the execution, allow the worker to spawn + assertTrue(DefaultExecutor.isThreadPresent) // the worker spawned + job.join() + awaitWorkerShutdown() + } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt index c421bd334a..fcdc6bb4ad 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt @@ -47,17 +47,17 @@ class TickerChannelTest : TestBase() { @Test fun testDelayChannelBackpressure2() = withVirtualTimeSource { runTest { - val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0) + val delayChannel = ticker(delayMillis = 200, initialDelayMillis = 0) delayChannel.checkNotEmpty() delayChannel.checkEmpty() - delay(2500) + delay(500) delayChannel.checkNotEmpty() - delay(510) + delay(110) delayChannel.checkNotEmpty() - delay(510) + delay(110) delayChannel.checkEmpty() - delay(510) + delay(110) delayChannel.checkNotEmpty() delayChannel.cancel() }