In [1]:
!pip install fastapi pydantic sqlalchemy pandas numpy



In [2]:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
app = FastAPI(title="Real-Time Product Recommendation System")
# Database Setup
DATABASE_URL = "sqlite:///./recommendations.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

  Base = declarative_base()


In [7]:
class Interaction(Base):
    __tablename__ = "interactions"
    __table_args__ = {'extend_existing': True}
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, index=True)
    product_id = Column(Integer, index=True)
    rating = Column(Float, default=5.0)  # 1-5 scale

class Product(Base):
    __tablename__ = "products"
    __table_args__ = {'extend_existing': True}
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True)

Base.metadata.create_all(bind=engine)

  class Interaction(Base):
  class Product(Base):


In [8]:
class InteractionCreate(BaseModel):
    user_id: int
    product_id: int
    rating: float

class ProductCreate(BaseModel):
    name: str

In [9]:
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

In [10]:
@app.post("/products/")
def add_product(product: ProductCreate, db=Depends(get_db)):
    existing = db.query(Product).filter(Product.name == product.name).first()
    if existing:
        raise HTTPException(status_code=400, detail="Product already exists")
    db_product = Product(name=product.name)
    db.add(db_product)
    db.commit()
    db.refresh(db_product)
    return {"product_id": db_product.id, "name": db_product.name}

@app.post("/interactions/")
def add_interaction(interaction: InteractionCreate, db=Depends(get_db)):
    db_interaction = Interaction(
        user_id=interaction.user_id,
        product_id=interaction.product_id,
        rating=interaction.rating
    )
    db.add(db_interaction)
    db.commit()
    db.refresh(db_interaction)
    return {
        "interaction_id": db_interaction.id,
        "user_id": db_interaction.user_id,
        "product_id": db_interaction.product_id,
        "rating": db_interaction.rating
    }

In [11]:
def get_recommendations(user_id: int, db, top_n=5):
    # Load all interactions
    interactions = pd.read_sql(db.query(Interaction).statement, db.bind)
    if interactions.empty:
        return []

    # Create user-item matrix
    user_item_matrix = interactions.pivot_table(
        index='user_id', columns='product_id', values='rating', fill_value=0
    )

    # Compute cosine similarity between users
    from sklearn.metrics.pairwise import cosine_similarity
    similarity = cosine_similarity(user_item_matrix)
    similarity_df = pd.DataFrame(similarity, index=user_item_matrix.index, columns=user_item_matrix.index)

    # Get similar users
    if user_id not in similarity_df.index:
        return []

    similar_users = similarity_df[user_id].sort_values(ascending=False)[1:]  # exclude self
    weighted_ratings = {}
    for other_user, score in similar_users.items():
        other_ratings = user_item_matrix.loc[other_user]
        for product_id, rating in other_ratings.items():
            if user_item_matrix.loc[user_id, product_id] == 0:  # only recommend unseen products
                if product_id not in weighted_ratings:
                    weighted_ratings[product_id] = 0
                weighted_ratings[product_id] += score * rating

    # Top N recommendations
    top_products = sorted(weighted_ratings.items(), key=lambda x: x[1], reverse=True)[:top_n]
    return [int(pid) for pid, _ in top_products]

In [12]:
@app.get("/recommend/{user_id}")
def recommend_products(user_id: int, db=Depends(get_db), top_n: int = 5):
    product_ids = get_recommendations(user_id, db, top_n)
    if not product_ids:
        return {"user_id": user_id, "recommended_products": []}

    products = db.query(Product).filter(Product.id.in_(product_ids)).all()
    product_names = [p.name for p in products]
    return {"user_id": user_id, "recommended_products": product_names}

In [13]:
from fastapi.testclient import TestClient
client = TestClient(app)
import time

# Add products
client.post("/products/", json={"name": "Laptop"})
client.post("/products/", json={"name": "Phone"})
client.post("/products/", json={"name": "Headphones"})
client.post("/products/", json={"name": "Keyboard"})
client.post("/products/", json={"name": "Mouse"})

# Add interactions
client.post("/interactions/", json={"user_id": 1, "product_id": 1, "rating": 5})
client.post("/interactions/", json={"user_id": 1, "product_id": 2, "rating": 4})
client.post("/interactions/", json={"user_id": 2, "product_id": 2, "rating": 5})
client.post("/interactions/", json={"user_id": 2, "product_id": 3, "rating": 4})
client.post("/interactions/", json={"user_id": 3, "product_id": 1, "rating": 5})
client.post("/interactions/", json={"user_id": 3, "product_id": 3, "rating": 5})

# Get recommendations for user 1
res = client.get("/recommend/1")
print(res.json())

{'user_id': 1, 'recommended_products': ['Headphones']}
