In [1]:
from fastapi import FastAPI, HTTPException, File, UploadFile
from pydantic import BaseModel
from typing import Optional, Union
import nest_asyncio
from uvicorn import Config, Server
from pymongo import MongoClient
import pandas as pd
from io import BytesIO
from dotenv import load_dotenv
import os

In [2]:
load_dotenv(dotenv_path="/Users/yashasvipamu/Documents/Web Applications/Glassify/Backend/config.env")
client = None

In [3]:
def mongo_connection():
    client = MongoClient(os.getenv("Mongo-url"))
    db = client["Glassify"]
    return db["Inventory"]


In [4]:
nest_asyncio.apply()

In [5]:
app = FastAPI()

In [6]:
class item(BaseModel):
    id : int
    name: str
    description : Optional[str] = None
    quantity: Union[str, int, float]
    cabinet: str
    room: str
    location: str

In [7]:
@app.get("/items", response_model=list[item])
async def get_items():
    try:
        collection = mongo_connection()
        items = collection.find()
        print(items)
        results = []
        for item in items:
            item["_id"] = str(item["_id"])  # Convert ObjectId to string for JSON serialization
            results.append(item)
        return results
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

    finally:
        if client:
            client.close()


In [8]:
@app.get("/items/name/{item_name}", response_model=item)
async def get_item_by_name(item_name: str):
    try:
        collection = mongo_connection()
        item = collection.find_one({"name": item_name})
        if not item:
            raise HTTPException(status_code=404, detail="Item not found")
        item["_id"] = str(item["_id"])  # Convert ObjectId to string for JSON serialization
        return item
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

    finally:
        if client:
            client.close()

In [None]:
@app.delete("/items/{item_id}", response_model=dict)
async def delete_item(item_id: int):
    try:
        collection = mongo_connection()
        result = collection.delete_one({"id": item_id})
        
        if result.deleted_count == 0:
            raise HTTPException(status_code=404, detail=f"Item with ID {item_id} not found.")
        
        return {"message": f"Item with ID {item_id} successfully deleted."}
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
    
    finally:
        if client:
            client.close()

In [9]:
@app.put("/items/{item_id}", response_model=item)
async def add_or_update_item(item_id: int, item: item):
    try:
        collection = mongo_connection()
        result = collection.replace_one({"id": item_id}, item.dict(), upsert=True)
        if result.matched_count == 0 and not result.upserted_id:
            raise HTTPException(status_code=400, detail="Failed to update or insert item")
        return item
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

    finally:
        if client:
            client.close()

In [10]:
@app.post("/upload/")
async def upload_excel(file: UploadFile = File(...)):
    collection = mongo_connection()
    
    if not file.filename.endswith(".xlsx"):
        raise HTTPException(status_code=400, detail="Invalid file format. Only .xlsx files are supported.")
    
    try:
        # Read the Excel file into a Pandas DataFrame
        df = pd.read_excel(BytesIO(await file.read()))
        
        # Validate required columns
        required_columns = {"Name", "Quantity", "Cabinet", "Room", "Location"}
        if not required_columns.issubset(df.columns):
            raise HTTPException(status_code=400, detail="Excel file is missing required columns")

        # Insert items into MongoDB
        items = []
        for i, row in df.iterrows():
            quantity = row["Quantity"] if pd.notna(row["Quantity"]) and row["Quantity"] != "" else "too many"
        
            item_data = {
                "id": collection.estimated_document_count() + i + 1,  # Auto-generate item ID
                "name": row["Name"],
                "description": row.get("Description", ""),
                "quantity": quantity,
                "cabinet": row["Cabinet"],
                "room": row["Room"],
                "location": row["Location"]
            }
            items.append(item_data)

        if items:
            collection.insert_many(items)

        return {"message": f"Successfully uploaded and inserted {len(items)} items into the database."}
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
    
    finally:
        if client:
            client.close()


In [None]:
config = Config(app=app, host="127.0.0.1", port=8000)
server = Server(config=config)
server.run()