Skip to content

Commit

Permalink
Rename Stream to Action.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Mar 22, 2022
1 parent 059425a commit ffd7db3
Show file tree
Hide file tree
Showing 25 changed files with 431 additions and 316 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- **Breaking**: Restructuring `Transition` type into `Transition<State, Event>` and `Transition.Result<State>`
- **Breaking**: Make `Formula` abstract class instead of an interface.
- **Breaking**: Introducing `Snapshot` type and changing `evaluate` signature.
- Renaming `Stream` to `Action` (non-breaking change for now)

## [0.7.0] - June 30, 2021
- **Breaking**: Remove `events(observable) { }` utility function.
Expand Down
56 changes: 28 additions & 28 deletions docs/async_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,45 @@ All asynchronous events have to be declared within `Formula.evaluate` function.
override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
output = createRenderModel(state.taskList),
// All async events need to be declared within "context.updates" block.
updates = context.updates {
// Convert RxJava observable to a Formula Stream.
val taskStream = RxStream.fromObservable(taskRepo::tasks)
// All async events need to be declared within "context.actions" block.
actions = context.actions {
// Convert RxJava observable to a Formula Action.
val taskAction = RxAction.fromObservable { taskRepo.tasks() }
// Tell Formula that you want to listen to these events
events(taskStream) { newTaskList ->
// update our state
transition(state.copy(taskList = newTaskList))
taskAction.onEvent { newTaskList ->
// update our state
transition(state.copy(taskList = newTaskList))
}
}
)
}

