From dd81f574fd3bd732f23be06c85cf5920f83a2c1f Mon Sep 17 00:00:00 2001 From: Zachary Klippenstein Date: Fri, 23 Nov 2018 15:08:46 -0800 Subject: [PATCH] Workers: Simpler stateless helper workflows. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This should make it relatively easy to write network calls that need to inject services. The main problem is that the API is not very discoverable. It uses some syntactic magic to have the launcher act like a callable function. The workflow interface is a little too general for this – we need to have a state type and output some sort of state (can't use `Nothing`) but this type of workflow doesn't need to have any state emitted. --- .../main/java/com/squareup/workflow/Worker.kt | 97 ++++++++++++++++ .../com/squareup/workflow/WorkflowPool.kt | 108 +++++++++++++++--- .../java/com/squareup/workflow/WorkerTest.kt | 55 +++++++++ .../java/com/squareup/workflow/rx2/Workers.kt | 18 +++ .../squareup/workflow/rx2/WorkflowPools.kt | 21 +++- .../rx2/ComposedReactorIntegrationTest.kt | 1 + .../workflow/rx2/WorkerIntegrationTest.kt | 52 +++++++++ 7 files changed, 337 insertions(+), 15 deletions(-) create mode 100644 workflow-core/src/main/java/com/squareup/workflow/Worker.kt create mode 100644 workflow-core/src/test/java/com/squareup/workflow/WorkerTest.kt create mode 100644 workflow-rx2/src/main/java/com/squareup/workflow/rx2/Workers.kt create mode 100644 workflow-rx2/src/test/java/com/squareup/workflow/rx2/WorkerIntegrationTest.kt diff --git a/workflow-core/src/main/java/com/squareup/workflow/Worker.kt b/workflow-core/src/main/java/com/squareup/workflow/Worker.kt new file mode 100644 index 000000000..7219299fe --- /dev/null +++ b/workflow-core/src/main/java/com/squareup/workflow/Worker.kt @@ -0,0 +1,97 @@ +package com.squareup.workflow + +import com.squareup.workflow.WorkflowPool.Id +import com.squareup.workflow.WorkflowPool.Type +import kotlinx.coroutines.experimental.Deferred + +/** + * Defines a discrete task that a [Workflow] can execute asynchronously via [WorkflowPool], + * possibly across state changes. + * + * # Defining Workers + * + * There are a few ways to define a worker: + * - Call [worker] and pass a suspending block: + * ``` + * val echoWorker = worker { input -> + * withContext(IO) { + * echoService.echo(input) + * } + * } + * ``` + * + * - Convert a [Deferred] to a worker directly: + * ``` + * val deferredWorker = deferred.asWorker() + * ``` + * + * - Implement the `Worker` interface and implement [call]: + * ``` + * class EchoWorker( + * private val echoService: EchoService, + * private val ioContext: CoroutineContext + * ) : Worker { + * override suspend fun call(input: String): String = + * withContext(ioContext) { + * echoService.echo(input) + * } + * } + * ``` + * + * # Running Workers + * + * To use, pass an instance of your [Worker] to [WorkflowPool.awaitWorkerResult] along with its + * input value. You may also pass an optional `name` `String` if you need to distinguish between + * multiple invocations. E.g.: + * + * ``` + * class MainReactor(private val echo: EchoWorker) : Reactor<…> { + * override suspend fun onReact( + * …, + * workflows: WorkflowPool + * ): Reaction<…, …> { + * … + * workflows.awaitWorkerResult(echo, "hello world") + * .let { result: String -> + * // React to result. + * } + * } + * } + * ``` + */ +interface Worker { + /** + * Perform some asynchronous work. + */ + suspend fun call(input: I): O +} + +/** + * Creates a [Worker] that passes [block] its input value and uses its return value as the result. + */ +fun worker(block: suspend (I) -> O): Worker = object : Worker { + override suspend fun call(input: I): O = block(input) +} + +/** + * Creates a [Worker] that will report the [Deferred]'s eventual value as its result. + */ +fun Deferred.asWorker(): Worker = worker { await() } + +/** + * Uniquely identifies the [Worker] across the [WorkflowPool]. + * See [WorkflowPool.Type.makeId] for details. + */ +@Suppress("unused") +inline fun Worker.makeId( + name: String = "" +): Id = workflowType.makeId(name) + +/** + * Returns the [Type] of the [Worker] for the [WorkflowPool] + */ +@PublishedApi +@Suppress("unused") +internal inline val + Worker.workflowType: Type + get() = Type(I::class, Nothing::class, O::class) diff --git a/workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt b/workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt index 021a3c0d7..57cc965f4 100644 --- a/workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt +++ b/workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.experimental.Deferred import kotlinx.coroutines.experimental.Dispatchers.Unconfined import kotlinx.coroutines.experimental.GlobalScope import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.experimental.channels.ReceiveChannel import org.jetbrains.annotations.TestOnly import kotlin.reflect.KClass @@ -22,7 +23,7 @@ class WorkflowPool { * - [Launcher.workflowType] * - `KClass.workflowType` */ - class Type( + class Type( stateType: KClass, eventType: KClass, outputType: KClass @@ -62,7 +63,7 @@ class WorkflowPool { * [completed][Workflow.invokeOnCompletion], including via calls to * [WorkflowPool.abandonDelegate]. */ - interface Launcher { + interface Launcher { fun launch( initialState: S, workflows: WorkflowPool @@ -77,7 +78,7 @@ class WorkflowPool { * - `KClass.makeId()` * - [Delegating.makeId] */ - data class Id + data class Id internal constructor( val name: String, val workflowType: Type @@ -85,7 +86,7 @@ class WorkflowPool { private class WorkflowEntry(val workflow: Workflow<*, *, *>) - private val factories = mutableMapOf, Launcher<*, *, *>>() + private val launchers = mutableMapOf, Launcher<*, *, *>>() private val workflows = mutableMapOf, WorkflowEntry>() @get:TestOnly val peekWorkflowsCount get() = workflows.values.size @@ -96,15 +97,11 @@ class WorkflowPool { * matching [Launcher] will be replaced, with the intention of allowing redundant calls * to be safe. */ - inline fun register( - factory: Launcher - ) = register(factory, factory.workflowType) - - @PublishedApi internal fun register( - factory: Launcher, + fun register( + launcher: Launcher, type: Type ) { - factories[type] = factory + launchers[type] = launcher } /** @@ -134,6 +131,41 @@ class WorkflowPool { return state?.let(::EnterState) ?: FinishWith(workflow.await()) } + /** + * Starts the required [Worker] if it wasn't already running. Suspends until the worker + * completes and then returns its result. + * + * If the worker was not already running, it is started with the given input. + * + * @throws kotlinx.coroutines.experimental.CancellationException If the worker is + * [abandoned][abandonDelegate]. + * @see workerResult + */ + suspend inline fun awaitWorkerResult( + worker: Worker, + input: I, + name: String = "" + ): O = awaitWorkerResult(worker, input, name, worker.workflowType) + + /** + * Hides the actual logic of starting processes from being inline in external code. + */ + @PublishedApi + internal suspend fun awaitWorkerResult( + worker: Worker, + input: I, + name: String, + type: Type + ): O { + register(worker.asReactor(), type) + val delegating = object : Delegating { + override val id: Id = type.makeId(name) + override val delegateState: I get() = input + } + return requireWorkflow(delegating) + .await() + } + /** * Returns a [WorkflowInput] that will route events to the identified [Workflow], * if it is running. That check is made each time an event is sent: if the workflow @@ -165,10 +197,10 @@ class WorkflowPool { workflows.values.forEach { it.workflow.cancel() } } - private fun factory( + private fun launcher( type: Type ): Launcher { - val launcher = factories[type] + val launcher = launchers[type] check(launcher != null) { "Expected launcher for \"$type\". Did you forget to call WorkflowPool.register()?" } @@ -188,7 +220,7 @@ class WorkflowPool { var workflow = workflows[delegating.id]?.workflow as Workflow? if (workflow == null) { - workflow = factory(delegating.id.workflowType).launch(delegating.delegateState, this) + workflow = launcher(delegating.id.workflowType).launch(delegating.delegateState, this) val entry = WorkflowEntry(workflow) workflows[delegating.id] = entry @@ -200,6 +232,18 @@ class WorkflowPool { } } +/** + * Registers the [Launcher] to be used to create workflows that match its [Launcher.workflowType], + * in response to calls to [nextDelegateReaction]. A previously registered + * matching [Launcher] will be replaced, with the intention of allowing redundant calls + * to be safe. + */ +// Note: This is defined as an extension function so that custom register functions can be defined +// that can implement custom behavior for specific launcher sub-types. +inline fun WorkflowPool.register( + launcher: Launcher +) = register(launcher, launcher.workflowType) + /** * This is a convenience method that wraps * [awaitNextDelegateReaction][WorkflowPool.awaitNextDelegateReaction] in a [Deferred] so it can @@ -213,6 +257,29 @@ fun WorkflowPool.nextDelegateReaction( awaitNextDelegateReaction(delegating) } +/** + * This is a convenience method that wraps [awaitWorkerResult][WorkflowPool.awaitWorkerResult] + * in a [Deferred] so it can be selected on. + * + * @see WorkflowPool.awaitWorkerResult + */ +inline fun WorkflowPool.workerResult( + worker: Worker, + input: I, + name: String = "" +): Deferred = workerResult(worker, input, name, worker.workflowType) + +/** + * Hides the implementation of [workerResult] above from being inlined in public code. + */ +@PublishedApi +internal fun WorkflowPool.workerResult( + worker: Worker, + input: I, + name: String = "", + type: Type +) = GlobalScope.async(Unconfined) { awaitWorkerResult(worker, input, name, type) } + /** * Returns the [Type] that represents this [Launcher]'s type parameters. */ @@ -241,3 +308,16 @@ inline fun inline val KClass>.workflowType: Type get() = Type(S::class, E::class, O::class) + +private fun Worker.asReactor() = object : Reactor { + override fun launch( + initialState: I, + workflows: WorkflowPool + ): Workflow<@UnsafeVariance I, Nothing, O> = doLaunch(initialState, workflows) + + override suspend fun onReact( + state: I, + events: ReceiveChannel, + workflows: WorkflowPool + ): Reaction<@UnsafeVariance I, O> = FinishWith(call(state)) +} diff --git a/workflow-core/src/test/java/com/squareup/workflow/WorkerTest.kt b/workflow-core/src/test/java/com/squareup/workflow/WorkerTest.kt new file mode 100644 index 000000000..c902546a4 --- /dev/null +++ b/workflow-core/src/test/java/com/squareup/workflow/WorkerTest.kt @@ -0,0 +1,55 @@ +package com.squareup.workflow + +import kotlinx.coroutines.experimental.CancellationException +import kotlinx.coroutines.experimental.CompletableDeferred +import org.junit.Test +import java.io.IOException +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class WorkerTest { + + private val pool = WorkflowPool() + private val deferred = CompletableDeferred() + private val worker = deferred.asWorker() + + @Test fun whenCallSucceeds() { + val reaction = pool.workerResult(worker, Unit) + assertFalse(reaction.isCompleted) + + deferred.complete(Unit) + + assertEquals(Unit, reaction.getCompleted()) + } + + @Test fun whenCallFails() { + val reaction = pool.workerResult(worker, Unit) + assertFalse(reaction.isCompleted) + + deferred.completeExceptionally(IOException("network fail")) + + val failure = reaction.getCompletionExceptionOrNull()!! + assertTrue(failure is ReactorException) + assertTrue(failure.cause is IOException) + } + + @Test fun whenInternalCoroutineCancelled() { + val reaction = pool.workerResult(worker, Unit) + assertFalse(reaction.isCompleted) + + deferred.cancel() + + assertFailsWith { reaction.getCompleted() } + } + + @Test fun whenWorkflowCancelled() { + val reaction = pool.workerResult(worker, Unit) + assertFalse(reaction.isCompleted) + + pool.abandonDelegate(worker.makeId()) + + assertFailsWith { reaction.getCompleted() } + } +} diff --git a/workflow-rx2/src/main/java/com/squareup/workflow/rx2/Workers.kt b/workflow-rx2/src/main/java/com/squareup/workflow/rx2/Workers.kt new file mode 100644 index 000000000..d7b57621b --- /dev/null +++ b/workflow-rx2/src/main/java/com/squareup/workflow/rx2/Workers.kt @@ -0,0 +1,18 @@ +package com.squareup.workflow.rx2 + +import com.squareup.workflow.Worker +import com.squareup.workflow.worker +import io.reactivex.Single +import kotlinx.coroutines.experimental.rx2.await + +/** + * Creates a [Worker] that will pass its input value to [block], then subscribe to the returned + * [Single] and report the value it emits as the worker result. + */ +fun singleWorker(block: (I) -> Single): Worker = + worker { block(it).await() } + +/** + * Creates a [Worker] that will report the [Single]'s eventual value as its result. + */ +fun Single.asWorker(): Worker = worker { await() } diff --git a/workflow-rx2/src/main/java/com/squareup/workflow/rx2/WorkflowPools.kt b/workflow-rx2/src/main/java/com/squareup/workflow/rx2/WorkflowPools.kt index 2bff463b1..733144abd 100644 --- a/workflow-rx2/src/main/java/com/squareup/workflow/rx2/WorkflowPools.kt +++ b/workflow-rx2/src/main/java/com/squareup/workflow/rx2/WorkflowPools.kt @@ -3,13 +3,16 @@ package com.squareup.workflow.rx2 import com.squareup.workflow.Delegating +import com.squareup.workflow.Worker import com.squareup.workflow.Reaction import com.squareup.workflow.WorkflowPool import io.reactivex.Single import kotlinx.coroutines.experimental.CancellationException +import kotlinx.coroutines.experimental.Deferred import kotlinx.coroutines.experimental.Dispatchers.Unconfined import kotlinx.coroutines.experimental.rx2.asSingle import com.squareup.workflow.nextDelegateReaction as nextDelegateReactionCore +import com.squareup.workflow.workerResult as workerResultCore /** * Starts the required nested workflow if it wasn't already running. Returns @@ -27,7 +30,23 @@ import com.squareup.workflow.nextDelegateReaction as nextDelegateReactionCore */ fun WorkflowPool.nextDelegateReaction( delegating: Delegating -): Single> = nextDelegateReactionCore(delegating).asSingle(Unconfined) +): Single> = nextDelegateReactionCore(delegating).asSingleNeverOnCancel() + +/** + * This is a convenience method that wraps + * [awaitWorkerResult][WorkflowPool.awaitWorkerResult] in a [Deferred] so it can + * be selected on. + * + * @see WorkflowPool.awaitWorkerResult + */ +inline fun WorkflowPool.workerResult( + worker: Worker, + input: I, + name: String = "" +): Single = workerResultCore(worker, input, name).asSingleNeverOnCancel() + +@PublishedApi +internal fun Deferred.asSingleNeverOnCancel() = asSingle(Unconfined) .onErrorResumeNext { if (it is CancellationException) { Single.never() diff --git a/workflow-rx2/src/test/java/com/squareup/workflow/rx2/ComposedReactorIntegrationTest.kt b/workflow-rx2/src/test/java/com/squareup/workflow/rx2/ComposedReactorIntegrationTest.kt index 00e0e9fba..6c5d6aae6 100644 --- a/workflow-rx2/src/test/java/com/squareup/workflow/rx2/ComposedReactorIntegrationTest.kt +++ b/workflow-rx2/src/test/java/com/squareup/workflow/rx2/ComposedReactorIntegrationTest.kt @@ -8,6 +8,7 @@ import com.squareup.workflow.Workflow import com.squareup.workflow.WorkflowPool import com.squareup.workflow.WorkflowPool.Id import com.squareup.workflow.makeId +import com.squareup.workflow.register import com.squareup.workflow.rx2.ComposedReactorIntegrationTest.OuterEvent.Background import com.squareup.workflow.rx2.ComposedReactorIntegrationTest.OuterEvent.Cancel import com.squareup.workflow.rx2.ComposedReactorIntegrationTest.OuterEvent.Pause diff --git a/workflow-rx2/src/test/java/com/squareup/workflow/rx2/WorkerIntegrationTest.kt b/workflow-rx2/src/test/java/com/squareup/workflow/rx2/WorkerIntegrationTest.kt new file mode 100644 index 000000000..7d9cd7ecc --- /dev/null +++ b/workflow-rx2/src/test/java/com/squareup/workflow/rx2/WorkerIntegrationTest.kt @@ -0,0 +1,52 @@ +package com.squareup.workflow.rx2 + +import com.squareup.workflow.ReactorException +import com.squareup.workflow.WorkflowPool +import com.squareup.workflow.makeId +import io.reactivex.subjects.SingleSubject +import org.assertj.core.api.Java6Assertions.assertThat +import org.junit.Test +import java.io.IOException + +class WorkerIntegrationTest { + + private val pool = WorkflowPool() + private val single = SingleSubject.create() + private val worker = single.asWorker() + + @Test fun whenCallSucceeds() { + val reaction = pool.workerResult(worker, Unit) + .test() + reaction.assertNotTerminated() + + single.onSuccess(Unit) + + reaction.assertValue(Unit) + } + + @Test fun whenCallFails() { + val reaction = pool.workerResult(worker, Unit) + .test() + reaction.assertNotTerminated() + + single.onError(IOException("network fail")) + + val failure = reaction.errors() + .single() + assertThat(failure).isInstanceOf(ReactorException::class.java) + assertThat(failure.cause is IOException).isTrue() + } + + @Test fun whenWorkflowCancelled() { + val reaction = pool.workerResult(worker, Unit) + .test() + reaction.assertNotTerminated() + + pool.abandonDelegate(worker.makeId()) + + // The rx2 version of nextProcessResult will never complete the single if the workflow is + // cancelled. + reaction.assertNoValues() + reaction.assertNoErrors() + } +}