Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the context used to start workers configurable for tests. #940

Merged
merged 1 commit into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Don't use this typealias for the public API, better to just use the function directly so it's
Expand Down Expand Up @@ -123,6 +125,7 @@ internal fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflow
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
workerContext: CoroutineContext = EmptyCoroutineContext,
beforeStart: Configurator<OutputT, RenderingT, RunnerT>
): RunnerT {
val renderingsAndSnapshots = ConflatedBroadcastChannel<RenderingAndSnapshot<RenderingT>>()
Expand All @@ -143,6 +146,7 @@ internal fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflow
props,
initialSnapshot = initialSnapshot,
initialState = initialState,
workerContext = workerContext,
onRendering = renderingsAndSnapshots::send,
onOutput = outputs::send,
diagnosticListener = diagnosticListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.plus
import kotlin.coroutines.CoroutineContext

/**
* Launches a new coroutine that is a child of this node's scope, and calls
Expand All @@ -44,7 +45,8 @@ internal fun <T> CoroutineScope.launchWorker(
key: String,
workerDiagnosticId: Long,
workflowDiagnosticId: Long,
diagnosticListener: WorkflowDiagnosticListener?
diagnosticListener: WorkflowDiagnosticListener?,
workerContext: CoroutineContext
): ReceiveChannel<ValueOrDone<T>> = worker.runWithNullCheck()
.wireUpDebugger(workerDiagnosticId, workflowDiagnosticId, diagnosticListener)
.transformToValueOrDone()
Expand All @@ -58,7 +60,7 @@ internal fun <T> CoroutineScope.launchWorker(
// produceIn implicitly creates a buffer (it uses a Channel to bridge between contexts). This
// operator is required to override the default buffer size.
.buffer(RENDEZVOUS)
.produceIn(createWorkerScope(worker, key))
.produceIn(createWorkerScope(worker, key, workerContext))

/**
* In unit tests, if you use a mocking library to create a Worker, the run method will return null
Expand Down Expand Up @@ -131,8 +133,9 @@ private fun <T> Flow<T>.transformToValueOrDone(): Flow<ValueOrDone<T>> = flow {

private fun CoroutineScope.createWorkerScope(
worker: Worker<*>,
key: String
): CoroutineScope = this + CoroutineName(worker.debugName(key)) + Unconfined
key: String,
workerContext: CoroutineContext
): CoroutineScope = this + CoroutineName(worker.debugName(key)) + Unconfined + workerContext
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I see what you meant when you were describing CouroutineScope at your talk.


private fun Worker<*>.debugName(key: String) =
toString().let { if (key.isBlank()) it else "$it:$key" }
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.selects.select
import org.jetbrains.annotations.TestOnly
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

internal interface WorkflowLoop {

Expand All @@ -44,6 +46,7 @@ internal interface WorkflowLoop {
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT? = null,
workerContext: CoroutineContext = EmptyCoroutineContext,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit,
diagnosticListener: WorkflowDiagnosticListener? = null
Expand All @@ -59,6 +62,7 @@ internal open class RealWorkflowLoop : WorkflowLoop {
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
workerContext: CoroutineContext,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit,
diagnosticListener: WorkflowDiagnosticListener?
Expand All @@ -78,6 +82,7 @@ internal open class RealWorkflowLoop : WorkflowLoop {
initialProps = input,
snapshot = initialSnapshot?.bytes?.takeUnless { it.size == 0 },
baseContext = coroutineContext,
workerContext = workerContext,
parentDiagnosticId = null,
diagnosticListener = diagnosticListener,
idCounter = idCounter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.selects.SelectBuilder
import okio.ByteString
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* A node in a state machine tree. Manages the actual state for a given [Workflow].
Expand All @@ -55,7 +56,8 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
parentDiagnosticId: Long? = null,
private val diagnosticListener: WorkflowDiagnosticListener? = null,
private val idCounter: IdCounter? = null,
initialState: StateT? = null
initialState: StateT? = null,
private val workerContext: CoroutineContext = EmptyCoroutineContext
) : CoroutineScope, WorkerRunner<StateT, OutputT> {

/**
Expand Down Expand Up @@ -270,7 +272,8 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
workerId = idCounter.createId()
diagnosticListener.onWorkerStarted(workerId, diagnosticId, key, worker.toString())
}
val workerChannel = launchWorker(worker, key, workerId, diagnosticId, diagnosticListener)
val workerChannel =
launchWorker(worker, key, workerId, diagnosticId, diagnosticListener, workerContext)
return WorkerChildNode(worker, key, workerChannel, handler = handler)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflowForTestFr
props,
initialState = initialState,
initialSnapshot = initialSnapshot,
beforeStart = beforeStart
beforeStart = beforeStart,
workerContext = testParams.workerContext
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.squareup.workflow.testing.WorkflowTestParams.StartMode.StartFromCompl
import com.squareup.workflow.testing.WorkflowTestParams.StartMode.StartFromState
import com.squareup.workflow.testing.WorkflowTestParams.StartMode.StartFromWorkflowSnapshot
import org.jetbrains.annotations.TestOnly
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Defines configuration for workflow testing infrastructure such as `testRender`, `testFromStart`.
Expand All @@ -34,11 +36,15 @@ import org.jetbrains.annotations.TestOnly
* for any given state, so performing side effects in `render` will almost always result in bugs.
* It is recommended to leave this on, but if you need to debug a test and don't want to have to
* deal with the extra passes, you can temporarily set it to false.
* @param workerContext Used to customize the context in which workers are started for tests.
* Default is [EmptyCoroutineContext], which means the workers will use the context from their
* workflow and use the [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] dispatcher.
*/
@TestOnly
data class WorkflowTestParams<out StateT>(
val startFrom: StartMode<StateT> = StartFresh,
val checkRenderIdempotence: Boolean = true
val checkRenderIdempotence: Boolean = true,
val workerContext: CoroutineContext = EmptyCoroutineContext
) {
/**
* Defines how to start the workflow for tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.yield
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext
import kotlin.test.AfterTest
import kotlin.test.assertEquals
import kotlin.test.assertFails
Expand Down Expand Up @@ -545,6 +546,7 @@ private fun simpleLoop(
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
workerContext: CoroutineContext,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit,
diagnosticListener: WorkflowDiagnosticListener?
Expand All @@ -562,6 +564,7 @@ private object HangingLoop : WorkflowLoop {
props: Flow<PropsT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
workerContext: CoroutineContext,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit,
diagnosticListener: WorkflowDiagnosticListener?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.yield
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
Expand Down Expand Up @@ -85,7 +86,7 @@ class WorkersTest {
val listener = RecordingDiagnosticListener()

runBlocking {
val outputs = launchWorker(worker, "", workerId, workflowId, listener)
val outputs = launchWorker(worker, "", workerId, workflowId, listener, EmptyCoroutineContext)

// Start event is sent by WorkflowNode.
yield()
Expand Down Expand Up @@ -220,7 +221,8 @@ class WorkersTest {
key = key,
workerDiagnosticId = 0,
workflowDiagnosticId = 0,
diagnosticListener = null
diagnosticListener = null,
workerContext = EmptyCoroutineContext
)

private class ExpectedException : RuntimeException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ package com.squareup.workflow

import com.squareup.workflow.WorkflowAction.Companion.noAction
import com.squareup.workflow.testing.WorkerSink
import com.squareup.workflow.testing.WorkflowTestParams
import com.squareup.workflow.testing.testFromStart
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand Down Expand Up @@ -238,4 +242,17 @@ class WorkerCompositionIntegrationTest {
}
}
}

@Test fun `worker coroutine uses test worker context`() {
val worker = Worker.from { coroutineContext }
val workflow = Workflow.stateless<Unit, CoroutineContext, Unit> {
runningWorker(worker) { context -> action { setOutput(context) } }
}
val workerContext = CoroutineName("worker context")

workflow.testFromStart(testParams = WorkflowTestParams(workerContext = workerContext)) {
val actualWorkerContext = awaitNextOutput()
assertEquals("worker context", actualWorkerContext[CoroutineName]!!.name)
}
}
}