Skip to content

Commit

Permalink
Merge pull request #851 from square/zachklipp/unconfined-workers
Browse files Browse the repository at this point in the history
Make all workers run on the Unconfined dispatcher.
  • Loading branch information
zach-klippenstein committed Jan 8, 2020
2 parents 943c8a0 + ebfa3d3 commit a678c73
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
16 changes: 11 additions & 5 deletions kotlin/workflow-core/src/main/java/com/squareup/workflow/Worker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,17 @@ interface Worker<out OutputT> {
*
* ## Coroutine Context
*
* When a worker is started, a coroutine is launched to [collect][Flow.collect] the flow. This
* coroutine is launched in the same scope as the workflow runtime, with the addition a
* [CoroutineName][kotlinx.coroutines.CoroutineName] that describes the `Worker` instance
* (via `toString`) and the key specified by the workflow running the worker. When the worker is
* torn down, the coroutine is cancelled.
* When a worker is started, a coroutine is launched to [collect][Flow.collect] the flow.
* When the worker is torn down, the coroutine is cancelled.
* This coroutine is launched in the same scope as the workflow runtime, with a few changes:
*
* - The dispatcher is always set to [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] to
* minimize overhead for workers that don't care which thread they're executed on (e.g. logging
* side effects, workers that wrap third-party reactive libraries, etc.). If your work cares
* which thread it runs on, use [withContext][kotlinx.coroutines.withContext] or
* [flowOn][kotlinx.coroutines.flow.flowOn] to specify a dispatcher.
* - A [CoroutineName][kotlinx.coroutines.CoroutineName] that describes the `Worker` instance
* (via `toString`) and the key specified by the workflow running the worker.
*
* ## Exceptions
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.squareup.workflow.Worker
import com.squareup.workflow.diagnostic.WorkflowDiagnosticListener
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
Expand Down Expand Up @@ -131,7 +132,7 @@ private fun <T> Flow<T>.transformToValueOrDone(): Flow<ValueOrDone<T>> = flow {
private fun CoroutineScope.createWorkerScope(
worker: Worker<*>,
key: String
): CoroutineScope = this + CoroutineName(worker.debugName(key))
): CoroutineScope = this + CoroutineName(worker.debugName(key)) + Unconfined

private fun Worker<*>.debugName(key: String) =
toString().let { if (key.isBlank()) it else "$it:$key" }
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.squareup.workflow.asWorker
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.flow.Flow
Expand All @@ -32,10 +33,12 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.yield
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.coroutineContext
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertSame
import kotlin.test.assertTrue

class WorkersTest {
Expand All @@ -52,18 +55,18 @@ class WorkersTest {
launch(start = UNDISPATCHED) {
assertEquals(0, counter.getAndIncrement())
channel.send("a")
assertEquals(2, counter.getAndIncrement())
assertEquals(1, counter.getAndIncrement())
channel.send("b")
assertEquals(4, counter.getAndIncrement())
assertEquals(3, counter.getAndIncrement())
channel.close()
assertEquals(5, counter.getAndIncrement())
assertEquals(4, counter.getAndIncrement())
}
yield()
assertEquals(1, counter.getAndIncrement())
assertEquals(2, counter.getAndIncrement())

assertEquals("a", workerOutputs.poll()!!.value)
yield()
assertEquals(3, counter.getAndIncrement())
assertEquals(5, counter.getAndIncrement())

assertEquals("b", workerOutputs.poll()!!.value)
yield()
Expand Down Expand Up @@ -197,6 +200,18 @@ class WorkersTest {
assertEquals("CoroutineNameWorker.toString:foo", output)
}

@Test fun `launchWorker dispatcher is unconfined`() {
val worker = Worker.from { coroutineContext[ContinuationInterceptor] }

runBlocking {
val interceptor = launchWorker(worker)
.consume { receive() }
.value

assertSame(Dispatchers.Unconfined, interceptor)
}
}

private fun <T> CoroutineScope.launchWorker(
worker: Worker<T>,
key: String = ""
Expand Down

0 comments on commit a678c73

Please sign in to comment.