Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ Reference https://github.com/cosmicpython/code
- [4️⃣ CHAPTER_04 usecase](https://github.com/sawaca96/architecture-patterns-with-python/pull/1)
- [6️⃣ CHAPTER_06 unit of work](https://github.com/sawaca96/architecture-patterns-with-python/pull/2#pullrequestreview-1265028411)
- [7️⃣ CHAPTER_07 aggregate](https://github.com/sawaca96/architecture-patterns-with-python/pull/3)
- [8️⃣ CHAPTER_08 event and messagebug](https://github.com/sawaca96/architecture-patterns-with-python/pull/4)
4 changes: 2 additions & 2 deletions app/allocation/adapters/email.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any


def send_mail(*args: Any) -> None:
def send(*args: Any) -> None:
"""Send an email to the user."""
print("Sending email to user")
print("Sending email to user", *args)
23 changes: 22 additions & 1 deletion app/allocation/adapters/repository.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import abc
from uuid import UUID

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import selectinload, subqueryload

from app.allocation.domain import models

Expand All @@ -25,6 +26,12 @@ async def get(self, sku: str) -> models.Product:
self._seen.add(product)
return product

async def get_by_batch_id(self, batch_id: UUID) -> models.Product:
product = await self._get_by_batch_id(batch_id)
if product:
self._seen.add(product)
return product

@abc.abstractmethod
async def _add(self, product: models.Product) -> None:
raise NotImplementedError
Expand All @@ -33,6 +40,10 @@ async def _add(self, product: models.Product) -> None:
async def _get(self, sku: str) -> models.Product:
raise NotImplementedError

@abc.abstractmethod
async def _get_by_batch_id(self, batch_id: UUID) -> models.Product:
raise NotImplementedError


class PGProductRepository(AbstractProductRepository):
def __init__(self, session: AsyncSession) -> None:
Expand All @@ -55,3 +66,13 @@ async def _get(self, sku: str) -> models.Product:
)
# TODO: joinedload?
return result.scalar_one_or_none()

async def _get_by_batch_id(self, batch_id: UUID) -> models.Product:
result = await self._session.execute(
sa.select(models.Product)
.where(models.Product.batches.any(models.Batch.id == batch_id)) # type: ignore
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

정적분석시에는 batches가 orm이 아니기 때문에 타입에러 발생

.options(
subqueryload(models.Product.batches).options(subqueryload(models.Batch.allocations))
)
)
return result.scalar_one_or_none()
23 changes: 23 additions & 0 deletions app/allocation/domain/events.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
from dataclasses import dataclass
from datetime import date
from uuid import UUID


class Event:
pass


@dataclass
class BatchCreated(Event):
id: UUID
sku: str
qty: int
eta: date = None


@dataclass
class BatchQuantityChanged(Event):
id: UUID
qty: int


@dataclass
class AllocationRequired(Event):
order_id: UUID
sku: str
qty: int


@dataclass
class OutOfStock(Event):
sku: str
12 changes: 9 additions & 3 deletions app/allocation/domain/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ def allocate(self, line: OrderLine) -> None:
if self.can_allocate(line):
self.allocations.add(line)

def deallocate(self, line: OrderLine) -> None:
if line in self.allocations:
self.allocations.remove(line)
def deallocate_one(self) -> OrderLine:
return self.allocations.pop()

@property
def allocated_quantity(self) -> int:
Expand Down Expand Up @@ -81,5 +80,12 @@ def allocate(self, line: OrderLine) -> UUID:
self.events.append(events.OutOfStock(sku=line.sku))
return None

def change_batch_quantity(self, id: UUID, qty: int) -> None:
batch = next(b for b in self.batches if b.id == id)
batch.qty = qty
while batch.available_quantity < 0:
line = batch.deallocate_one()
self.events.append(events.AllocationRequired(line.id, line.sku, line.qty))

def __hash__(self) -> int:
return hash(self.sku)
13 changes: 9 additions & 4 deletions app/allocation/routers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from fastapi import Body, Depends, FastAPI, HTTPException

from app.allocation.adapters.repository import AbstractProductRepository
from app.allocation.domain import events
from app.allocation.routers.dependencies import batch_uow
from app.allocation.service_layer import services
from app.allocation.service_layer import messagebus
from app.allocation.service_layer.handlers import InvalidSku
from app.allocation.service_layer.unit_of_work import AbstractUnitOfWork

app = FastAPI()
Expand All @@ -25,7 +27,8 @@ async def add_batch(
eta: date = Body(default=None),
uow: AbstractUnitOfWork[AbstractProductRepository] = Depends(batch_uow),
) -> dict[str, str]:
await services.add_batch(batch_id, sku, quantity, eta, uow)
event = events.BatchCreated(batch_id, sku, quantity, eta)
await messagebus.handle(event, uow)
return {"message": "success"}


Expand All @@ -37,7 +40,9 @@ async def allocate(
uow: AbstractUnitOfWork[AbstractProductRepository] = Depends(batch_uow),
) -> dict[str, str]:
try:
batch_id = await services.allocate(line_id, sku, quantity, uow)
except services.InvalidSku as e:
event = events.AllocationRequired(line_id, sku, quantity)
results = await messagebus.handle(event, uow)
batch_id = results[0]
except InvalidSku as e:
raise HTTPException(status_code=400, detail=str(e))
return {"batch_id": str(batch_id)}
55 changes: 55 additions & 0 deletions app/allocation/service_layer/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from uuid import UUID

from app.allocation.adapters import email
from app.allocation.adapters.repository import AbstractProductRepository
from app.allocation.domain import events, models
from app.allocation.service_layer import unit_of_work


class InvalidSku(Exception):
pass


async def add_batch(
event: events.BatchCreated,
uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository],
) -> None:
async with uow:
product = await uow.repo.get(event.sku)
if product is None:
product = models.Product(sku=event.sku, batches=[])
await uow.repo.add(product)
product.batches.append(
models.Batch(id=event.id, sku=event.sku, qty=event.qty, eta=event.eta)
)
await uow.commit()


async def allocate(
event: events.AllocationRequired,
uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository],
) -> UUID:
line = models.OrderLine(id=event.order_id, sku=event.sku, qty=event.qty)
async with uow:
product = await uow.repo.get(line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batch_id = product.allocate(line)
await uow.commit()
return batch_id


async def change_batch_quantity(
event: events.BatchQuantityChanged,
uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository],
) -> None:
async with uow:
product = await uow.repo.get_by_batch_id(event.id)
product.change_batch_quantity(event.id, event.qty)
await uow.commit()


def send_out_of_stock_notification(
event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository]
) -> None:
email.send("stock@made.com", f"Out of stock for {event.sku}")
42 changes: 27 additions & 15 deletions app/allocation/service_layer/messagebus.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
from app.allocation.adapters import email
from app.allocation.domain import events


