# Getting Started with RxPY

ReactiveX、つまりRxは、observableイベントストリームでプログラミングするためのAPIです。

"Rx = Observables + LINQ + Schedulers"     
３つの組み合わせを使用することでコードを簡潔にかける。



```python
subscription = Observable.subscribe(observer)
```


## インストール

pipを使ってRxPYをインストール

In [2]:
%%bash
pip install rx

Collecting rx
  Using cached Rx-1.6.0-py2.py3-none-any.whl
Installing collected packages: rx
Successfully installed rx-1.6.0


## Rxモジュールをインポート

In [1]:
import rx
from rx import Observable, Observer

## シーケンスの生成

一連のイベントを生成するには多くの方法があります。 開始する最も簡単な方法はfrom_iterable（）演算子を使用することです。この演算子はfrom_とも呼ばれます。 just、generate、create、rangeなどのシーケンスを生成することができる。
ObserverクラスのMyObserverを生成。range(10)を引数に入れると１行処理するごとにon_nextが呼ばれる。最後の処理が終わったらon_completedが呼ばれる。

In [2]:
class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
        
    def on_error(self, e):
        print("Got error: %s" % e)
        
    def on_completed(self):
        print("Sequence completed")

xs = Observable.from_iterable(range(3))
d = xs.subscribe(MyObserver())

Got: 0
Got: 1
Got: 2
Sequence completed


from_iterableもfrom_も同じじゃない？

In [3]:
xs = Observable.from_iterable(range(3))
d = xs.subscribe(print)

0
1
2


In [4]:
xs = Observable.from_(range(3))
d = xs.subscribe(print)

0
1
2


**NOTE:** subscribeメソッドは、on_next（）、on_error（）およびon_completed（）を渡すためのオブザーバまたは1〜3つのコールバックをとります。 匿名オブザーバのためのon_next（）ハンドラなので、上記の例ではオブザーバとして直接printを使用できます。


# シーケンスのフィルタリング

In [20]:
xs = Observable.from_(range(5))
d = xs.filter(
        lambda x: x % 2
    ).subscribe(print)

1
3


In [21]:
xs = Observable.from_(range(5))
d = xs.filter(
        lambda x: x == 2
    ).subscribe(print)

2


## シーケンスの変換

In [22]:
xs = Observable.from_(range(3))
d = xs.map(
        lambda x: x * 2
    ).subscribe(print)

0
2
4


In [23]:
xs = Observable.from_(range(3))
d = xs.map(
        lambda x: x + 2
    ).subscribe(print)

2
3
4


**NOTE: **  マッパー関数の2番目のパラメータとしてインデックスを取ることもできます：

In [8]:
xs = Observable.from_(range(10, 20, 2))
d = xs.map(
        lambda x, i: "%s: %s" % (i, x * 2)
    ).subscribe(print)

0: 20
1: 24
2: 28
3: 32
4: 36


## マージ

マージ演算子を使用して2つのシーケンスを単一のシーケンスにマージする

In [6]:
xs = Observable.range(1, 5)
ys = Observable.from_("abcde")
zs = xs.merge(ys).subscribe(print)

a
1
b
2
c
3
d
4
e
5


## Rxの時代

上記の例では、すべてのイベントが同じ瞬間に発生します。イベントは、注文によってのみ分離されます。これは、上記のマージ操作の結果が次のようないくつかの有効な結果を持つ可能性があるため、多くの新規参入者をRxに混乱させます。

    a1b2c3d4e5
    1a2b3c4d5e
    ab12cd34e5
    abcde12345
    
あなたが持っている唯一の保証は、1はxsの2より前ですが、xsの1はysの前後です。 どのイベントを先に進めるべきかを決定するのはスケジューラのソートの安定性です。 リアルタイムのデータストリームの場合、イベントは実際の時間で区切られるため、問題はありません。 あなたが "期待している"結果を確実に得るためには、Rxで遊ぶときにイベントの間に時間を入れておくことをお勧めします。

## マーブルとマーブルダイアグラム

こちらの画像を見ると何となくわかる。
http://rxmarbles.com/

