### Event Sourcing

In [17]:
from dataclasses import dataclass
from datetime import datetime
from pprint import pprint


class Event:
    
    def as_dict(self):
        pass

@dataclass
class UserCreatedEvent(Event):
    id: int
    username: str
    balance: int
    created_at: datetime = datetime.now()

    def as_dict(self):
        return {
            "id": self.id,
            "username": self.username,
            "balance": self.balance,
        }

@dataclass
class UserBalanceChangedEvent(Event):
    id: int
    balance_change: int
    created_at: datetime = datetime.now()

    def as_dict(self):
        return {
            "id": self.id,
            "balance_change": self.balance_change,
        }

# models.py
class User:
    def __init__(self):
        self.id = None
        self.username = None
        self.balance = None
        self.events = []
    
    def __repr__(self):
        return (
            f"<User(id={self.id}, meta="
            f"['username': {self.username}, 'balance': {self.balance}, "
            f"'events': {self.events}])>"
        )

    def apply(self, event: Event):
        if isinstance(event, UserCreatedEvent):
            self.id = event.id
            self.username = event.username
            self.balance = event.balance
        elif isinstance(event, UserBalanceChangedEvent):
            self.balance += event.balance_change
        self.events.append(event)
    
    def init(self, id: int, username: str, balance: int):
        self.apply(UserCreatedEvent(id, username, balance))
    
    def change_balance(self, balance_change: int):
        if self.balance + balance_change < 0:
            raise ValueError("Insufficient balance")
        self.balance += balance_change
        self.apply(UserBalanceChangedEvent(self.id, balance_change))


# repository.py
class UserRepository:
    saved_users: dict[str, list[Event]] = {}
    
    def save(self, user: User):
        self.saved_users[user.id] = list(user.events)

    def get(self, id: int) -> User:
        return self.saved_users.get(id)
    
    def replay(self, id: int, to_: datetime | None=None) -> list[Event] | None:
        events = self.saved_users.get(id, [])
        if not events:
            return None
        if to_:
            events = [event for event in events if event.created_at <= to_]
        return [
            {
                "event": event.as_dict(),
                "event_type": type(event).__name__,
                "event_id": event.id,
                "created_at": event.created_at
            }
            for event in events
        ]

    def delete(self, id: int):
        if id in self.saved_users:
            del self.saved_users[id]
    
    def gen_user_id(self):
        return len(self.saved_users) + 1


# main.py
def main():
    user_repo = UserRepository()
    user_id = user_repo.gen_user_id()
    user = User()
    user.init(user_id, "test_user", 100)
    user_repo.save(user)
    import time

    user.change_balance(10)
    user.change_balance(250)
    time.sleep(1)
    user.change_balance(-50)

    user_repo.save(user)
    replay_events = user_repo.replay(user_id, to_=datetime.now())
    pprint(replay_events, indent=2)

main()

[ { 'created_at': datetime.datetime(2025, 5, 7, 16, 8, 34, 633698),
    'event': {'balance': 100, 'id': 1, 'username': 'test_user'},
    'event_id': 1,
    'event_type': 'UserCreatedEvent'},
  { 'created_at': datetime.datetime(2025, 5, 7, 16, 8, 34, 638528),
    'event': {'balance_change': 10, 'id': 1},
    'event_id': 1,
    'event_type': 'UserBalanceChangedEvent'},
  { 'created_at': datetime.datetime(2025, 5, 7, 16, 8, 34, 638528),
    'event': {'balance_change': 250, 'id': 1},
    'event_id': 1,
    'event_type': 'UserBalanceChangedEvent'},
  { 'created_at': datetime.datetime(2025, 5, 7, 16, 8, 34, 638528),
    'event': {'balance_change': -50, 'id': 1},
    'event_id': 1,
    'event_type': 'UserBalanceChangedEvent'}]


In [18]:
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Optional, Any
from uuid import UUID, uuid4
import json


# --- 1. EVENTS ---
# Base Event class
@dataclass
class Event:
    event_id: UUID
    timestamp: datetime
    event_type: str
    entity_id: UUID
    data: Dict[str, Any]
    
    @classmethod
    def create(cls, entity_id: UUID, data: Dict[str, Any], event_type: Optional[str] = None):
        event_type = event_type or cls.__name__
        return cls(
            event_id=uuid4(),
            timestamp=datetime.now(),
            event_type=event_type,
            entity_id=entity_id,
            data=data
        )


# Concrete Event types
@dataclass
class ShipRegistered(Event): pass

@dataclass
class ShipDocked(Event): pass

@dataclass
class ShipDeparted(Event): pass

@dataclass
class CargoLoaded(Event): pass

@dataclass
class CargoUnloaded(Event): pass


