## O que é Event Sourcing?

Event Sourcing garante que todo o estado de um objeto possa ser representado por um Fluxo de Eventos.

## Eventos de domínio

Os eventos podem ser implementados como qualquer estrutura de dados, sendo o único requisito que o Event Store os consiga ler e escrever.

In [1]:
from collections import namedtuple

BankAccountCreated = namedtuple('BankAccountCreated', ['id', 'owner'])
DepositPerformed = namedtuple('DepositPerformed', ['id', 'amount'])
OwnerChanged = namedtuple('OwnerChanged', ['id', 'new_owner'])
WithdrawalPerformed = namedtuple('WithdrawalPerformed', ['id', 'amount'])
TransferDebited = namedtuple('TransferDebited', ['id', 'amount', 'recipient_id'])
TransferCredited = namedtuple('TransferCredited', ['id', 'amount', 'source_id'])

## Objeto orientado a eventos

Precisamos de uma classe que consiga chamar os métodos apropriados para cada evento. Os eventos precisam ser enfileirados em um fluxo de eventos.

In [4]:
class MutationMixin:
  def __init__(self, events=None, *args, **kwargs):
    super().__init__(*args, **kwargs)
    if events is not None:
      for event in events:
        self.mutate(event)
  
  def mutate(self, event):
    method_name = f'when_{event.__class__.__name__.lower()}'
    method = getattr(self, method_name)
    method(event)

Agora nós precisamos definir um objeto para representar a conta, porém iremos separar a sua definição em duas classes: uma classe de alteração de estado e outra classe para comportamentos.

In [3]:
class BankAccountState(MutationMixin, ChangesMixin):
  def when_bankaccountcreated(self, event):
    self.id = event.id
    self.owner = event.owner
    self.balance = 0 

  def when_depositperformed(self, event):
    self.balance += event.amount

  def when_withdrawalperformed(self, event):
    self.balance -= event.amount

  def when_ownerchanged(self, event):
    self.owner = event.owner

  def when_transferdebited(self, event):
    self.balance -= event.amount

  def when_transfercredited(self, event):
    self.balance += event.amount

Agora precisamos de uma classe que aplique os eventos no objeto e os armazene em uma lista interna do objeto.

In [5]:
class ChangesMixin:
  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.changes = []

  def apply(self, event):
    self.changes.append(event)
    self.mutate(event)

Agora preicaos de uma classe que contém apenas os comportamentos e suas regras de negócio.

In [6]:
from uuid import uuid4

class BankAccount(BankAccountState, ChangesMixin):
  def __repr__(self):
    data = {
      'id': self.id,
      'owner': self.owner,
      'balance': self.balance
    }
    return f'<BankAccount: {data}>'

  def open_account(self, owner):
    self.apply(BankAccountCreated(
      id=uuid4(),
      owner=owner,
    ))
  
  def deposit(self, amount):
    self.apply(DepositPerformed(
      id=self.id,
      amount=amount,
    ))

  def change_owner(self, new_owner):
    self.apply(OwnerChanged(
      id=self.id,
      new_owner=new_owner,
    ))

  def withdraw(self, amount):
    if self.balance - amount < 0:
      raise ValueError(f"'{amount}' is not available")
    self.apply(WithdrawalPerformed(
      id=self.id,
      amount=amount,
    ))

  def receive_transfer(self, amount, source):
    self.apply(TransferCredited(
      id=self.id,
      amount=amount,
      source_id=source,
    ))

  def make_transfer(self, amount, recipient):
    if self.balance - amount < 0:
      raise ValueError(f"'{amount}' is not available")
    self.apply(TransferDebited(
      id=self.id,
      amount=amount,
      recipient_id=recipient.id,
    ))
    recipient.receive_transfer(amount, self)

## Event Store

Agora precisamos implementar um mecanismo de armazenamento de eventos. O Armazenamento de Eventos é dividido entre duas classes: `EventStore` e `AppendOnlyStore`. O `EventStore` é uma interface entre a aplicação e o mecanismo de persistência enquanto o `AppendOnlyStore` é uma interface para o mecanismo de persistência.