前のセクションで見たように、RxとRxPYで遊ぶときに時間を追加するといいです。 RxPYを探検する素晴らしい方法は、大理石図を使って遊ぶことができる大理石テストモジュールを使うことです。 Marblesモジュールは、Observableに2つの新しい拡張メソッドを追加します。 メソッドはfrom_marbles（）とto_marbles（）です。


Examples:
1.  `res = rx.Observable.from_marbles("1-2-3-|")`
2.  `res = rx.Observable.from_marbles("1-2-3-x", rx.Scheduler.timeout)`

マーブルの文字列は、いくつかの特殊文字

```
    - = Timespan of 100 ms
    x = on_error()
    | = on_completed()
```

他のすべての文字は、文字列上にある特定の瞬間にon_next（）イベントとして扱われます。 複数の文字値を表現する必要がある場合は、「1-（42）-3」などの角括弧でグループ化できます。

Lets try it out:

In [24]:
from rx.testing import marbles

xs = Observable.from_marbles("a-b-c-|")
xs.to_blocking().to_marbles()

'a-b-c-|'

関数の処理たちのことをストリームと呼んでる<br>
大文字の文字列にxを挿入することで、偶数ストリームにエラーを追加するのは簡単です：

In [25]:
xs = Observable.from_marbles("1-2-3-x-5")
ys = Observable.from_marbles("1-2-3-4-5")
xs.merge(ys).to_blocking().to_marbles()

'11-22-3-34x'

## サブジェクトとストリーム

ストリームを作成する簡単な方法は、サブジェクトを使用することです。 

SubjectはObservableとObserverの両方であるため、イベントに参加することもon_nextすることもできます。

In [29]:
from rx.subjects import Subject

stream = Subject()
stream.on_next(41)

d = stream.subscribe(lambda x: print("Got: %s" % x))
stream.on_next(42)

Got: 42


In [33]:
d = stream.subscribe(lambda x: print("Got: %s" % x))
stream.on_next(43)

Got: 43


In [34]:
d.dispose()
stream.on_next(44)

# チュートリアル

In [None]:
#https://tutorialedge.net/python/python-event-driven-rxpy-tutorial/

In [5]:
stocks = [
  { 'TCKR' : 'APPL', 'PRICE': 200},
  { 'TCKR' : 'GOOG', 'PRICE': 90},
  { 'TCKR' : 'TSLA', 'PRICE': 120},
  { 'TCKR' : 'MSFT', 'PRICE': 150},
  { 'TCKR' : 'INTL', 'PRICE': 70},
]

In [6]:
from rx import Observable
def buy_stock_events(observer):
    for stock in stocks:
        if(stock['PRICE'] > 100):
            observer.on_next(stock['TCKR'])
        elif(stock['PRICE'] <= 0):
            observer.on_error(stock['TCKR'])
    observer.on_completed()

source = Observable.create(buy_stock_events) 

In [7]:
#メソッドを突っ込んで発行

In [8]:
source.subscribe(on_next=lambda value: print("購入 {0}".format(value)),
                on_completed=lambda: print("取引終了"),
                on_error=lambda e: print(e))

SyntaxError: invalid syntax (<ipython-input-8-f6119096764a>, line 1)

# Observablesの作成

## import

In [5]:
from random import randint
from datetime import datetime
import rx
from rx import Observable, Observer
from rx.concurrency import AsyncIOScheduler

class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
        
    def on_error(self, e):
        print("Got error: %s" % e)
        
    def on_completed(self):
        print("Sequence completed")

def rand():
    return randint(100, 999)

## create

オブザーバメソッドを呼び出す

In [6]:
def obs1(observer):
    observer.on_next(1)
    observer.on_next(2)
res = rx.Observable.create(obs1)
res.subscribe(on_next=lambda value: print("{0}".format(value)),
                on_completed=lambda: print("終了"),
                on_error=lambda e: print(e))

1
2


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x102785898>

## generate

In [7]:
res = rx.Observable.generate(0, lambda x: x < 4, lambda x: x + 1, lambda x: x)
res.subscribe(on_next=lambda value: print("{0}".format(value)),
                on_completed=lambda: print("終了"),
                on_error=lambda e: print(e))