# --- 2. EVENT STORE ---
class EventStore:
    def __init__(self):
        self.events: List[Event] = []
        
    def append(self, event: Event) -> None:
        """Add an event to the store"""
        self.events.append(event)
        
    def get_events_for_entity(self, entity_id: UUID) -> List[Event]:
        """Get all events for a specific entity"""
        return [e for e in self.events if e.entity_id == entity_id]
    
    def persist(self, filename: str) -> None:
        """Save events to a file"""
        with open(filename, 'w') as f:
            serialized = [
                {
                    'event_id': str(e.event_id),
                    'timestamp': e.timestamp.isoformat(),
                    'event_type': e.event_type,
                    'entity_id': str(e.entity_id),
                    'data': e.data
                }
                for e in self.events
            ]
            json.dump(serialized, f, indent=2)
    
    def load(self, filename: str) -> None:
        """Load events from a file"""
        with open(filename, 'r') as f:
            serialized = json.load(f)
            self.events = [
                Event(
                    event_id=UUID(e['event_id']),
                    timestamp=datetime.fromisoformat(e['timestamp']),
                    event_type=e['event_type'],
                    entity_id=UUID(e['entity_id']),
                    data=e['data']
                )
                for e in serialized
            ]


# --- 3. ENTITY (AGGREGATE) ---
@dataclass
class CargoShip:
    id: UUID
    name: str
    current_port: Optional[str] = None
    status: str = "registered"
    cargo: Dict[str, int] = None
    
    def __post_init__(self):
        if self.cargo is None:
            self.cargo = {}
    
    @classmethod
    def create(cls, name: str) -> tuple['CargoShip', Event]:
        """Factory method to create a new ship"""
        ship_id = uuid4()
        ship = cls(id=ship_id, name=name)
        
        # Create and return the registration event
        event = ShipRegistered.create(
            entity_id=ship_id,
            data={'name': name}
        )
        
        return ship, event
    
    def dock(self, port: str) -> Event:
        """Dock the ship at a port"""
        self.current_port = port
        self.status = "docked"
        
        return ShipDocked.create(
            entity_id=self.id,
            data={'port': port}
        )
    
    def depart(self) -> Event:
        """Depart from current port"""
        if not self.current_port:
            raise ValueError("Ship is not docked at any port")
        
        departed_from = self.current_port
        self.current_port = None
        self.status = "at sea"
        
        return ShipDeparted.create(
            entity_id=self.id,
            data={'departed_from': departed_from}
        )
    
    def load_cargo(self, item: str, quantity: int) -> Event:
        """Load cargo onto the ship"""
        if not self.current_port:
            raise ValueError("Ship must be docked to load cargo")
        
        if item in self.cargo:
            self.cargo[item] += quantity
        else:
            self.cargo[item] = quantity
            
        return CargoLoaded.create(
            entity_id=self.id,
            data={
                'item': item,
                'quantity': quantity,
                'port': self.current_port
            }
        )
    
    def unload_cargo(self, item: str, quantity: int) -> Event:
        """Unload cargo from the ship"""
        if not self.current_port:
            raise ValueError("Ship must be docked to unload cargo")
        
        if item not in self.cargo:
            raise ValueError(f"No {item} on the ship")
        
        if self.cargo[item] < quantity:
            raise ValueError(f"Not enough {item} on the ship")
        
        self.cargo[item] -= quantity
        if self.cargo[item] == 0:
            del self.cargo[item]
            
        return CargoUnloaded.create(
            entity_id=self.id,
            data={
                'item': item,
                'quantity': quantity,
                'port': self.current_port
            }
        )


# --- 4. REPOSITORY (for materializing the current state) ---
class ShipRepository:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.ships: Dict[UUID, CargoShip] = {}
        
    def save(self, ship: CargoShip, event: Event) -> None:
        """Save a ship and its latest event"""
        self.event_store.append(event)
        self.ships[ship.id] = ship
        
    def get(self, ship_id: UUID) -> Optional[CargoShip]:
        """Get a ship by ID or rebuild it from events"""
        # Return from cache if available
        if ship_id in self.ships:
            return self.ships[ship_id]
        
        # Rebuild from events if not in cache
        events = self.event_store.get_events_for_entity(ship_id)
        if not events:
            return None
            
        # Get the first event to initialize the ship
        first_event = events[0]
        if first_event.event_type != "ShipRegistered":
            raise ValueError("First event must be ShipRegistered")
        
        ship = CargoShip(
            id=ship_id,
            name=first_event.data['name']
        )
        
        # Apply subsequent events to rebuild the current state
        for event in events[1:]:
            self._apply_event(ship, event)
            
        # Cache and return the ship
        self.ships[ship_id] = ship
        return ship
    
    def _apply_event(self, ship: CargoShip, event: Event) -> None:
        """Apply an event to update a ship's state"""
        if event.event_type == "ShipDocked":
            ship.current_port = event.data['port']
            ship.status = "docked"
        elif event.event_type == "ShipDeparted":
            ship.current_port = None
            ship.status = "at sea"
        elif event.event_type == "CargoLoaded":
            item = event.data['item']
            quantity = event.data['quantity']
            if item in ship.cargo:
                ship.cargo[item] += quantity
            else:
                ship.cargo[item] = quantity
        elif event.event_type == "CargoUnloaded":
            item = event.data['item']
            quantity = event.data['quantity']
            ship.cargo[item] -= quantity
            if ship.cargo[item] == 0:
                del ship.cargo[item]


