Skip to content

Commit

Permalink
Workers: Simpler stateless helper workflows.
Browse files Browse the repository at this point in the history
This should make it relatively easy to write network calls that need to inject
services. The main problem is that the API is not very discoverable. It uses
some syntactic magic to have the launcher act like a callable function. The workflow
interface is a little too general for this – we need to have a state type and output
some sort of state (can't use `Nothing`) but this type of workflow doesn't need to
have any state emitted.
  • Loading branch information
zach-klippenstein committed Dec 1, 2018
1 parent dae10e6 commit dd81f57
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 15 deletions.
97 changes: 97 additions & 0 deletions workflow-core/src/main/java/com/squareup/workflow/Worker.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.squareup.workflow

import com.squareup.workflow.WorkflowPool.Id
import com.squareup.workflow.WorkflowPool.Type
import kotlinx.coroutines.experimental.Deferred

/**
* Defines a discrete task that a [Workflow] can execute asynchronously via [WorkflowPool],
* possibly across state changes.
*
* # Defining Workers
*
* There are a few ways to define a worker:
* - Call [worker] and pass a suspending block:
* ```
* val echoWorker = worker { input ->
* withContext(IO) {
* echoService.echo(input)
* }
* }
* ```
*
* - Convert a [Deferred] to a worker directly:
* ```
* val deferredWorker = deferred.asWorker()
* ```
*
* - Implement the `Worker` interface and implement [call]:
* ```
* class EchoWorker(
* private val echoService: EchoService,
* private val ioContext: CoroutineContext
* ) : Worker<String, String> {
* override suspend fun call(input: String): String =
* withContext(ioContext) {
* echoService.echo(input)
* }
* }
* ```
*
* # Running Workers
*
* To use, pass an instance of your [Worker] to [WorkflowPool.awaitWorkerResult] along with its
* input value. You may also pass an optional `name` `String` if you need to distinguish between
* multiple invocations. E.g.:
*
* ```
* class MainReactor(private val echo: EchoWorker) : Reactor<…> {
* override suspend fun onReact(
* …,
* workflows: WorkflowPool
* ): Reaction<…, …> {
* …
* workflows.awaitWorkerResult(echo, "hello world")
* .let { result: String ->
* // React to result.
* }
* }
* }
* ```
*/
interface Worker<in I : Any, out O : Any> {
/**
* Perform some asynchronous work.
*/
suspend fun call(input: I): O
}

/**
* Creates a [Worker] that passes [block] its input value and uses its return value as the result.
*/
fun <I : Any, O : Any> worker(block: suspend (I) -> O): Worker<I, O> = object : Worker<I, O> {
override suspend fun call(input: I): O = block(input)
}

/**
* Creates a [Worker] that will report the [Deferred]'s eventual value as its result.
*/
fun <T : Any> Deferred<T>.asWorker(): Worker<Unit, T> = worker { await() }

/**
* Uniquely identifies the [Worker] across the [WorkflowPool].
* See [WorkflowPool.Type.makeId] for details.
*/
@Suppress("unused")
inline fun <reified I : Any, reified O : Any> Worker<I, O>.makeId(
name: String = ""
): Id<I, Nothing, O> = workflowType.makeId(name)

/**
* Returns the [Type] of the [Worker] for the [WorkflowPool]
*/
@PublishedApi
@Suppress("unused")
internal inline val <reified I : Any, reified O : Any>
Worker<I, O>.workflowType: Type<I, Nothing, O>
get() = Type(I::class, Nothing::class, O::class)
108 changes: 94 additions & 14 deletions workflow-core/src/main/java/com/squareup/workflow/WorkflowPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Dispatchers.Unconfined
import kotlinx.coroutines.experimental.GlobalScope
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import org.jetbrains.annotations.TestOnly
import kotlin.reflect.KClass

Expand All @@ -22,7 +23,7 @@ class WorkflowPool {
* - [Launcher.workflowType]
* - `KClass<Launcher>.workflowType`
*/
class Type<S : Any, E : Any, out O : Any>(
class Type<S : Any, in E : Any, out O : Any>(
stateType: KClass<S>,
eventType: KClass<E>,
outputType: KClass<O>
Expand Down Expand Up @@ -62,7 +63,7 @@ class WorkflowPool {
* [completed][Workflow.invokeOnCompletion], including via calls to
* [WorkflowPool.abandonDelegate].
*/
interface Launcher<S : Any, E : Any, out O : Any> {
interface Launcher<S : Any, in E : Any, out O : Any> {
fun launch(
initialState: S,
workflows: WorkflowPool
Expand All @@ -77,15 +78,15 @@ class WorkflowPool {
* - `KClass<Launcher>.makeId()`
* - [Delegating.makeId]
*/
data class Id<S : Any, E : Any, out O : Any>
data class Id<S : Any, in E : Any, out O : Any>
internal constructor(
val name: String,
val workflowType: Type<S, E, O>
)

private class WorkflowEntry(val workflow: Workflow<*, *, *>)

private val factories = mutableMapOf<Type<*, *, *>, Launcher<*, *, *>>()
private val launchers = mutableMapOf<Type<*, *, *>, Launcher<*, *, *>>()
private val workflows = mutableMapOf<Id<*, *, *>, WorkflowEntry>()

@get:TestOnly val peekWorkflowsCount get() = workflows.values.size
Expand All @@ -96,15 +97,11 @@ class WorkflowPool {
* matching [Launcher] will be replaced, with the intention of allowing redundant calls
* to be safe.
*/
inline fun <reified S : Any, reified E : Any, reified O : Any> register(
factory: Launcher<S, E, O>
) = register(factory, factory.workflowType)

@PublishedApi internal fun <S : Any, E : Any, O : Any> register(
factory: Launcher<S, E, O>,
fun <S : Any, E : Any, O : Any> register(
launcher: Launcher<S, E, O>,
type: Type<S, E, O>
) {
factories[type] = factory
launchers[type] = launcher
}

/**
Expand Down Expand Up @@ -134,6 +131,41 @@ class WorkflowPool {
return state?.let(::EnterState) ?: FinishWith(workflow.await())
}

/**
* Starts the required [Worker] if it wasn't already running. Suspends until the worker
* completes and then returns its result.
*
* If the worker was not already running, it is started with the given input.
*
* @throws kotlinx.coroutines.experimental.CancellationException If the worker is
* [abandoned][abandonDelegate].
* @see workerResult
*/
suspend inline fun <reified I : Any, reified O : Any> awaitWorkerResult(
worker: Worker<I, O>,
input: I,
name: String = ""
): O = awaitWorkerResult(worker, input, name, worker.workflowType)

/**
* Hides the actual logic of starting processes from being inline in external code.
*/
@PublishedApi
internal suspend fun <I : Any, O : Any> awaitWorkerResult(
worker: Worker<I, O>,
input: I,
name: String,
type: Type<I, Nothing, O>
): O {
register(worker.asReactor(), type)
val delegating = object : Delegating<I, Nothing, O> {
override val id: Id<I, Nothing, O> = type.makeId(name)
override val delegateState: I get() = input
}
return requireWorkflow(delegating)
.await()
}

/**
* Returns a [WorkflowInput] that will route events to the identified [Workflow],
* if it is running. That check is made each time an event is sent: if the workflow
Expand Down Expand Up @@ -165,10 +197,10 @@ class WorkflowPool {
workflows.values.forEach { it.workflow.cancel() }
}

private fun <S : Any, E : Any, O : Any> factory(
private fun <S : Any, E : Any, O : Any> launcher(
type: Type<S, E, O>
): Launcher<S, E, O> {
val launcher = factories[type]
val launcher = launchers[type]
check(launcher != null) {
"Expected launcher for \"$type\". Did you forget to call WorkflowPool.register()?"
}
Expand All @@ -188,7 +220,7 @@ class WorkflowPool {
var workflow = workflows[delegating.id]?.workflow as Workflow<S, E, O>?

if (workflow == null) {
workflow = factory(delegating.id.workflowType).launch(delegating.delegateState, this)
workflow = launcher(delegating.id.workflowType).launch(delegating.delegateState, this)

val entry = WorkflowEntry(workflow)
workflows[delegating.id] = entry
Expand All @@ -200,6 +232,18 @@ class WorkflowPool {
}
}

/**
* Registers the [Launcher] to be used to create workflows that match its [Launcher.workflowType],
* in response to calls to [nextDelegateReaction]. A previously registered
* matching [Launcher] will be replaced, with the intention of allowing redundant calls
* to be safe.
*/
// Note: This is defined as an extension function so that custom register functions can be defined
// that can implement custom behavior for specific launcher sub-types.
inline fun <reified S : Any, reified E : Any, reified O : Any> WorkflowPool.register(
launcher: Launcher<S, E, O>
) = register(launcher, launcher.workflowType)

/**
* This is a convenience method that wraps
* [awaitNextDelegateReaction][WorkflowPool.awaitNextDelegateReaction] in a [Deferred] so it can
Expand All @@ -213,6 +257,29 @@ fun <S : Any, O : Any> WorkflowPool.nextDelegateReaction(
awaitNextDelegateReaction(delegating)
}

/**
* This is a convenience method that wraps [awaitWorkerResult][WorkflowPool.awaitWorkerResult]
* in a [Deferred] so it can be selected on.
*
* @see WorkflowPool.awaitWorkerResult
*/
inline fun <reified I : Any, reified O : Any> WorkflowPool.workerResult(
worker: Worker<I, O>,
input: I,
name: String = ""
): Deferred<O> = workerResult(worker, input, name, worker.workflowType)

/**
* Hides the implementation of [workerResult] above from being inlined in public code.
*/
@PublishedApi
internal fun <I : Any, O : Any> WorkflowPool.workerResult(
worker: Worker<I, O>,
input: I,
name: String = "",
type: Type<I, Nothing, O>
) = GlobalScope.async(Unconfined) { awaitWorkerResult(worker, input, name, type) }

/**
* Returns the [Type] that represents this [Launcher]'s type parameters.
*/
Expand Down Expand Up @@ -241,3 +308,16 @@ inline fun <reified S : Any, reified E : Any, reified O : Any>
inline val <reified S : Any, reified E : Any, reified O : Any>
KClass<out Launcher<S, E, O>>.workflowType: Type<S, E, O>
get() = Type(S::class, E::class, O::class)

private fun <I : Any, O : Any> Worker<I, O>.asReactor() = object : Reactor<I, Nothing, O> {
override fun launch(
initialState: I,
workflows: WorkflowPool
): Workflow<@UnsafeVariance I, Nothing, O> = doLaunch(initialState, workflows)

override suspend fun onReact(
state: I,
events: ReceiveChannel<Nothing>,
workflows: WorkflowPool
): Reaction<@UnsafeVariance I, O> = FinishWith(call(state))
}
55 changes: 55 additions & 0 deletions workflow-core/src/test/java/com/squareup/workflow/WorkerTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.squareup.workflow

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CompletableDeferred
import org.junit.Test
import java.io.IOException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue

class WorkerTest {

private val pool = WorkflowPool()
private val deferred = CompletableDeferred<Unit>()
private val worker = deferred.asWorker()

@Test fun whenCallSucceeds() {
val reaction = pool.workerResult(worker, Unit)
assertFalse(reaction.isCompleted)

deferred.complete(Unit)

assertEquals(Unit, reaction.getCompleted())
}

@Test fun whenCallFails() {
val reaction = pool.workerResult(worker, Unit)
assertFalse(reaction.isCompleted)

deferred.completeExceptionally(IOException("network fail"))

val failure = reaction.getCompletionExceptionOrNull()!!
assertTrue(failure is ReactorException)
assertTrue(failure.cause is IOException)
}

@Test fun whenInternalCoroutineCancelled() {
val reaction = pool.workerResult(worker, Unit)
assertFalse(reaction.isCompleted)

deferred.cancel()

assertFailsWith<CancellationException> { reaction.getCompleted() }
}

@Test fun whenWorkflowCancelled() {
val reaction = pool.workerResult(worker, Unit)
assertFalse(reaction.isCompleted)

pool.abandonDelegate(worker.makeId())

assertFailsWith<CancellationException> { reaction.getCompleted() }
}
}
18 changes: 18 additions & 0 deletions workflow-rx2/src/main/java/com/squareup/workflow/rx2/Workers.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.squareup.workflow.rx2

import com.squareup.workflow.Worker
import com.squareup.workflow.worker
import io.reactivex.Single
import kotlinx.coroutines.experimental.rx2.await

/**
* Creates a [Worker] that will pass its input value to [block], then subscribe to the returned
* [Single] and report the value it emits as the worker result.
*/
fun <I : Any, O : Any> singleWorker(block: (I) -> Single<O>): Worker<I, O> =
worker { block(it).await() }

/**
* Creates a [Worker] that will report the [Single]'s eventual value as its result.
*/
fun <T : Any> Single<T>.asWorker(): Worker<Unit, T> = worker { await() }

0 comments on commit dd81f57

Please sign in to comment.