In [1]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
import numpy as np
from github import Github
import pickle
import base64
import uvicorn
import joblib
from typing import List, Dict
import requests
from io import BytesIO
import psutil
import datetime
import os
from enum import Enum
from fastapi.middleware.cors import CORSMiddleware
import nest_asyncio

# Execution Flow

The application is designed to be run as a FastAPI server:

## 1. Initialize the FastAPI App
- Sets up middleware for **Cross-Origin Resource Sharing (CORS)**.
- Adds routes for predictions and health checks.

---

## 2. Model Loading
- Loads the trained ML model from GitHub upon service initialization.

---

## 3. Run Server
- Uses `uvicorn` to run the FastAPI app on **port 8000** by default.


In [3]:
# Initialize FastAPI app
app = FastAPI(
    title="Retail Sales Prediction API",
    description="API for predicting retail sales using ML model",
    version="1.0"
)

nest_asyncio.apply()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"

class SystemHealth(BaseModel):
    status: HealthStatus
    timestamp: str
    model_status: dict
    system_info: dict
    memory_usage: dict
    disk_usage: dict
    uptime: str

class PredictionInput(BaseModel):
    num_purchases: float
    total_quantity: float
    avg_quantity: float
    std_quantity: float
    avg_transaction: float
    std_transaction: float
    avg_unit_price: float
    std_unit_price: float
    country: str




# 1. Class: ModelService

This class encapsulates all logic for managing the machine learning model, making predictions, and monitoring the system's health.

## Purpose
To load the model, handle predictions, and monitor metrics like prediction count, success rate, and resource usage.

## Key Methods

### `__init__()`
Initializes the service:
- Loads the model from a GitHub repository.
- Defines the expected feature columns for prediction.
- Tracks system uptime, prediction count, and failure count.

#### Attributes:
- **`model`**: Holds the machine learning model loaded from GitHub.
- **`feature_columns`**: Expected input features for predictions.
- **`start_time`**: Timestamp when the service started (for uptime tracking).
- **`model_loaded`**: Boolean indicating whether the model was loaded successfully.
- **`last_prediction_time`**: Tracks the timestamp of the last successful prediction.
- **`total_predictions`**: Total number of predictions made.
- **`failed_predictions`**: Total number of prediction failures.

---

### `load_model_from_github()`
Downloads and loads the machine learning model (a `.pkl` file) from a GitHub repository.

#### Steps:
1. Sends a GET request to the raw URL of the model file on GitHub.
2. Raises an error if the request fails.
3. Deserializes the model using `pickle`.

#### Error Handling:
- Catches exceptions and sets `model_loaded` to `False` if loading fails.

---

### `predict(input_data: PredictionInput)`
- Takes input data, processes it into a DataFrame, and makes a prediction using the loaded model.

#### Updates Prediction Metrics:
- **`last_prediction_time`**
- **`total_predictions`**
- **`failed_predictions`** (if an exception occurs)

#### Error Handling:
- If the prediction fails, it increments `failed_predictions`.

---

### `get_health_status()`
Returns a detailed health status of the system and model, including:

#### Memory Usage:
- Total, available, and percentage memory usage (via `psutil`).

#### Disk Usage:
- Total, free, and percentage disk usage.

#### Model Status:
- Tracks if the model is loaded, the total predictions, failed predictions, and success rate.

#### System Info:
- CPU usage, core count, Python version, and uptime.

#### Overall Status:
- Marks the system as **HEALTHY**, **DEGRADED**, or **UNHEALTHY** based on resource usage and model availability.


