Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions app/allocation/domain/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dataclasses import dataclass
from datetime import date
from uuid import UUID


class Command:
pass


@dataclass
class CreateBatch(Command):
id: UUID
sku: str
qty: int
eta: date = None


@dataclass
class ChangeBatchQuantity(Command):
id: UUID
qty: int


@dataclass
class Allocate(Command):
order_id: UUID
sku: str
qty: int
23 changes: 0 additions & 23 deletions app/allocation/domain/events.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,10 @@
from dataclasses import dataclass
from datetime import date
from uuid import UUID


class Event:
pass


@dataclass
class BatchCreated(Event):
id: UUID
sku: str
qty: int
eta: date = None


@dataclass
class BatchQuantityChanged(Event):
id: UUID
qty: int


@dataclass
class AllocationRequired(Event):
order_id: UUID
sku: str
qty: int


@dataclass
class OutOfStock(Event):
sku: str
6 changes: 3 additions & 3 deletions app/allocation/domain/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import date
from uuid import UUID, uuid4

from app.allocation.domain import events
from app.allocation.domain import commands, events


@dataclass(unsafe_hash=True, kw_only=True)
Expand Down Expand Up @@ -68,7 +68,7 @@ class Product:
sku: str
batches: list[Batch]
version_number: int = 0
events: list[events.Event] = field(default_factory=lambda: [])
events: list[events.Event | commands.Command] = field(default_factory=lambda: [])

def allocate(self, line: OrderLine) -> UUID:
try:
Expand All @@ -85,7 +85,7 @@ def change_batch_quantity(self, id: UUID, qty: int) -> None:
batch.qty = qty
while batch.available_quantity < 0:
line = batch.deallocate_one()
self.events.append(events.AllocationRequired(line.id, line.sku, line.qty))
self.events.append(commands.Allocate(line.id, line.sku, line.qty))

def __hash__(self) -> int:
return hash(self.sku)
10 changes: 5 additions & 5 deletions app/allocation/routers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi import Body, Depends, FastAPI, HTTPException

from app.allocation.adapters.repository import AbstractProductRepository
from app.allocation.domain import events
from app.allocation.domain import commands
from app.allocation.routers.dependencies import batch_uow
from app.allocation.service_layer import messagebus
from app.allocation.service_layer.handlers import InvalidSku
Expand All @@ -27,8 +27,8 @@ async def add_batch(
eta: date = Body(default=None),
uow: AbstractUnitOfWork[AbstractProductRepository] = Depends(batch_uow),
) -> dict[str, str]:
event = events.BatchCreated(batch_id, sku, quantity, eta)
await messagebus.handle(event, uow)
cmd = commands.CreateBatch(batch_id, sku, quantity, eta)
await messagebus.handle(cmd, uow)
return {"message": "success"}


Expand All @@ -40,8 +40,8 @@ async def allocate(
uow: AbstractUnitOfWork[AbstractProductRepository] = Depends(batch_uow),
) -> dict[str, str]:
try:
event = events.AllocationRequired(line_id, sku, quantity)
results = await messagebus.handle(event, uow)
cmd = commands.Allocate(line_id, sku, quantity)
results = await messagebus.handle(cmd, uow)
batch_id = results[0]
except InvalidSku as e:
raise HTTPException(status_code=400, detail=str(e))
Expand Down
22 changes: 10 additions & 12 deletions app/allocation/service_layer/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from app.allocation.adapters import email
from app.allocation.adapters.repository import AbstractProductRepository
from app.allocation.domain import events, models
from app.allocation.domain import commands, events, models
from app.allocation.service_layer import unit_of_work


Expand All @@ -11,25 +11,23 @@ class InvalidSku(Exception):


async def add_batch(
event: events.BatchCreated,
cmd: commands.CreateBatch,
uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository],
) -> None:
async with uow:
product = await uow.repo.get(event.sku)
product = await uow.repo.get(cmd.sku)
if product is None:
product = models.Product(sku=event.sku, batches=[])
product = models.Product(sku=cmd.sku, batches=[])
await uow.repo.add(product)
product.batches.append(
models.Batch(id=event.id, sku=event.sku, qty=event.qty, eta=event.eta)
)
product.batches.append(models.Batch(id=cmd.id, sku=cmd.sku, qty=cmd.qty, eta=cmd.eta))
await uow.commit()


async def allocate(
event: events.AllocationRequired,
cmd: commands.Allocate,
uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository],
) -> UUID:
line = models.OrderLine(id=event.order_id, sku=event.sku, qty=event.qty)
line = models.OrderLine(id=cmd.order_id, sku=cmd.sku, qty=cmd.qty)
async with uow:
product = await uow.repo.get(line.sku)
if product is None:
Expand All @@ -40,12 +38,12 @@ async def allocate(


async def change_batch_quantity(
event: events.BatchQuantityChanged,
cmd: commands.ChangeBatchQuantity,
uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository],
) -> None:
async with uow:
product = await uow.repo.get_by_batch_id(event.id)
product.change_batch_quantity(event.id, event.qty)
product = await uow.repo.get_by_batch_id(cmd.id)
product.change_batch_quantity(cmd.id, cmd.qty)
await uow.commit()


