# Event-Driven Architecture

In [1]:
from typing import Callable

## Simple Events


### Using a message broker

In [2]:
# Before continuing, install and import the 'pypubsub' library,
# which implements a pub/sub architecture, as follows

"""
!pip install pypubsub
"""
from pubsub import pub


In [3]:
# To subscribe to a topic, we specify the topic 
# and the function to be called, as follows:

class Order:
    pass

def notify_customer_that_meal_is_done(order: Order):
    # ... snip ...
    return

pub.subscribe(notify_customer_that_meal_is_done, "meal-done")


(<pubsub.core.listener.Listener at 0x7f56713629a0>, True)

In [4]:
# To publish to a topic, we do the following:

def complete_order(order: Order):
    packge_order(order)
    pub.publish("meal-done", order)


In [5]:
# We can add additional subscribers to a topic is easy, 
# just by adding new functions to the topic, as follows:

def schedule_pick_up_for_meal(order: Order):
    '''Schedule a drone pick-up'''
    # ... snip ...
    return
    
pub.subscribe(schedule_pick_up_for_meal, "meal-done")

(<pubsub.core.listener.Listener at 0x7f5671362e50>, True)

### The Observer Pattern


In [6]:
# We can rewrite the previous publisher function example
# to use the Observer Pattern, as follows:

def complete_order(order: Order, observers: list[Callable]):
    package_order(order)
    for observer_func in observers:
        observer_func(order)


## Streaming events

In [7]:
# Before continuing, install and import the 'rx' library,
# which suports reactive programming, as follows

"""
!pip install rx
"""
import rx


In [11]:
# With the RxPy library, we  define a stream of data 
# (an observable, in RxPY parlance) as follows: 

class LocationData:
    def __init__(self, x, y, z) -> None:
        pass

class BatteryLevel:
    def __init__(self, percent) -> None:
        pass

observable = rx.of(
    LocationData(x=3, y=12, z=40),
    BatteryLevel(percent=95),
)
observable

<rx.core.observable.observable.Observable at 0x7f56712fd220>

In [9]:
# Once we have an observable, observers can subscribe to it, 
# similar to the pub/sub mechanism:

def handle_drone_data(value):
    # ... snip handle drone data ...
    return

observable.subscribe(handle_drone_data)

<rx.disposable.disposable.Disposable at 0x7f56715466d0>

In [15]:
# But unlike pub/sub patterns, RxPY allows us to pipe operations together, as follows:

class CurrentWeight:
    pass 

get_average_weight = observable.pipe(
    rx.operators.filter(lambda data: isinstance(data, CurrentWeight)),
    rx.operators.map(lambda cw: cw.grams),
    rx.operators.average()
)

def save_average_weight():
    """
    Does something with the final data 
    (e.g. save to database, print to screen, etc.)
    """
    return

# get_average_weight.subscribe(save_average_weight)
