Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions app/allocation/adapters/dao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession

from app.allocation.adapters.dto import Allocation


async def allocations(sku: str, session: AsyncSession) -> list[Allocation]:
result = await session.execute(
sa.text(
"SELECT allocations_view.order_id, allocations_view.batch_id, "
'allocations_view.sku, "order".qty FROM allocations_view JOIN "order" '
'ON allocations_view.order_id = "order".id WHERE allocations_view.sku = :sku'
),
dict(sku=sku),
)
return [Allocation(**dict(r)) for r in result.fetchall()]
10 changes: 10 additions & 0 deletions app/allocation/adapters/dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from uuid import UUID

from pydantic import BaseModel


class Allocation(BaseModel):
order_id: UUID
sku: str
qty: int
batch_id: UUID
35 changes: 21 additions & 14 deletions app/allocation/adapters/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
mapper_registry = registry()
metadata = mapper_registry.metadata

order_line_table = sa.Table(
"order_line",
order_table = sa.Table(
"order",
metadata,
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("sku", sa.String),
Expand All @@ -19,7 +19,7 @@
"batch",
metadata,
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("sku", sa.ForeignKey("products.sku")),
sa.Column("sku", sa.ForeignKey("product.sku")),
sa.Column("qty", sa.Integer),
sa.Column("eta", sa.Date, nullable=True),
)
Expand All @@ -28,32 +28,39 @@
"allocation",
metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("order_line_id", sa.ForeignKey("order_line.id")),
sa.Column("order_id", sa.ForeignKey("order.id")),
sa.Column("batch_id", sa.ForeignKey("batch.id")),
sa.UniqueConstraint("order_id", "batch_id"),
)