Expand Down
59 changes: 43 additions & 16 deletions app/allocation/service_layer/messagebus.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,57 @@
from typing import Any

from app.allocation.domain import events
from app.allocation.domain import commands, events
from app.allocation.service_layer import handlers, unit_of_work

Message = commands.Command | events.Event


# TODO: 이렇게 하는거 맞나?
async def handle(
event: events.Event,
message: Message,
uow: unit_of_work.AbstractUnitOfWork[unit_of_work.AbstractProductRepository],
) -> list[Any]:
results = []
queue = [event]
queue = [message]
while queue:
event = queue.pop(0)
message = queue.pop(0)
result = None
if isinstance(message, events.Event):
await handle_event(message, queue, uow)
elif isinstance(message, commands.Command):
result = await handle_command(message, queue, uow)
results.append(result)
else:
raise Exception(f"Unknown message {message}")
return results


async def handle_event(
event: events.Event,
queue: list[Message],
uow: unit_of_work.AbstractUnitOfWork[unit_of_work.AbstractProductRepository],
) -> None:
try:
if isinstance(event, events.OutOfStock):
handlers.send_out_of_stock_notification(event, uow)
elif isinstance(event, events.BatchQuantityChanged):
await handlers.change_batch_quantity(event, uow)
elif isinstance(event, events.AllocationRequired):
result = await handlers.allocate(event, uow)
elif isinstance(event, events.BatchCreated):
await handlers.add_batch(event, uow)
else:
raise Exception(f"Unknown event {event}")
if result:
results.append(result)
queue.extend(uow.collect_new_events())
return results
except Exception:
return


async def handle_command(
command: commands.Command,
queue: list[Message],
uow: unit_of_work.AbstractUnitOfWork[unit_of_work.AbstractProductRepository],
) -> Any:
try:
result = None
if isinstance(command, commands.CreateBatch):
await handlers.add_batch(command, uow)
elif isinstance(command, commands.ChangeBatchQuantity):
await handlers.change_batch_quantity(command, uow)
elif isinstance(command, commands.Allocate):
result = await handlers.allocate(command, uow)
queue.extend(uow.collect_new_events())
return result
except Exception:
raise
5 changes: 3 additions & 2 deletions app/allocation/service_layer/unit_of_work.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from __future__ import annotations

import abc
from typing import Any, Generic, TypeVar
from collections.abc import Generator
from typing import Any, Generic, TypeVar

from app.allocation.adapters.db import SESSION_FACTORY
from app.allocation.adapters.repository import (
AbstractProductRepository,
PGProductRepository,
)
from app.allocation.domain.commands import Command
from app.allocation.domain.events import Event
from app.config import get_config

Expand All @@ -31,7 +32,7 @@ def repo(self) -> Repo:
async def commit(self) -> None:
await self._commit()

def collect_new_events(self) -> Generator[Event, None, None]:
def collect_new_events(self) -> Generator[Event | Command, None, None]:
for product in self.repo._seen:
while product.events:
yield product.events.pop(0)
Expand Down
39 changes: 19 additions & 20 deletions tests/unit/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest

from app.allocation.adapters import repository
from app.allocation.domain import events, models
from app.allocation.domain import commands, models
from app.allocation.service_layer import handlers, messagebus, unit_of_work


Expand Down Expand Up @@ -48,18 +48,19 @@ async def rollback(self) -> None:
class TestAddBatch:
async def test_for_new_product(self) -> None:
uow = FakeUnitOfWork()
await messagebus.handle(events.BatchCreated(uuid4(), "CRUNCHY-ARMCHAIR", 100), uow)
await messagebus.handle(commands.CreateBatch(uuid4(), "CRUNCHY-ARMCHAIR", 100), uow)
assert uow.repo.get("CRUNCHY-ARMCHAIR") is not None
assert uow.committed