# --- 5. EVENT PROCESSOR ---
class ShipEventProcessor:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.ship_repository = ShipRepository(event_store)
        
    def handle(self, command_name: str, **kwargs) -> Any:
        """Process commands and generate events"""
        command_handlers = {
            'register_ship': self._register_ship,
            'dock_ship': self._dock_ship,
            'depart_ship': self._depart_ship,
            'load_cargo': self._load_cargo,
            'unload_cargo': self._unload_cargo,
        }
        
        if command_name not in command_handlers:
            raise ValueError(f"Unknown command: {command_name}")
            
        return command_handlers[command_name](**kwargs)
        
    def _register_ship(self, name: str) -> UUID:
        """Register a new ship"""
        ship, event = CargoShip.create(name)
        self.ship_repository.save(ship, event)
        return ship.id
    
    def _dock_ship(self, ship_id: UUID, port: str) -> None:
        """Dock a ship at a port"""
        ship = self.ship_repository.get(ship_id)
        if not ship:
            raise ValueError(f"Ship with ID {ship_id} not found")
            
        event = ship.dock(port)
        self.ship_repository.save(ship, event)
    
    def _depart_ship(self, ship_id: UUID) -> None:
        """Depart a ship from its current port"""
        ship = self.ship_repository.get(ship_id)
        if not ship:
            raise ValueError(f"Ship with ID {ship_id} not found")
            
        event = ship.depart()
        self.ship_repository.save(ship, event)
    
    def _load_cargo(self, ship_id: UUID, item: str, quantity: int) -> None:
        """Load cargo onto a ship"""
        ship = self.ship_repository.get(ship_id)
        if not ship:
            raise ValueError(f"Ship with ID {ship_id} not found")
            
        event = ship.load_cargo(item, quantity)
        self.ship_repository.save(ship, event)
    
    def _unload_cargo(self, ship_id: UUID, item: str, quantity: int) -> None:
        """Unload cargo from a ship"""
        ship = self.ship_repository.get(ship_id)
        if not ship:
            raise ValueError(f"Ship with ID {ship_id} not found")
            
        event = ship.unload_cargo(item, quantity)
        self.ship_repository.save(ship, event)


# --- 6. DEMO USAGE ---
def run_demo():
    # Initialize event store and processor
    event_store = EventStore()
    processor = ShipEventProcessor(event_store)
    
    # Register a new ship
    ship_id = processor.handle('register_ship', name="Pacific Explorer")
    print(f"Registered ship with ID: {ship_id}")
    
    # Dock the ship
    processor.handle('dock_ship', ship_id=ship_id, port="Singapore")
    print("Ship docked at Singapore")
    
    # Load cargo
    processor.handle('load_cargo', ship_id=ship_id, item="electronics", quantity=500)
    print("Loaded 500 electronics")
    
    # Depart
    processor.handle('depart_ship', ship_id=ship_id)
    print("Ship departed from Singapore")
    
    # Dock at another port
    processor.handle('dock_ship', ship_id=ship_id, port="Hong Kong")
    print("Ship docked at Hong Kong")
    
    # Unload some cargo
    processor.handle('unload_cargo', ship_id=ship_id, item="electronics", quantity=200)
    print("Unloaded 200 electronics")
    
    # Get current ship state
    ship = processor.ship_repository.get(ship_id)
    print(f"\nCurrent ship state:")
    print(f"Name: {ship.name}")
    print(f"Status: {ship.status}")
    print(f"Current port: {ship.current_port}")
    print(f"Cargo: {ship.cargo}")
    
    # View all events
    print("\nEvent log:")
    for i, event in enumerate(event_store.events):
        print(f"{i+1}. {event.event_type} at {event.timestamp}: {event.data}")
    
    # Save events to file
    event_store.persist("ship_events.json")
    print("\nEvents saved to ship_events.json")

run_demo()

Registered ship with ID: 4ea11601-df83-486a-adb5-e2c58809dfd8
Ship docked at Singapore
Loaded 500 electronics
Ship departed from Singapore
Ship docked at Hong Kong
Unloaded 200 electronics

Current ship state:
Name: Pacific Explorer
Status: docked
Current port: Hong Kong
Cargo: {'electronics': 300}

Event log:
1. ShipRegistered at 2025-05-07 16:19:38.512175: {'name': 'Pacific Explorer'}
2. ShipDocked at 2025-05-07 16:19:38.512319: {'port': 'Singapore'}
3. CargoLoaded at 2025-05-07 16:19:38.512336: {'item': 'electronics', 'quantity': 500, 'port': 'Singapore'}
4. ShipDeparted at 2025-05-07 16:19:38.512347: {'departed_from': 'Singapore'}
5. ShipDocked at 2025-05-07 16:19:38.512355: {'port': 'Hong Kong'}
6. CargoUnloaded at 2025-05-07 16:19:38.512364: {'item': 'electronics', 'quantity': 200, 'port': 'Hong Kong'}

Events saved to ship_events.json
