Skip to content

Commit

Permalink
Make the context used to start workers configurable for tests.
Browse files Browse the repository at this point in the history
This is helpful for tests that need to be able to control time.
  • Loading branch information
zach-klippenstein committed Feb 6, 2020
1 parent 482f4b2 commit 6c6adda
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Don't use this typealias for the public API, better to just use the function directly so it's
Expand Down Expand Up @@ -123,6 +125,7 @@ internal fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflow
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
workerContext: CoroutineContext = EmptyCoroutineContext,
beforeStart: Configurator<OutputT, RenderingT, RunnerT>
): RunnerT {
val renderingsAndSnapshots = ConflatedBroadcastChannel<RenderingAndSnapshot<RenderingT>>()
Expand All @@ -143,6 +146,7 @@ internal fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflow
props,
initialSnapshot = initialSnapshot,
initialState = initialState,
workerContext = workerContext,
onRendering = renderingsAndSnapshots::send,
onOutput = outputs::send,
diagnosticListener = diagnosticListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.plus
import kotlin.coroutines.CoroutineContext

/**
* Launches a new coroutine that is a child of this node's scope, and calls
Expand All @@ -44,7 +45,8 @@ internal fun <T> CoroutineScope.launchWorker(
key: String,
workerDiagnosticId: Long,
workflowDiagnosticId: Long,
diagnosticListener: WorkflowDiagnosticListener?
diagnosticListener: WorkflowDiagnosticListener?,
workerContext: CoroutineContext
): ReceiveChannel<ValueOrDone<T>> = worker.runWithNullCheck()
.wireUpDebugger(workerDiagnosticId, workflowDiagnosticId, diagnosticListener)
.transformToValueOrDone()
Expand All @@ -58,7 +60,7 @@ internal fun <T> CoroutineScope.launchWorker(
// produceIn implicitly creates a buffer (it uses a Channel to bridge between contexts). This
// operator is required to override the default buffer size.
.buffer(RENDEZVOUS)
.produceIn(createWorkerScope(worker, key))
.produceIn(createWorkerScope(worker, key, workerContext))

/**
* In unit tests, if you use a mocking library to create a Worker, the run method will return null
Expand Down Expand Up @@ -131,8 +133,9 @@ private fun <T> Flow<T>.transformToValueOrDone(): Flow<ValueOrDone<T>> = flow {

private fun CoroutineScope.createWorkerScope(
worker: Worker<*>,
key: String
): CoroutineScope = this + CoroutineName(worker.debugName(key)) + Unconfined
key: String,
workerContext: CoroutineContext
): CoroutineScope = this + CoroutineName(worker.debugName(key)) + Unconfined + workerContext

private fun Worker<*>.debugName(key: String) =
toString().let { if (key.isBlank()) it else "$it:$key" }
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.selects.select
import org.jetbrains.annotations.TestOnly
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

internal interface WorkflowLoop {

Expand All @@ -44,6 +46,7 @@ internal interface WorkflowLoop {
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT? = null,
workerContext: CoroutineContext = EmptyCoroutineContext,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit,
diagnosticListener: WorkflowDiagnosticListener? = null
Expand All @@ -59,6 +62,7 @@ internal open class RealWorkflowLoop : WorkflowLoop {
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
workerContext: CoroutineContext,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit,
diagnosticListener: WorkflowDiagnosticListener?
Expand All @@ -78,6 +82,7 @@ internal open class RealWorkflowLoop : WorkflowLoop {
initialProps = input,
snapshot = initialSnapshot?.bytes?.takeUnless { it.size == 0 },
baseContext = coroutineContext,
workerContext = workerContext,
parentDiagnosticId = null,
diagnosticListener = diagnosticListener,
idCounter = idCounter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.selects.SelectBuilder
import okio.ByteString
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* A node in a state machine tree. Manages the actual state for a given [Workflow].
Expand All @@ -55,7 +56,8 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
parentDiagnosticId: Long? = null,
private val diagnosticListener: WorkflowDiagnosticListener? = null,
private val idCounter: IdCounter? = null,
initialState: StateT? = null
initialState: StateT? = null,
private val workerContext: CoroutineContext = EmptyCoroutineContext
) : CoroutineScope, WorkerRunner<StateT, OutputT> {

/**
Expand Down Expand Up @@ -270,7 +272,8 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
workerId = idCounter.createId()
diagnosticListener.onWorkerStarted(workerId, diagnosticId, key, worker.toString())
}
val workerChannel = launchWorker(worker, key, workerId, diagnosticId, diagnosticListener)
val workerChannel =
launchWorker(worker, key, workerId, diagnosticId, diagnosticListener, workerContext)
return WorkerChildNode(worker, key, workerChannel, handler = handler)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflowForTestFr
props,
initialState = initialState,
initialSnapshot = initialSnapshot,
beforeStart = beforeStart
beforeStart = beforeStart,
workerContext = testParams.workerContext
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.squareup.workflow.testing.WorkflowTestParams.StartMode.StartFromCompl
import com.squareup.workflow.testing.WorkflowTestParams.StartMode.StartFromState
import com.squareup.workflow.testing.WorkflowTestParams.StartMode.StartFromWorkflowSnapshot
import org.jetbrains.annotations.TestOnly
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Defines configuration for workflow testing infrastructure such as `testRender`, `testFromStart`.
Expand All @@ -34,11 +36,15 @@ import org.jetbrains.annotations.TestOnly
* for any given state, so performing side effects in `render` will almost always result in bugs.
* It is recommended to leave this on, but if you need to debug a test and don't want to have to
* deal with the extra passes, you can temporarily set it to false.
* @param workerContext Used to customize the context in which workers are started for tests.
* Default is [EmptyCoroutineContext], which means the workers will use the context from their
* workflow and use the [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] dispatcher.
*/
@TestOnly
data class WorkflowTestParams<out StateT>(
val startFrom: StartMode<StateT> = StartFresh,
val checkRenderIdempotence: Boolean = true
val checkRenderIdempotence: Boolean = true,
val workerContext: CoroutineContext = EmptyCoroutineContext
) {
/**
* Defines how to start the workflow for tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.yield
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext
import kotlin.test.AfterTest
import kotlin.test.assertEquals
import kotlin.test.assertFails
Expand Down Expand Up @@ -545,6 +546,7 @@ private fun simpleLoop(
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
workerContext: CoroutineContext,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit,
diagnosticListener: WorkflowDiagnosticListener?
Expand All @@ -562,6 +564,7 @@ private object HangingLoop : WorkflowLoop {
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
workerContext: CoroutineContext,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit,
diagnosticListener: WorkflowDiagnosticListener?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.yield
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
Expand Down Expand Up @@ -85,7 +86,7 @@ class WorkersTest {
val listener = RecordingDiagnosticListener()

runBlocking {
val outputs = launchWorker(worker, "", workerId, workflowId, listener)
val outputs = launchWorker(worker, "", workerId, workflowId, listener, EmptyCoroutineContext)

// Start event is sent by WorkflowNode.
yield()
Expand Down Expand Up @@ -220,7 +221,8 @@ class WorkersTest {
key = key,
workerDiagnosticId = 0,
workflowDiagnosticId = 0,
diagnosticListener = null
diagnosticListener = null,
workerContext = EmptyCoroutineContext
)

private class ExpectedException : RuntimeException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ package com.squareup.workflow

import com.squareup.workflow.WorkflowAction.Companion.noAction
import com.squareup.workflow.testing.WorkerSink
import com.squareup.workflow.testing.WorkflowTestParams
import com.squareup.workflow.testing.testFromStart
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand Down Expand Up @@ -238,4 +242,17 @@ class WorkerCompositionIntegrationTest {
}
}
}

@Test fun `worker coroutine uses test worker context`() {
val worker = Worker.from { coroutineContext }
val workflow = Workflow.stateless<Unit, CoroutineContext, Unit> {
runningWorker(worker) { context -> action { setOutput(context) } }
}
val workerContext = CoroutineName("worker context")

workflow.testFromStart(testParams = WorkflowTestParams(workerContext = workerContext)) {
val actualWorkerContext = awaitNextOutput()
assertEquals("worker context", actualWorkerContext[CoroutineName]!!.name)
}
}
}

0 comments on commit 6c6adda

Please sign in to comment.