async def test_for_existing_product(self) -> None:
uow = FakeUnitOfWork()
await messagebus.handle(
events.BatchCreated(UUID("5103f447-4568-4615-bc28-70447d1a7436"), "GARISH-RUG", 100),
commands.CreateBatch(UUID("5103f447-4568-4615-bc28-70447d1a7436"), "GARISH-RUG", 100),
uow,
)
await messagebus.handle(
events.BatchCreated(UUID("6c381ae2-c9fc-4b52-aacb-3e496f0aacef"), "GARISH-RUG", 99), uow
commands.CreateBatch(UUID("6c381ae2-c9fc-4b52-aacb-3e496f0aacef"), "GARISH-RUG", 99),
uow,
)
assert UUID("5103f447-4568-4615-bc28-70447d1a7436") in [
b.id for b in (await uow.repo.get("GARISH-RUG")).batches
Expand All @@ -73,32 +74,30 @@ class TestAllocate:
async def test_returns_allocation(self) -> None:
uow = FakeUnitOfWork()
await messagebus.handle(
events.BatchCreated(
commands.CreateBatch(
UUID("b4cf5213-6e1f-46cc-8302-aac1f12ac617"), "COMPLICATED-LAMP", 100
),
uow,
)
results = await messagebus.handle(
events.AllocationRequired(uuid4(), "COMPLICATED-LAMP", 10), uow
)
results = await messagebus.handle(commands.Allocate(uuid4(), "COMPLICATED-LAMP", 10), uow)
assert results.pop(0) == UUID("b4cf5213-6e1f-46cc-8302-aac1f12ac617")

async def test_errors_for_invalid_sku(self) -> None:
uow = FakeUnitOfWork()
with pytest.raises(handlers.InvalidSku, match="Invalid sku NONEXISTENTSKU"):
await messagebus.handle(events.AllocationRequired(uuid4(), "NONEXISTENTSKU", 10), uow)
await messagebus.handle(commands.Allocate(uuid4(), "NONEXISTENTSKU", 10), uow)

async def test_commits(self) -> None:
uow = FakeUnitOfWork()
await messagebus.handle(events.BatchCreated(uuid4(), "OMINOUS-MIRROR", 100, None), uow)
await messagebus.handle(events.AllocationRequired(uuid4(), "OMINOUS-MIRROR", 10), uow)
await messagebus.handle(commands.CreateBatch(uuid4(), "OMINOUS-MIRROR", 100, None), uow)
await messagebus.handle(commands.Allocate(uuid4(), "OMINOUS-MIRROR", 10), uow)
assert uow.committed

async def test_sends_email_on_out_of_stock_error(self) -> None:
uow = FakeUnitOfWork()
await messagebus.handle(events.BatchCreated(uuid4(), "POPULAR-CURTAINS", 9, None), uow)
await messagebus.handle(commands.CreateBatch(uuid4(), "POPULAR-CURTAINS", 9, None), uow)
with mock.patch("app.allocation.adapters.email.send") as mock_send_mail:
await messagebus.handle(events.AllocationRequired(uuid4(), "POPULAR-CURTAINS", 10), uow)
await messagebus.handle(commands.Allocate(uuid4(), "POPULAR-CURTAINS", 10), uow)
assert mock_send_mail.call_args == mock.call(
"stock@made.com", "Out of stock for POPULAR-CURTAINS"
)
Expand All @@ -108,7 +107,7 @@ class TestChangeBatchQuantity:
async def test_changes_available_quantity(self) -> None:
uow = FakeUnitOfWork()
await messagebus.handle(
events.BatchCreated(
commands.CreateBatch(
UUID("05e2c957-154b-4dcf-9d24-d172f26e4b12"), "ADORABLE-SETTEE", 100, None
),
uow,
Expand All @@ -117,20 +116,20 @@ async def test_changes_available_quantity(self) -> None:
assert batch.available_quantity == 100

await messagebus.handle(
events.BatchQuantityChanged(UUID("05e2c957-154b-4dcf-9d24-d172f26e4b12"), 50), uow
commands.ChangeBatchQuantity(UUID("05e2c957-154b-4dcf-9d24-d172f26e4b12"), 50), uow
)

assert batch.available_quantity == 50

async def test_reallocates_if_necessary(self) -> None:
uow = FakeUnitOfWork()
event_history = [
events.BatchCreated(
commands.CreateBatch(
UUID("874c6d0d-84e6-4307-b9d5-e23ec78bb727"), "INDIFFERENT-TABLE", 50, None
),
events.BatchCreated(uuid4(), "INDIFFERENT-TABLE", 50, date.today()),
events.AllocationRequired(uuid4(), "INDIFFERENT-TABLE", 20),
events.AllocationRequired(uuid4(), "INDIFFERENT-TABLE", 20),
commands.CreateBatch(uuid4(), "INDIFFERENT-TABLE", 50, date.today()),
commands.Allocate(uuid4(), "INDIFFERENT-TABLE", 20),
commands.Allocate(uuid4(), "INDIFFERENT-TABLE", 20),
]
for e in event_history:
await messagebus.handle(e, uow)
Expand All @@ -139,7 +138,7 @@ async def test_reallocates_if_necessary(self) -> None:
assert batch2.available_quantity == 50

await messagebus.handle(
events.BatchQuantityChanged(UUID("874c6d0d-84e6-4307-b9d5-e23ec78bb727"), 25), uow
commands.ChangeBatchQuantity(UUID("874c6d0d-84e6-4307-b9d5-e23ec78bb727"), 25), uow
)

# order1 or order2 will be deallocated, so we'll have 25 - 20
Expand Down