In [38]:
from fastapi import FastAPI, HTTPException, Depends, Request, Query, Form
from fastapi.responses import JSONResponse, StreamingResponse, HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.security.api_key import APIKeyHeader
from pymongo import MongoClient, errors
from pydantic import BaseModel
from bson import ObjectId
import pandas as pd
import io
from datetime import datetime

In [39]:
app = FastAPI()

In [40]:
client = MongoClient("mongodb://localhost:27017/")
db = client["fastapi_db"]

In [41]:
try:
    db.students.create_index("email", unique=True)
except errors.DuplicateKeyError:
    print("Duplicate emails exist, index not created. Clean data before retrying.")

In [42]:
def clean_document(doc):
    doc["_id"] = str(doc["_id"])
    return doc


In [43]:
class Student(BaseModel):
    student_id: int
    name: str
    age: int
    grade: str
    email: str


class Course(BaseModel):
    course_id: int
    course_name: str


class Enrollment(BaseModel):
    student_id: int
    course_id: int

In [44]:
@app.post("/students")
def create_student(student: Student):
    try:
        student_dict = student.model_dump()
        result = db.students.insert_one(student_dict)
        return {"inserted_id": str(result.inserted_id)}
    except errors.DuplicateKeyError:
        return JSONResponse(status_code=409, content={"detail": "Email already exists"})


@app.get("/students")
def get_students():
    students = list(db.students.find())
    return [clean_document(s) for s in students]


In [45]:
@app.get("/students/paginated")
def paginated_students(page: int = Query(1, ge=1), limit: int = Query(10, ge=1)):
    skip = (page - 1) * limit
    students = list(db.students.find().skip(skip).limit(limit))
    return [clean_document(s) for s in students]


In [46]:
@app.get("/students/filter")
def filter_students(
    min_age: int = Query(0, ge=0, description="Minimum age filter"),
    sort: str = Query("asc", regex="^(asc|desc)$", description="Sort order (asc/desc)"),
):
    sort_order = 1 if sort == "asc" else -1
    students = list(
        db.students.find({"age": {"$gte": min_age}}).sort("age", sort_order)
    )
    return [clean_document(s) for s in students]


@app.get("/students/{student_id}")
def get_student(student_id: int):
    student = db.students.find_one({"student_id": student_id})
    return clean_document(student) if student else {"detail": "Not found"}


@app.put("/students/{student_id}")
def update_student(student_id: int, student: Student):
    update_data = student.model_dump(exclude_unset=True)
    db.students.update_one({"student_id": student_id}, {"$set": update_data})
    return {"message": "Student updated"}


@app.delete("/students/{student_id}")
def delete_student(student_id: int):
    if db.enrollments.find_one({"student_id": student_id}):
        return {"detail": "Student is enrolled in a course"}
    db.students.delete_one({"student_id": student_id})
    return {"message": "Student deleted"}

  sort: str = Query("asc", regex="^(asc|desc)$", description="Sort order (asc/desc)"),


In [47]:
@app.post("./courses.csv")
def create_course(course: Course):
    result = db.courses.insert_one(course.model_dump())
    return {"inserted_id": str(result.inserted_id)}


@app.get("./courses.csv")
def get_courses():
    return [clean_document(c) for c in db.courses.find()]


In [48]:
@app.post("./enrollments.csv")
def enroll_student(enrollment: Enrollment):
    result = db.enrollments.insert_one(enrollment.model_dump())
    return {"inserted_id": str(result.inserted_id)}


@app.get("./enrollments.csv")
def get_enrollments():
    return [clean_document(e) for e in db.enrollments.find()]


In [49]:
@app.get("/stats/grades")
def grade_stats():
    pipeline = [{"group": {"_id": "grade", "count": {"sum": 1}}}]
    return {doc["_id"]: doc["count"] for doc in db.students.aggregate(pipeline)}


@app.get("/stats/top-courses")
def top_courses():
    pipeline = [
        {"group": {"_id": "course_id", "count": {"sum": 1}}},
        {"sort": {"count": -1}},
    ]
    return list(db.enrollments.aggregate(pipeline))


In [50]:
@app.post("/upload-csv")
async def upload_csv(file: bytes):
    df = pd.read_csv(io.BytesIO(file))
    db.students.insert_many(df.to_dict(orient="records"))
    return {"message": "CSV uploaded"}
@app.get("/students/export")
def export_students():
    students = list(db.students.find())
    df = pd.DataFrame(students)
    df["_id"] = df["_id"].astype(str)
    stream = io.StringIO()
    df.to_csv(stream, index=False)
    response = StreamingResponse(iter([stream.getvalue()]), media_type="text/csv")
    response.headers["Content-Disposition"] = "attachment; filename=students.csv"
    return response



In [None]:
client = MongoClient("mongodb://localhost:27017/")
db = client["mydatabase"]
students_collection = db["students"]

@app.post("/upload-csv")
async def upload_csv(file: UploadFile = File(...)):
    # Check file type
    if not file.filename.endswith(".csv"):
        raise HTTPException(status_code=400, detail="Only CSV files are allowed")

    try:
        # Read file contents
        content = await file.read()
        df = pd.read_csv(io.BytesIO(content))

        # Validate required columns
        required_columns = {"name", "age", "grade", "email"}
        if not required_columns.issubset(df.columns):
            raise HTTPException(
                status_code=400,
                detail=f"CSV must contain columns: {', '.join(required_columns)}"
            )

        # Convert DataFrame to dictionary list
        records = df.to_dict(orient="records")

        # Insert into MongoDB
        if records:
            students_collection.insert_many(records)

        return {"message": f"Inserted {len(records)} students successfully"}

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}")


@app.get("/upload-csv")
def upload_csv_info():
    return {"message": "Use POST /upload-csv with a CSV file to upload students"}
