# 18章 イベント駆動アーキテクチャ

In [1]:
from pydantic.dataclasses import dataclass
from pubsub import pub

## 単純イベント

### メッセージブローカーの使用(PyPubSub)

In [6]:
@dataclass
class Email:
    address: str
    name: str
    message: str

# トピックをサブスクライブするために、トピックと呼び出してほしい関数を定義する
def notify_customer_send_email_done(email: Email):
    print(email.adress)

pub.subscribe(notify_customer_send_email_done, "send-email")

(<pubsub.core.listener.Listener at 0x7f92b996a880>, True)

In [7]:
# トピックをパブリッシュするコード
def complete_send_email(email: Email):
    pub.publish("send-email", email)

### Observerパターン

In [None]:
from collections.abc import Callable

def notify_customer_send_email_done(email: Email, observers: list[Callable[Order]]):
    email_done(email)
    for observer_func in observers:
        observer_func(email)

# 今回の例だと新しいオブザーバーは引数として渡されるリストに入れるだけで追加できる

In [1]:
import rx

In [2]:
# オブザーバーはオブザーバブルをサブスクライブできる
from dataclasses import dataclass
from enum import Enum

class Direction(Enum):
    NORTH = "NORTH"
    WEST = "WEST"
    SOUTH = "SOUTH"
    EAST = "EAST"

@dataclass
class LocationData:
    x: int
    y: int
    z: int

@dataclass
class BatteryLevel:
    percent: int

@dataclass
class WindData:
    speed: int
    direction: Direction

@dataclass
class CurrentWeight:
    grams: int

def is_close_to_restaurant(*args):
    return False

observable = rx.of(
    LocationData(x=3, y=12, z=40),
    BatteryLevel(percent=95),
    BatteryLevel(percent=94),
    WindData(speed=15, direction=Direction.NORTH),
    LocationData(x=3, y=12, z=35),
    LocationData(x=4, y=12, z=32),
    # ... snip 100s of events
    BatteryLevel(percent=72),
    CurrentWeight(grams=300),
    CurrentWeight(grams=100)
)

In [None]:
#フィルタリング、変換、計算のパイプラインの実装が可能
val = None
def save_value(x):
    global val
    val = x
    
def save_average_weight(data):
    save_value(data)

def save_max_altitude(data):
    save_value(data)

import rx.operators

get_average_weight = observable.pipe(
    rx.operators.filter(lambda data: isinstance(data, CurrentWeight)),
    rx.operators.map(lambda cw: cw.grams),
    rx.operators.average()
)

get_average_weight.subscribe(save_average_weight)

assert val == 200

get_max_altitude = observable.pipe(
    rx.operators.skip_while(is_close_to_restaurant),
    rx.operators.filter(lambda data: isinstance(data, LocationData)),
    rx.operators.map(lambda loc: loc.z),
    rx.operators.max()
)

get_max_altitude.subscribe(save_max_altitude)

assert val == 40