In [5]:
class ModelService:
    def __init__(self):
        self.model = self.load_model_from_github()
        self.feature_columns = [
            'num_purchases', 'total_quantity', 'avg_quantity', 
            'std_quantity', 'avg_transaction', 'std_transaction', 
            'avg_unit_price', 'std_unit_price', 'country'
        ]
        self.start_time = datetime.datetime.now()
        self.model_loaded = False
        self.last_prediction_time = None
        self.total_predictions = 0
        self.failed_predictions = 0

    def load_model_from_github(self):
        """Load model from GitHub repository"""
        try:
            # GitHub repository details
            repo_owner = "Group8MLUL2"
            repo_name = "Group8_CT1"
            model_path = "models/best_model.pkl"
            
            # Step 1: Provide the raw URL of the .pkl file on GitHub
            url = "https://raw.githubusercontent.com/Group8MLUL2/Group8_CT1/main/models/best_model.pkl"
            
            # Step 2: Download the file
            response = requests.get(url)
            response.raise_for_status()  # Check if the request was successful
            
            # Step 3: Load the pickle file into a Python object
            model = pickle.loads(response.content)
            self.model_loaded = True
            print("Model loaded successfully!")
            return model
            
        except Exception as e:
            print(f"Error loading model: {str(e)}")
            self.model_loaded = False
            raise

    def predict(self, input_data: PredictionInput) -> float:
        """Make prediction using the loaded model"""
        try:
            # Convert input to DataFrame
            input_dict = input_data.dict()
            df = pd.DataFrame([input_dict])
            
            # Make prediction
            prediction = self.model.predict(df)
            
            # Update metrics
            self.last_prediction_time = datetime.datetime.now()
            self.total_predictions += 1
            
            return float(prediction[0])
            
        except Exception as e:
            self.failed_predictions += 1
            print(f"Error making prediction: {str(e)}")
            raise

    def get_health_status(self) -> SystemHealth:
        """Get comprehensive system health status"""
        current_time = datetime.datetime.now()
        
        # Calculate uptime
        uptime = current_time - self.start_time
        
        # Get memory usage
        memory = psutil.virtual_memory()
        memory_usage = {
            "total": f"{memory.total / (1024 * 1024 * 1024):.2f} GB",
            "available": f"{memory.available / (1024 * 1024 * 1024):.2f} GB",
            "percent": f"{memory.percent}%",
        }
        
        # Get disk usage
        disk = psutil.disk_usage('/')
        disk_usage = {
            "total": f"{disk.total / (1024 * 1024 * 1024):.2f} GB",
            "free": f"{disk.free / (1024 * 1024 * 1024):.2f} GB",
            "percent": f"{disk.percent}%",
        }
        
        # Get model status
        model_status = {
            "loaded": self.model_loaded,
            "total_predictions": self.total_predictions,
            "failed_predictions": self.failed_predictions,
            "last_prediction": str(self.last_prediction_time) if self.last_prediction_time else "Never",
            "success_rate": f"{((self.total_predictions - self.failed_predictions) / self.total_predictions * 100):.2f}%" if self.total_predictions > 0 else "N/A"
        }
        
        # Get system info
        system_info = {
            "cpu_percent": f"{psutil.cpu_percent()}%",
            "cpu_count": psutil.cpu_count(),
            "python_version": os.sys.version,
        }
        
        # Determine overall status
        status = HealthStatus.HEALTHY
        if not self.model_loaded:
            status = HealthStatus.UNHEALTHY
        elif memory.percent > 90 or disk.percent > 90:
            status = HealthStatus.DEGRADED
        
        return SystemHealth(
            status=status,
            timestamp=str(current_time),
            model_status=model_status,
            system_info=system_info,
            memory_usage=memory_usage,
            disk_usage=disk_usage,
            uptime=str(uptime)
        )

# # Initialize FastAPI app
# app = FastAPI(
#     title="Retail Sales Prediction API",
#     description="API for predicting retail sales using ML model",
#     version="1.0"
# )

# Initialize model service
model_service = ModelService()

Model loaded successfully!


# FastAPI Endpoints

The application exposes several RESTful endpoints for health checks and predictions.

## Endpoints

### Root (`/`)
- **Method**: `GET`
- **Description**: Returns basic information about the API and available endpoints.

---

### Health Check (`/health`)
- **Method**: `GET`
- **Description**: Performs a basic health check.
- **Response**:
  - **`healthy`**: If the model is loaded successfully.
  - **`unhealthy`**: Otherwise.

---

### Detailed Health Check (`/health/full`)
- **Method**: `GET`
- **Description**: Returns a detailed health report, including:
  - System resource usage.
  - Uptime.
  - Model status.
- **Response Model**: Uses the `SystemHealth` class to structure the response.

---

### Prediction (`/predict`)
- **Method**: `POST`
- **Description**: Accepts input data and returns the model's prediction.
- **Input**: JSON object matching the `PredictionInput` structure.
- **Output**: JSON object containing:
  - The prediction result.
  - The input data.
  - A timestamp for when the prediction was made.


In [7]:
@app.get("/")
async def root():
    """Root endpoint"""
    return {
        "message": "Retail Sales Prediction API",
        "status": "active",
        "endpoints": {
            "/predict": "Make predictions",
            "/health": "Check API health",
            "/health/full": "Detailed health check"
        }
    }

@app.get("/health")
async def health():
    """Basic health check endpoint"""
    return {
        "status": "healthy" if model_service.model_loaded else "unhealthy",
        "timestamp": datetime.datetime.now().isoformat()
    }

@app.get("/health/full", response_model=SystemHealth)
async def detailed_health():
    """Detailed health check endpoint"""
    return model_service.get_health_status()

@app.post("/predict")
async def predict(input_data: PredictionInput):
    """Make prediction"""
    try:
        prediction = model_service.predict(input_data)
        return {
            "prediction": prediction,
            "input_data": input_data.dict(),
            "timestamp": datetime.datetime.now().isoformat()
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

In [8]:
def run_server():
    """Run the FastAPI server with logging"""
    try:
        # logger.info("Starting API server")
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000)
    except Exception as e:
        logger.error(f"Server startup failed: {str(e)}")
        raise

if __name__ == "__main__":
    run_server()

INFO:     Started server process [30276]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:51752 - "GET /health HTTP/1.1" 200 OK
INFO:     127.0.0.1:51756 - "GET /health HTTP/1.1" 200 OK
INFO:     127.0.0.1:51759 - "POST /predict HTTP/1.1" 200 OK


C:\Users\divye\AppData\Local\Temp\ipykernel_30276\1496101971.py:45: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.5/migration/
  input_dict = input_data.dict()
C:\Users\divye\AppData\Local\Temp\ipykernel_30276\2657409222.py:34: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.5/migration/
  "input_data": input_data.dict(),


INFO:     127.0.0.1:51775 - "GET /health HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [30276]
