Skip to content

Commit

Permalink
Create a Worker.transform operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-klippenstein committed Aug 20, 2019
1 parent a584ebf commit dbe799c
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import com.squareup.workflow.StatefulWorkflow
import com.squareup.workflow.Worker
import com.squareup.workflow.WorkflowAction.Companion.enterState
import com.squareup.workflow.onWorkerOutput
import kotlinx.coroutines.flow.collect
import java.util.UUID
import com.squareup.workflow.transform
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.transform
import kotlin.random.Random
import kotlin.reflect.KClass

Expand Down Expand Up @@ -95,12 +96,12 @@ private fun <T : Enum<T>> Random.nextEnum(enumClass: KClass<T>): T {
/**
* Scales the tick frequency by a random amount to make direction changes look more arbitrary.
*/
@UseExperimental(ExperimentalCoroutinesApi::class)
private fun Worker<Long>.createDirectionTicker(random: Random): Worker<Unit> =
Worker.create(key = UUID.randomUUID().toString()) {
run()
.collect { tick ->
if (tick % random.nextInt(2, 5) == 0L) {
emit(Unit)
}
}
transform { flow ->
flow.transform { tick ->
if (tick % random.nextInt(2, 5) == 0L) {
emit(Unit)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import kotlinx.coroutines.delay
import kotlin.math.roundToLong
import kotlin.random.Random

private val PAUSED_TICKER = Worker.createSideEffect(key = "noop") {}

class GameWorkflow(
private val playerWorkflow: PlayerWorkflow,
private val aiWorkflows: List<ActorWorkflow>,
Expand Down Expand Up @@ -104,7 +102,7 @@ class GameWorkflow(
): GameRendering {
val running = !state.game.isPlayerEaten
// Stop actors from ticking if the game is paused or finished.
val ticker = if (running) createTickerWorker(input.ticksPerSecond) else PAUSED_TICKER
val ticker = if (running) createTickerWorker(input.ticksPerSecond) else Worker.finished()
val game = state.game
val board = input.board

Expand Down
69 changes: 69 additions & 0 deletions kotlin/workflow-core/src/main/java/com/squareup/workflow/Worker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlin.experimental.ExperimentalTypeInference
import kotlin.reflect.KClass

/**
* Represents a unit of asynchronous work that can have zero, one, or multiple outputs.
*
* [Worker]s are effectively [Flow]s that can be compared to determine equivalence.
*
* A [Workflow] uses [Worker]s to perform asynchronous work during the render pass by calling
* [RenderContext.onWorkerOutput] or [RenderContext.runningWorker]. When equivalent [Worker]s are
* passed in subsequent render passes, [doesSameWorkAs] is used to calculate which [Worker]s are
Expand Down Expand Up @@ -183,6 +186,11 @@ interface Worker<out T> {
block: suspend () -> Unit
): Worker<Nothing> = TypedWorker(Nothing::class, key, flow { block() })

/**
* Returns a [Worker] that finishes immediately without emitting anything.
*/
fun finished(): Worker<Nothing> = FinishedWorker

/**
* Creates a [Worker] from a function that returns a single value.
*
Expand Down Expand Up @@ -290,6 +298,49 @@ inline fun <reified T> ReceiveChannel<T>.asWorker(
}
}

/**
* Returns a [Worker] that transforms this [Worker]'s [Flow] by calling [transform].
*
* The returned worker is considered equivalent with any other worker returned by this function
* with the same receiver and the same [key].
*
* ## Examples
*
* ### Same source and key are equivalent
*
* ```
* val secondsWorker = millisWorker.transform {
* it.map { millis -> millis / 1000 }.distinctUntilChanged()
* }
*
* val otherSecondsWorker = millisWorker.transform {
* it.map { millis -> millis.toSeconds() }
* }
*
* assert(secondsWorker.doesSameWorkAs(otherSecondsWorker))
* ```
*
* ### Different sources are not equivalent
*
* ```
* val secondsWorker = millisWorker.transform {
* it.map { millis -> millis / 1000 }.distinctUntilChanged()
* }
*
* val otherSecondsWorker = secondsWorker.transform { it }
*
* assert(!secondsWorker.doesSameWorkAs(otherSecondsWorker))
* ```
*/
fun <T, R> Worker<T>.transform(
key: String = "",
transform: (Flow<T>) -> Flow<R>
): Worker<R> = WorkerWrapper(
wrapped = this,
flow = transform(run()),
key = key
)

/**
* A generic [Worker] implementation that defines equivalent workers as those having equivalent
* [key]s and equivalent [type]s. This is used by all the [Worker] builder functions.
Expand Down Expand Up @@ -327,3 +378,21 @@ private class TimerWorker(
override fun doesSameWorkAs(otherWorker: Worker<*>): Boolean =
otherWorker is TimerWorker && otherWorker.key == key
}

private object FinishedWorker : Worker<Nothing> {
override fun run(): Flow<Nothing> = emptyFlow()
override fun doesSameWorkAs(otherWorker: Worker<*>): Boolean = otherWorker === FinishedWorker
}

private class WorkerWrapper<T, R>(
private val wrapped: Worker<T>,
private val flow: Flow<R>,
private val key: String
) : Worker<R> {
override fun run(): Flow<R> = flow
override fun doesSameWorkAs(otherWorker: Worker<*>): Boolean {
return otherWorker is WorkerWrapper<*, *> &&
key == otherWorker.key &&
wrapped.doesSameWorkAs(otherWorker.wrapped)
}
}

0 comments on commit dbe799c

Please sign in to comment.