Skip to content

Commit

Permalink
Give worker coroutines descriptive names for debugging.
Browse files Browse the repository at this point in the history
Also fills out the worker docs some more.

Closes #848.
  • Loading branch information
zach-klippenstein committed Dec 24, 2019
1 parent 3b8937d commit 5467ed0
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 27 deletions.
50 changes: 37 additions & 13 deletions kotlin/workflow-core/src/main/java/com/squareup/workflow/Worker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ import kotlin.reflect.KClass
/**
* Represents a unit of asynchronous work that can have zero, one, or multiple outputs.
*
* [Worker]s are effectively [Flow]s that can be compared to determine equivalence.
* Workers allow you to execute arbitrary, possibly asynchronous tasks in a declarative manner. To
* perform their tasks, workers return a [Flow]. Workers are effectively [Flow]s that can be
* [compared][doesSameWorkAs] to determine equivalence between render passes. A [Workflow] uses
* Workers to perform asynchronous work during the render pass by calling
* [RenderContext.runningWorker].
*
* A [Workflow] uses [Worker]s to perform asynchronous work during the render pass by calling
* [RenderContext.onWorkerOutput] or [RenderContext.runningWorker]. When equivalent [Worker]s are
* passed in subsequent render passes, [doesSameWorkAs] is used to calculate which [Worker]s are
* new and should be started, and which ones are continuations from the last render pass and
* should be allowed to continue working. [Worker]s that are not included in a render pass are
* cancelled.
* See the documentation on [run] for more information on the returned [Flow] is consumed and how
* to implement asynchronous work.
*
* See the documentation on [doesSameWorkAs] for more details on how and when workers are compared
* and the worker lifecycle.
*
* ## Example: Network request
*
Expand All @@ -56,7 +59,7 @@ import kotlin.reflect.KClass
* }
* ```
*
* The first step is to define a [Worker] that can call this service, and maybe an extension
* The first step is to define a Worker that can call this service, and maybe an extension
* function on your service class:
* ```
* fun TimeService.getTimeWorker(timezone: String): Worker<Long> = TimeWorker(timezone, this)
Expand All @@ -73,7 +76,7 @@ import kotlin.reflect.KClass
* }
* ```
*
* You also need to define how to determine if a previous [Worker] is already doing the same work.
* You also need to define how to determine if a previous Worker is already doing the same work.
* This will ensure that if the same request is made by the same [Workflow] in adjacent render
* passes, we'll keep the request alive from the first pass.
* ```
Expand All @@ -91,7 +94,7 @@ import kotlin.reflect.KClass
* ```
*
* Alternatively, if the response is a unique type, unlikely to be shared by any other workers,
* you don't even need to create your own [Worker] class, you can use a builder, and the worker
* you don't even need to create your own Worker class, you can use a builder, and the worker
* will automatically be distinguished by that response type:
* ```
* interface TimeService {
Expand All @@ -113,9 +116,30 @@ interface Worker<out OutputT> {
/**
* Returns a [Flow] to execute the work represented by this worker.
*
* The [Flow] is collected in the context of the workflow runtime. When this [Worker], its parent
* [Workflow], or any ancestor [Workflow]s are torn down, the coroutine in which this [Flow] is
* being collected will be cancelled.
* [Flow] is "a cold asynchronous data stream that sequentially emits values and completes
* normally or with an exception", although it may not emit any values. It is common to use
* workers to perform some side effect that should only be executed when a state is entered – in
* this case, the worker never emits anything (and will have type `Worker<Nothing>`).
*
* ## Coroutine Context
*
* When a worker is started, a coroutine is launched to [collect][Flow.collect] the flow. This
* coroutine is launched in the same scope as the workflow runtime, with the addition a
* [CoroutineName][kotlinx.coroutines.CoroutineName] that describes the `Worker` instance
* (via `toString`) and the key specified by the workflow running the worker. When the worker is
* torn down, the coroutine is cancelled.
*
* ## Exceptions
*
* If a worker needs to report an error to the workflow running it, it *must not* throw it as an
* exception – any exceptions thrown by a worker's [Flow] will cancel the entire workflow runtime.
* Instead, the worker's [OutputT] type should be capable of expressing errors itself, and the
* worker's logic should wrap any relevant exceptions into an output value (e.g. using the
* [catch][kotlinx.coroutines.flow.catch] operator).
*
* While this might seem restrictive, this design decision keeps the [RenderContext.runningWorker]
* API simpler, since it does not need to handle exceptions itself. It also discourages the code
* smell of relying on exceptions to handle control flow.
*/
fun run(): Flow<OutputT>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.squareup.workflow.internal
import com.squareup.workflow.VeryExperimentalWorkflow
import com.squareup.workflow.Worker
import com.squareup.workflow.diagnostic.WorkflowDiagnosticListener
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
Expand All @@ -29,6 +30,7 @@ import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.plus

