In [1]:
!pip install fastapi pydantic sqlalchemy



In [2]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import threading
import time
from queue import Queue
app = FastAPI(title="Scalable Order Processing System")
# Database Setup
DATABASE_URL = "sqlite:///./orders.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

  Base = declarative_base()


In [3]:
class Order(Base):
    __tablename__ = "orders"
    id = Column(Integer, primary_key=True, index=True)
    item = Column(String, nullable=False)
    quantity = Column(Integer, nullable=False)
    status = Column(String, default="Pending")  # Pending, Processing, Completed, Failed
Base.metadata.create_all(bind=engine)

In [4]:
class OrderCreate(BaseModel):
    item: str
    quantity: int

In [5]:
# Simulated async order queue
order_queue = Queue()
def order_worker():
    while True:
        order_id = order_queue.get()
        db = SessionLocal()
        order = db.query(Order).filter(Order.id == order_id).first()
        if order:
            try:
                # Simulate payment + inventory check
                time.sleep(2)  # simulate processing delay
                order.status = "Completed"
                db.commit()
            except Exception as e:
                order.status = "Failed"
                db.commit()
        db.close()
        order_queue.task_done()
# Start worker thread
threading.Thread(target=order_worker, daemon=True).start()

In [6]:
from fastapi import Depends
# Dependency to get DB session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
# Place a new order
@app.post("/orders/")
def place_order(order: OrderCreate, db=Depends(get_db)):
    db_order = Order(item=order.item, quantity=order.quantity, status="Pending")
    db.add(db_order)
    db.commit()
    db.refresh(db_order)
    # Add order to processing queue
    order_queue.put(db_order.id)
    return {"order_id": db_order.id, "status": db_order.status}
# Check order status
@app.get("/orders/{order_id}")
def check_order_status(order_id: int, db=Depends(get_db)):
    order = db.query(Order).filter(Order.id == order_id).first()
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")
    return {"order_id": order.id, "item": order.item, "quantity": order.quantity, "status": order.status}

In [7]:
from fastapi.testclient import TestClient
client = TestClient(app)
# Place a new order
res = client.post("/orders/", json={"item": "Laptop", "quantity": 2})
print(res.json())
order_id = res.json()["order_id"]
# Wait a few seconds to let the worker process the order
import time
time.sleep(3)
# Check order status
res = client.get(f"/orders/{order_id}")
print(res.json())

{'order_id': 1, 'status': 'Pending'}
{'order_id': 1, 'item': 'Laptop', 'quantity': 2, 'status': 'Completed'}


In [8]:
class Inventory(Base):
    __tablename__ = "inventory"
    item = Column(String, primary_key=True, index=True)
    quantity = Column(Integer, default=0)
Base.metadata.create_all(bind=engine)
# Initialize inventory for demo
db = SessionLocal()
items = [("Laptop", 5), ("Phone", 10), ("Headphones", 15)]
for item_name, qty in items:
    if not db.query(Inventory).filter(Inventory.item == item_name).first():
        db.add(Inventory(item=item_name, quantity=qty))
db.commit()
db.close()

In [9]:
def order_worker():
    while True:
        order_id = order_queue.get()
        db = SessionLocal()
        order = db.query(Order).filter(Order.id == order_id).first()
        if order:
            try:
                # Check inventory
                inv = db.query(Inventory).filter(Inventory.item == order.item).first()
                if not inv or inv.quantity < order.quantity:
                    order.status = "Failed: Insufficient stock"
                    db.commit()
                else:
                    # Deduct stock and complete order
                    inv.quantity -= order.quantity
                    order.status = "Completed"
                    db.commit()
            except Exception as e:
                order.status = f"Failed: {str(e)}"
                db.commit()
        db.close()
        order_queue.task_done()

In [10]:
from collections import defaultdict
RATE_LIMIT = 5  # requests per 60 sec per client
WINDOW = 60
client_requests = defaultdict(list)
def rate_limited(client_ip):
    import time
    now = time.time()
    timestamps = client_requests[client_ip]
    client_requests[client_ip] = [t for t in timestamps if now - t < WINDOW]
    if len(client_requests[client_ip]) >= RATE_LIMIT:
        return True
    client_requests[client_ip].append(now)
    return False

In [11]:
from fastapi import Request
@app.post("/orders/")
def place_order(order: OrderCreate, request: Request, db=Depends(get_db)):
    client_ip = request.client.host if request.client else "unknown"
    if rate_limited(client_ip):
        raise HTTPException(status_code=429, detail="Too many requests")
    db_order = Order(item=order.item, quantity=order.quantity, status="Pending")
    db.add(db_order)
    db.commit()
    db.refresh(db_order)
    order_queue.put(db_order.id)
    return {"order_id": db_order.id, "status": db_order.status}
@app.get("/orders/{order_id}")
def check_order_status(order_id: int, request: Request, db=Depends(get_db)):
    client_ip = request.client.host if request.client else "unknown"
    if rate_limited(client_ip):
        raise HTTPException(status_code=429, detail="Too many requests")
    order = db.query(Order).filter(Order.id == order_id).first()
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")
    return {"order_id": order.id, "item": order.item, "quantity": order.quantity, "status": order.status}


In [12]:
client = TestClient(app)
# Place an order
res = client.post("/orders/", json={"item": "Laptop", "quantity": 2})
print(res.json())
order_id = res.json()["order_id"]
# Place an order exceeding stock
res = client.post("/orders/", json={"item": "Laptop", "quantity": 10})
print(res.json())
# Wait for worker to process
time.sleep(3)
# Check status
res = client.get(f"/orders/{order_id}")
print(res.json())

{'order_id': 2, 'status': 'Pending'}
{'order_id': 3, 'status': 'Pending'}
{'order_id': 2, 'item': 'Laptop', 'quantity': 2, 'status': 'Completed'}
