In [None]:
!pip install rx

Collecting rx
[?25l  Downloading https://files.pythonhosted.org/packages/90/6c/5f1839d9ae2a8c85d119c51acaff1f1382f68691cb0f1cb3d0c9fdd32a93/Rx-3.1.1-py3-none-any.whl (197kB)
[K     |█▋                              | 10kB 15.8MB/s eta 0:00:01[K     |███▎                            | 20kB 15.3MB/s eta 0:00:01[K     |█████                           | 30kB 10.6MB/s eta 0:00:01[K     |██████▋                         | 40kB 8.4MB/s eta 0:00:01[K     |████████▎                       | 51kB 5.4MB/s eta 0:00:01[K     |██████████                      | 61kB 5.7MB/s eta 0:00:01[K     |███████████▋                    | 71kB 5.6MB/s eta 0:00:01[K     |█████████████▎                  | 81kB 6.1MB/s eta 0:00:01[K     |███████████████                 | 92kB 6.5MB/s eta 0:00:01[K     |████████████████▋               | 102kB 6.8MB/s eta 0:00:01[K     |██████████████████▎             | 112kB 6.8MB/s eta 0:00:01[K     |████████████████████            | 122kB 6.8MB/s eta 0:00:01[

In [None]:
import rx
import time
import threading
import random

In [None]:
# Function that gets the observers
def push_strings(observer, scheduler):
    array = ["ALPHA", "BETA", "GAMMA", "DELTA", "EPSILON"]
    for item in array:
        observer.on_next(item)
    observer.on_completed()
 
 
# Disposable class son
class PrintObserver(rx.disposable.Disposable):
    
    def on_next(self, value="UNDEFINED"):
        print(f"Received {value}")
 
    def on_error(self, error="UNDEFINED"):
        print(f"Error: {error}")
 
    def on_completed(self):
        print("Completed")
 
 
source = rx.create(push_strings)  # Creates an observable
source.subscribe(PrintObserver())  # Defines our oservable

Received ALPHA
Received BETA
Received GAMMA
Received DELTA
Received EPSILON
Completed


<rx.disposable.disposable.Disposable at 0x7f0cdd33c748>

In [None]:
# Observable factories
 
# Creates an observable
source = rx.of("ALPHA", "BETA", "GAMMA", "DELTA", "EPSILON")
 
source.subscribe(
    on_next = lambda i: print(f'Received {i}'),
    on_error = lambda e: print(f'Error: {e}'),
    on_completed = lambda: print('Completed!'),
)

Received ALPHA
Received BETA
Received GAMMA
Received DELTA
Received EPSILON
Completed!


<rx.disposable.disposable.Disposable at 0x7f0cdd33c128>

In [None]:
# Lazy representation, Stocks aren't updated in real time and stock prices
# Are randomized
 
stocks = [
          {"TCKR": "APPL", "PRICE": 170.00},
          {"TCKR": "GOGL", "PRICE": 90.00},
          {"TCKR": "TSLA", "PRICE": 120.00},
          {"TCKR": "MCSF", "PRICE": 150.00},
          {"TCKR": "INTL", "PRICE": 70.00},
]
 
 
def low_price(observer, scheduler):
    for stock in stocks:
        if stock["PRICE"] <= 100:
            observer.on_next(stock["TCKR"])
    observer.on_completed()
 
 
class StockObserver(rx.disposable.Disposable):
 
    def __init__(self, name):
        self.name = name
        self.bought = 0
 
    def on_next(self, value):
        self.bought += 1
        print(f'{self.name}: The condition was met, buying "{value}" stock!')
        print(f'{self.name}: I already bought: {self.bought} stocks')
 
    def on_error(self, error):
     print(f'Error: {error}')
 
    def on_completed(self):
        print("All buying instructions are done!")
 
 
source = rx.create(low_price)
def checking(name):
    time.sleep(5)
    source.subscribe(StockObserver(name))
 
 
threads = []
names = ["Lucas", "João", "Ana", "Arthur", "Pedro",
             "Matheus", "Cristiano", "Kauan", "Isabel", "Rose"]
 
for do_threads in range(5):
    name = random.choice(names)
    names.remove(name)
    thread = threading.Thread(target=checking, name=name, args=(name, ))
    threads.append(thread)
    thread.start()
 
time.sleep(0.5)
print("Waiting to update stock...")
time.sleep(0.5)
 
def update_stock(timer=1):
    for index, value in enumerate(stocks):
        stocks[index]["PRICE"] = random.randrange(0, 200)
 
for times in range(5):
    time.sleep(1)
    update_stock()
 
for thread in threads:
    thread.join()
 
time.sleep(10)
print('END.')

Waiting to update stock...
Kauan: The condition was met, buying "APPL" stock!
Kauan: I already bought: 1 stocks
Kauan: The condition was met, buying "MCSF" stock!
Kauan: I already bought: 2 stocks
All buying instructions are done!
Cristiano: The condition was met, buying "APPL" stock!
Cristiano: I already bought: 1 stocks
Lucas: The condition was met, buying "APPL" stock!
Lucas: I already bought: 1 stocks
Lucas: The condition was met, buying "MCSF" stock!
Lucas: I already bought: 2 stocks
All buying instructions are done!
Rose: The condition was met, buying "TSLA" stock!
Cristiano: The condition was met, buying "TSLA" stock!Rose: I already bought: 1 stocks
Rose: The condition was met, buying "MCSF" stock!
Rose: I already bought: 2 stocks
All buying instructions are done!
João: The condition was met, buying "TSLA" stock!

Cristiano: I already bought: 2 stocks
Cristiano: The condition was met, buying "MCSF" stock!
Cristiano: I already bought: 3 stocks
All buying instructions are done!
Jo

In [None]:
stocks = [
          {"TCKR": "APPL", "PRICE": 170.00},
          {"TCKR": "GOGL", "PRICE": 90.00},
          {"TCKR": "TSLA", "PRICE": 120.00},
          {"TCKR": "MCSF", "PRICE": 150.00},
          {"TCKR": "INTL", "PRICE": 70.00},
]
 
 
def low_price(observer, scheduler):
    for stock in stocks:
        if stock["PRICE"] <= 100:
            observer.on_next(stock["TCKR"])
    observer.on_completed()
 
 
source = rx.create(low_price)
source.subscribe(lambda value: print(f'Buying {value}'))

Buying GOGL
Buying INTL


<rx.disposable.disposable.Disposable at 0x7f0cdd266cf8>

In [None]:
stocks = [
          {"TCKR": "APPL", "PRICE": 170.00},
          {"TCKR": "GOGL", "PRICE": 90.00},
          {"TCKR": "TSLA", "PRICE": 120.00},
          {"TCKR": "MCSF", "PRICE": 150.00},
          {"TCKR": "INTL", "PRICE": 70.00},
]
 
def low_price(observer, scheduler):
    for stock in stocks:
        if stock["PRICE"] <= 100:
            observer.on_next(stock["TCKR"])
    observer.on_completed()
 
source = rx.create(low_price)
source.subscribe(
    on_next=lambda v: print(f"Buying {v}"),
    on_error=lambda e: print(f"Error: {e}"),
    on_completed=lambda: print(f"Completed.")
)

Buying GOGL
Buying INTL
Completed.


<rx.disposable.disposable.Disposable at 0x7f0cdd2649b0>