Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate asWorker() extension on deferreds and channels. #19

Merged
merged 1 commit into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class TerminalWorkflowRunner(
@Suppress("BlockingMethodInNonBlockingContext")
fun run(workflow: TerminalWorkflow): ExitCode = runBlocking {
val keyStrokesChannel = screen.listenForKeyStrokesOn(this + ioDispatcher)
@Suppress("DEPRECATION")
val keyStrokesWorker = keyStrokesChannel.asWorker()
val resizes = screen.terminal.listenForResizesOn(this)

Expand Down
13 changes: 12 additions & 1 deletion workflow-core/src/main/java/com/squareup/workflow/Worker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ interface Worker<out OutputT> {
*/
@OptIn(FlowPreview::class)
inline fun <reified OutputT> from(noinline block: suspend () -> OutputT): Worker<OutputT> =
block.asFlow().asWorker()
block.asFlow()
.asWorker()

/**
* Creates a [Worker] from a function that returns a single value.
Expand Down Expand Up @@ -285,6 +286,13 @@ inline fun <reified OutputT> Flow<OutputT>.asWorker(): Worker<OutputT> =
* Worker.from { doThing().await() }
* ```
*/
@Deprecated(
"Use Worker.from { await() }",
ReplaceWith(
"Worker.from { this.await() }",
"com.squareup.workflow.Worker"
)
)
inline fun <reified OutputT> Deferred<OutputT>.asWorker(): Worker<OutputT> =
from { await() }

Expand All @@ -295,6 +303,8 @@ inline fun <reified OutputT> Deferred<OutputT>.asWorker(): Worker<OutputT> =
FlowPreview::class,
ExperimentalCoroutinesApi::class
)
@Suppress("DeprecatedCallableAddReplaceWith")
@Deprecated("Use SharedFlow or StateFlow with Flow.asWorker()")
inline fun <reified OutputT> BroadcastChannel<OutputT>.asWorker(): Worker<OutputT> =
asFlow().asWorker()

Expand All @@ -314,6 +324,7 @@ inline fun <reified OutputT> BroadcastChannel<OutputT>.asWorker(): Worker<Output
* True by default.
*/
@OptIn(ExperimentalCoroutinesApi::class)
@Deprecated("Use consumeAsFlow() or receiveAsFlow() with Flow.asWorker().")
inline fun <reified OutputT> ReceiveChannel<OutputT>.asWorker(
closeOnCancel: Boolean = true
): Worker<OutputT> = create {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
Expand Down Expand Up @@ -140,7 +141,10 @@ class RenderWorkflowInTest {
@Test fun `onOutput called when output emitted`() {
val trigger = Channel<String>()
val workflow = Workflow.stateless<Unit, String, Unit> {
runningWorker(trigger.asWorker()) { action { setOutput(it) } }
runningWorker(
trigger.consumeAsFlow()
.asWorker()
) { action { setOutput(it) } }
}
val receivedOutputs = mutableListOf<String>()
renderWorkflowIn(workflow, scope, MutableStateFlow(Unit)) { receivedOutputs += it }
Expand Down Expand Up @@ -206,7 +210,7 @@ class RenderWorkflowInTest {
val workflow = Workflow.stateful<Unit, Boolean, Nothing, Unit>(
initialState = { false },
render = { _, throwNow ->
runningWorker(trigger.asWorker()) { action { nextState = true } }
runningWorker(Worker.from { trigger.await() }) { action { nextState = true } }
if (throwNow) {
throw ExpectedException()
}
Expand All @@ -226,7 +230,7 @@ class RenderWorkflowInTest {
val trigger = CompletableDeferred<Unit>()
// Throws an exception when trigger is completed.
val workflow = Workflow.stateless<Unit, Nothing, Unit> {
runningWorker(trigger.asWorker()) {
runningWorker(Worker.from { trigger.await() }) {
action {
throw ExpectedException()
}
Expand Down Expand Up @@ -324,7 +328,7 @@ class RenderWorkflowInTest {
val trigger = CompletableDeferred<Unit>()
// Emits a Unit when trigger is completed.
val workflow = Workflow.stateless<Unit, Unit, Unit> {
runningWorker(trigger.asWorker()) { action { setOutput(Unit) } }
runningWorker(Worker.from { trigger.await() }) { action { setOutput(Unit) } }
}
renderWorkflowIn(workflow, scope, MutableStateFlow(Unit)) {
throw ExpectedException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
Expand All @@ -46,7 +47,8 @@ class WorkersTest {

@Test fun `launchWorker propagates backpressure`() {
val channel = Channel<String>()
val worker = channel.asWorker()
val worker = channel.consumeAsFlow()
.asWorker()
// Used to assert ordering.
val counter = AtomicInteger(0)

Expand Down Expand Up @@ -110,7 +112,10 @@ class WorkersTest {
val channel = Channel<String>(capacity = 1)

runBlocking {
val workerOutputs = launchWorker(channel.asWorker())
val workerOutputs = launchWorker(
channel.consumeAsFlow()
.asWorker()
)
assertTrue(workerOutputs.isEmpty)

channel.close()
Expand All @@ -122,7 +127,10 @@ class WorkersTest {
val channel = Channel<String>(capacity = 1)

runBlocking {
val workerOutputs = launchWorker(channel.asWorker())
val workerOutputs = launchWorker(
channel.consumeAsFlow()
.asWorker()
)
assertTrue(workerOutputs.isEmpty)

channel.send("foo")
Expand All @@ -140,7 +148,10 @@ class WorkersTest {
// Needed so that cancelling the channel doesn't cancel our job, which means receive will
// throw the JobCancellationException instead of the actual channel failure.
supervisorScope {
val workerOutputs = launchWorker(channel.asWorker())
val workerOutputs = launchWorker(
channel.consumeAsFlow()
.asWorker()
)
assertTrue(workerOutputs.isEmpty)

channel.close(ExpectedException())
Expand All @@ -153,7 +164,10 @@ class WorkersTest {
val channel = Channel<String>(capacity = 1)

runBlocking {
val workerOutputs = launchWorker(channel.asWorker())
val workerOutputs = launchWorker(
channel.consumeAsFlow()
.asWorker()
)
channel.close()
assertTrue(workerOutputs.receive().isDone)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.test.runBlockingTest
Expand Down Expand Up @@ -167,7 +168,8 @@ class WorkflowDiagnosticListenerIntegrationTest {
@Test fun `workflow updates emit events in order`() {
val props = MutableStateFlow("initial props")
val channel = Channel<String>()
val worker = channel.asWorker()
val worker = channel.consumeAsFlow()
.asWorker()
fun action(value: String) = action<Nothing, String> { setOutput("output:$value") }
val workflow = Workflow.stateless<String, String, Unit> {
runningWorker(worker, "key", ::action)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ class WorkflowDiagnosticListenerLegacyIntegrationTest {
@Test fun `workflow updates emit events in order`() {
val propsChannel = Channel<String>(1).apply { offer("initial props") }
val channel = Channel<String>()
val worker = channel.asWorker()
val worker = channel.consumeAsFlow()
.asWorker()
fun action(value: String) = action<Nothing, String> { setOutput("output:$value") }
val workflow = Workflow.stateless<String, String, Unit> {
runningWorker(worker, "key", ::action)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
Expand Down Expand Up @@ -156,7 +157,10 @@ class WorkerCompositionIntegrationTest {
@Test fun `runningWorker gets error`() {
val channel = Channel<String>()
val workflow = Workflow.stateless<Unit, String, Unit> {
runningWorker(channel.asWorker()) { action { setOutput(it) } }
runningWorker(
channel.consumeAsFlow()
.asWorker()
) { action { setOutput(it) } }
}

assertFailsWith<ExpectedException> {
Expand All @@ -173,7 +177,10 @@ class WorkerCompositionIntegrationTest {
@Test fun `onWorkerOutput does nothing when worker finished`() {
val channel = Channel<Unit>()
val workflow = Workflow.stateless<Unit, Unit, Unit> {
runningWorker(channel.asWorker()) { fail("Expected handler to not be invoked.") }
runningWorker(
channel.consumeAsFlow()
.asWorker()
) { fail("Expected handler to not be invoked.") }
}

workflow.testFromStart {
Expand Down Expand Up @@ -270,7 +277,7 @@ class WorkerCompositionIntegrationTest {
val workflow = Workflow.stateless<Unit, CoroutineContext, Unit> {
renderChild(leafWorkflow) { action { setOutput(it) } }
}
val job: Job = Job()
val job = Job()

workflow.testFromStart(context = job) {
val actualWorkerContext = awaitNextOutput()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ class WorkerStressTest {
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
@Test fun `multiple subscriptions to single channel when closed`() {
val channel = Channel<Unit>()
val workers = List(WORKER_COUNT / 2) { channel.asWorker() }
val workers = List(WORKER_COUNT / 2) {
channel.consumeAsFlow()
.asWorker()
}
val finishedWorkers = List(WORKER_COUNT / 2) {
channel.asWorker()
channel.consumeAsFlow()
.asWorker()
.transform { it.onCompletion { emit(Unit) } }
}
val action = action<Nothing, Unit> { setOutput(Unit) }
Expand Down Expand Up @@ -65,6 +69,8 @@ class WorkerStressTest {
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
@Test fun `multiple subscriptions to single channel when emits`() {
val channel = ConflatedBroadcastChannel(Unit)

@Suppress("DEPRECATION")
val workers = List(WORKER_COUNT) { channel.asWorker() }
val action = action<Nothing, Int> { setOutput(1) }
val workflow = Workflow.stateless<Unit, Int, Unit> {
Expand All @@ -88,4 +94,33 @@ class WorkerStressTest {
coroutineContext.cancelChildren()
}
}

@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
@Test fun `multiple subscriptions to single StateFlow when emits`() {
val flow = MutableStateFlow(Unit)

@Suppress("DEPRECATION")
val workers = List(WORKER_COUNT) { flow.asWorker() }
val action = action<Nothing, Int> { setOutput(1) }
val workflow = Workflow.stateless<Unit, Int, Unit> {
// Run lots of workers that will all see the same conflated channel value.
workers.forEachIndexed { i, worker ->
runningWorker(worker, key = i.toString()) { action }
}
}

runBlocking {
val outputs = Channel<Int>()
renderWorkflowIn(workflow, this, MutableStateFlow(Unit)) {
outputs.send(it)
}
val sum = outputs.consumeAsFlow()
.take(workers.size)
.reduce { sum, value -> sum + value }
assertEquals(WORKER_COUNT, sum)

// Cancel the runtime so the test can finish.
coroutineContext.cancelChildren()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.squareup.workflow

import com.squareup.workflow.testing.test
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.conflate
Expand Down Expand Up @@ -153,51 +152,6 @@ class WorkerTest {
}
}

@Test fun `ReceiveChannel asWorker emits`() {
val channel = Channel<String>(capacity = 1)
val worker = channel.asWorker()

worker.test {
channel.send("hello")
assertEquals("hello", nextOutput())

channel.send("world")
assertEquals("world", nextOutput())
}
}

@Test fun `ReceiveChannel asWorker finishes`() {
val channel = Channel<String>()
val worker = channel.asWorker()

worker.test {
channel.close()
assertFinished()
}
}

@Test fun `ReceiveChannel asWorker does close channel when cancelled`() {
val channel = Channel<Unit>(capacity = 1)
val worker = channel.asWorker()

worker.test {
cancelWorker()
assertTrue(channel.isClosedForReceive)
}
}

@Test
fun `ReceiveChannel asWorker doesn't close channel when cancelled when closeOnCancel false`() {
val channel = Channel<Unit>(capacity = 1)
val worker = channel.asWorker(closeOnCancel = false)

worker.test {
cancelWorker()
channel.send(Unit)
assertEquals(Unit, channel.receive())
}
}

@Test fun `timer returns equivalent workers keyed`() {
val worker1 = Worker.timer(1, "key")
val worker2 = Worker.timer(1, "key")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package com.squareup.workflow.testing

import com.squareup.workflow.Snapshot
import com.squareup.workflow.Worker
import com.squareup.workflow.Workflow
import com.squareup.workflow.asWorker
import com.squareup.workflow.internal.util.rethrowingUncaughtExceptions
import com.squareup.workflow.stateful
import com.squareup.workflow.stateless
Expand Down Expand Up @@ -57,7 +57,7 @@ class WorkflowTesterTest {
}

rethrowingUncaughtExceptions {
assertFailsWith<ExpectedException>() {
assertFailsWith<ExpectedException> {
workflow.testFromStart {
// Nothing to do.
}
Expand Down Expand Up @@ -166,7 +166,7 @@ class WorkflowTesterTest {
val deferred = CompletableDeferred<Unit>()
deferred.completeExceptionally(ExpectedException())
val workflow = Workflow.stateless<Unit, Unit, Unit> {
runningWorker(deferred.asWorker()) { fail("Shouldn't get here.") }
runningWorker(Worker.from { deferred.await() }) { fail("Shouldn't get here.") }
}

rethrowingUncaughtExceptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -162,7 +163,10 @@ class TracingDiagnosticListenerTest {
if (props == 0) return "initial"
if (props in 1..6) context.renderChild(this, 0) { bubbleUp(it) }
if (props in 4..5) context.renderChild(this, props = 1, key = "second") { bubbleUp(it) }
if (props in 2..3) context.runningWorker(channel.asWorker(false)) { bubbleUp(it) }
if (props in 2..3) context.runningWorker(
channel.receiveAsFlow()
.asWorker()
) { bubbleUp(it) }

return if (props > 10) "final" else "rendering"
}
Expand Down
Loading