# Reactive Programming

Reactive programming describes a design paradigm that relies on asynchronous programming logic to handle real-time updates to otherwise static content. A reactive application must be;
- **Responsive** takes action in a timely manner
- **Resilient** can handle failures and keep working 
- **Elastic** The system stays responsive under varying workload
- **Message Driven**  Reactive Systems rely on asynchronous message-passing

## Observer Pattern
The *Observer-Observable* pattern, which ensures that a group of objects gets alerted whenever another object's state changes.

- **Observable** notifies other objects of a change in the state
- **Observer** receive the notification and apply the suited work

>To receive the notifications the Observer must *subscribe* to the Observable

### Observables
Based on their behaviour, Observables can be distinguished into:

- *cold Observables*: they publish data only if at least one Observer is subscribed. The data is queued into a Data Stream (pull relation)
- *hot Observables*: they publish data independenlty of the number of Observers. As soon as an Observer is subscribed, it will start to receive data (from that point on).

> In the case of hot Observables data can be lost. This approach is typically used when data is irrilevant if not processed immediately

## ReactiveX
The most well-known implementation of the Reactive Programming paradigm is ReactiveX. It's built on the *Observer-Observable*.
The implemented library for python is the RxPy.
```
import rx
```

An Observable can be created to publish data from static data and dynamic data sources

```
observable = rx.from_list([1,2,3,4,5])
```

An Observer can be *subscribed* using the `subscribe` method:
```
subscribe(observer=None, on_error=None, on_completed=None, on_next=None, *, scheduler=None)
```
- `observer` The object that is to receive notifications.
- `on_error` Action to invoke upon exceptional termination of the observable sequence.
- `on_completed` Action to invoke upon graceful termination of the observable sequence.
- `on_next` Action to invoke for each element in the observable sequence.
- `scheduler` The default scheduler to use for this subscription.


In [3]:
import rx

def print_(val):
    print(f'(from method) Value received {val}\n',end='')


class DataObserver:
    def on_next(self, value):
        print(f'(from DataObserver) Value received {value}\n',end='')
    
    def on_error(self, error):
        print(f'(from DataObserver) Error {error}\n',end='')
    
    def on_completed(self):
        print(f'(from DataObserver) Completed!\n',end='')


observable = rx.from_list([1,2,3,4,5])
# use lambda
observable.subscribe(lambda val: print(f'(from lambda) Value received: {val}\n', end=''))

# use named function
observable.subscribe(print_)

# use Observable
observable.subscribe(DataObserver())

(from lambda) Value received: 1
(from lambda) Value received: 2
(from lambda) Value received: 3
(from lambda) Value received: 4
(from lambda) Value received: 5
(from method) Value received 1
(from method) Value received 2
(from method) Value received 3
(from method) Value received 4
(from method) Value received 5
(from DataObserver) Value received 1
(from DataObserver) Value received 2
(from DataObserver) Value received 3
(from DataObserver) Value received 4
(from DataObserver) Value received 5
(from DataObserver) Completed!


<rx.disposable.disposable.Disposable at 0x26619a5fb20>

### Subjects
A *subject* is both an Observable and an Observer. Typically it is used to gather data from a source, modify it and redirect to the final Observers. Moreover, note for example in the previous case how each observer is served uniquely by the Observable; the Subject on the contrary is able to serve all the subscribed Observable before the next data item is ready.

In [23]:
import rx
from rx.subject import Subject
from datetime import datetime
from time import sleep
class TimeStampSubject(Subject):
    def on_next(self, value):
        now = datetime.now() # current date and time
        sleep(1)
        date_time = now.strftime("%m/%d/%Y, %H:%M:%S:%f")
        super().on_next((value, date_time))
    
    def on_error(self, error):
        print(f'(from DataObserver) Error {error}\n',end='')
    
    def on_completed(self):
        print(f'(from DataObserver) Completed!\n',end='')



def print_(val):
    val, date_time = val
    print(f'(from method) {date_time} - Value received: {val}\n',end='')



observable = rx.from_list([1,2,3,4,5])

subject = TimeStampSubject()
subject.subscribe(print_)
subject.subscribe(lambda val: print(f'(from lambda) {val[1]} - Value received: {val[0]}\n', end=''))

# The Observers are added to the subject before it is added to the Observable so that no data is lost
observable.subscribe(subject)