async def handle(event: events.Event) -> None:
if isinstance(event, events.OutOfStock):
await send_out_of_stock_notification(event)
else:
raise Exception(f"Unknown event {event}")
from typing import Any


async def send_out_of_stock_notification(event: events.OutOfStock) -> None:
email.send_mail("stock@made.com", f"Out of stock for {event.sku}")
from app.allocation.domain import events
from app.allocation.service_layer import handlers, unit_of_work


HANDLERS = {
events.OutOfStock: [send_out_of_stock_notification],
}
# TODO: 이렇게 하는거 맞나?
async def handle(
event: events.Event,
uow: unit_of_work.AbstractUnitOfWork[unit_of_work.AbstractProductRepository],
) -> list[Any]:
results = []
queue = [event]
while queue:
event = queue.pop(0)
result = None
if isinstance(event, events.OutOfStock):
handlers.send_out_of_stock_notification(event, uow)
elif isinstance(event, events.BatchQuantityChanged):
await handlers.change_batch_quantity(event, uow)
elif isinstance(event, events.AllocationRequired):
result = await handlers.allocate(event, uow)
elif isinstance(event, events.BatchCreated):
await handlers.add_batch(event, uow)
else:
raise Exception(f"Unknown event {event}")
if result:
results.append(result)
queue.extend(uow.collect_new_events())
return results
42 changes: 0 additions & 42 deletions app/allocation/service_layer/services.py

This file was deleted.

9 changes: 4 additions & 5 deletions app/allocation/service_layer/unit_of_work.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import abc
from typing import Any, Generic, TypeVar
from collections.abc import Generator

from app.allocation.adapters.db import SESSION_FACTORY
from app.allocation.adapters.repository import (
AbstractProductRepository,
PGProductRepository,
)
from app.allocation.service_layer import messagebus
from app.allocation.domain.events import Event
from app.config import get_config

config = get_config()
Expand All @@ -29,13 +30,11 @@ def repo(self) -> Repo:

async def commit(self) -> None:
await self._commit()
await self._publish_events()

async def _publish_events(self) -> None:
def collect_new_events(self) -> Generator[Event, None, None]:
for product in self.repo._seen:
while product.events:
event = product.events.pop(0)
await messagebus.handle(event)
yield product.events.pop(0)

@abc.abstractmethod
async def _commit(self) -> None:
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/test_uow.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,29 @@ async def allocate(order: models.OrderLine) -> None:
dict(sku="RETRO-CLOCK"),
)
assert len(orders.all()) == 1


async def test_get_by_batch_id(session: AsyncSession) -> None:
b1 = models.Batch(
id=UUID("c1dac0a1-b8e8-4052-a7e4-67061204d4d9"), sku="sku1", qty=100, eta=None
)
b2 = models.Batch(
id=UUID("c3f04384-8fdf-4d89-8616-9efec913092f"), sku="sku1", qty=100, eta=None
)
b3 = models.Batch(
id=UUID("cf15ab56-082a-4495-8422-b42cbb5ac1e5"), sku="sku2", qty=100, eta=None
)
p1 = models.Product(sku="sku1", batches=[b1, b2])
p2 = models.Product(sku="sku2", batches=[b3])
session.add(p1)
session.add(p2)
await session.commit()

async with ProductUnitOfWork() as uow:
actual1 = await uow.repo.get_by_batch_id(UUID("c1dac0a1-b8e8-4052-a7e4-67061204d4d9"))
actual2 = await uow.repo.get_by_batch_id(UUID("c3f04384-8fdf-4d89-8616-9efec913092f"))
actual3 = await uow.repo.get_by_batch_id(UUID("cf15ab56-082a-4495-8422-b42cbb5ac1e5"))
assert actual1.sku == "sku1"
assert actual2.sku == "sku1"
assert actual3.sku == "sku2"
await uow.commit()
25 changes: 0 additions & 25 deletions tests/unit/test_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,3 @@ def test_allocation_is_idempotent() -> None:
batch.allocate(line)
# Then
assert batch.available_quantity == 0


def test_deallocate() -> None:
# Given
batch = Batch(sku="SMALL-FORK", qty=10)
line = OrderLine(sku="SMALL-FORK", qty=10)

# When
batch.allocate(line)
batch.deallocate(line)

# Then
assert batch.available_quantity == 10


def test_can_only_deallocate_allocated_lines() -> None:
# Given
batch = Batch(sku="SMALL-FORK", qty=10)
line = OrderLine(sku="SMALL-FORK", qty=10)

# When
batch.deallocate(line)

# Then
assert batch.available_quantity == 10
Loading