Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ repos:
rev: v0.982
hooks:
- id: "mypy"
additional_dependencies: ["fastapi", "pytest"]
additional_dependencies: [fastapi, pytest, types-redis]
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ ENV PYTHONHASHSEED=random \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_CREATE=false \
WORKDIR=/code
ENV PATH="$POETRY_HOME/bin:$PATH"
ENV PATH="$POETRY_HOME/bin:$PATH" \
PYTHONPATH=$WORKDIR:$PYTHONPATH
WORKDIR $WORKDIR

FROM base as poetry_installer
Expand Down
4 changes: 1 addition & 3 deletions app/allocation/adapters/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session, create_async_engine
from sqlalchemy.orm import sessionmaker

from app.config import get_config

config = get_config()
from app.config import config

ENGINE = create_async_engine(config.PG_DSN, echo=False)
SESSION_FACTORY = async_scoped_session(
Expand Down
18 changes: 18 additions & 0 deletions app/allocation/adapters/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# types-redis의 Generic 타입 이슈로 인해 아래와 같이 사용
# https://github.com/python/typeshed/issues/8242

from typing import TYPE_CHECKING

from redis.asyncio.client import Redis as Redis_

from app.config import config

if TYPE_CHECKING:
Redis = Redis_[bytes]
else:
Redis = Redis_


__all__ = ["Redis"]

redis = Redis.from_url(config.REDIS_DSN)
1 change: 0 additions & 1 deletion app/allocation/adapters/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ async def _get(self, sku: str) -> models.Product:
selectinload(models.Product.batches).options(selectinload(models.Batch.allocations))
)
)
# TODO: joinedload?
return result.scalar_one_or_none()

async def _get_by_batch_id(self, batch_id: UUID) -> models.Product:
Expand Down
2 changes: 2 additions & 0 deletions app/allocation/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
LINE_ALLOCATED_CHANNEL = "allocation:line_allocated:v1"
BATCH_QUANTITY_CHANGED_CHANNEL = "allocation:batch_quantity_changed:v1"
1 change: 0 additions & 1 deletion app/allocation/domain/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ class ChangeBatchQuantity(Command):

@dataclass
class Allocate(Command):
order_id: UUID
sku: str
qty: int
9 changes: 9 additions & 0 deletions app/allocation/domain/events.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from dataclasses import dataclass
from uuid import UUID


class Event:
pass


@dataclass
class Allocated(Event):
order_id: UUID
sku: str
qty: int
batch_id: UUID


@dataclass
class OutOfStock(Event):
sku: str
10 changes: 9 additions & 1 deletion app/allocation/domain/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def allocate(self, line: OrderLine) -> UUID:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
batch.allocate(line)
self.version_number += 1
self.events.append(
events.Allocated(
order_id=line.id,
sku=line.sku,
qty=line.qty,
batch_id=batch.id,
)
)
return batch.id
except StopIteration:
self.events.append(events.OutOfStock(sku=line.sku))
Expand All @@ -85,7 +93,7 @@ def change_batch_quantity(self, id: UUID, qty: int) -> None:
batch.qty = qty
while batch.available_quantity < 0:
line = batch.deallocate_one()
self.events.append(commands.Allocate(line.id, line.sku, line.qty))
self.events.append(commands.Allocate(line.sku, line.qty))

def __hash__(self) -> int:
return hash(self.sku)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from fastapi import Body, Depends, FastAPI, HTTPException

from app.allocation.adapters.orm import start_mappers
from app.allocation.adapters.repository import AbstractProductRepository
from app.allocation.domain import commands
from app.allocation.routers.dependencies import batch_uow
Expand All @@ -11,7 +12,7 @@
from app.allocation.service_layer.unit_of_work import AbstractUnitOfWork

app = FastAPI()
# start_mappers() # TODO: 운영환경에서는 실행되어야 함
start_mappers()


