Skip to content

Commit

Permalink
Rename Stream to Action.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Mar 23, 2022
1 parent 059425a commit 4fed82d
Show file tree
Hide file tree
Showing 27 changed files with 448 additions and 354 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- **Breaking**: Restructuring `Transition` type into `Transition<State, Event>` and `Transition.Result<State>`
- **Breaking**: Make `Formula` abstract class instead of an interface.
- **Breaking**: Introducing `Snapshot` type and changing `evaluate` signature.
- Renaming `Stream` to `Action` (non-breaking change for now)

## [0.7.0] - June 30, 2021
- **Breaking**: Remove `events(observable) { }` utility function.
Expand Down
19 changes: 10 additions & 9 deletions docs/Declarative-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ disposables += fetchUserObservable.subscribe { userResult ->

Formula does things a bit differently. It manages the lifecycle of the asynchronous actions for you. Instead of manually subscribing and unsubscribing,
you define the conditions for which the asynchronous action should run and the listener which handles events produced by the action.

```kotlin
val fetchUserStream = RxStream.fromObservable { repository.fetchUser() }
fetchUserStream.onEvent { userResult ->
// Do something
val fetchUserAction = RxAction.fromObservable { repository.fetchUser() }
fetchUserAction.onEvent { userResult ->
// Do something
}
```

Expand All @@ -22,8 +23,8 @@ is deferred. It might not be clear why this is useful just from this example, bu
For example, we can add conditional logic to only fetch user when user id is set.
```kotlin
if (state.userId != null) {
val fetchUserStream = RxStream.fromObservable { repository.fetchUser(state.userId) }
fetchUserStream.onEvent { userResult ->
val fetchUserAction = RxAction.fromObservable { repository.fetchUser(state.userId) }
fetchUserAction.onEvent { userResult ->
// Do something with the result
}
}
Expand All @@ -43,8 +44,8 @@ What if for some unusual reason the userId could change and we would want to ref
define this behavior using `key` parameter
```kotlin
if (state.isUserFetchEnabled && state.userId != null) {
val fetchUserStream = RxStream.fromObservable(key = state.userId) { repository.fetchUser(state.userId) }
fetchUserStream.onEvent { userResult ->
val fetchUserAction = RxAction.fromObservable(key = state.userId) { repository.fetchUser(state.userId) }
fetchUserAction.onEvent { userResult ->
// Do something with the result
}
}
Expand Down Expand Up @@ -96,8 +97,8 @@ to `State`:
This means that similarly to `State`, we can also use `Input` to define action conditions.
```kotlin
if (input.userId != null) {
val fetchUserStream = RxStream.fromObservable { repository.fetchUser(input.userId) }
fetchUserStream.onEvent { userResult ->
val fetchUserAction = RxAction.fromObservable { repository.fetchUser(input.userId) }
fetchUserAction.onEvent { userResult ->
// Do something with the result
}
}
Expand Down
56 changes: 28 additions & 28 deletions docs/async_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,45 @@ All asynchronous events have to be declared within `Formula.evaluate` function.
override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
output = createRenderModel(state.taskList),
// All async events need to be declared within "context.updates" block.
updates = context.updates {
// Convert RxJava observable to a Formula Stream.
val taskStream = RxStream.fromObservable(taskRepo::tasks)
// All async events need to be declared within "context.actions" block.
actions = context.actions {
// Convert RxJava observable to a Formula Action.
val taskAction = RxAction.fromObservable { taskRepo.tasks() }
// Tell Formula that you want to listen to these events
events(taskStream) { newTaskList ->
// update our state
transition(state.copy(taskList = newTaskList))
taskAction.onEvent { newTaskList ->
// update our state
transition(state.copy(taskList = newTaskList))
}
}
)
}

```

Formula uses a `Stream` interface to define an asynchronous event producers/sources.
Formula uses a `Action` interface to define an asynchronous event producers/sources.
```kotlin
interface Stream<Event> {
interface Action<Event> {
fun start(send: (Event) -> Unit): Cancelable?
}
```

In this example we used an `RxStream.fromObservable` to convert from an `Observable` to a `Stream` instance.
In this example we used an `RxAction.fromObservable` to convert from an `Observable` to a `Action` instance.

Instead of us subscribing to the observable/stream directly, the runtime manages the subscriptions for us.
It will subscribe the first time `events` is called and unsubscribe when our Formula is removed or
if we don't return it anymore. For example, it is okay to have conditional logic.
Instead of us subscribing to the observable directly, the runtime manages the subscriptions for us.
It will subscribe the first time the action is returned as part of evaluation output and unsubscribe
when our Formula is removed or if we don't return it anymore. For example, it is okay to have conditional logic.
```kotlin
context.updates {
context.actions {
if (state.locationTrackingEnabled) {
val locationStream = RxStream.fromObservable { locationManager.updates() }
events(locationStream) { event ->
val locationAction = RxAction.fromObservable { locationManager.updates() }
events(locationAction) { event ->
transition(state.copy(location = event.location))
}
}
}
```

If `state.locationTrackingEnabled` changes from `true` to `false`, we won't return this `Stream`
If `state.locationTrackingEnabled` changes from `true` to `false`, we won't return this `Action`
anymore and the runtime will unsubscribe.

### Fetching data
Expand All @@ -76,8 +76,8 @@ class TaskFormula(val taskRepo: TaskRepo): Formula {

override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
updates = context.updates {
val fetchTask = RxStream.fromObservable(key = input.taskId) { taskRepo.fetchTask(input.taskId) }
actions = context.actions {
val fetchTask = RxAction.fromObservable(key = input.taskId) { taskRepo.fetchTask(input.taskId) }
events(fetchTask) { taskResponse ->
transition(state.copy(task = taskResponse))
}
Expand All @@ -87,28 +87,28 @@ class TaskFormula(val taskRepo: TaskRepo): Formula {
}
```

The `key` parameter enables us to distinguish between different streams. If `input.taskId` changes, we will
cancel the currently running `Stream` and start a new one.
The `key` parameter enables us to distinguish between different actions. If `input.taskId` changes, we will
cancel the currently running `Action` and start a new one.

```
Note: we are not handling errors in this example. The best practice is to emit errors as data using the onNext instead
of emitting them through onError.
```

### Extending Stream Interface
If you need to use a different mechanism for asynchronous events, you can extend `Stream` interface.
### Extending Action Interface
If you need to use a different mechanism for asynchronous events, you can extend `Action` interface.
```kotlin
interface Stream<Event> {
interface Action<Event> {
fun start(send: (Event) -> Unit): Cancelable?
}
```


For example, let's say we want to track network status (I'm going to use mock network status APIs).
```kotlin
class NetworkStatusStream(
class NetworkStatusAction(
val manager: NetworkStatusManager
) : Stream<NetworkStatus> {
) : Action<NetworkStatus> {

override fun start(send: (NetworkStatus) -> Unit): Cancelable? {
val listener = object: NetworkStatusListener {
Expand All @@ -125,11 +125,11 @@ class NetworkStatusStream(

We can now hook this up within our Formula:
```kotlin
class MyFormula(val networkStatus: NetworkStatusStream): Formula<Input, State, Output> {
class MyFormula(val networkStatus: NetworkStatusAction): Formula<Input, State, Output> {

override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
updates = context.updates {
actions = context.actions {
events(networkStatus) { status ->
val updated = status.copy(isOnline = status.isOnline)
transition(updated)
Expand Down
2 changes: 1 addition & 1 deletion docs/diffing.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ TODO
### Diffing
Given that we recompute everything with each state change, there is an internal diffing mechanism with Formula. This
mechanism ensures that:
1. RxJava streams are only subscribed to once.
1. RxJava actions are only subscribed to once.
2. Children state is persisted across every processing pass.
6 changes: 3 additions & 3 deletions docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TaskDetailFormula @Inject constructor(
return Evaluation(
output = state.task,
updates = context.updates {
RxStream.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
RxAction.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
val renderModel = TaskDetailRenderModel(
title = task.title,
// Don't do: calling context.onEvent within "onEvent" will cause a crash described above
Expand All @@ -60,7 +60,7 @@ class TaskDetailFormula @Inject constructor(
}
}
```
which the render model and then stores it in the `State`, we would store the fetched task from the RxStream in
which the render model and then stores it in the `State`, we would store the fetched task from the RxAction in
the state and then construct the render model in the `evaluation` function itself:
```
class TaskDetailFormula @Inject constructor(
Expand Down Expand Up @@ -91,7 +91,7 @@ class TaskDetailFormula @Inject constructor(
return Evaluation(
output = renderModel,
updates = context.updates {
RxStream.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
RxAction.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
transition(state.copy(task = renderModel))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
package com.instacart.formula.coroutines

import com.instacart.formula.Action
import com.instacart.formula.Cancelable
import com.instacart.formula.Stream
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach

/**
* Formula [Stream] adapter to enable coroutine's Flow use.
* Formula [Action] adapter to enable coroutine's Flow use.
*/
interface FlowStream<Event> : Stream<Event> {
interface FlowAction<Event> : Action<Event> {

companion object {
/**
* Creates a [Stream] from a [Flow] factory [create].
* Creates an [Action] from a [Flow] factory [create].
*
* ```
* events(FlowStream.fromFlow { locationManager.updates() }) { event ->
* events(FlowAction.fromFlow { locationManager.updates() }) { event ->
* transition()
* }
* ```
*/
inline fun <Event> fromFlow(
scope: CoroutineScope = MainScope(),
crossinline create: () -> Flow<Event>
): Stream<Event> {
return object : FlowStream<Event> {
): Action<Event> {
return object : FlowAction<Event> {

override val scope: CoroutineScope = scope

Expand All @@ -40,22 +40,22 @@ interface FlowStream<Event> : Stream<Event> {
}

/**
* Creates a [Stream] from a [Flow] factory [create].
* Creates an [Action] from a [Flow] factory [create].
*
* ```
* events(FlowStream.fromFlow(itemId) { repo.fetchItem(itemId) }) { event ->
* events(FlowAction.fromFlow(itemId) { repo.fetchItem(itemId) }) { event ->
* transition()
* }
* ```
*
* @param key Used to distinguish this [Stream] from other streams.
* @param key Used to distinguish this [Action] from other actions.
*/
inline fun <Event> fromFlow(
scope: CoroutineScope = MainScope(),
key: Any?,
crossinline create: () -> Flow<Event>
): Stream<Event> {
return object : FlowStream<Event> {
): Action<Event> {
return object : FlowAction<Event> {
override val scope: CoroutineScope = scope

override fun flow(): Flow<Event> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.instacart.formula.coroutines

import com.instacart.formula.Stream
import com.instacart.formula.StreamFormula
import com.instacart.formula.Action
import com.instacart.formula.ActionFormula
import kotlinx.coroutines.flow.Flow

abstract class FlowFormula<Input : Any, Output : Any> : StreamFormula<Input, Output>() {
abstract class FlowFormula<Input : Any, Output : Any> : ActionFormula<Input, Output>() {

abstract override fun initialValue(input: Input): Output

abstract fun flow(input: Input): Flow<Output>

final override fun stream(input: Input): Stream<Output> {
return FlowStream.fromFlow {
final override fun action(input: Input): Action<Output> {
return FlowAction.fromFlow {
flow(input)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.instacart.formula.rxjava3

import com.instacart.formula.Action
import com.instacart.formula.Cancelable
import io.reactivex.rxjava3.core.Observable

/**
* Adapter which maps RxJava types to [Action] type. Take a look
* at [RxAction.fromObservable].
*/
interface RxAction<Event> : Action<Event> {
companion object {
/**
* Creates an [Action] from an [Observable] factory [create].
*
* ```
* RxAction.fromObservable { locationManager.updates() }.onEvent { event ->
* transition()
* }
* ```
*/
inline fun <Event> fromObservable(
crossinline create: () -> Observable<Event>
): Action<Event> {
return object : RxAction<Event> {

override fun observable(): Observable<Event> {
return create()
}

override fun key(): Any = Unit
}
}

/**
* Creates an [Action] from an [Observable] factory [create].
*
* ```
* RxAction.fromObservable(itemId) { repo.fetchItem(itemId) }.onEvent { event ->
* transition()
* }
* ```
*
* @param key Used to distinguish this [Action] from other actions.
*/
inline fun <Event> fromObservable(
key: Any?,
crossinline create: () -> Observable<Event>
): Action<Event> {
return object : RxAction<Event> {

override fun observable(): Observable<Event> {
return create()
}

override fun key(): Any? = key
}
}
}

fun observable(): Observable<Event>

override fun start(send: (Event) -> Unit): Cancelable? {
val disposable = observable().subscribe(send)
return Cancelable(disposable::dispose)
}
}
Loading

0 comments on commit 4fed82d

Please sign in to comment.