/**
* Launches a new coroutine that is a child of this node's scope, and calls
Expand All @@ -38,6 +40,7 @@ import kotlinx.coroutines.flow.produceIn
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
internal fun <T> CoroutineScope.launchWorker(
worker: Worker<T>,
key: String,
workerDiagnosticId: Long,
workflowDiagnosticId: Long,
diagnosticListener: WorkflowDiagnosticListener?
Expand All @@ -54,7 +57,7 @@ internal fun <T> CoroutineScope.launchWorker(
// produceIn implicitly creates a buffer (it uses a Channel to bridge between contexts). This
// operator is required to override the default buffer size.
.buffer(RENDEZVOUS)
.produceIn(this)
.produceIn(createWorkerScope(worker, key))

/**
* In unit tests, if you use a mocking library to create a Worker, the run method will return null
Expand Down Expand Up @@ -124,3 +127,11 @@ private fun <T> Flow<T>.transformToValueOrDone(): Flow<ValueOrDone<T>> = flow {
}
emit(ValueOrDone.done())
}

private fun CoroutineScope.createWorkerScope(
worker: Worker<*>,
key: String
): CoroutineScope = this + CoroutineName(worker.debugName(key))

private fun Worker<*>.debugName(key: String) =
toString().let { if (key.isBlank()) it else "$it:$key" }
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
diagnosticListener
.onWorkerStarted(workerId, diagnosticId, case.key, case.worker.toString())
}
val workerChannel = launchWorker(case.worker, workerId, diagnosticId, diagnosticListener)
val workerChannel =
launchWorker(case.worker, case.key, workerId, diagnosticId, diagnosticListener)
WorkerSession(workerChannel)
},
dispose = { _, session -> session.channel.cancel() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ package com.squareup.workflow.internal

import com.squareup.workflow.Worker
import com.squareup.workflow.asWorker
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.yield
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.coroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand All @@ -41,12 +47,7 @@ class WorkersTest {
val counter = AtomicInteger(0)

runBlocking {
val workerOutputs = launchWorker(
worker,
workerDiagnosticId = 0,
workflowDiagnosticId = 0,
diagnosticListener = null
)
val workerOutputs = launchWorker(worker)

launch(start = UNDISPATCHED) {
assertEquals(0, counter.getAndIncrement())
Expand Down Expand Up @@ -81,7 +82,7 @@ class WorkersTest {
val listener = RecordingDiagnosticListener()

runBlocking {
val outputs = launchWorker(worker, workerId, workflowId, listener)
val outputs = launchWorker(worker, "", workerId, workflowId, listener)

// Start event is sent by WorkflowNode.
yield()
Expand All @@ -105,7 +106,7 @@ class WorkersTest {
val channel = Channel<String>(capacity = 1)

runBlocking {
val workerOutputs = launchWorker(channel.asWorker(), 0, 0, null)
val workerOutputs = launchWorker(channel.asWorker())
assertTrue(workerOutputs.isEmpty)

channel.close()
Expand All @@ -117,7 +118,7 @@ class WorkersTest {
val channel = Channel<String>(capacity = 1)

runBlocking {
val workerOutputs = launchWorker(channel.asWorker(), 0, 0, null)
val workerOutputs = launchWorker(channel.asWorker())
assertTrue(workerOutputs.isEmpty)

channel.send("foo")
Expand All @@ -135,7 +136,7 @@ class WorkersTest {
// Needed so that cancelling the channel doesn't cancel our job, which means receive will
// throw the JobCancellationException instead of the actual channel failure.
supervisorScope {
val workerOutputs = launchWorker(channel.asWorker(), 0, 0, null)
val workerOutputs = launchWorker(channel.asWorker())
assertTrue(workerOutputs.isEmpty)

channel.close(ExpectedException())
Expand All @@ -148,7 +149,7 @@ class WorkersTest {
val channel = Channel<String>(capacity = 1)

runBlocking {
val workerOutputs = launchWorker(channel.asWorker(), 0, 0, null)
val workerOutputs = launchWorker(channel.asWorker())
channel.close()
assertTrue(workerOutputs.receive().isDone)

Expand All @@ -165,7 +166,7 @@ class WorkersTest {

val error = runBlocking {
assertFailsWith<NullPointerException> {
launchWorker(nullFlowWorker, 0, 0, null)
launchWorker(nullFlowWorker)
}
}

Expand All @@ -176,5 +177,45 @@ class WorkersTest {
)
}

@Test fun `launchWorker coroutine is named without key`() {
val output = runBlocking {
launchWorker(CoroutineNameWorker)
.consume { receive() }
.value
}

assertEquals("CoroutineNameWorker.toString", output)
}

@Test fun `launchWorker coroutine is named with key`() {
val output = runBlocking {
launchWorker(CoroutineNameWorker, key = "foo")
.consume { receive() }
.value
}

assertEquals("CoroutineNameWorker.toString:foo", output)
}

private fun <T> CoroutineScope.launchWorker(
worker: Worker<T>,
key: String = ""
) = launchWorker(
worker,
key = key,
workerDiagnosticId = 0,
workflowDiagnosticId = 0,
diagnosticListener = null
)

private class ExpectedException : RuntimeException()

private object CoroutineNameWorker : Worker<String> {
override fun run(): Flow<String> = flow {
val nameElement = coroutineContext[CoroutineName] as CoroutineName
emit(nameElement.name)
}

override fun toString(): String = "CoroutineNameWorker.toString"
}
}

0 comments on commit 5467ed0

Please sign in to comment.