# Reactive programming

In *reactive programming*, which goes well with functional programming (FRP), we have *Observables* which emit events, and *Observers* which consume them, and we also have *operators*.

    →→e→→→→→→f→→→→g→h→→→→→→
    ↓↓↓↓↓↓ operator ↓↓↓↓↓↓↓
    →→→E→→→→→→→→F→→→G→→→→H→

We are thinking about streams. Almost everything is a stream.

## Resources

Some resources I read are
* [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754)
* [What is Reactive Programming?](https://medium.com/@kevalpatel2106/what-is-reactive-programming-da37c1611382)
* [Code your next android app using RxJava](https://medium.com/@kevalpatel2106/code-your-next-android-app-using-rxjava-d1db30ac9fcc)
* [An introduction to reactive programming](https://codewords.recurse.com/issues/two/an-introduction-to-reactive-programming) examples in Scala, with `Future` and `Promise`, and `async` and `await` constructs too
* [Notes on Reactive Programming Part I: The Reactive Landscape](https://spring.io/blog/2016/06/07/notes-on-reactive-programming-part-i-the-reactive-landscape)
* [Notes on Reactive Programming Part II: Writing Some Code](https://spring.io/blog/2016/06/13/notes-on-reactive-programming-part-ii-writing-some-code)

One of the libraries which brings the paradigm and Reactive Extensions aka Rx to Python is [RxPY](https://github.com/ReactiveX/RxPY).

In [1]:
from rx import Observable, Observer

## Basic observables and observers

Let's define a function for this, though we will soon want to use an interable for this, right?

In [10]:
def push_five_strings(observer):
    observer.on_next("Alpha")
    observer.on_next("Beta")
    observer.on_next("Gamma")
    observer.on_next("Delta")
    observer.on_next("Epsilon")
    observer.on_completed()

A simple observer, which just gets events, and prints them.

In [11]:
class PrintObserver(Observer):
    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error occurred: {0}".format(error))

Lets create the observable with the `Observable.create()` factory, and call it `source`.

In [12]:
source = Observable.create(push_five_strings)

Then let's just connect to it, which will flow the data.

In [14]:
source.subscribe(PrintObserver())

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e677fd0>

The above tastes redundant, we should be able to create an observable from a Python iterable. We indeed can, with `Observable.from_()`.

In [15]:
source = Observable.from_(['Kitten', 'Giraffe', 'Snake', 'Penguin'])

In [16]:
source.subscribe(PrintObserver())

Received Kitten
Received Giraffe
Received Snake
Received Penguin
Done!


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e654780>

Instead of actually implementing the Observer via inheritance, we can and often want – says the documentation – to pass three lambdas:

In [21]:
source.subscribe(lambda l: print("Received {}".format(l)))

Received Kitten
Received Giraffe
Received Snake
Received Penguin


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e82c160>

## Operators and chaining

The RxPY documentation says there are 130 operators, I don't know at all what they do. Map, filter, reduce sound familiar of course.

In [29]:
Observable.from_(['Kitten', 'Puppy', 'Aardvark', 'Murder of crows']) \
    .map(lambda s: len(s)) \
    .filter(lambda l: l > 5) \
    .subscribe(lambda v: print("Received animal of length {}".format('⦿' * v)))

Received animal of length ⦿⦿⦿⦿⦿⦿
Received animal of length ⦿⦿⦿⦿⦿⦿⦿⦿
Received animal of length ⦿⦿⦿⦿⦿⦿⦿⦿⦿⦿⦿⦿⦿⦿⦿


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e859828>

## Multicasting

Having many observers is useful.

In [30]:
import random

In [32]:
three_emissions = Observable.range(1, 3)
three_random_ints = three_emissions.map(lambda i: random.randint(1, 100))

three_random_ints.subscribe(lambda i: print("S1 received {}".format(i)))
three_random_ints.subscribe(lambda i: print("S2 received {}".format(i)))

S1 received 5
S1 received 10
S1 received 21
S2 received 62
S2 received 25
S2 received 6


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e82c2b0>

Instead if same stream should go to each subscriber, we can use `publish()` on the observable, and then use `connect()` of the observers to get the events. This defers the emission.

In [33]:
three_emissions = Observable.range(1, 3)
three_random_ints = three_emissions.map(lambda i: random.randint(1, 100)).publish()

three_random_ints.subscribe(lambda i: print("S1 received {}".format(i)))
three_random_ints.subscribe(lambda i: print("S2 received {}".format(i)))

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e9e83c8>

In [34]:
three_random_ints.connect()

S1 received 96
S2 received 96
S1 received 84
S2 received 84
S1 received 33
S2 received 33


<rx.disposables.compositedisposable.CompositeDisposable at 0x10ea04ef0>

With `auto_connect()` on the observable we can defer emission until *n* observers ("subscribers") are connected.

In [35]:
three_emissions = Observable.range(1, 3)
three_random_ints = three_emissions.map(lambda i: random.randint(1, 100)).publish().auto_connect(2)

three_random_ints.subscribe(lambda i: print("S1 received {}".format(i)))

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e639e10>

In [36]:
three_random_ints.subscribe(lambda i: print("S2 received {}".format(i)))

S1 received 25
S2 received 25
S1 received 20
S2 received 20
S1 received 42
S2 received 42


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e3f6358>

## Combining subscribers

There are plenty of factories on `Observable`, for instance `zip`, `concat`, `merge` and `latest`. Let's use `interval` here to do something slowly.

In [37]:
letters = Observable.from_(['Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon'])
intervals = Observable.interval(1000)

Observable.zip(letters, intervals, lambda s, i: (s, i)) \
    .subscribe(lambda t: print(t))

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e6bd400>

('Alpha', 0)
('Beta', 1)
('Gamma', 2)
('Delta', 3)
('Epsilon', 4)


`flat_map()` is worth learning about. Concurrencty is achieved with `subscribe_on()` and `observe_on()`. I didn't check that bit.

## Pythonics

We can concatenate observable with `+` and repeat them with `*` and slice with `[start:stop:step]`. As in

In [42]:
Observable.zip(letters[1:4:2] * 2, intervals, lambda s, i: (s, i)) \
    .subscribe(lambda t: print(t))

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10e566588>

('Beta', 0)
('Delta', 1)
('Beta', 2)
('Delta', 3)