In [7]:
DataWithVersion = namedtuple('DataWithVersion', ['version', 'data'])
DataWithName = namedtuple('DataWithName', ['name', 'data'])

class AppendOnlyStoreConcurrencyException(Exception):
  def __init__(self, version, expected_version, name):
    self.version = version
    self.expected_version = expected_version
    self.name = name
    message = f"'{expected_version}' expected but was '{version}'"
    super().__init__(message)

class AppendOnlyStore:
  def __init__(self, conn):
    self.conn = conn

  def append(self, name, data, expected_version=-1):
    c = self.conn.cursor()
    sql = """
    SELECT COALESCE(MAX(Version), 0)
    FROM 'ES_Events'
    WHERE Name = :name
    """
    c.execute(sql, {'name': str(name)})
    (version,) = c.fetchone()
    if expected_version != -1 and version != expected_version:
      raise AppendOnlyStoreConcurrencyException(version, expected_version, name)

    txt = """
    INSERT INTO 'ES_Events' (Name, Version, Data)
    VALUES(:name, :version, :data)
    """
    c.execute(txt, {'name': str(name), 'version': version + 1, 'data': data})
    c.close()
  
  def fetchone(self, name, after_version, max_count):
    c = self.conn.cursor()
    sql = """
    SELECT Data, Version FROM 'ES_Events'
    WHERE Name = :name AND Version > :version
    ORDER BY Version
    LIMIT 0, :max_count
    """
    c.execute(sql, {'name': str(name), 'version': after_version, 'max_count': max_count})
    for row in c.fetchall():
      data, version = row
      yield DataWithVersion(version, data)
    c.close()

  def fetchall(self, after_version, max_count):
    c = self.conn.cursor()
    sql = """
    SELECT Data, Name FROM 'ES_Events'
    WHERE Version > :version
    ORDER BY Version
    LIMIT 0, :max_count
    """
    c.execute(sql, {'version': after_version, 'max_count': max_count})
    for row in c.fetchall():
      data, name = row
      yield DataWithName(name, data)
    c.close()

  def close(self):
    self.conn.close()

O EventStore precisa de algum formato de serialização para persistir o Fluxo de Eventos (JSON, XML, Protocol Buffers…).

In [11]:
import pickle

class OptimisticConcurrencyException(Exception):
  def __init__(self, version, expected_version, identity, events):
    self.version = version
    self.expected_version = expected_version
    self.identity = identity
    self.events = events
    message = f"'{expected_version}' expected but was '{version}'"
    super().__init__(message)

class EventStream:
  def __init__(self, version=-1, events=None):
    self.version = version
    if events is None:
      self.events = []
    else:
      self.events = list(events)

class EventStore():
  def __init__(self, append_only_store):
    self.append_only_store = append_only_store

  def save(self, identity: str, events: list, expected_version: int = -1):
    data = self.serialize_event(events)
    try:
      self.append_only_store.append(identity, data, expected_version)
    except AppendOnlyStoreConcurrencyException as e:
      # carrega os eventos do servidor
      server = self.load(identity)
      raise OptimisticConcurrencyException(server.version, e.expected_version, identity, server.events)

  def load(self, identity: str, skip_events: int = 0, max_count: int = -1):
    records = self.append_only_store.fetchone(identity, skip_events, max_count)
    stream = EventStream()
    for record in records:
      stream.events.extend(self.deserialize_event(record.data))
      stream.version = record.version
    return stream

  def serialize_event(self, event):
    return pickle.dumps(event)

  def deserialize_event(self, event):
    return pickle.loads(event)

Por fim, agora vamos fazer o conjunto funcionar.

In [12]:
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute(open('sql/events.sql', 'r').read())
# armazenamento de eventos para acessar o fluxo de eventos
append_only_store = AppendOnlyStore(conn)
event_store = EventStore(append_only_store)

