Skip to content

Commit

Permalink
Add cancel of TestCaseExecutor interruption job if test succeeds (#590)
Browse files Browse the repository at this point in the history
* Add cancel of TestCaseExecutor interruption job if test succeeds

As it is, the current TestCaseExecutor will by mistake recycle a thread from a threadpool between multiple test executions. That shouldn't be a problem. However, the scheduler will cancel the executor after a timeout has passed, and if another thread is running there, it will be interrupted.

This interruption is undesired, as it had nothing to do with the thread that successfully passed without a timeout.

This commit cancels the interruption job if the test ran successfully, avoiding this issue.

Fixes #588

* Implemented new "junit" style approach to the test timeouts #588
  • Loading branch information
LeoColman authored and sksamuel committed Jan 19, 2019
1 parent ce9c13a commit 71eb4d3
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 43 deletions.
Expand Up @@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicReference
* The executor can be shared between multiple tests as it is thread safe.
*/
class TestCaseExecutor(private val listener: TestEngineListener,
private val listenerExecutor: ExecutorService,
private val executor: ExecutorService,
private val scheduler: ScheduledExecutorService) {

private val logger = LoggerFactory.getLogger(this.javaClass)
Expand All @@ -48,7 +48,8 @@ class TestCaseExecutor(private val listener: TestEngineListener,

try {

context.launch(listenerExecutor.asCoroutineDispatcher()) {
// invoke the "before" callbacks here on the main executor
context.launch(executor.asCoroutineDispatcher()) {
before(testCase)
}.join()

Expand All @@ -58,20 +59,10 @@ class TestCaseExecutor(private val listener: TestEngineListener,

// get active status here in case calling this function is expensive (eg
runExtensions(testCase, context, extensions) { result ->

// it's possible the listenerExecutor has been shut down here.
// If it has, we can only run them on another thread, better than a slap in the face
// but will run foul of https://github.com/kotlintest/kotlintest/issues/447
if (listenerExecutor.isShutdown) {
context.launch {
after(testCase, result)
}.join()
} else {
context.launch(listenerExecutor.asCoroutineDispatcher()) {
after(testCase, result)
}.join()
}

// invoke the "after" callbacks here on the main executor
context.launch(executor.asCoroutineDispatcher()) {
after(testCase, result)
}.join()
onResult(result)
}

Expand Down Expand Up @@ -103,12 +94,12 @@ class TestCaseExecutor(private val listener: TestEngineListener,

listener.beforeTestCaseExecution(testCase)

// if we have more than one requested thread, we run the tests inside an executor,
// otherwise we run on the same thread as the listeners to avoid issues where before/after listeners
// require the same thread as the test case.
// if we have more than one requested thread, we run the tests inside a clean executor;
// otherwise we run on the same thread as the listeners to avoid issues where the before and
// after listeners require the same thread as the test case.
// @see https://github.com/kotlintest/kotlintest/issues/447
val executor = when (testCase.config.threads) {
1 -> listenerExecutor
1 -> executor
else -> Executors.newFixedThreadPool(testCase.config.threads)!!
}

Expand Down Expand Up @@ -137,19 +128,24 @@ class TestCaseExecutor(private val listener: TestEngineListener,
}
}

// we need to interrupt the threads in the executor in order to effect the timeout
scheduler.schedule({
error.compareAndSet(null, TimeoutException("Execution of test took longer than ${testCase.config.timeout}"))
// this will ruin the listener executor so after() won't run if the test times out but I don't
// know how else to interupt the coroutine context effectively. job.cancel() won't cut the mustard here.
executor.shutdownNow()
}, testCase.config.timeout.toMillis(), TimeUnit.MILLISECONDS)
// we schedule a timeout, (if timeout has been configured) which will fail the test with a timed-out status
if (testCase.config.timeout.nano > 0) {
scheduler.schedule({
error.compareAndSet(null, TimeoutException("Execution of test took longer than ${testCase.config.timeout}"))
}, testCase.config.timeout.toMillis(), TimeUnit.MILLISECONDS)
}

supervisorJob.invokeOnCompletion { e ->
error.compareAndSet(null, e)
}

supervisorJob.join()

// if the tests had their own special executor (ie threads > 1) then we need to shut it down
if (testCase.config.threads > 1) {
executor.shutdown()
}

val result = buildTestResult(error.get(), context.metaData())

listener.afterTestCaseExecution(testCase, result)
Expand Down
Expand Up @@ -7,8 +7,6 @@ import io.kotlintest.Spec
import io.kotlintest.runner.jvm.internal.NamedThreadFactory
import io.kotlintest.runner.jvm.spec.SpecExecutor
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -25,9 +23,11 @@ class TestEngine(val classes: List<KClass<out Spec>>,
// the main executor is used to parallelize the execution of specs
// inside a spec, tests themselves are executed as coroutines
private val executor = Executors.newFixedThreadPool(parallelism, NamedThreadFactory("kotlintest-engine-%d"))
private val listenerExecutors = ConcurrentLinkedQueue<ExecutorService>()

// the scheduler executor is used for notifications on when a test case timeout has been reached
private val scheduler = Executors.newSingleThreadScheduledExecutor()
private val specExecutor = SpecExecutor(listener, listenerExecutors, scheduler)

private val specExecutor = SpecExecutor(listener, scheduler)


private fun afterAll() = Try {
Expand Down
Expand Up @@ -7,7 +7,6 @@ import io.kotlintest.Spec
import io.kotlintest.runner.jvm.TestEngineListener
import io.kotlintest.internal.topLevelTests
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
Expand All @@ -18,19 +17,23 @@ import java.util.concurrent.ScheduledExecutorService
* to instantiate fresh specs based on the [IsolationMode] of the spec.
*/
class SpecExecutor(private val engineListener: TestEngineListener,
private val listenerExecutors: ConcurrentLinkedQueue<ExecutorService>,
private val scheduler: ScheduledExecutorService) {

private val logger = LoggerFactory.getLogger(this.javaClass)

private fun withListenerExecutor(thunk: (ExecutorService) -> Unit) {
val listenerExecutor = listenerExecutors.poll() ?: Executors.newSingleThreadExecutor()
// each spec has it's own "main thread" (courtesy of an executor)
// this main thread is always used to execute the before and after callbacks, and also tests
// where config has threads = 1 (the default). In tests where threads > 1, then a seperate executor is required.

private fun withExecutor(thunk: (ExecutorService) -> Unit) {
val listenerExecutor = Executors.newSingleThreadExecutor()
thunk(listenerExecutor)
listenerExecutors.add(listenerExecutor)
// only on exiting the spec can the listener executor can be shutdown
listenerExecutor.shutdown()
}

fun execute(spec: Spec) = Try {
withListenerExecutor { listenerExecutor ->
withExecutor { listenerExecutor ->

engineListener.beforeSpecClass(spec.description(), spec::class)

Expand Down
@@ -0,0 +1,45 @@
package com.sksamuel.kotlintest

import io.kotlintest.milliseconds
import io.kotlintest.specs.FreeSpec

class MultipleTestTimeoutTest : FreeSpec() {

// The test executor was failing because as it reutilizes some threads from a thread pool.
// When using that thread pool, a task to cancel the thread is created, so that the engine can interrupt
// a test that is going forever.
// However, if the task is not cancelled, it will eventually interrupt the thread when it's running another task
// in the thread pool, interrupting a test that hasn't timed out yet, which is undesired.

init {
// 100 millis sleep will "accumulate" between tests. If the context is still shared, one of them will fail
// due to timeout.
"Test 1".config(timeout = 300.milliseconds) {
Thread.sleep(100)
}

"Test 2".config(timeout = 300.milliseconds) {
Thread.sleep(100)
}

"Test 3".config(timeout = 300.milliseconds) {
Thread.sleep(100)
}

"Test 4".config(timeout = 300.milliseconds) {
Thread.sleep(100)
}

"Test 5".config(timeout = 300.milliseconds) {
Thread.sleep(100)
}

"Test 6".config(timeout = 300.milliseconds) {
Thread.sleep(100)
}

"Test 7".config(timeout = 300.milliseconds) {
Thread.sleep(100)
}
}
}
Expand Up @@ -35,13 +35,13 @@ class TestCaseExecutorListenerAfterTimeoutTest : FunSpec() {

init {

test("tests which block should timeout but still run after listeners").config {
test("tests which timeout should still run the 'after test' listeners").config {
val listenerExecutor = Executors.newSingleThreadExecutor()
val listener = mock<TestEngineListener> {}
val executor = TestCaseExecutor(listener, listenerExecutor, scheduler)

val testCase = TestCase(Description.root("wibble"), this@TestCaseExecutorListenerAfterTimeoutTest, {
Thread.sleep(100000000L)
Thread.sleep(500)
}, 0, TestType.Test, TestCaseConfig(true, invocations = 1, threads = 1, timeout = 100.milliseconds))

val context = object : TestContext(GlobalScope.coroutineContext) {
Expand Down
Expand Up @@ -221,13 +221,13 @@ class TestCaseExecutorTest : FunSpec() {
then(listener).should().exitTestCase(argThat { description == Description.root("wibble") }, argThat { status == TestStatus.Error })
}

test("tests which block should timeout should error").config {
test("tests which timeout should error").config {
val listenerExecutor = Executors.newSingleThreadExecutor()
val listener = mock<TestEngineListener> {}
val executor = TestCaseExecutor(listener, listenerExecutor, scheduler)

val testCase = TestCase(Description.root("wibble"), this@TestCaseExecutorTest, {
Thread.sleep(100000000L)
Thread.sleep(10000)
}, 0, TestType.Test, TestCaseConfig(true, invocations = 1, threads = 1, timeout = 100.milliseconds))

val context = object : TestContext(GlobalScope.coroutineContext) {
Expand Down Expand Up @@ -267,7 +267,7 @@ class TestCaseExecutorTest : FunSpec() {
)
}

test("test with infinite loop but failure should complete with TestStatus.Failure") {
test("test with infinite loop but invocations = 1 should complete with TestStatus.Failure") {

val listenerExecutor = Executors.newSingleThreadExecutor()
val listener = mock<TestEngineListener> {}
Expand All @@ -287,5 +287,27 @@ class TestCaseExecutorTest : FunSpec() {

then(listener).should().exitTestCase(argThat { description == Description.root("wibble") }, argThat { status == TestStatus.Failure })
}

test("test with infinite loop but invocations > 1 should complete with TestStatus.Failure") {

val listenerExecutor = Executors.newSingleThreadExecutor()
val listener = mock<TestEngineListener> {}
val executor = TestCaseExecutor(listener, listenerExecutor, scheduler)

val testCase = TestCase(Description.root("wibble"), this@TestCaseExecutorTest, {
while (true) {
"this" shouldBe "that"
}
}, 0, TestType.Test, TestCaseConfig(true, invocations = 2, threads = 1))

val context = object : TestContext(GlobalScope.coroutineContext) {
override suspend fun registerTestCase(testCase: TestCase) {}
override fun description(): Description = Description.root("wibble")
}
executor.execute(testCase, context)

then(listener).should().exitTestCase(argThat { description == Description.root("wibble") }, argThat { status == TestStatus.Failure })
}
}
}
}

0 comments on commit 71eb4d3

Please sign in to comment.