@app.get("/")
Expand All @@ -34,13 +35,12 @@ async def add_batch(

@app.post("/allocate", response_model=dict[str, str], status_code=201)
async def allocate(
line_id: UUID = Body(),
sku: str = Body(),
quantity: int = Body(),
uow: AbstractUnitOfWork[AbstractProductRepository] = Depends(batch_uow),
) -> dict[str, str]:
try:
cmd = commands.Allocate(line_id, sku, quantity)
cmd = commands.Allocate(sku, quantity)
results = await messagebus.handle(cmd, uow)
batch_id = results[0]
except InvalidSku as e:
Expand Down
4 changes: 1 addition & 3 deletions app/allocation/routers/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
from app.allocation.adapters.db import DB
from app.allocation.adapters.repository import AbstractProductRepository, PGProductRepository
from app.allocation.service_layer.unit_of_work import AbstractUnitOfWork, ProductUnitOfWork
from app.config import get_config

config = get_config()
from app.config import config


@functools.lru_cache
Expand Down
30 changes: 30 additions & 0 deletions app/allocation/routers/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
from uuid import UUID

import orjson

from app.allocation.adapters.orm import start_mappers
from app.allocation.adapters.redis import redis
from app.allocation.constants import BATCH_QUANTITY_CHANGED_CHANNEL
from app.allocation.domain import commands
from app.allocation.service_layer import messagebus, unit_of_work

start_mappers()


async def main() -> None:
pubsub = redis.pubsub(ignore_subscribe_messages=True)
await pubsub.subscribe(BATCH_QUANTITY_CHANGED_CHANNEL)

async for m in pubsub.listen():
channel = m["channel"].decode("utf-8")
data = orjson.loads(m["data"])
if channel == BATCH_QUANTITY_CHANGED_CHANNEL:
await messagebus.handle(
commands.ChangeBatchQuantity(id=UUID(data["id"]), qty=data["qty"]),
uow=unit_of_work.ProductUnitOfWork(),
)


if __name__ == "__main__":
asyncio.run(main())
26 changes: 21 additions & 5 deletions app/allocation/service_layer/handlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from uuid import UUID
from uuid import UUID, uuid4

import orjson

from app.allocation.adapters import email
from app.allocation.adapters.redis import redis
from app.allocation.adapters.repository import AbstractProductRepository
from app.allocation.constants import LINE_ALLOCATED_CHANNEL
from app.allocation.domain import commands, events, models
from app.allocation.service_layer import unit_of_work

Expand All @@ -27,7 +31,7 @@ async def allocate(
cmd: commands.Allocate,
uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository],
) -> UUID:
line = models.OrderLine(id=cmd.order_id, sku=cmd.sku, qty=cmd.qty)
line = models.OrderLine(id=uuid4(), sku=cmd.sku, qty=cmd.qty)
async with uow:
product = await uow.repo.get(line.sku)
if product is None:
Expand All @@ -47,7 +51,19 @@ async def change_batch_quantity(
await uow.commit()


def send_out_of_stock_notification(
event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork[AbstractProductRepository]
) -> None:
def send_out_of_stock_notification(event: events.OutOfStock) -> None:
email.send("stock@made.com", f"Out of stock for {event.sku}")


async def publish_allocated_event(event: events.Allocated) -> None:
await redis.publish(
LINE_ALLOCATED_CHANNEL,
orjson.dumps(
dict(
order_id=str(event.order_id),
sku=event.sku,
qty=event.qty,
batch_id=str(event.batch_id),
)
),
)
4 changes: 3 additions & 1 deletion app/allocation/service_layer/messagebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ async def handle_event(
) -> None:
try:
if isinstance(event, events.OutOfStock):
handlers.send_out_of_stock_notification(event, uow)
handlers.send_out_of_stock_notification(event)
elif isinstance(event, events.Allocated):
await handlers.publish_allocated_event(event)
queue.extend(uow.collect_new_events())
except Exception:
return
Expand Down
5 changes: 1 addition & 4 deletions app/allocation/service_layer/unit_of_work.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
)
from app.allocation.domain.commands import Command
from app.allocation.domain.events import Event
from app.config import get_config

config = get_config()

Repo = TypeVar("Repo", bound=AbstractProductRepository)

Expand All @@ -33,7 +30,7 @@ async def commit(self) -> None:
await self._commit()

def collect_new_events(self) -> Generator[Event | Command, None, None]:
for product in self.repo._seen:
for product in self.repo.seen:
while product.events:
yield product.events.pop(0)

Expand Down
7 changes: 2 additions & 5 deletions app/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from functools import lru_cache

from pydantic import BaseSettings


class Config(BaseSettings):
PG_DSN: str
REDIS_DSN: str


@lru_cache
def get_config() -> Config:
return Config()
config = Config()
29 changes: 28 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
container_name: app
depends_on:
- db
- redis
ports:
- 8000:8000
env_file:
Expand All @@ -19,6 +20,23 @@ services:
- python-packages:/usr/local/lib/python3.10/site-packages
- python-bin:/usr/local/bin
command: /bin/bash scripts/start-dev.sh

worker:
build:
context: .
dockerfile: Dockerfile
target: dev
restart: always
container_name: worker
depends_on:
- redis
env_file:
- ./secrets/.env
volumes:
- .:/code
- python-packages:/usr/local/lib/python3.10/site-packages
- python-bin:/usr/local/bin
command: /bin/bash scripts/start-worker.sh

db:
image: postgres
Expand All @@ -32,10 +50,19 @@ services:
POSTGRES_PASSWORD: password
volumes:
- db-volume:/data/postgres/:/var/lib/postgresql/data


redis:
image: redis
restart: always
container_name: redis
ports:
- "6379:6379"
volumes:
- redis-volume:/data/redis/:/data
volumes:
python-packages:
python-bin:
db-volume:
redis-volume:


Loading