In [1]:
# !pip install fastapi uvicorn mysql-connector-python python-dotenv
# !pip install watchgod

In [None]:
import requests

response = requests.get("http://localhost:3001/api/sales?year=2023")
if response.status_code == 200:
    print("Monthly Sales:", response.json())
else:
    print("Error:", response.status_code, response.text)


In [2]:
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import mysql.connector
from dotenv import load_dotenv
import os

load_dotenv()

app = FastAPI()

# Enable CORS (adjust origins as needed)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # In production, use the specific domain
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Database connection
def get_connection():
    return mysql.connector.connect(
        host=os.getenv("DB_HOST"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME"),
    )

@app.get("/api/sales")
def get_sales_by_year(year: int):
    try:
        conn = get_connection()
        cursor = conn.cursor(dictionary=True)

        query = """
            SELECT 
              MONTH(date) AS month,
              SUM(total_earn) AS total_earn
            FROM sales
            WHERE YEAR(date) = %s
            GROUP BY MONTH(date)
            ORDER BY MONTH(date);
        """
        cursor.execute(query, (year,))
        results = cursor.fetchall()

        # Fill missing months with 0
        monthly_totals = [0.0] * 12
        for row in results:
            monthly_totals[row["month"] - 1] = float(row["total_earn"])

        return monthly_totals

    except mysql.connector.Error as err:
        print("MySQL Error:", err)
        raise HTTPException(status_code=500, detail="Database error")
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()


In [None]:
from fastapi import FastAPI, HTTPException
from typing import List
import mysql.connector
from mysql.connector import errorcode
from pydantic import BaseModel
import os

app = FastAPI()

# Load your RDS connection info from environment variables or config
RDS_HOST = os.getenv("RDS_HOST", "your-rds-host")
RDS_USER = os.getenv("RDS_USER", "your-username")
RDS_PASSWORD = os.getenv("RDS_PASSWORD", "your-password")
RDS_DB = os.getenv("RDS_DB", "your-database")

class ProductStock(BaseModel):
    ProductID: str
    Balance: int

def get_db_connection():
    try:
        conn = mysql.connector.connect(
            host=RDS_HOST,
            user=RDS_USER,
            password=RDS_PASSWORD,
            database=RDS_DB
        )
        return conn
    except mysql.connector.Error as err:
        print(f"Error connecting to DB: {err}")
        return None

@app.get("/inventory/{store_id}", response_model=List[ProductStock])
def get_inventory(store_id: str):
    conn = get_db_connection()
    if not conn:
        raise HTTPException(status_code=500, detail="Database connection error")

    cursor = conn.cursor(dictionary=True)

    # Query to get latest date per store, and get all products' balances for that date
    # Adjust table/column names to your actual schema
    query = """
        SELECT ProductID, Balance FROM inventory_data
        WHERE StoreID = %s
          AND Date = (
            SELECT MAX(Date) FROM inventory_data WHERE StoreID = %s
          )
        ORDER BY ProductID
    """

    cursor.execute(query, (store_id, store_id))
    results = cursor.fetchall()
    cursor.close()
    conn.close()

    # Return list of dict {ProductID, Balance}
    return [ProductStock(**row) for row in results]


FileNotFoundError: [Errno 2] No such file or directory: 'inventory_data.csv'