Skip to content

Commit

Permalink
Rename Stream to Action. (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Mar 24, 2022
1 parent 059425a commit 2447967
Show file tree
Hide file tree
Showing 32 changed files with 488 additions and 366 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- **Breaking**: Restructuring `Transition` type into `Transition<State, Event>` and `Transition.Result<State>`
- **Breaking**: Make `Formula` abstract class instead of an interface.
- **Breaking**: Introducing `Snapshot` type and changing `evaluate` signature.
- Renaming `Stream` to `Action` (non-breaking change for now)

## [0.7.0] - June 30, 2021
- **Breaking**: Remove `events(observable) { }` utility function.
Expand Down
19 changes: 10 additions & 9 deletions docs/Declarative-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ disposables += fetchUserObservable.subscribe { userResult ->

Formula does things a bit differently. It manages the lifecycle of the asynchronous actions for you. Instead of manually subscribing and unsubscribing,
you define the conditions for which the asynchronous action should run and the listener which handles events produced by the action.

```kotlin
val fetchUserStream = RxStream.fromObservable { repository.fetchUser() }
fetchUserStream.onEvent { userResult ->
// Do something
val fetchUserAction = RxAction.fromObservable { repository.fetchUser() }
fetchUserAction.onEvent { userResult ->
// Do something
}
```

Expand All @@ -22,8 +23,8 @@ is deferred. It might not be clear why this is useful just from this example, bu
For example, we can add conditional logic to only fetch user when user id is set.
```kotlin
if (state.userId != null) {
val fetchUserStream = RxStream.fromObservable { repository.fetchUser(state.userId) }
fetchUserStream.onEvent { userResult ->
val fetchUserAction = RxAction.fromObservable { repository.fetchUser(state.userId) }
fetchUserAction.onEvent { userResult ->
// Do something with the result
}
}
Expand All @@ -43,8 +44,8 @@ What if for some unusual reason the userId could change and we would want to ref
define this behavior using `key` parameter
```kotlin
if (state.isUserFetchEnabled && state.userId != null) {
val fetchUserStream = RxStream.fromObservable(key = state.userId) { repository.fetchUser(state.userId) }
fetchUserStream.onEvent { userResult ->
val fetchUserAction = RxAction.fromObservable(key = state.userId) { repository.fetchUser(state.userId) }
fetchUserAction.onEvent { userResult ->
// Do something with the result
}
}
Expand Down Expand Up @@ -96,8 +97,8 @@ to `State`:
This means that similarly to `State`, we can also use `Input` to define action conditions.
```kotlin
if (input.userId != null) {
val fetchUserStream = RxStream.fromObservable { repository.fetchUser(input.userId) }
fetchUserStream.onEvent { userResult ->
val fetchUserAction = RxAction.fromObservable { repository.fetchUser(input.userId) }
fetchUserAction.onEvent { userResult ->
// Do something with the result
}
}
Expand Down
60 changes: 30 additions & 30 deletions docs/async_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ To show how Formula handles asynchronous events, we'll use a task app example. L
a task repository that exposes an RxJava `Observable<List<Task>>`.
```kotlin
interface TaskRepo {
fun tasks(): Observable<List<Task>>
fun getTaskList(): Observable<List<Task>>
}
```

Expand All @@ -13,45 +13,45 @@ All asynchronous events have to be declared within `Formula.evaluate` function.
override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
output = createRenderModel(state.taskList),
// All async events need to be declared within "context.updates" block.
updates = context.updates {
// Convert RxJava observable to a Formula Stream.
val taskStream = RxStream.fromObservable(taskRepo::tasks)
// All async events need to be declared within "context.actions" block.
actions = context.actions {
// Convert RxJava observable to a Formula Action.
val taskAction = RxAction.fromObservable { taskRepo.getTaskList() }
// Tell Formula that you want to listen to these events
events(taskStream) { newTaskList ->
// update our state
transition(state.copy(taskList = newTaskList))
taskAction.onEvent { newTaskList ->
// update our state
transition(state.copy(taskList = newTaskList))
}
}
)
}

```

Formula uses a `Stream` interface to define an asynchronous event producers/sources.
Formula uses a `Action` interface to define an asynchronous event producers/sources.
```kotlin
interface Stream<Event> {
interface Action<Event> {
fun start(send: (Event) -> Unit): Cancelable?
}
```

In this example we used an `RxStream.fromObservable` to convert from an `Observable` to a `Stream` instance.
In this example we used an `RxAction.fromObservable` to convert from an `Observable` to a `Action` instance.

Instead of us subscribing to the observable/stream directly, the runtime manages the subscriptions for us.
It will subscribe the first time `events` is called and unsubscribe when our Formula is removed or
if we don't return it anymore. For example, it is okay to have conditional logic.
Instead of us subscribing to the observable directly, the runtime manages the subscriptions for us.
It will subscribe the first time the action is returned as part of evaluation output and unsubscribe
when our Formula is removed or if we don't return it anymore. For example, it is okay to have conditional logic.
```kotlin
context.updates {
context.actions {
if (state.locationTrackingEnabled) {
val locationStream = RxStream.fromObservable { locationManager.updates() }
events(locationStream) { event ->
val locationAction = RxAction.fromObservable { locationManager.updates() }
events(locationAction) { event ->
transition(state.copy(location = event.location))
}
}
}
```

If `state.locationTrackingEnabled` changes from `true` to `false`, we won't return this `Stream`
If `state.locationTrackingEnabled` changes from `true` to `false`, we won't return this `Action`
anymore and the runtime will unsubscribe.

### Fetching data
Expand All @@ -76,8 +76,8 @@ class TaskFormula(val taskRepo: TaskRepo): Formula {

override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
updates = context.updates {
val fetchTask = RxStream.fromObservable(key = input.taskId) { taskRepo.fetchTask(input.taskId) }
actions = context.actions {
val fetchTask = RxAction.fromObservable(key = input.taskId) { taskRepo.fetchTask(input.taskId) }
events(fetchTask) { taskResponse ->
transition(state.copy(task = taskResponse))
}
Expand All @@ -87,28 +87,28 @@ class TaskFormula(val taskRepo: TaskRepo): Formula {
}
```

The `key` parameter enables us to distinguish between different streams. If `input.taskId` changes, we will
cancel the currently running `Stream` and start a new one.
The `key` parameter enables us to distinguish between different actions. If `input.taskId` changes, we will
cancel the currently running `Action` and start a new one.

```
Note: we are not handling errors in this example. The best practice is to emit errors as data using the onNext instead
of emitting them through onError.
```

### Extending Stream Interface
If you need to use a different mechanism for asynchronous events, you can extend `Stream` interface.
### Extending Action Interface
If you need to use a different mechanism for asynchronous events, you can extend `Action` interface.
```kotlin
interface Stream<Event> {
interface Action<Event> {
fun start(send: (Event) -> Unit): Cancelable?
}
```


For example, let's say we want to track network status (I'm going to use mock network status APIs).
```kotlin
class NetworkStatusStream(
class GetNetworkStatusAction(
val manager: NetworkStatusManager
) : Stream<NetworkStatus> {
) : Action<NetworkStatus> {

override fun start(send: (NetworkStatus) -> Unit): Cancelable? {
val listener = object: NetworkStatusListener {
Expand All @@ -125,12 +125,12 @@ class NetworkStatusStream(

We can now hook this up within our Formula:
```kotlin
class MyFormula(val networkStatus: NetworkStatusStream): Formula<Input, State, Output> {
class MyFormula(val getNetworkStatusAction: GetNetworkStatusAction): Formula<Input, State, Output> {

override fun Snapshot<Input, State>.evaluate(): Evaluation<Output> {
return Evaluation(
updates = context.updates {
events(networkStatus) { status ->
actions = context.actions {
getNetworkStatusAction.onEvent { status ->
val updated = status.copy(isOnline = status.isOnline)
transition(updated)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/diffing.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ TODO
### Diffing
Given that we recompute everything with each state change, there is an internal diffing mechanism with Formula. This
mechanism ensures that:
1. RxJava streams are only subscribed to once.
1. RxJava observables are only subscribed to once.
2. Children state is persisted across every processing pass.
6 changes: 3 additions & 3 deletions docs/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,17 @@ Evaluation(
output = ...,
updates = context.updates {
// Performs a side effect when formula is initialized
Stream.onInit().onEvent {
Action.onInit().onEvent {
transition { analytics.trackScreenOpen() }
}

// Performs a side effect when formula is terminated
Stream.onTerminate().onEvent {
Action.onTerminate().onEvent {
transition { analytics.trackClose() }
}

// Performs a side-effect when data changes
Stream.onData(state.itemId).onEvent {
Action.onData(state.itemId).onEvent {
// This will call api.fetchItem for each unique itemId
transition { api.fetchItem(state.itemId) }
}
Expand Down
6 changes: 3 additions & 3 deletions docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TaskDetailFormula @Inject constructor(
return Evaluation(
output = state.task,
updates = context.updates {
RxStream.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
RxAction.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
val renderModel = TaskDetailRenderModel(
title = task.title,
// Don't do: calling context.onEvent within "onEvent" will cause a crash described above
Expand All @@ -60,7 +60,7 @@ class TaskDetailFormula @Inject constructor(
}
}
```
which the render model and then stores it in the `State`, we would store the fetched task from the RxStream in
which the render model and then stores it in the `State`, we would store the fetched task from the RxAction in
the state and then construct the render model in the `evaluation` function itself:
```
class TaskDetailFormula @Inject constructor(
Expand Down Expand Up @@ -91,7 +91,7 @@ class TaskDetailFormula @Inject constructor(
return Evaluation(
output = renderModel,
updates = context.updates {
RxStream.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
RxAction.fromObservable { repo.fetchTask(input.taskId) }.onEvent { task ->
transition(state.copy(task = renderModel))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
package com.instacart.formula.coroutines

import com.instacart.formula.Action
import com.instacart.formula.Cancelable
import com.instacart.formula.Stream
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach

/**
* Formula [Stream] adapter to enable coroutine's Flow use.
* Adapter which allows creating Formula [Action] from Kotlin coroutine's. Take a
* look at [FlowAction.fromFlow] to create an [Action] from [Flow] type.
*/
interface FlowStream<Event> : Stream<Event> {
interface FlowAction<Event> : Action<Event> {

companion object {
/**
* Creates a [Stream] from a [Flow] factory [create].
* Creates an [Action] which will launch a [Flow] created by factory function [create].
*
* ```
* events(FlowStream.fromFlow { locationManager.updates() }) { event ->
* FlowAction.fromFlow { locationManager.updates() }.onEvent { event ->
* transition()
* }
* ```
*/
inline fun <Event> fromFlow(
scope: CoroutineScope = MainScope(),
crossinline create: () -> Flow<Event>
): Stream<Event> {
return object : FlowStream<Event> {
): Action<Event> {
return object : FlowAction<Event> {

override val scope: CoroutineScope = scope

Expand All @@ -40,22 +41,22 @@ interface FlowStream<Event> : Stream<Event> {
}

/**
* Creates a [Stream] from a [Flow] factory [create].
* Creates an [Action] which will launch a [Flow] created by factory function [create].
*
* ```
* events(FlowStream.fromFlow(itemId) { repo.fetchItem(itemId) }) { event ->
* FlowAction.fromFlow(itemId) { repo.fetchItem(itemId) }.onEvent { event ->
* transition()
* }
* ```
*
* @param key Used to distinguish this [Stream] from other streams.
* @param key Used to distinguish this [Action] from other actions.
*/
inline fun <Event> fromFlow(
scope: CoroutineScope = MainScope(),
key: Any?,
crossinline create: () -> Flow<Event>
): Stream<Event> {
return object : FlowStream<Event> {
): Action<Event> {
return object : FlowAction<Event> {
override val scope: CoroutineScope = scope

override fun flow(): Flow<Event> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.instacart.formula.coroutines

import com.instacart.formula.Stream
import com.instacart.formula.StreamFormula
import com.instacart.formula.Action
import com.instacart.formula.ActionFormula
import kotlinx.coroutines.flow.Flow

abstract class FlowFormula<Input : Any, Output : Any> : StreamFormula<Input, Output>() {
abstract class FlowFormula<Input : Any, Output : Any> : ActionFormula<Input, Output>() {

abstract override fun initialValue(input: Input): Output

abstract fun flow(input: Input): Flow<Output>

final override fun stream(input: Input): Stream<Output> {
return FlowStream.fromFlow {
final override fun action(input: Input): Action<Output> {
return FlowAction.fromFlow {
flow(input)
}
}
Expand Down
Loading

0 comments on commit 2447967

Please sign in to comment.