Skip to content

Commit

Permalink
Merge pull request #950 from square/zachklipp/worker-context-propagation
Browse files Browse the repository at this point in the history
Plumb workerContext to child workflows.
  • Loading branch information
zach-klippenstein committed Feb 11, 2020
2 parents c4249b0 + f375ff0 commit 5641ba6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Change Log

### Kotlin

* Make the context used to start workers configurable for tests. (#940, #943)
* Make the context used to start workers configurable for tests. (#940, #943, #950)

### Swift

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.squareup.workflow.diagnostic.WorkflowDiagnosticListener
import kotlinx.coroutines.selects.SelectBuilder
import okio.ByteString
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Responsible for tracking child workflows, starting them and tearing them down when necessary.
Expand Down Expand Up @@ -95,7 +96,8 @@ internal class SubtreeManager<StateT, OutputT : Any>(
private val emitActionToParent: (WorkflowAction<StateT, OutputT>) -> Any?,
private val parentDiagnosticId: Long,
private val diagnosticListener: WorkflowDiagnosticListener? = null,
private val idCounter: IdCounter? = null
private val idCounter: IdCounter? = null,
private val workerContext: CoroutineContext = EmptyCoroutineContext
) : RealRenderContext.Renderer<StateT, OutputT> {

/**
Expand Down Expand Up @@ -197,7 +199,8 @@ internal class SubtreeManager<StateT, OutputT : Any>(
::acceptChildOutput,
parentDiagnosticId,
diagnosticListener,
idCounter
idCounter,
workerContext = workerContext
)
return WorkflowChildNode(child, handler, workflowNode)
.also { node = it }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
internal val diagnosticId = idCounter.createId()

private val subtreeManager = SubtreeManager<StateT, OutputT>(
coroutineContext, ::applyAction, diagnosticId, diagnosticListener, idCounter
coroutineContext, ::applyAction, diagnosticId, diagnosticListener, idCounter, workerContext
)

private val workers = ActiveStagingList<WorkerChildNode<*, *, *>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@ import com.squareup.workflow.WorkflowAction.Companion.noAction
import com.squareup.workflow.testing.WorkerSink
import com.squareup.workflow.testing.testFromStart
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.Job
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertNotSame
import kotlin.test.assertSame
import kotlin.test.assertTrue
import kotlin.test.fail

Expand Down Expand Up @@ -259,14 +264,41 @@ class WorkerCompositionIntegrationTest {

@Test fun `worker context job is ignored`() {
val worker = Worker.from { coroutineContext }
val workflow = Workflow.stateless<Unit, CoroutineContext, Unit> {
val leafWorkflow = Workflow.stateless<Unit, CoroutineContext, Unit> {
runningWorker(worker) { context -> action { setOutput(context) } }
}
val workflow = Workflow.stateless<Unit, CoroutineContext, Unit> {
renderChild(leafWorkflow) { action { setOutput(it) } }
}
val job: Job = Job()

workflow.testFromStart(context = job) {
val actualWorkerContext = awaitNextOutput()
assertNotSame(job, actualWorkerContext[Job])
}
}

@Test fun `worker context is used for workers`() {
val worker = Worker.from { coroutineContext }
val leafWorkflow = Workflow.stateless<Unit, CoroutineContext, Unit> {
runningWorker(worker) { context -> action { setOutput(context) } }
}
val workflow = Workflow.stateless<Unit, CoroutineContext, Unit> {
renderChild(leafWorkflow) { action { setOutput(it) } }
}
val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
Unconfined.isDispatchNeeded(context)

override fun dispatch(
context: CoroutineContext,
block: Runnable
) = Unconfined.dispatch(context, block)
}

workflow.testFromStart(context = dispatcher) {
val actualWorkerContext = awaitNextOutput()
assertSame(dispatcher, actualWorkerContext[ContinuationInterceptor])
}
}
}

0 comments on commit 5641ba6

Please sign in to comment.