(from method) 10/20/2021, 19:28:18:918224 - Value received: 1
(from lambda) 10/20/2021, 19:28:18:918224 - Value received: 1
(from method) 10/20/2021, 19:28:19:932714 - Value received: 2
(from lambda) 10/20/2021, 19:28:19:932714 - Value received: 2
(from method) 10/20/2021, 19:28:20:942003 - Value received: 3
(from lambda) 10/20/2021, 19:28:20:942003 - Value received: 3
(from method) 10/20/2021, 19:28:21:946258 - Value received: 4
(from lambda) 10/20/2021, 19:28:21:946258 - Value received: 4
(from method) 10/20/2021, 19:28:22:953453 - Value received: 5
(from lambda) 10/20/2021, 19:28:22:953453 - Value received: 5
(from DataObserver) Completed!


<rx.disposable.disposable.Disposable at 0x1f45e7043a0>

### Concurrency
By default RxPy is single threaded. To do differently a scheduler must be provided when the subscribe method is called. There are different types of scheduler. From `rx.concurrency`: 

- ImmediateScheduler
- CurrentThreadScheduler
- TimeoutScheduler
- NewThreadScheduler
- ThreadPoolScheduler

`rx.concurrency.mainloopscheduler` defines other schedulers:
- `IOLoopScheduler`
- `PyGameScheduler`
- `WxScheduler`

## operators
It is possible to apply operators to the data stream. Operators are defined in the `rx.operators` module.
```
from rx import operators as op
```
To apply an operator it is necessary to create a pipe using the method `Observable.pipe()` and passing one or more operators.

**Creating Observables**

| Operator | Description |
| :--- | :--- | 
| `create` | Create an Observable from scratch by calling observer methods programmatically. |
| `empty` | Creates an Observable that emits no item and completes immediately.|
| `never` | Creates an Observable that never completes.|
| `throw` | Creates an Observable that terminates with an error.|
| `from_` | Convert some other object or data structure into an Observable.|
|`interval`| Create an Observable that emits a sequence of integers spaced by a particular time interval.|
| `just` | Convert an object or a set of objects into an Observable that emits that object or those objects.|
| `range`| Create an Observable that emits a range of sequential integers.|
| `repeat_value` | Create an Observable that emits a particular item or sequence of items repeatedly.|
| `start`| Create an Observable that emits the return value of a function.|
| `timer`| Create an Observable that emits a single item after a given delay.|

In [1]:
import rx
from datetime import datetime
from time import sleep
res = rx.interval(1.0)
res.subscribe(lambda val: print(f'{datetime.now().strftime("%m/%d/%Y, %H:%M:%S")} - {val}\n', end=''))

<rx.disposable.disposable.Disposable at 0x2c3a4ee9f70>

10/21/2021, 15:51:18 - 0
10/21/2021, 15:51:19 - 1
10/21/2021, 15:51:20 - 2
10/21/2021, 15:51:21 - 3
10/21/2021, 15:51:22 - 4
10/21/2021, 15:51:23 - 5
10/21/2021, 15:51:24 - 6


**Transforming Observables**

| Operator | Description |
| :--- | :--- | 
| `buffer` | Periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time.|
| `flat_map` | Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.|
|`group_by` | Divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key.|
| `map` |Transform the items emitted by an Observable by applying a function to each item.|
|`scan` | Apply a function to each item emitted by an Observable, sequentially, and emit each successive value.|
|`window` | Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.|

In [5]:
import rx
import rx.operators as op

sentences = ['Hello world', 'Reactive Programming is cool', 'Computer science']
r = rx.from_(sentences)
words = r.pipe(op.flat_map(lambda x: x.split()))

words.subscribe(lambda word: print(word))

Hello
world
Reactive
Programming
is
cool
Computer
science


<rx.disposable.disposable.Disposable at 0x2c3a5030a00>

10/21/2021, 15:57:04 - 342
10/21/2021, 15:57:05 - 343
10/21/2021, 15:57:06 - 344
10/21/2021, 15:57:07 - 345
10/21/2021, 15:57:08 - 346
10/21/2021, 15:57:09 - 347
10/21/2021, 15:57:10 - 348
10/21/2021, 15:57:11 - 349
10/21/2021, 15:57:12 - 350
10/21/2021, 15:57:13 - 351
10/21/2021, 15:57:14 - 352
10/21/2021, 15:57:15 - 353