```

Formula uses a `Stream` interface to define an asynchronous event producers/sources.
Formula uses a `Action` interface to define an asynchronous event producers/sources.
```kotlin
interface Stream<Event> {
interface Action<Event> {
fun start(send: (Event) -> Unit): Cancelable?
}
```

In this example we used an `RxStream.fromObservable` to convert from an `Observable` to a `Stream` instance.
In this example we used an `RxAction.fromObservable` to convert from an `Observable` to a `Action` instance.

Instead of us subscribing to the observable/stream directly, the runtime manages the subscriptions for us.
It will subscribe the first time `events` is called and unsubscribe when our Formula is removed or
if we don't return it anymore. For example, it is okay to have conditional logic.
Instead of us subscribing to the observable directly, the runtime manages the subscriptions for us.
It will subscribe the first time the action is returned as part of evaluation output and unsubscribe
when our Formula is removed or if we don't return it anymore. For example, it is okay to have conditional logic.
```kotlin
context.updates {
context.actions {
if (state.locationTrackingEnabled) {
val locationStream = RxStream.fromObservable { locationManager.updates() }
events(locationStream) { event ->
val locationAction = RxAction.fromObservable { locationManager.updates() }
events(locationAction) { event ->
transition(state.copy(location = event.location))
}
}
}
```

If `state.locationTrackingEnabled` changes from `true` to `false`, we won't return this `Stream`
If `state.locationTrackingEnabled` changes from `true` to `false`, we won't return this `Action`
anymore and the runtime will unsubscribe.

### Fetching data
Expand All @@ -76,8 +76,8 @@ class TaskFormula(val taskRepo: TaskRepo): Formula {

override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
updates = context.updates {
val fetchTask = RxStream.fromObservable(key = input.taskId) { taskRepo.fetchTask(input.taskId) }
actions = context.actions {
val fetchTask = RxAction.fromObservable(key = input.taskId) { taskRepo.fetchTask(input.taskId) }
events(fetchTask) { taskResponse ->
transition(state.copy(task = taskResponse))
}
Expand All @@ -87,28 +87,28 @@ class TaskFormula(val taskRepo: TaskRepo): Formula {
}
```

The `key` parameter enables us to distinguish between different streams. If `input.taskId` changes, we will
cancel the currently running `Stream` and start a new one.
The `key` parameter enables us to distinguish between different actions. If `input.taskId` changes, we will
cancel the currently running `Action` and start a new one.

```
Note: we are not handling errors in this example. The best practice is to emit errors as data using the onNext instead
of emitting them through onError.
```

### Extending Stream Interface
If you need to use a different mechanism for asynchronous events, you can extend `Stream` interface.
### Extending Action Interface
If you need to use a different mechanism for asynchronous events, you can extend `Action` interface.
```kotlin
interface Stream<Event> {
interface Action<Event> {
fun start(send: (Event) -> Unit): Cancelable?
}
```


For example, let's say we want to track network status (I'm going to use mock network status APIs).
```kotlin
class NetworkStatusStream(
class NetworkStatusAction(
val manager: NetworkStatusManager
) : Stream<NetworkStatus> {
) : Action<NetworkStatus> {

override fun start(send: (NetworkStatus) -> Unit): Cancelable? {
val listener = object: NetworkStatusListener {
Expand All @@ -125,11 +125,11 @@ class NetworkStatusStream(

We can now hook this up within our Formula:
```kotlin
class MyFormula(val networkStatus: NetworkStatusStream): Formula<Input, State, Output> {
class MyFormula(val networkStatus: NetworkStatusAction): Formula<Input, State, Output> {

override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
updates = context.updates {
actions = context.actions {
events(networkStatus) { status ->
val updated = status.copy(isOnline = status.isOnline)
transition(updated)
Expand Down
2 changes: 1 addition & 1 deletion docs/diffing.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ TODO
### Diffing
Given that we recompute everything with each state change, there is an internal diffing mechanism with Formula. This
mechanism ensures that:
1. RxJava streams are only subscribed to once.
1. RxJava actions are only subscribed to once.
2. Children state is persisted across every processing pass.
6 changes: 3 additions & 3 deletions docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TaskDetailFormula @Inject constructor(
return Evaluation(
output = state.task,
updates = context.updates {
RxStream.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
RxAction.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
val renderModel = TaskDetailRenderModel(
title = task.title,
// Don't do: calling context.onEvent within "onEvent" will cause a crash described above
Expand All @@ -60,7 +60,7 @@ class TaskDetailFormula @Inject constructor(
}
}
```
which the render model and then stores it in the `State`, we would store the fetched task from the RxStream in
which the render model and then stores it in the `State`, we would store the fetched task from the RxAction in
the state and then construct the render model in the `evaluation` function itself:
```
class TaskDetailFormula @Inject constructor(
Expand Down Expand Up @@ -91,7 +91,7 @@ class TaskDetailFormula @Inject constructor(
return Evaluation(
output = renderModel,
updates = context.updates {
RxStream.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
RxAction.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
transition(state.copy(task = renderModel))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
package com.instacart.formula.coroutines

import com.instacart.formula.Action
import com.instacart.formula.Cancelable
import com.instacart.formula.Stream
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach

/**
* Formula [Stream] adapter to enable coroutine's Flow use.
* Formula [Action] adapter to enable coroutine's Flow use.
*/
interface FlowStream<Event> : Stream<Event> {
interface FlowAction<Event> : Action<Event> {

companion object {
/**
* Creates a [Stream] from a [Flow] factory [create].
* Creates an [Action] from a [Flow] factory [create].
*
* ```
* events(FlowStream.fromFlow { locationManager.updates() }) { event ->
* events(FlowAction.fromFlow { locationManager.updates() }) { event ->
* transition()
* }
* ```
*/
inline fun <Event> fromFlow(
scope: CoroutineScope = MainScope(),
crossinline create: () -> Flow<Event>
): Stream<Event> {
return object : FlowStream<Event> {
): Action<Event> {
return object : FlowAction<Event> {

override val scope: CoroutineScope = scope

Expand All @@ -40,22 +40,22 @@ interface FlowStream<Event> : Stream<Event> {
}

/**
* Creates a [Stream] from a [Flow] factory [create].
* Creates an [Action] from a [Flow] factory [create].
*
* ```
* events(FlowStream.fromFlow(itemId) { repo.fetchItem(itemId) }) { event ->
* events(FlowAction.fromFlow(itemId) { repo.fetchItem(itemId) }) { event ->
* transition()
* }
* ```
*
* @param key Used to distinguish this [Stream] from other streams.
* @param key Used to distinguish this [Action] from other actions.
*/
inline fun <Event> fromFlow(
scope: CoroutineScope = MainScope(),
key: Any?,
crossinline create: () -> Flow<Event>
): Stream<Event> {
return object : FlowStream<Event> {
): Action<Event> {
return object : FlowAction<Event> {
override val scope: CoroutineScope = scope

override fun flow(): Flow<Event> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ abstract class FlowFormula<Input : Any, Output : Any> : StreamFormula<Input, Out
abstract fun flow(input: Input): Flow<Output>

final override fun stream(input: Input): Stream<Output> {
return FlowStream.fromFlow {
return FlowAction.fromFlow {
flow(input)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.instacart.formula.rxjava3

import com.instacart.formula.Action
import com.instacart.formula.Cancelable
import io.reactivex.rxjava3.core.Observable

/**
* Formula [Action] adapter to enable RxJava use.
*/
interface RxAction<Event> : Action<Event> {
companion object {
/**
* Creates an [Action] from an [Observable] factory [create].
*
* ```
* RxAction.fromObservable { locationManager.updates() }.onEvent { event ->
* transition()
* }
* ```
*/
inline fun <Event> fromObservable(
crossinline create: () -> Observable<Event>
): Action<Event> {
return object : RxAction<Event> {

override fun observable(): Observable<Event> {
return create()
}

override fun key(): Any = Unit
}
}

/**
* Creates an [Action] from an [Observable] factory [create].
*
* ```
* RxAction.fromObservable(itemId) { repo.fetchItem(itemId) }.onEvent { event ->
* transition()
* }
* ```
*
* @param key Used to distinguish this [Action] from other actions.
*/
inline fun <Event> fromObservable(
key: Any?,
crossinline create: () -> Observable<Event>
): Action<Event> {
return object : RxAction<Event> {

override fun observable(): Observable<Event> {
return create()
}

override fun key(): Any? = key
}
}
}

fun observable(): Observable<Event>

override fun start(send: (Event) -> Unit): Cancelable? {
val disposable = observable().subscribe(send)
return Cancelable(disposable::dispose)
}
}
Original file line number Diff line number Diff line change
@@ -1,66 +1,4 @@
package com.instacart.formula.rxjava3

import com.instacart.formula.Cancelable
import com.instacart.formula.Stream
import io.reactivex.rxjava3.core.Observable

/**
* Formula [Stream] adapter to enable RxJava use.
*/
interface RxStream<Event> : Stream<Event> {
companion object {
/**
* Creates a [Stream] from an [Observable] factory [create].
*
* ```
* events(RxStream.fromObservable { locationManager.updates() }) { event ->
* transition()
* }
* ```
*/
inline fun <Event> fromObservable(
crossinline create: () -> Observable<Event>
): Stream<Event> {
return object : RxStream<Event> {

override fun observable(): Observable<Event> {
return create()
}

override fun key(): Any = Unit
}
}

/**
* Creates a [Stream] from an [Observable] factory [create].
*
* ```
* events(RxStream.fromObservable(itemId) { repo.fetchItem(itemId) }) { event ->
* transition()
* }
* ```
*
* @param key Used to distinguish this [Stream] from other streams.
*/
inline fun <Event> fromObservable(
key: Any?,
crossinline create: () -> Observable<Event>
): Stream<Event> {
return object : RxStream<Event> {

override fun observable(): Observable<Event> {
return create()
}

override fun key(): Any? = key
}
}
}

fun observable(): Observable<Event>

override fun start(send: (Event) -> Unit): Cancelable? {
val disposable = observable().subscribe(send)
return Cancelable(disposable::dispose)
}
}
@Deprecated("use RxAction")
typealias RxStream<Event> = RxAction<Event>
Loading

0 comments on commit ffd7db3

Please sign in to comment.