In [1]:
import rx
from rx import Observable, Observer

## Hello World

In [3]:
class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
        
    def on_error(self, e):
        print("Got error: %s" % e)
        
    def on_completed(self):
        print("Sequence completed")

xs = Observable.from_iterable(range(5)) \
               .map(lambda x: x*2) 

d = xs.subscribe(MyObserver())

Got: 0
Got: 2
Got: 4
Got: 6
Got: 8
Sequence completed


## Aggregations

In [None]:
xs = Observable.from_iterable(range(5)) \
               .window_with_count(2) \
               .subscribe(lambda x: x.subscribe(lambda val: print("Next: ", val), 
                                                lambda err: print("Err: ", err), 
                                                lambda: print("Complete!")
                )) 

In [18]:
xs = Observable.from_iterable(range(5)) \
               .buffer_with_count(2) \
               .subscribe(lambda val: print("Next: ", val), 
                          lambda err: print("Err: ", err), 
                          lambda: print("Complete!")
                          ) 

Next:  [0, 1]
Next:  [2, 3]
Next:  [4]
Complete!


## Concurrency

In [19]:
import multiprocessing
import random
import time
from threading import current_thread

from rx import Observable
from rx.concurrency import ThreadPoolScheduler


def intense_calculation(value):
    # sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
    time.sleep(random.randint(5, 20) * .1)
    return value

# calculate number of CPU's, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

# Create Process 1
Observable.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon") \
    .map(lambda s: intense_calculation(s)) \
    .subscribe_on(pool_scheduler) \
    .subscribe(on_next=lambda s: print("PROCESS 1: {0} {1}".format(current_thread().name, s)),
               on_error=lambda e: print(e),
               on_completed=lambda: print("PROCESS 1 done!"))

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x177c82cd978>

PROCESS 1: ThreadPoolExecutor-1_0 Alpha
PROCESS 1: ThreadPoolExecutor-1_0 Beta
PROCESS 1: ThreadPoolExecutor-1_0 Gamma
PROCESS 1: ThreadPoolExecutor-1_0 Delta
PROCESS 1: ThreadPoolExecutor-1_0 Epsilon
PROCESS 1 done!


## sandbox

In [21]:
def emit():
    for i in range(5):
        yield i

In [24]:
Observable.from_(emit()) \
    .map(lambda x: x*2) \
    .subscribe(print)                

0
2
4
6
8


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x177c82cd518>