diff --git a/workflow-core/src/main/java/com/squareup/workflow/Worker.kt b/workflow-core/src/main/java/com/squareup/workflow/Worker.kt new file mode 100644 index 000000000..7219299fe --- /dev/null +++ b/workflow-core/src/main/java/com/squareup/workflow/Worker.kt @@ -0,0 +1,97 @@ +package com.squareup.workflow + +import com.squareup.workflow.WorkflowPool.Id +import com.squareup.workflow.WorkflowPool.Type +import kotlinx.coroutines.experimental.Deferred + +/** + * Defines a discrete task that a [Workflow] can execute asynchronously via [WorkflowPool], + * possibly across state changes. + * + * # Defining Workers + * + * There are a few ways to define a worker: + * - Call [worker] and pass a suspending block: + * ``` + * val echoWorker = worker { input -> + * withContext(IO) { + * echoService.echo(input) + * } + * } + * ``` + * + * - Convert a [Deferred] to a worker directly: + * ``` + * val deferredWorker = deferred.asWorker() + * ``` + * + * - Implement the `Worker` interface and implement [call]: + * ``` + * class EchoWorker( + * private val echoService: EchoService, + * private val ioContext: CoroutineContext + * ) : Worker { + * override suspend fun call(input: String): String = + * withContext(ioContext) { + * echoService.echo(input) + * } + * } + * ``` + * + * # Running Workers + * + * To use, pass an instance of your [Worker] to [WorkflowPool.awaitWorkerResult] along with its + * input value. You may also pass an optional `name` `String` if you need to distinguish between + * multiple invocations. E.g.: + * + * ``` + * class MainReactor(private val echo: EchoWorker) : Reactor<…> { + * override suspend fun onReact( + * …, + * workflows: WorkflowPool + * ): Reaction<…, …> { + * … + * workflows.awaitWorkerResult(echo, "hello world") + * .let { result: String -> + * // React to result. + * } + * } + * } + * ``` + */ +interface Worker { + /** + * Perform some asynchronous work. + */ + suspend fun call(input: I): O +} + +/** + * Creates a [Worker] that passes [block] its input value and uses its return value as the result. + */ +fun worker(block: suspend (I) -> O): Worker = object : Worker { + override suspend fun call(input: I): O = block(input) +} + +/** + * Creates a [Worker] that will report the [Deferred]'s eventual value as its result. + */ +fun Deferred.asWorker(): Worker = worker { await() } + +/** + * Uniquely identifies the [Worker] across the [WorkflowPool]. + * See [WorkflowPool.Type.makeId] for details. + */ +@Suppress("unused") +inline fun Worker.makeId( + name: String = "" +): Id = workflowType.makeId(name) + +/** + * Returns the [Type] of the [Worker] for the [WorkflowPool] + */ +@PublishedApi +@Suppress("unused") +internal inline val + Worker.workflowType: Type + get() = Type(I::class, Nothing::class, O::class) diff --git a/workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt b/workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt index 021a3c0d7..57cc965f4 100644 --- a/workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt +++ b/workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.experimental.Deferred import kotlinx.coroutines.experimental.Dispatchers.Unconfined import kotlinx.coroutines.experimental.GlobalScope import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.experimental.channels.ReceiveChannel import org.jetbrains.annotations.TestOnly import kotlin.reflect.KClass @@ -22,7 +23,7 @@ class WorkflowPool { * - [Launcher.workflowType] * - `KClass.workflowType` */ - class Type( + class Type( stateType: KClass, eventType: KClass, outputType: KClass @@ -62,7 +63,7 @@ class WorkflowPool { * [completed][Workflow.invokeOnCompletion], including via calls to * [WorkflowPool.abandonDelegate]. */ - interface Launcher { + interface Launcher { fun launch( initialState: S, workflows: WorkflowPool @@ -77,7 +78,7 @@ class WorkflowPool { * - `KClass.makeId()` * - [Delegating.makeId] */ - data class Id + data class Id internal constructor( val name: String, val workflowType: Type @@ -85,7 +86,7 @@ class WorkflowPool { private class WorkflowEntry(val workflow: Workflow<*, *, *>) - private val factories = mutableMapOf, Launcher<*, *, *>>() + private val launchers = mutableMapOf, Launcher<*, *, *>>() private val workflows = mutableMapOf, WorkflowEntry>() @get:TestOnly val peekWorkflowsCount get() = workflows.values.size @@ -96,15 +97,11 @@ class WorkflowPool { * matching [Launcher] will be replaced, with the intention of allowing redundant calls * to be safe. */ - inline fun register( - factory: Launcher - ) = register(factory, factory.workflowType) - - @PublishedApi internal fun register( - factory: Launcher, + fun register( + launcher: Launcher, type: Type ) { - factories[type] = factory + launchers[type] = launcher } /** @@ -134,6 +131,41 @@ class WorkflowPool { return state?.let(::EnterState) ?: FinishWith(workflow.await()) } + /** + * Starts the required [Worker] if it wasn't already running. Suspends until the worker + * completes and then returns its result. + * + * If the worker was not already running, it is started with the given input. + * + * @throws kotlinx.coroutines.experimental.CancellationException If the worker is + * [abandoned][abandonDelegate]. + * @see workerResult + */ + suspend inline fun awaitWorkerResult( + worker: Worker, + input: I, + name: String = "" + ): O = awaitWorkerResult(worker, input, name, worker.workflowType) + + /** + * Hides the actual logic of starting processes from being inline in external code. + */ + @PublishedApi + internal suspend fun awaitWorkerResult( + worker: Worker, + input: I, + name: String, + type: Type + ): O { + register(worker.asReactor(), type) + val delegating = object : Delegating { + override val id: Id = type.makeId(name) + override val delegateState: I get() = input + } + return requireWorkflow(delegating) + .await() + } + /** * Returns a [WorkflowInput] that will route events to the identified [Workflow], * if it is running. That check is made each time an event is sent: if the workflow @@ -165,10 +197,10 @@ class WorkflowPool { workflows.values.forEach { it.workflow.cancel() } } - private fun factory( + private fun launcher( type: Type ): Launcher { - val launcher = factories[type] + val launcher = launchers[type] check(launcher != null) { "Expected launcher for \"$type\". Did you forget to call WorkflowPool.register()?" } @@ -188,7 +220,7 @@ class WorkflowPool { var workflow = workflows[delegating.id]?.workflow as Workflow? if (workflow == null) { - workflow = factory(delegating.id.workflowType).launch(delegating.delegateState, this) + workflow = launcher(delegating.id.workflowType).launch(delegating.delegateState, this) val entry = WorkflowEntry(workflow) workflows[delegating.id] = entry @@ -200,6 +232,18 @@ class WorkflowPool { } } +/** + * Registers the [Launcher] to be used to create workflows that match its [Launcher.workflowType], + * in response to calls to [nextDelegateReaction]. A previously registered + * matching [Launcher] will be replaced, with the intention of allowing redundant calls + * to be safe. + */ +// Note: This is defined as an extension function so that custom register functions can be defined +// that can implement custom behavior for specific launcher sub-types. +inline fun WorkflowPool.register( + launcher: Launcher +) = register(launcher, launcher.workflowType) + /** * This is a convenience method that wraps * [awaitNextDelegateReaction][WorkflowPool.awaitNextDelegateReaction] in a [Deferred] so it can @@ -213,6 +257,29 @@ fun WorkflowPool.nextDelegateReaction( awaitNextDelegateReaction(delegating) } +/** + * This is a convenience method that wraps [awaitWorkerResult][WorkflowPool.awaitWorkerResult] + * in a [Deferred] so it can be selected on. + * + * @see WorkflowPool.awaitWorkerResult + */ +inline fun WorkflowPool.workerResult( + worker: Worker, + input: I, + name: String = "" +): Deferred = workerResult(worker, input, name, worker.workflowType) + +/** + * Hides the implementation of [workerResult] above from being inlined in public code. + */ +@PublishedApi +internal fun WorkflowPool.workerResult( + worker: Worker, + input: I, + name: String = "", + type: Type +) = GlobalScope.async(Unconfined) { awaitWorkerResult(worker, input, name, type) } + /** * Returns the [Type] that represents this [Launcher]'s type parameters. */ @@ -241,3 +308,16 @@ inline fun inline val KClass>.workflowType: Type get() = Type(S::class, E::class, O::class) + +private fun Worker.asReactor() = object : Reactor { + override fun launch( + initialState: I, + workflows: WorkflowPool + ): Workflow<@UnsafeVariance I, Nothing, O> = doLaunch(initialState, workflows) + + override suspend fun onReact( + state: I, + events: ReceiveChannel, + workflows: WorkflowPool + ): Reaction<@UnsafeVariance I, O> = FinishWith(call(state)) +} diff --git a/workflow-core/src/test/java/com/squareup/workflow/WorkerTest.kt b/workflow-core/src/test/java/com/squareup/workflow/WorkerTest.kt new file mode 100644 index 000000000..c902546a4 --- /dev/null +++ b/workflow-core/src/test/java/com/squareup/workflow/WorkerTest.kt @@ -0,0 +1,55 @@ +package com.squareup.workflow + +import kotlinx.coroutines.experimental.CancellationException +import kotlinx.coroutines.experimental.CompletableDeferred +import org.junit.Test +import java.io.IOException +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class WorkerTest { + + private val pool = WorkflowPool() + private val deferred = CompletableDeferred() + private val worker = deferred.asWorker() + + @Test fun whenCallSucceeds() { + val reaction = pool.workerResult(worker, Unit) + assertFalse(reaction.isCompleted) + + deferred.complete(Unit) + + assertEquals(Unit, reaction.getCompleted()) + } + + @Test fun whenCallFails() { + val reaction = pool.workerResult(worker, Unit) + assertFalse(reaction.isCompleted) + + deferred.completeExceptionally(IOException("network fail")) + + val failure = reaction.getCompletionExceptionOrNull()!! + assertTrue(failure is ReactorException) + assertTrue(failure.cause is IOException) + } + + @Test fun whenInternalCoroutineCancelled() { + val reaction = pool.workerResult(worker, Unit) + assertFalse(reaction.isCompleted) + + deferred.cancel() + + assertFailsWith { reaction.getCompleted() } + } + + @Test fun whenWorkflowCancelled() { + val reaction = pool.workerResult(worker, Unit) + assertFalse(reaction.isCompleted) + + pool.abandonDelegate(worker.makeId()) + + assertFailsWith { reaction.getCompleted() } + } +} diff --git a/workflow-rx2/src/main/java/com/squareup/workflow/rx2/Workers.kt b/workflow-rx2/src/main/java/com/squareup/workflow/rx2/Workers.kt new file mode 100644 index 000000000..d7b57621b --- /dev/null +++ b/workflow-rx2/src/main/java/com/squareup/workflow/rx2/Workers.kt @@ -0,0 +1,18 @@ +package com.squareup.workflow.rx2 + +import com.squareup.workflow.Worker +import com.squareup.workflow.worker +import io.reactivex.Single +import kotlinx.coroutines.experimental.rx2.await + +/** + * Creates a [Worker] that will pass its input value to [block], then subscribe to the returned + * [Single] and report the value it emits as the worker result. + */ +fun singleWorker(block: (I) -> Single): Worker = + worker { block(it).await() } + +/** + * Creates a [Worker] that will report the [Single]'s eventual value as its result. + */ +fun Single.asWorker(): Worker = worker { await() } diff --git a/workflow-rx2/src/main/java/com/squareup/workflow/rx2/WorkflowPools.kt b/workflow-rx2/src/main/java/com/squareup/workflow/rx2/WorkflowPools.kt index 2bff463b1..733144abd 100644 --- a/workflow-rx2/src/main/java/com/squareup/workflow/rx2/WorkflowPools.kt +++ b/workflow-rx2/src/main/java/com/squareup/workflow/rx2/WorkflowPools.kt @@ -3,13 +3,16 @@ package com.squareup.workflow.rx2 import com.squareup.workflow.Delegating +import com.squareup.workflow.Worker import com.squareup.workflow.Reaction import com.squareup.workflow.WorkflowPool import io.reactivex.Single import kotlinx.coroutines.experimental.CancellationException +import kotlinx.coroutines.experimental.Deferred import kotlinx.coroutines.experimental.Dispatchers.Unconfined import kotlinx.coroutines.experimental.rx2.asSingle import com.squareup.workflow.nextDelegateReaction as nextDelegateReactionCore +import com.squareup.workflow.workerResult as workerResultCore /** * Starts the required nested workflow if it wasn't already running. Returns @@ -27,7 +30,23 @@ import com.squareup.workflow.nextDelegateReaction as nextDelegateReactionCore */ fun WorkflowPool.nextDelegateReaction( delegating: Delegating -): Single> = nextDelegateReactionCore(delegating).asSingle(Unconfined) +): Single> = nextDelegateReactionCore(delegating).asSingleNeverOnCancel() + +/** + * This is a convenience method that wraps + * [awaitWorkerResult][WorkflowPool.awaitWorkerResult] in a [Deferred] so it can + * be selected on. + * + * @see WorkflowPool.awaitWorkerResult + */ +inline fun WorkflowPool.workerResult( + worker: Worker, + input: I, + name: String = "" +): Single = workerResultCore(worker, input, name).asSingleNeverOnCancel() + +@PublishedApi +internal fun Deferred.asSingleNeverOnCancel() = asSingle(Unconfined) .onErrorResumeNext { if (it is CancellationException) { Single.never() diff --git a/workflow-rx2/src/test/java/com/squareup/workflow/rx2/ComposedReactorIntegrationTest.kt b/workflow-rx2/src/test/java/com/squareup/workflow/rx2/ComposedReactorIntegrationTest.kt index 00e0e9fba..6c5d6aae6 100644 --- a/workflow-rx2/src/test/java/com/squareup/workflow/rx2/ComposedReactorIntegrationTest.kt +++ b/workflow-rx2/src/test/java/com/squareup/workflow/rx2/ComposedReactorIntegrationTest.kt @@ -8,6 +8,7 @@ import com.squareup.workflow.Workflow import com.squareup.workflow.WorkflowPool import com.squareup.workflow.WorkflowPool.Id import com.squareup.workflow.makeId +import com.squareup.workflow.register import com.squareup.workflow.rx2.ComposedReactorIntegrationTest.OuterEvent.Background import com.squareup.workflow.rx2.ComposedReactorIntegrationTest.OuterEvent.Cancel import com.squareup.workflow.rx2.ComposedReactorIntegrationTest.OuterEvent.Pause diff --git a/workflow-rx2/src/test/java/com/squareup/workflow/rx2/WorkerIntegrationTest.kt b/workflow-rx2/src/test/java/com/squareup/workflow/rx2/WorkerIntegrationTest.kt new file mode 100644 index 000000000..7d9cd7ecc --- /dev/null +++ b/workflow-rx2/src/test/java/com/squareup/workflow/rx2/WorkerIntegrationTest.kt @@ -0,0 +1,52 @@ +package com.squareup.workflow.rx2 + +import com.squareup.workflow.ReactorException +import com.squareup.workflow.WorkflowPool +import com.squareup.workflow.makeId +import io.reactivex.subjects.SingleSubject +import org.assertj.core.api.Java6Assertions.assertThat +import org.junit.Test +import java.io.IOException + +class WorkerIntegrationTest { + + private val pool = WorkflowPool() + private val single = SingleSubject.create() + private val worker = single.asWorker() + + @Test fun whenCallSucceeds() { + val reaction = pool.workerResult(worker, Unit) + .test() + reaction.assertNotTerminated() + + single.onSuccess(Unit) + + reaction.assertValue(Unit) + } + + @Test fun whenCallFails() { + val reaction = pool.workerResult(worker, Unit) + .test() + reaction.assertNotTerminated() + + single.onError(IOException("network fail")) + + val failure = reaction.errors() + .single() + assertThat(failure).isInstanceOf(ReactorException::class.java) + assertThat(failure.cause is IOException).isTrue() + } + + @Test fun whenWorkflowCancelled() { + val reaction = pool.workerResult(worker, Unit) + .test() + reaction.assertNotTerminated() + + pool.abandonDelegate(worker.makeId()) + + // The rx2 version of nextProcessResult will never complete the single if the workflow is + // cancelled. + reaction.assertNoValues() + reaction.assertNoErrors() + } +}