# Getting Started with RxPY

RxPY is a port of ReactiveX to Python. Learning Rx with Python is particularly interesting since Python removes much of the clutter that comes with statically typed languages.

## Importing the Rx module

In [2]:
import rx
from rx import Observable, Observer
from rx.subjects import Subject, BehaviorSubject, ReplaySubject, AsyncSubject
from rx.testing import marbles

## Generating an observable

In [3]:
observable = Observable.from_marbles("1-2-3-4-5-|")

## Generating an observer

In [4]:
class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
        
    def on_error(self, e):
        print("Got error: %s" % e)
        
    def on_completed(self):
        print("Sequence completed")

In [5]:
observable.subscribe(MyObserver())

Got: 1


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10cda1090>

Got: 2
Got: 3
Got: 4
Got: 5
Sequence completed


## Operators

### Transforming operators

In [6]:
observable = Observable.from_marbles("1-2-3-4-5-|")
observable.map(
    lambda x, i: int(x)
).map(
    lambda x, i: x ** 2
).subscribe(MyObserver())

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10cdc3ed0>

Got: 1
Got: 4
Got: 9
Got: 16
Got: 25
Sequence completed


### Filtering operators

In [7]:
observable = Observable.from_marbles("1-2-3-4-5-|")
observable.map(
    lambda x, i: int(x)
).map(
    lambda x, i: x ** 2
).filter(
    lambda x: x < 20
).take(3).subscribe(MyObserver())

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10cdd1f10>

Got: 1
Got: 4
Got: 9
Sequence completed


### Combining operators

In [8]:
observable = Observable.from_marbles("1-2-3-4-5-6-7-8-9-10|")
observable.map(
    lambda x, i: int(x)
).map(
    lambda x, i: x ** 2
).take_last(8).merge(Observable.from_marbles("1-2-3-4-5-|")).subscribe(MyObserver())

Got: 1


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10cdce8d0>

Got: 2
Got: 3
Got: 4
Got: 5
Got: 9
Got: 16
Got: 25
Got: 36
Got: 49
Got: 64
Got: 81
Got: 100
Sequence completed


In [10]:
observable = Observable.from_marbles("1-2-3-4-5-6-7-8-9-10-|")
observable.sample(sampler=Observable.from_marbles("1-2-3-4-5-|")).subscribe(MyObserver())

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10ce06190>

Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6


In [11]:
observable = Observable.from_marbles("1-2-3-4-5-6-7-8-9-10-|")
observable.sample(500).subscribe(MyObserver())

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10ce2c490>

Got: 5
Got: 10
Sequence completed


### Mathematical operators

In [12]:
observable = Observable.from_marbles("1-2-3-4-5-6-7-8-9-10|")
observable.map(
    lambda x, i: int(x)
).map(
    lambda x, i: x ** 2
).take_last(8).average().subscribe(MyObserver())

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10ce2cdd0>

Got: 47.5
Sequence completed


## Subjects(Observer and Observables)

Subject is both an Observable and an Observer, so you can both subscribe to it and on_next it with events. This makes it an obvious candidate if need to publish values into an observable stream for processing:

In [14]:
stream = Subject()
stream.on_next(41)

d = stream.subscribe(MyObserver())

stream.on_next(42)

d.dispose()
stream.on_next(43)

Got: 42


In [15]:
observable = Observable.from_marbles("1-2-3-4-5-6-7-8-9-10-|")
stream = Subject()
d = stream.subscribe(MyObserver())
observable.subscribe(stream)

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10ce42890>

Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6
Got: 7
Got: 8
Got: 9
Got: 10
Sequence completed


In [13]:
d.dispose()

In [17]:
observable = Observable.from_marbles("1-2-3-4-5-6-7-8-9-10-|")
stream = ReplaySubject()
d = stream.subscribe(MyObserver())
observable.subscribe(stream)

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10ce6ed50>

Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6
Got: 7
Got: 8
Got: 9
Got: 10
Sequence completed


In [18]:
d.dispose()

In [21]:
d = stream.map(lambda x, i: int(x) ** 2).subscribe(MyObserver())

Got: 1
Got: 4
Got: 9
Got: 16
Got: 25
Got: 36
Got: 49
Got: 64
Got: 81
Got: 100
Sequence completed


In [22]:
d.dispose()

In [23]:
observable = Observable.from_marbles("1-2-3-4-5-6-7-8-9-10-|")
stream = AsyncSubject()
d = stream.subscribe(MyObserver())
observable.subscribe(stream)

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10ce55250>

Got: 10
Sequence completed


In [24]:
observable = Observable.from_marbles("1-2-3-4-5-6-7-8-9-10-|")
stream = BehaviorSubject(2000)
d = stream.subscribe(MyObserver())
observable.subscribe(stream)

Got: 2000


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10cea7810>

Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6
Got: 7
Got: 8
Got: 9
Got: 10
Sequence completed


## Now, shit got real.