products = sa.Table(
"products",
product_table = sa.Table(
"product",
metadata,
sa.Column("sku", sa.String, primary_key=True),
sa.Column("version_number", sa.Integer, nullable=False, server_default="0"),
)

allocations_view = sa.Table(
"allocations_view",
metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("order_id", UUID),
sa.Column("sku", sa.String(255)),
sa.Column("batch_id", UUID),
sa.UniqueConstraint("order_id", "batch_id"),
)


def start_mappers() -> None:
line_mapper = mapper_registry.map_imperatively(models.OrderLine, order_line_table)
order_mapper = mapper_registry.map_imperatively(models.Order, order_table)
batches_mapper = mapper_registry.map_imperatively(
models.Batch,
batch_table,
properties={
"allocations": relationship(
line_mapper, secondary=allocation_table, collection_class=set
)
},
properties={"allocations": relationship(order_mapper, secondary=allocation_table, collection_class=set)},
)
mapper_registry.map_imperatively(
models.Product,
products,
product_table,
properties={"batches": relationship(batches_mapper)},
version_id_col=products.c.version_number,
version_id_col=product_table.c.version_number,
)
3 changes: 2 additions & 1 deletion app/allocation/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
LINE_ALLOCATED_CHANNEL = "allocation:line_allocated:v1"
ORDER_DEALLOCATED_CHANNEL = "allocation:order_deallocated:v1"
ORDER_ALLOCATED_CHANNEL = "allocation:order_allocated:v1"
BATCH_QUANTITY_CHANGED_CHANNEL = "allocation:batch_quantity_changed:v1"
1 change: 1 addition & 0 deletions app/allocation/domain/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ class ChangeBatchQuantity(Command):

@dataclass
class Allocate(Command):
order_id: UUID
sku: str
qty: int
7 changes: 7 additions & 0 deletions app/allocation/domain/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,10 @@ class Allocated(Event):
@dataclass
class OutOfStock(Event):
sku: str


@dataclass
class Deallocated(Event):
order_id: UUID
sku: str
qty: int
38 changes: 17 additions & 21 deletions app/allocation/domain/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


@dataclass(unsafe_hash=True, kw_only=True)
class OrderLine:
class Order:
id: UUID = field(default_factory=uuid4)
sku: str
qty: int
Expand All @@ -18,7 +18,7 @@ class Batch:
sku: str
eta: date = None
qty: int
allocations: set[OrderLine] = field(default_factory=lambda: set())
allocations: set[Order] = field(default_factory=lambda: set())

def __repr__(self) -> str:
return f"<Batch {self.id}>"
Expand All @@ -38,27 +38,23 @@ def __gt__(self, other: Batch) -> bool:
return True
return self.eta > other.eta

def allocate(self, line: OrderLine) -> None:
if self.can_allocate(line):
self.allocations.add(line)
def allocate(self, order: Order) -> None:
if self.can_allocate(order):
self.allocations.add(order)

def deallocate_one(self) -> OrderLine:
def deallocate_one(self) -> Order:
return self.allocations.pop()

@property
def allocated_quantity(self) -> int:
return sum(line.qty for line in self.allocations)
return sum(order.qty for order in self.allocations)

@property
def available_quantity(self) -> int:
return self.qty - self.allocated_quantity

def can_allocate(self, line: OrderLine) -> bool:
return (
self.sku == line.sku
and self.available_quantity >= line.qty
and line not in self.allocations
)
def can_allocate(self, order: Order) -> bool:
return self.sku == order.sku and self.available_quantity >= order.qty and order not in self.allocations


@dataclass(kw_only=True)
Expand All @@ -67,23 +63,23 @@ class Product:
batches: list[Batch]
version_number: int = 0

def allocate(self, line: OrderLine) -> UUID:
def allocate(self, order: Order) -> UUID:
try:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
batch.allocate(line)
batch = next(b for b in sorted(self.batches) if b.can_allocate(order))
batch.allocate(order)
self.version_number += 1
return batch.id
except StopIteration:
return None

def change_batch_quantity(self, id: UUID, qty: int) -> list[OrderLine]:
def change_batch_quantity(self, id: UUID, qty: int) -> list[Order]:
batch = next(b for b in self.batches if b.id == id)
batch.qty = qty
deallocated_lines = []
deallocated_orders = []
while batch.available_quantity < 0:
line = batch.deallocate_one()
deallocated_lines.append(line)
return deallocated_lines
order = batch.deallocate_one()
deallocated_orders.append(order)
return deallocated_orders

def __hash__(self) -> int:
return hash(self.sku)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from app.allocation.adapters.db import DB
from app.allocation.adapters.repository import AbstractProductRepository, PGProductRepository
from app.allocation.service_layer.unit_of_work import AbstractUnitOfWork, ProductUnitOfWork
from app.allocation.service_layer.unit_of_work import AbstractUnitOfWork, PGUnitOfWork
from app.config import config


Expand All @@ -26,5 +26,5 @@ def repository(
return PGProductRepository(session)


def batch_uow() -> AbstractUnitOfWork[AbstractProductRepository]:
return ProductUnitOfWork()
def batch_uow() -> AbstractUnitOfWork:
return PGUnitOfWork()
56 changes: 56 additions & 0 deletions app/allocation/entrypoints/restapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from datetime import date
from uuid import uuid4

from fastapi import Body, Depends, FastAPI, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

from app.allocation.adapters import dao
from app.allocation.adapters.dto import Allocation
from app.allocation.adapters.orm import start_mappers
from app.allocation.domain import commands
from app.allocation.entrypoints.dependencies import batch_uow, session
from app.allocation.service_layer import handlers
from app.allocation.service_layer.handlers import InvalidSku
from app.allocation.service_layer.unit_of_work import AbstractUnitOfWork

app = FastAPI()
start_mappers()


@app.get("/")
async def root() -> dict[str, str]:
return {"message": "Hello World"}


@app.post("/batches", status_code=201)
async def add_batch(
sku: str = Body(),
quantity: int = Body(),
eta: date = Body(default=None),
uow: AbstractUnitOfWork = Depends(batch_uow),
) -> dict[str, str]:
cmd = commands.CreateBatch(uuid4(), sku, quantity, eta)
await handlers.CreateBatchCmdHandler(uow).handle(cmd)
return {"batch_id": str(cmd.id)}


@app.post("/allocate", response_model=dict[str, str], status_code=201)
async def allocate(
sku: str = Body(),
quantity: int = Body(),
uow: AbstractUnitOfWork = Depends(batch_uow),
) -> dict[str, str]:
try:
cmd = commands.Allocate(uuid4(), sku, quantity)
batch_id = await handlers.AllocateCmdHandler(uow).handle(cmd)
except InvalidSku as e:
raise HTTPException(status_code=400, detail=str(e))
return {"batch_id": str(batch_id)}


@app.get("/allocations/{sku}", status_code=200)
async def allocations_view_endpoint(sku: str, session: AsyncSession = Depends(session)) -> list[Allocation]:
result = await dao.allocations(sku, session)
if not result:
raise HTTPException(status_code=404, detail="not found")
return result
71 changes: 71 additions & 0 deletions app/allocation/entrypoints/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio
from uuid import UUID

import orjson
import sqlalchemy as sa

from app.allocation.adapters.db import DB
from app.allocation.adapters.orm import start_mappers
from app.allocation.adapters.redis import redis
from app.allocation.constants import BATCH_QUANTITY_CHANGED_CHANNEL, ORDER_ALLOCATED_CHANNEL, ORDER_DEALLOCATED_CHANNEL
from app.allocation.domain import commands, events
from app.allocation.service_layer import handlers, unit_of_work
from app.config import config

start_mappers()
db = DB(config.PG_DSN)

# TODO: prevent event loss
# TODO: use same transaction, or make idempotent
async def main() -> None:
pubsub = redis.pubsub(ignore_subscribe_messages=True)
await pubsub.subscribe(BATCH_QUANTITY_CHANGED_CHANNEL, ORDER_ALLOCATED_CHANNEL, ORDER_DEALLOCATED_CHANNEL)

async for m in pubsub.listen():
channel = m["channel"].decode("utf-8")
data = orjson.loads(m["data"])
if channel == BATCH_QUANTITY_CHANGED_CHANNEL:
await handlers.ChangeBatchQuantityCmdHandler(unit_of_work.PGUnitOfWork()).handle(
commands.ChangeBatchQuantity(id=UUID(data["id"]), qty=data["qty"]),
)
elif channel == ORDER_ALLOCATED_CHANNEL:
await _add_allocation_to_read_model(
events.Allocated(
order_id=UUID(data["order_id"]), sku=data["sku"], qty=data["qty"], batch_id=UUID(data["batch_id"])
),
)
elif channel == ORDER_DEALLOCATED_CHANNEL:
event = events.Deallocated(order_id=UUID(data["order_id"]), sku=data["sku"], qty=data["qty"])
await _remove_allocation_from_read_model(event)
await _remove_order(event)
await handlers.AllocateCmdHandler(unit_of_work.PGUnitOfWork()).handle(
commands.Allocate(order_id=event.order_id, sku=event.sku, qty=event.qty)
)


async def _add_allocation_to_read_model(event: events.Allocated) -> None:
async with db.session() as session:
await session.execute(
sa.text("INSERT INTO allocations_view (order_id, sku, batch_id) VALUES (:order_id, :sku, :batch_id)"),
dict(order_id=event.order_id, sku=event.sku, batch_id=event.batch_id),
)


async def _remove_order(event: events.Deallocated) -> None:
async with db.session() as session:
await session.execute(
sa.text('DELETE FROM "order" WHERE id = :order_id'),
dict(order_id=event.order_id),
)


async def _remove_allocation_from_read_model(event: events.Deallocated) -> None:
async with db.session() as session:
await session.execute(
sa.text("DELETE FROM allocations_view WHERE order_id = :order_id AND sku = :sku"),
dict(order_id=event.order_id, sku=event.sku),
)


if __name__ == "__main__":
asyncio.run(main())
Loading