[BankAccountCreated(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), owner='João Gabriel'), DepositPerformed(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=20000), TransferDebited(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=10000, recipient_id=UUID('e48feaf5-a5f4-4f8e-ae72-a7d8c37dd0a0'))]
[BankAccountCreated(id=UUID('e48feaf5-a5f4-4f8e-ae72-a7d8c37dd0a0'), owner='Henrique'), DepositPerformed(id=UUID('e48feaf5-a5f4-4f8e-ae72-a7d8c37dd0a0'), amount=3000), TransferCredited(id=UUID('e48feaf5-a5f4-4f8e-ae72-a7d8c37dd0a0'), amount=10000, source_id=<BankAccount: {'id': UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), 'owner': 'João Gabriel', 'balance': 10000}>)]


In [22]:
bank_account = BankAccount()
bank_account.open_account(owner='João Gabriel')
bank_account.changes

[BankAccountCreated(id=UUID('9f63d813-52cd-47b9-953f-6f1e0719aa70'), owner='João Gabriel')]

In [25]:
bank_account.deposit(amount=20)
event_store.save(bank_account.id, bank_account.changes)
bank_account.changes

[BankAccountCreated(id=UUID('9f63d813-52cd-47b9-953f-6f1e0719aa70'), owner='João Gabriel'),
 DepositPerformed(id=UUID('9f63d813-52cd-47b9-953f-6f1e0719aa70'), amount=20),
 DepositPerformed(id=UUID('9f63d813-52cd-47b9-953f-6f1e0719aa70'), amount=20),
 DepositPerformed(id=UUID('9f63d813-52cd-47b9-953f-6f1e0719aa70'), amount=20)]

In [26]:
stream = event_store.load(bank_account.id)
BankAccount(stream.events)

<BankAccount: {'id': UUID('9f63d813-52cd-47b9-953f-6f1e0719aa70'), 'owner': 'João Gabriel', 'balance': 60}>

In [None]:
bank_account1 = BankAccount()
bank_account1.open_account(owner='João Gabriel')
bank_account1.changes

In [28]:
bank_account1.deposit(amount=20000)
bank_account1.changes

[BankAccountCreated(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), owner='João Gabriel'),
 DepositPerformed(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=20000),
 TransferDebited(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=10000, recipient_id=UUID('e48feaf5-a5f4-4f8e-ae72-a7d8c37dd0a0')),
 DepositPerformed(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=20000),
 DepositPerformed(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=20000)]

In [29]:
bank_account2 = BankAccount()
bank_account2.open_account(owner='Henrique')
bank_account2.changes

[BankAccountCreated(id=UUID('f1fe136a-6c73-441d-9291-a98d01f9d3b4'), owner='Henrique')]

In [30]:
bank_account2.deposit(amount=3000)
bank_account2.changes

[BankAccountCreated(id=UUID('f1fe136a-6c73-441d-9291-a98d01f9d3b4'), owner='Henrique'),
 DepositPerformed(id=UUID('f1fe136a-6c73-441d-9291-a98d01f9d3b4'), amount=3000)]

In [33]:
bank_account1.make_transfer(10000, bank_account2)
bank_account1.changes, bank_account2.changes

([BankAccountCreated(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), owner='João Gabriel'),
  DepositPerformed(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=20000),
  TransferDebited(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=10000, recipient_id=UUID('e48feaf5-a5f4-4f8e-ae72-a7d8c37dd0a0')),
  DepositPerformed(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=20000),
  DepositPerformed(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=20000),
  TransferDebited(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=10000, recipient_id=UUID('f1fe136a-6c73-441d-9291-a98d01f9d3b4')),
  TransferDebited(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=10000, recipient_id=UUID('f1fe136a-6c73-441d-9291-a98d01f9d3b4')),
  TransferDebited(id=UUID('ca8fab0c-e7d1-45da-9a34-1beec980ef75'), amount=10000, recipient_id=UUID('f1fe136a-6c73-441d-9291-a98d01f9d3b4'))],
 [BankAccountCreated(id=UUID('f1fe136a-6c73-441d-9291-a98d01f9d3b4'), owner='Henrique'),
  De