**Filtering Observables**

| Operator | Description |
| :--- | :--- | 
|`debounce`|Only emit an item from an Observable if a particular timespan has passed without it emitting another item.|
|`distinct`|Suppress duplicate items emitted by an Observable.|
|`element_at`|Emit only item n emitted by an Observable.|
|`filter`|Emit only those items from an Observable that pass a predicate test.|
|`first`|Emit only the first item, or the first item that meets a condition, from an Observable.|
|`ignore_elements`|Do not emit any items from an Observable but mirror its termination notification.|
|`last`|Emit only the last item emitted by an Observable.|
|`sample`|Emit the most recent item emitted by an Observable within periodic time intervals.|
|`skip`|Suppress the first n items emitted by an Observable.|
|`skip_last`|Suppress the last n items emitted by an Observable.|
|`take`|Emit only the first n items emitted by an Observable.|
|`take_last`|Emit only the last n items emitted by an Observable.|

**Combining Observables**

| Operator | Description |
| :--- | :--- | 
|`combine_latest`|When an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function.|
|`join`|Combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable.|
|`merge`|Combine multiple Observables into one by merging their emissions.|
|`start_with`|Emit a specified sequence of items before beginning to emit the items from the source Observable.|
|`switch_latest`|Convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.|
|`zip`|Combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function.|
|`fork_join`|Wait for Observables to complete and then combine last values they emitted into a tuple.|

**Error Handling**

| Operator | Description |
| :--- | :--- | 
|`catch`|Continues observable sequences which are terminated with an exception by switching over to the next observable sequence.|
|`retry`|If a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error.|

**Utility Operators**

| Operator | Description |
| :--- | :--- | 
|`delay`|Shift the emissions from an Observable forward in time by a particular amount.|
|`do`|Register an action to take upon a variety of Observable lifecycle events.|
|`materialize`|Materializes the implicit notifications of an observable sequence as explicit notification values.|
|`dematerialize`|Dematerializes the explicit notification values of an observable sequence as implicit notifications.|
|`observe_on`|Specify the scheduler on which an observer will observe this Observable.|
|`subscribe`|Operate upon the emissions and notifications from an Observable.|
|`subscribe_on`|Specify the scheduler an Observable should use when it is subscribed to.|
|`time_interval`|Convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions.|
|`timeout`|Mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items.|
|`timestamp`|Attach a timestamp to each item emitted by an Observable.|

**Conditional and Boolean Operators**

| Operator | Description |
| :--- | :--- | 
|`all`|Determine whether all items emitted by an Observable meet some criteria.|
|`amb`|Given two or more source Observables, emit all of the items from only the first of these Observables to emit an item.|
|`contains`|Determine whether an Observable emits a particular item or not.|
|`default_if_empty`|Emit items from the source Observable, or a default item if the source Observable emits nothing.|
|`sequence_equal`|Determine whether two Observables emit the same sequence of items.|
|`skip_until`|Discard items emitted by an Observable until a second Observable emits an item.|
|`skip_while`|Discard items emitted by an Observable until a specified condition becomes false.|
|`take_until`|Discard items emitted by an Observable after a second Observable emits an item or terminates.|
|`take_whle`|Discard items emitted by an Observable after a specified condition becomes false.|

**Mathematical and Aggregate Operators**

| Operator | Description |
| :--- | :--- | 
|`average`|Calculates the average of numbers emitted by an Observable and emits this average.|
|`concat`|Emit the emissions from two or more Observables without interleaving them.|
|`count`|Count the number of items emitted by the source Observable and emit only this value.|
|`max`|Determine, and emit, the maximum-valued item emitted by an Observable.|
|`min`|Determine, and emit, the minimum-valued item emitted by an Observable.|
|`reduce`|Apply a function to each item emitted by an Observable, sequentially, and emit the final value.|
|`sum`|Calculate the sum of numbers emitted by an Observable and emit this sum.|

**Connectable Observable Operators**

| Operator | Description |
| :--- | :--- | 
|`connect`|Instruct a connectable Observable to begin emitting items to its subscribers.|
|`publish`|Convert an ordinary Observable into a connectable Observable.|
|`ref_count`|Make a Connectable Observable behave like an ordinary Observable.|
|`replay`|Ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items.|

In [1]:
%%html
<style>
table {float:left}
</style>