0
1
2
3
終了


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x10211de48>

In [8]:
def generator_based_on_previous(x): return x + 1.1
def doubler(x): return 2 * x
res = rx.Observable.generate(0, lambda x: x < 4, generator_based_on_previous, doubler)
res.subscribe(on_next=lambda value: print("{0}".format(value)),
                on_completed=lambda: print("終了"),
                on_error=lambda e: print(e))

0
2.2
4.4
6.6000000000000005
終了


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x102785048>

## defer

In [38]:
# obs1 = rx.Observable.defer(rx.Observable.just(1))
# obs2 = rx.Observable.defer(rx.Observable.just(2))

# def selector():
#     return True

# res = rx.Observable.if_then(selector, obs1, obs2)
# res.subscribe(on_next=lambda value: print("{0}".format(value)),
#                 on_completed=lambda: print("終了"),
#                 on_error=lambda e: print(e))


In [41]:
# obs1 = rx.Observable.defer(rx.Observable.just(1))
# obs2 = rx.Observable.defer(rx.Observable.just(2))

# def selector():
#     return True

# res = rx.Observable.case(selector, { '1': obs1, '2': obs2 })
# res.subscribe(on_next=lambda value: print("{0}".format(value)),
#                 on_completed=lambda: print("終了"),
#                 on_error=lambda e: print(e))

## range

In [23]:
res = rx.Observable.range(0, 3)
res.subscribe(on_next=lambda value: print("{0}".format(value)),
                on_completed=lambda: print("終了"),
                on_error=lambda e: print(e))

0
1
2
終了


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x1027a4048>

## just

単一の指定された要素を含むobservableシーケンスを返します

In [99]:
res = rx.Observable.just(42)
res.subscribe(on_next=lambda value: print("{0}".format(value)),
                on_completed=lambda: print("終了"),
                on_error=lambda e: print(e))

42
終了


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x104867080>

## start

関数の結果値を公開するobservableシーケンス

In [52]:
res = rx.Observable.start(lambda: print('hello1'))
res = rx.Observable.start(lambda: print('hello2'))

hello1hello2



## from

指定されたenumerableシーケンスから要素が引き出されるobservableシーケンス

In [72]:
res = rx.Observable.from_iterable([1,2,3])
res

<rx.core.anonymousobservable.AnonymousObservable at 0x1057eacc0>

In [73]:
res.subscribe(on_next=lambda value: print("{0}".format(value)),
                on_completed=lambda: print("終了"),
                on_error=lambda e: print(e))

1
2
3
終了


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x1058260f0>

## timer

期限が経過してから各期間の後に値を生成するobservableシーケンスを返します

In [82]:
res = Observable.timer(datetime.now())
res = Observable.timer(datetime.now(), 1000)
res = res.time_interval().map(lambda x: 'val:%s dt:%s' % (x.value, x.interval)).take(3)

In [83]:
res.subscribe(on_next=lambda value: print("{0}".format(value)),
                on_completed=lambda: print("終了"),
                on_error=lambda e: print(e))

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x105845c50>

# Observablesの変換

## buffer

Observableシーケンスの各要素を、要素数情報に基づいて生成される0個以上のバッファに投げる

In [89]:
items = ['red', 'yellow', 'green', 'cyan', 'blue', 'purple']
source = rx.Observable.from_(items).buffer_with_count(2)
subscription = source.subscribe(
    lambda value: print("Next:", value),
    lambda error: print("Error:", error),
    lambda: print("Complete!"))

Next: ['red', 'yellow']
Next: ['green', 'cyan']
Next: ['blue', 'purple']
Complete!


# Flat map

In [93]:
items = range(5)
source = rx.Observable.from_(items)
# source.select_many(lambda x: Observable.range(0, x))
source.select_many(lambda x: Observable.range(0, x), lambda x, y: x + y)
subscription = source.subscribe(
    lambda value: print("Next:", value),
    lambda error: print("Error:", error),
    lambda: print("Complete!"))

Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Complete!
