# Deploying AI into production with FastAPI

## Chapter 1 - Introducion to FastAPI for Model Deployment

### Section 1.1 - GET and POST requests for AI

#### GET endpoint for model information

You're part of a machine learning team that has developed several machine learning models, each designed for different tasks such as sentiment analysis, product categorization, and customer churn prediction. You're working on deploying these models, and you need to create an endpoint that provides basic information about each model.

Your task is to implement a GET endpoint at route `/model-info/{model_id}` that retrieves and returns this essential model information.

In [9]:
%%writefile main.py
from fastapi import FastAPI, HTTPException

app = FastAPI()

# Add model_id as a path parameter in the route
@app.get("/model-info/{model_id}")
# Pass on the model id as an argument
async def get_model_info(model_id: int):
    # Check if the passed model id is 0
    if model_id == 0:
      	# Raise the right status code for not found
        raise HTTPException(status_code=404, detail="Model not found")
    model_info = get_model_details(id)  
    # Return the model id and info in the dict
    return {"model_id": model_id, "model_name": model_info}

Overwriting main.py


#### POST endpoint for model registration

While the GET endpoint you created earlier allows users to retrieve information about existing models, you now need a way for authorized team members to register new models or update information about existing ones.

You need to create a POST endpoint that allows team members to register new models or update existing ones. This endpoint will store model information on the server.

In [10]:
%%writefile main.py
from pydantic import BaseModel
from fastapi import FastAPI

app = FastAPI()

model_db = {}

class ModelInfo(BaseModel):
    model_id: int
    model_name: str
    description: str

# Specify the status code for successful POST request
@app.post("/register-model", status_code=201)
# Pass the model info from the request as function parameter 
def register_model(model_info: ModelInfo):
    # Add new model's information dictionary to the model database
    model_db[model_info.model_id] = model_info.model_dump()
    # Return model info dictionary corresponding to model along with success status code
    return {"message": "Model registered successfully", "model": model_info}, 201

Overwriting main.py


In [11]:
import requests

data = {
    "model_id":1, 
    "model_name": "cnn", 
    "description": "convolutional nn"}

url = "http://localhost:8000/register-model"
headers = {"Content_Type": "Application-json"}
response = requests.post(url, json=data, headers=headers)
print(response.json())

[{'message': 'Model registered successfully', 'model': {'model_id': 1, 'model_name': 'cnn', 'description': 'convolutional nn'}}, 201]


### Section 1.2 - FastAPI prediction with a pre-trained model

#### Preperation

The model from the exercises does not work. So hwere we quickly trains and save our own model that can be used for the exercises.

In [12]:
from datasets import load_dataset

ds = load_dataset("SIH/palmer-penguins")

In [13]:
df = ds['train'].to_pandas()
df.head()

Unnamed: 0,species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex,year
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,male,2007
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,female,2007
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,female,2007
3,Adelie,Torgersen,,,,,,2007
4,Adelie,Torgersen,36.7,19.3,193.0,3450.0,female,2007


In [15]:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import numpy as np
import joblib

# Only include numerical features + target
features = ['species', 'bill_length_mm', 'bill_depth_mm',
            'flipper_length_mm', 'body_mass_g']

# 0. Prepare the dataset
df_dataset = df[features].dropna()
X = df_dataset.drop("species", axis=1)
y = df_dataset["species"]

# 1. Split first
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 2. Define pipeline (scaler + classifier)
pipe = make_pipeline(
    StandardScaler(),
    LogisticRegression(max_iter=1000)
)

# 3. Perform cross-validation only on the training set
cv_scores = cross_val_score(pipe, X_train, y_train, cv=5)

# 4. Report CV accuracy
print("Cross-validation accuracy scores:", cv_scores)
print("Mean CV accuracy:", np.mean(cv_scores))

# 5. Fit the pipeline on the full training set
pipe.fit(X_train, y_train)

# 6. Evaluate on the test set
y_pred = pipe.predict(X_test)
print("Test set accuracy:", accuracy_score(y_test, y_pred))

# 7. Save the pipeline (includes scaler + model)
joblib.dump(pipe, "penguin_classifier.pkl")

# 8. Load the pipeline
model = joblib.load("penguin_classifier.pkl")
print("Loaded model type:", type(model))

# 9. Use the loaded model for prediction
# (Optional — uncomment to test)
# y_pred = model.predict(X_test)


Cross-validation accuracy scores: [0.98181818 1.         1.         0.98148148 1.        ]
Mean CV accuracy: 0.9926599326599327
Test set accuracy: 0.9855072463768116
Loaded model type: <class 'sklearn.pipeline.Pipeline'>


#### Load the pre-trained model

You're a data scientist at an animal conservation company. You've been given a pre-trained machine learning model that predicts penguin species.

Your task is to load this model so it can be used in an API. The model has been saved using `joblib`.

A pre-trained ML model is stored in the pickle file: `penguin_classifier.pkl`

Write a script to load the pickle file as a model. Test your script by running `python3 solution.py` in the terminal.

In [16]:
# Import the necessary module
import joblib

# Load the pre-trained model
model = joblib.load('penguin_classifier.pkl')

# Print the type of the loaded model
print(f"Loaded model type: {type(model)}")

Loaded model type: <class 'sklearn.pipeline.Pipeline'>


#### Create the prediction endpoint

In this exercise, you'll create a prediction endpoint that uses a pre-trained model to estimate diabetes progression.

The model has been trained on a dataset which has three features age, bmi and blood_pressure. It then predicts the diabetes progression score. Using these inputs, it predicts a diabetes progression score, which helps assess how the condition may develop over time.

You'll use FastAPI to create a POST endpoint that accepts patient data and returns a prediction of diabetes progression.

In [23]:
%%writefile main.py
from fastapi import FastAPI
from pydantic import BaseModel
import pandas as pd
import joblib

# Load the pre-trained model
model = joblib.load('penguin_classifier.pkl')

# Print the type of the loaded model
print(f"Loaded model type: {type(model)}")
class PengiunFeatures(BaseModel):
    bill_length_mm: float
    bill_depth_mm: float
    flipper_length_mm: float
    body_mass_g: float
    
# Create FastAPI instance
app = FastAPI()

# # Create a POST request endpoint at the route "/predict"
@app.post("/predict")
async def predict_progression(features: PengiunFeatures):
    input_data = pd.DataFrame([features.model_dump()])
    
    prediction = model.predict(input_data)
    return {"predicted_progression": prediction[0]}

Overwriting main.py


In [24]:
from pydantic import BaseModel
import requests

class PengiunFeatures(BaseModel):
    bill_length_mm: float
    bill_depth_mm: float
    flipper_length_mm: float
    body_mass_g: float


pengiun = PengiunFeatures(bill_length_mm=39.1,
                          bill_depth_mm=18.7,
                          flipper_length_mm=181.0,
                          body_mass_g=3750.0)

url = "http://localhost:8000/predict"
data = pengiun.model_dump()
response = requests.post(url, json=data)
print(response.json())

{'predicted_progression': 'Adelie'}


#### Running the FastAPI app

Your FastAPI app has been saved in a Python file called `main.py`. You would like to run the app from a Python script using uvicorn.

To serve your FastAPI app directly via the Python script in `solution.py`, you need to finish adding the code block that sets up the host and port of the server where the API will run.

In [25]:
%%writefile solution.py
# Import the server module
import uvicorn
from main import app

if __name__ == "__main__":
    # Start the uvicorn server
    uvicorn.run(
	  app, 
      # Configure the host
      host="0.0.0.0",
      # Configure the port
      port=8080)

Overwriting solution.py


with the snippet below we can start the server using python. It is best to do this in a terminal.

In [None]:
!python3 solution.py

now post to the server on port 8080

In [26]:
pengiun = PengiunFeatures(bill_length_mm=39.1,
                          bill_depth_mm=18.7,
                          flipper_length_mm=181.0,
                          body_mass_g=3750.0)

url = "http://localhost:8080/predict"
data = pengiun.model_dump()
response = requests.post(url, json=data)
print(response.json())

{'predicted_progression': 'Adelie'}


### Section 1.3 - Create a PYdantic model for ML input

#### Create a Pydantic model for ML input

You're developing a FastAPI application to deploy a machine learning model that predicts the quality score of coffee based on attributes including aroma, flavor, and altitude.

The first step is to create a Pydantic model to validate the input request data for your ML model and ensure that only valid data flows through the model for successful model prediction.

In [27]:
# Import the base class from pydantic
from pydantic import BaseModel

class CoffeeQualityInput(BaseModel):
    # Use apt data type for each attribute of coffee quality
    aroma: float  
    flavor: float  
    altitude: int

#### Validate request and response for ML prediction

Building on your work as a data scientist at the coffee company, you now need to create a FastAPI endpoint that validates input request using `CoffeeQualityInput` data validation model and a `QualityPrediction` for response validation.

This endpoint will accept coffee data and return a quality prediction along with the confidence score.

The model is already loaded into a function called `predict_quality` for this exercise.

In [28]:
class CoffeeQualityInput(BaseModel):
    aroma: float
    flavor: float
    altitude: int
    
class QualityPrediction(BaseModel):
    quality_score: float 
    confidence: float

# Specify the data model to validate response
@app.post("/predict", response_model=QualityPrediction) 
# Specify the data model to validate input request
def predict(coffee_data: CoffeeQualityInput):
    prediction = predict_quality(coffee_data)
    return prediction 

## Chapter 2 - Integrating AI Model

#### Handle textual request data

Another requirement in the content moderation system is to take into account user comments' sentiment. The system needs to identify specific problematic phrases to help moderators review potentially inappropriate content.

You'll create an endpoint that analyzes text coming from users and extracts standardized moderation flags.

In [31]:
%%writefile main.py
from fastapi import FastAPI

app = FastAPI()

@app.post("/analyze_comment")
def analyze_comment(text: str):
    problem_keywords = ["spam", "hate", "offensive", "abuse"]
    
    # Convert the input text to lowercase
    text_lower = text.lower()
    # Extract matching flags using list comprehension
    found_issues = [keyword for keyword in problem_keywords if keyword in text_lower]
    # Return the dictionary with required keys
    return {
        "issues": found_issues,
        "issue_count": len(found_issues),
        "original_text": text
    }

Overwriting main.py


In [37]:
response = requests.post(
    "http://localhost:8000/analyze_comment?text=This is a spam with spam which I hate and abusive message"
)
print(response.json())

{'issues': ['spam', 'hate'], 'issue_count': 2, 'original_text': 'This is a spam with spam which I hate and abusive message'}


#### Handle numerical request data

You're building a content moderation system. The system needs to calculate a trust score for each user comment based on numerical features - `length`, `user_reputation`, and `report_count`. You'll create an endpoint that processes these features to make them compatible for the moderation model.

Note that the ML model and `CommentMetrics Pydantic model with `length`(int), `user_reputation`(int) and `report_count`(int) are already created and loaded for you.

In [38]:
%%writefile scorer.py
import numpy as np
from pydantic import BaseModel

class CommentMetrics(BaseModel):
    length: int
    user_reputation: int
    report_count: int

class CommentScorer:
    def predict(self, features: np.ndarray) -> float:
        """
        Predict trust score based on comment metrics
        features: [[length, user_reputation, report_count]]
        """
        # Unpack features
        length, reputation, reports = features[0]
        
        # Calculate trust score
        score = (0.3 * (length/500) +        # Normalize length
                 0.5 * (reputation/100) +    # Normalize reputation
                 -0.2 * reports)             # Reports reduce score
        
        return float(max(min(score * 100, 100), 0))  # Scale to 0-100

Writing scorer.py


In [39]:
%%writefile main.py
import numpy as np
from scorer import CommentMetrics, CommentScorer
from fastapi import FastAPI

app = FastAPI()
model = CommentScorer()

@app.post("/predict_trust")
def predict_trust(comment: CommentMetrics):
    # Convert input and extract comment metrics
    features = np.array([[
        comment.length,
        comment.user_reputation,
        comment.report_count
    ]])
    # Get prediction from model 
    score = model.predict(features)
    return {
        "trust_score": round(score, 2),
        "comment_metrics": comment.dict()
    }
    
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Overwriting main.py


In [46]:
import requests

url = "http://localhost:8080/predict_trust"
data = {
    "length": 150,
    "user_reputation": 100,
    "report_count": 0}

headers = {"Content-Type": "application/json"}

response = requests.post(url, json=data, headers=headers)
print(response.json())

{'trust_score': 59.0, 'comment_metrics': {'length': 150, 'user_reputation': 100, 'report_count': 0}}


### Section 2.2 - Input validation in PastAPI

#### Field validation

You are building on the user comment moderation service. Your goal is to create a Pydantic `User` model that ensures data integrity across all users.

Implement validations for the `username` (min 5, max 50 characters) field.

Use Pydantic's `Field` class to add these constraints, and test your model with both valid and invalid product data to ensure it correctly handles various scenarios.

In [47]:
# Import the base model and field validator from Pydantic
from pydantic import BaseModel, Field

# Inherit Pydantic's base model
class User(BaseModel): 
    # Set minimum and maximum name length
    username: str = Field(..., min_length=5, max_length=20)
    email: str
    age: int

user = User(username="john_doe", email="john@mode360.com", age=25)
print(user)

username='john_doe' email='john@mode360.com' age=25


#### Adding custom validators

Mode360 Solutions is the organization behind the comment moderation system, and you're given a task to create a validation service for all employees. The system should be able to validate input details (`username`, `email` and `age`) such that only employees with an official email address can register.

You need to define a Pydantic `User` model using `@validator` decorator on `email` to check if the entered email ends with `@mode360.com`

These validators enhance security and system integration.

In [51]:
from pydantic import BaseModel, field_validator, Field

class User(BaseModel):
    username: str = Field(..., min_length=5, max_length=20)  
    email: str
    age: int

    # Add the Pydantic decorator to validate
    @field_validator('email')  
    def email_must_be_example_domain(cls, user_email):
        # Use the endswith method to validate the email ends with @mode360.com
        if not user_email.endswith("@mode360.com"):
            raise ValueError('Email must be from the mode360.com domain')
        return user_email

#### Testing custom validators

After defining custom validators, you need to add that validator to the API endpoint and ensure it is working as expected.

The system should ensure users with valid email addresses are able to register. Here you need to create a simple endpoint that expects user details(`username`, `email` and `age`) in the request. You need to add the pydantic model to the endpoint and test the endpoint for invalid email address using `cURL` command, provided in the instructions.

In [53]:
%%writefile main.py
from fastapi import FastAPI
from pydantic import BaseModel, Field, validator

app = FastAPI()

class User(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: str
    age: int
    
    @validator('email')
    def email_must_be_example_domain(cls, user_email):
        if not user_email.endswith("@mode360.com"):
            raise ValueError('Email must be from the mode360.com domain')
        return user_email

# Create a post request endpoint
@app.post("/register")
# Validate incoming user data with a pydantic model
def register_user(user: User):
    return {"status": "success", "user": user.dict()}
  
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Overwriting main.py


**remark** - play with the `data` below and violate validation restrictions to see the validation fails and provide errors.

In [60]:
import requests

url = "http://localhost:8080/register"
data = {
    "username": "jane_doe", 
    "email": "jane@mode360.com", 
    "age": 30}

headers = {"Content-Type": "application/json"}

response = requests.post(url, json=data, headers=headers)
print(response.json())

{'status': 'success', 'user': {'username': 'jane_doe', 'email': 'jane@mode360.com', 'age': 30}}


### Section 2.3 - Loading a pre-trained model

#### Loading AI model at server startup

You have to deploy a trained sentiment analysis model that helps in moderating comments from users. To ensure zero downtime, the API needs to be ready to analyze user comments as soon as it starts up.

In this exercise, you'll implement FastAPI's lifespan events to load your model efficiently to build the comment moderation systems. The `SentimentAnalyzer` model class is already defined and imported for you.

In [61]:
# Import the context manager decorator from contextlib module
from contextlib import asynccontextmanager

sentiment_model = None

def load_model():
    global sentiment_model
    sentiment_model = SentmentAnalyzer("sentiment_model.joblib")

# Use FastAPI's context manager to define lifespan event
@asynccontextmanager
def lifespan(app: FastAPI):
    # Call the function to load the model
    load_model()
    yield

#### Health-check API for model loading

After loading the model at server startup, you need to develop a testing endpoint to build a monitoring system that can detect if the model is ready to analyze user comments.

In this exercise, you'll create a health check endpoint that allows you to verify your API's status and trigger alerts if the `sentiment_model` isn't available.

Note: `sentiment_model is already pre-loaded for you.

In [62]:
%%writefile  model.py
import joblib
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression


# Model creation
def train_and_save_model():
    data = {
        "review": [
            "I love this product, it's fantastic!",
            "Really satisfied with the quality!",
            "Terrible, I hate it.",
            "Not happy with the purchase.",
            "Absolutely amazing and wonderful!",
            "Worst experience ever.",
            "I am very pleased with my purchase.",
            "Disappointed, it didn't work as expected.",
            "The best thing I've ever bought.",
            "Totally awful, will not buy again."
        ],
        "label": [1, 1, 0, 0, 1, 0, 1, 0, 1, 0]  # 1 = Positive, 0 = Negative
    }

    df = pd.DataFrame(data)

    positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
    negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    df["num_words"] = df["review"].apply(lambda x: len(x.split()))
    df["num_positive_words"] = df["review"].apply(lambda x: sum(word in x.lower() for word in positive_words))
    df["num_complaints"] = df["review"].apply(lambda x: sum(word in x.lower() for word in negative_words))

    X = df[["num_words", "num_positive_words", "num_complaints"]]
    y = df["label"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    model = LogisticRegression(solver='lbfgs')
    model.fit(X_train, y_train)

    # Save model using joblib instead of pickle
    joblib.dump(model, 'sentiment_model.joblib', compress=3)

# Define a callable class
class SentimentAnalyzer:
    def __init__(self, model_path):
        # Load the model using joblib
        train_and_save_model()
        self.model = joblib.load(model_path)
        self.positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
        self.negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    def __call__(self, text):
        num_words = len(text.split())
        num_positive_words = sum(word in text.lower() for word in self.positive_words)
        num_complaints = sum(word in text.lower() for word in self.negative_words)
        features = [[num_words, num_positive_words, num_complaints]]
        
        # Get prediction and confidence score
        prediction = self.model.predict(features)
        confidence_scores = self.model.predict_proba(features)
        
        # Return dictionary directly instead of JSON string
        result = {
            "label": "Positive" if prediction[0] == 1 else "Negative",
            "confidence": float(confidence_scores[0][prediction[0]])  
        }
        return result 


Writing model.py


In [63]:
%%writefile main.py
from model import SentimentAnalyzer
from fastapi import FastAPI
from contextlib import asynccontextmanager

def load_model():
    global sentiment_model
    sentiment_model = SentimentAnalyzer("sentiment_model.joblib")

@asynccontextmanager
async def lifespan(app: FastAPI):
    load_model()
    yield

app = FastAPI(title="Sentiment Analysis API", lifespan=lifespan)

# Define a GET endpoint at route "/health"
@app.get("/health")
def health_check():
  	# Check whether sentiment_model is loaded or not.
    if sentiment_model is not None:
        return {
          	# Mark status as healthy and loaded boolean to True
            "status": "healthy",
            "model_loaded": True
        }
    # Mark status as unhealthy and loaded boolean to False
    return {
        "status": "unhealthy",
        "model_loaded": False
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Overwriting main.py


import requests

url = "http://localhost:8080/health"
headers = {"accept": "application/json"}

response = requests.get(url, headers=headers)
print(response.json())


### Section 2.3 - Returning structured predictions resposne

#### Returning structured output from API

You're building a content moderation system where you need to define a POST endpoint to test a pre-trained sentiment analysis model on user comments.

You need to create an endpoint that leverages `pydantic` models to return predictions in a structured format.

Note: Pydantic models - `CommentRequest` and `CommentResponse` are already created for you to use along with the pre-trained `sentiment_model` from pre-defined `SentimentAnalyzer` class.

#### Testing the endpoint for structured response
Now that the `analyze_comment` endpoint is created, you need to test it using Python's requests library.

You'll create a script to send `POST` request containing sample text to your API and handle the responses.

Note: the API code is already compiled and captured in `api.py` script. For this exercise, you will be working in `main.py`.

In [77]:
%%writefile api.py
# File: sentiment_api.py
import joblib
import json
import pandas as pd
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from contextlib import asynccontextmanager
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression


# model creation
def train_and_save_model():
    data = {
        "review": [
            "I love this product, it's fantastic!",
            "Really satisfied with the quality!",
            "Terrible, I hate it.",
            "Not happy with the purchase.",
            "Absolutely amazing and wonderful!",
            "Worst experience ever.",
            "I am very pleased with my purchase.",
            "Disappointed, it didn't work as expected.",
            "The best thing I've ever bought.",
            "Totally awful, will not buy again."
        ],
        "label": [1, 1, 0, 0, 1, 0, 1, 0, 1, 0]  # 1 = Positive, 0 = Negative
    }

    df = pd.DataFrame(data)

    positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
    negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    df["num_words"] = df["review"].apply(lambda x: len(x.split()))
    df["num_positive_words"] = df["review"].apply(lambda x: sum(word in x.lower() for word in positive_words))
    df["num_complaints"] = df["review"].apply(lambda x: sum(word in x.lower() for word in negative_words))

    X = df[["num_words", "num_positive_words", "num_complaints"]]
    y = df["label"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    model = LogisticRegression(solver='lbfgs')
    model.fit(X_train, y_train)

    # Save model using joblib instead of pickle
    joblib.dump(model, 'sentiment_model.joblib', compress=3)

# Train and save the model
train_and_save_model()

sentiment_model = None

# Define a callable class
class SentimentAnalyzer:
    def __init__(self, model_path):
        # Load the model using joblib
        self.model = joblib.load(model_path)
        self.positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
        self.negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    def __call__(self, text):
        num_words = len(text.split())
        num_positive_words = sum(word in text.lower() for word in self.positive_words)
        num_complaints = sum(word in text.lower() for word in self.negative_words)
        features = [[num_words, num_positive_words, num_complaints]]
        
        # Get prediction and confidence score
        prediction = self.model.predict(features)
        confidence_scores = self.model.predict_proba(features)
        
        # Return dictionary directly instead of JSON string
        result = {
            "label": "Positive" if prediction[0] == 1 else "Negative",
            "confidence": float(confidence_scores[0][prediction[0]])  
        }
        return result 

# Define model loading function
def load_model():
    global sentiment_model
    sentiment_model = SentimentAnalyzer("sentiment_model.joblib")

# Initialize FastAPI app with lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize our mock model on startup
    global sentiment_model
    load_model()
    yield


# Initialize FastAPI app
app = FastAPI(title="Sentiment Analysis API", lifespan=lifespan)

# Initialize model variable
sentiment_model = None

# Define request/response models
class CommentRequest(BaseModel):
    text: str

class CommentResponse(BaseModel):
    text: str
    sentiment: str
    confidence: float
    

@app.post("/analyze")
def analyze_comment(request: CommentRequest):
    if sentiment_model is None:
        raise HTTPException(
            status_code=503,
            detail="Model not loaded"
        )
    
    if not request.text.strip():
        raise HTTPException(
            status_code=400,
            detail="Empty text provided"
        )
        
    result = sentiment_model(request.text)
    return CommentResponse(
        text=request.text,
        sentiment=result["label"],
        confidence=result["confidence"]
    )

@app.get("/health")
def health_check():
    """Check if model is loaded and ready"""
    return {
        "status": "healthy" if sentiment_model is not None else "unhealthy",
        "model_loaded": sentiment_model is not None
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Writing api.py


In [78]:
import requests

url = "http://localhost:8080/analyze"
data = {"text": "This is great, I can totally relate."}

# Send post request and pass the sample request data
response = requests.post(url, json=data)

# Print prediction response
print(response.json())

{'text': 'This is great, I can totally relate.', 'sentiment': 'Negative', 'confidence': 0.6244529524435871}


## Chapter 3 - API Key Authentication

### Securing APIs with key authentication

You're building a secure API and need to implement API key verification. The API will check for a key in the `X-API-Key` header of each request and verify it against a predefined secret. You'll use FastAPI's built-in security features to implement this authentication system.

The `FastAPI` and `HTTPException` classes have been pre-imported.

In [80]:
# Import the function that handles dependencies
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import APIKeyHeader

# Create the API key instance
api_key_header = APIKeyHeader(name="X-API-Key")
API_KEY = "your_secret_key"

# Pass the APIKeyHeader instance and verify against input api_key
def verify_api_key(api_key: str = Depends(api_key_header)):
    if api_key != API_KEY:  
      	# Raise the HTTP exception here
        raise HTTPException(status_code=403, detail="Invalid API Key")  
    return api_key

#### Secure the API endpoint

You have a sentiment analysis model API that needs to be protected with API key authentication.

You've already set up the `verify_api_key` function, and now you need to create a protected endpoint that accepts the request with text input and returns predictions from the model. This endpoint should only be accessible to users with valid API keys.

You will be working within `main.py` to complete this exercise.

All the supporting code including `verify_api_key` and model loading is in `model.py`

In [88]:
%%writefile model.py
import joblib
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from fastapi import Depends, HTTPException
from fastapi.security import APIKeyHeader

# Model creation
def train_and_save_model():
    data = {
        "review": [
            "I love this product, it's fantastic!",
            "Really satisfied with the quality!",
            "Terrible, I hate it.",
            "Not happy with the purchase.",
            "Absolutely amazing and wonderful!",
            "Worst experience ever.",
            "I am very pleased with my purchase.",
            "Disappointed, it didn't work as expected.",
            "The best thing I've ever bought.",
            "Totally awful, will not buy again."
        ],
        "label": [1, 1, 0, 0, 1, 0, 1, 0, 1, 0]  # 1 = Positive, 0 = Negative
    }

    df = pd.DataFrame(data)

    positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
    negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    df["num_words"] = df["review"].apply(lambda x: len(x.split()))
    df["num_positive_words"] = df["review"].apply(lambda x: sum(word in x.lower() for word in positive_words))
    df["num_complaints"] = df["review"].apply(lambda x: sum(word in x.lower() for word in negative_words))

    X = df[["num_words", "num_positive_words", "num_complaints"]]
    y = df["label"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    model = LogisticRegression(solver='lbfgs')
    model.fit(X_train, y_train)

    # Save model using joblib instead of pickle
    joblib.dump(model, 'sentiment_model.joblib', compress=3)

# Define a callable class
class SentimentAnalyzer:
    def __init__(self, model_path):
        # Load the model using joblib
        train_and_save_model()
        self.model = joblib.load(model_path)
        self.positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
        self.negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    def __call__(self, text):
        num_words = len(text.split())
        num_positive_words = sum(word in text.lower() for word in self.positive_words)
        num_complaints = sum(word in text.lower() for word in self.negative_words)
        features = [[num_words, num_positive_words, num_complaints]]
        
        # Get prediction and confidence score
        prediction = self.model.predict(features)
        confidence_scores = self.model.predict_proba(features)
        
        # Return dictionary directly instead of JSON string
        result = {
            "label": "Positive" if prediction[0] == 1 else "Negative",
            "confidence": float(confidence_scores[0][prediction[0]])  
        }
        return result 
      
      
api_key_header = APIKeyHeader(name="X-API-Key")
API_KEY = "your_secret_key"

# Pass the variable containing the APIKeyHeader
def verify_api_key(api_key: str = Depends(api_key_header)):  
    # Verify the API key
    if api_key != API_KEY:  
      	# Raise the HTTP exception here
        raise HTTPException(status_code=403, detail="Invalid API Key")
    return api_key

Overwriting model.py


In [85]:
%%writefile main.py
from model import SentimentAnalyzer, verify_api_key
from fastapi import FastAPI, Depends
from pydantic import BaseModel

app = FastAPI()

class SentimentRequest(BaseModel):
    text: str
    
@app.post("/predict")
def get_prediction(
    request: SentimentRequest,
    # Authenticate the incoming API key using verify_api_key function
    api_key: str = Depends(verify_api_key)
):
    sentiment_model = SentimentAnalyzer("sentiment_model.joblib")
    result = sentiment_model(request.text)
    return {
        "text": request.text,
        "sentiment": result,
        "status": "success"
    }
    
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Overwriting main.py


In [97]:
import requests

url = "http://localhost:8080/predict" 
headers = {"X-API-Key": "your_secret_key", "Content-Type": "application/json"}
data = {"text": "This is not a good product"}

response = requests.post(url, headers=headers, json=data)
print(response.json())

{'text': 'This is not a good product', 'sentiment': {'label': 'Negative', 'confidence': 0.6826307822443081}, 'status': 'success'}


In [98]:
import requests

url = "http://localhost:8080/predict" 
headers = {"X-API-Key": "wrong_key", "Content-Type": "application/json"}
data = {"text": "This is not a good product"}

response = requests.post(url, headers=headers, json=data)
print(response.json())

{'detail': 'Invalid API Key'}


#### Implementing rate limiter

You're building a sentiment analysis API where users can analyze texts for sentiments. To prevent abuse, you need to implement rate limiting that allows only `5` requests per minute per API key. The `RateLimiter` class is already created and you have to add the `is_rate_limited` method within the `RateLimiter` class that checks the number of requests that have been made within the 1 minute time window.

The `datetime` and `timedelta` classes from the datetime library have been pre-imported.

In [99]:
from datetime import datetime, timedelta

In [100]:
def is_rate_limited(self, api_key: str) -> bool:
    # Get current time and the timestamp for one minute ago
    now = datetime.now()
    minute_ago = now - timedelta(minutes=1)
    
    # Remove requests older than 1 minute
    self.requests[api_key] = [
        req_time for req_time in self.requests[api_key]
        if req_time > minute_ago]
    
    # Check if no. of requests exceeded the set limit
    if len(self.requests[api_key]) > self.requests_per_minute:
        return True
    self.requests[api_key].append(now)
    return False

#### Add rate limiting to endpoint

The next step in securing the API endpoint is to add rate limit logic to the previously defined `test_api_key` function that checks both the API key validity and enforces rate limiting. You need to integrate the rate limiting logic that you defined in the `RateLimiter` class and raise HTTP exceptions in case the limit is exceeded.

You will be updating the `test_api_key` function from `main.py` script here.

All the supporting code for model creation and loading is in `model.py

In [101]:
%%writefile model.py
import joblib
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from datetime import datetime, timedelta
from collections import defaultdict
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")
API_KEY = "your_secret_key"

# Model creation
def train_and_save_model():
    data = {
        "review": [
            "I love this product, it's fantastic!",
            "Really satisfied with the quality!",
            "Terrible, I hate it.",
            "Not happy with the purchase.",
            "Absolutely amazing and wonderful!",
            "Worst experience ever.",
            "I am very pleased with my purchase.",
            "Disappointed, it didn't work as expected.",
            "The best thing I've ever bought.",
            "Totally awful, will not buy again."
        ],
        "label": [1, 1, 0, 0, 1, 0, 1, 0, 1, 0]  # 1 = Positive, 0 = Negative
    }

    df = pd.DataFrame(data)

    positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
    negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    df["num_words"] = df["review"].apply(lambda x: len(x.split()))
    df["num_positive_words"] = df["review"].apply(lambda x: sum(word in x.lower() for word in positive_words))
    df["num_complaints"] = df["review"].apply(lambda x: sum(word in x.lower() for word in negative_words))

    X = df[["num_words", "num_positive_words", "num_complaints"]]
    y = df["label"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    model = LogisticRegression(solver='lbfgs')
    model.fit(X_train, y_train)

    # Save model using joblib instead of pickle
    joblib.dump(model, 'sentiment_model.joblib', compress=3)

# Define a callable class
class SentimentAnalyzer:
    def __init__(self, model_path):
        # Load the model using joblib
        train_and_save_model()
        self.model = joblib.load(model_path)
        self.positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
        self.negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    def __call__(self, text):
        num_words = len(text.split())
        num_positive_words = sum(word in text.lower() for word in self.positive_words)
        num_complaints = sum(word in text.lower() for word in self.negative_words)
        features = [[num_words, num_positive_words, num_complaints]]
        
        # Get prediction and confidence score
        prediction = self.model.predict(features)
        confidence_scores = self.model.predict_proba(features)
        
        # Return dictionary directly instead of JSON string
        result = {
            "label": "Positive" if prediction[0] == 1 else "Negative",
            "confidence": float(confidence_scores[0][prediction[0]])  
        }
        return result 

class RateLimiter:
    def __init__(self, requests_per_minute: int = 10):
        self.requests_per_minute = requests_per_minute
        self.requests = defaultdict(list)  # Store request timestamps per API key

    def is_rate_limited(self, api_key: str) -> tuple[bool, int]:
        """
        Check if the request should be rate limited
        Returns (is_limited, requests_remaining)
        """
        now = datetime.now()
        minute_ago = now - timedelta(minutes=1)
        
        # Remove requests older than 1 minute
        self.requests[api_key] = [
            req_time for req_time in self.requests[api_key]
            if req_time > minute_ago
        ]
        
        # Check if rate limit is exceeded
        recent_requests = len(self.requests[api_key])
        if recent_requests >= self.requests_per_minute:
            return True, 0
            
        # Add new request timestamp
        self.requests[api_key].append(now)
        return False, self.requests_per_minute - recent_requests - 1


Overwriting model.py


In [102]:
%%writefile main.py
from model import SentimentAnalyzer, RateLimiter, API_KEY, api_key_header
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel

app = FastAPI()
rate_limiter = RateLimiter(requests_per_minute=2)

def test_api_key(api_key: str = Depends(api_key_header)):
    if api_key != API_KEY:
        raise HTTPException(
            status_code=403, detail="Invalid API key")
    
    # Check rate limit corresponding to the input api key
    is_limited, _ = rate_limiter.is_rate_limited(API_KEY)
    # Check if returned boolean by is_rate_limited is True
    if is_limited:
        # Raise the http exception with status code
        raise HTTPException(status_code=429,
            detail="Rate limit exceeded. Please try again later."
        )
    return api_key

class SentimentRequest(BaseModel):
    text: str

@app.post("/predict")
def get_prediction(
    request: SentimentRequest,
    api_key: str = Depends(test_api_key)
):
    sentiment_model = SentimentAnalyzer("sentiment_model.joblib")
    result = sentiment_model(request.text)
    return {
        "text": request.text,
        "sentiment": result,
        "status": "success"
    }
    
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Overwriting main.py


Curl it three times to see we exceeded the rate limit.

In [109]:
!curl -X POST  http://localhost:8080/predict -H "X-API-Key: your_secret_key" -H "Content-Type: application/json"  -d '{"text": "This is not a good product"}'

{"detail":"Rate limit exceeded. Please try again later."}

In [122]:
import requests

url = "http://localhost:8080/predict"
headers = { "X-API-Key": "your_secret_key",
            "Content-Type": "application/json"}
data = {"text": "This is not a good product"}

response = requests.post(url, headers=headers, json=data)
print(response.json())

{'text': 'This is not a good product', 'sentiment': {'label': 'Negative', 'confidence': 0.6826307822443081}, 'status': 'success'}


### Section 3.3 - Asynchonous processing

#### Create an async sentiment analysis endpoint

You're building a social media analytics platform that needs to analyze reviews for sentiment. To handle high traffic efficiently, you need to implement an `async` endpoint. The sentiment analysis model is already loaded and available as sentiment_model.

In [123]:
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Review(BaseModel):
    text: str

# Create async endpoint at /analyze route
@app.post("/analyze")
# Write an asynchronous function to process review's text
async def analyze_review(review: Review):
    # Run the model in a separate thread to avoid any event loop blockage
    result = await asyncio.to_thread(sentiment_model, review.text)
    return {"sentiment": result[0]["label"]}

#### Implementing background tasks

Your sentiment analysis API is getting requests to process batch of hundreds of reviews at once. To handle this efficiently without making users wait, you'll implement background task processing so that requests are being processed after sending a response to the client.

In [125]:
# Import the background task class
from fastapi import BackgroundTasks
# Create a background task dependency
@app.post("/analyze_batch")
async def analyze_batch(
    reviews: Review,
    background_tasks: BackgroundTasks
):
    async def process_reviews(texts: List[str]):
        for text in texts:
            result = await asyncio.to_thread(sentiment_model, text)
            print(f"Processed: {result[0]['label']}")
    # Add the task of analysing reviews' texts to the background
    background_tasks.add_task(process_reviews, reviews.texts)
    return {"message": "Processing started"}

#### Handling timeout errors

To make the review analysis system more robust and user-friendly, you are now required to add error handling to the analyze_review endpoint. You need to handle timeouts and internal server errors if the model couldn't process the text in time or if there was an error while making the predictions.

To simulate this test, a delay of 10 seconds has been added in the model prediction process in `model.py`.

In [126]:
%%writefile model.py
import asyncio
import joblib
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

# Model creation
def train_and_save_model():
    data = {
        "review": [
            "I love this product, it's fantastic!",
            "Really satisfied with the quality!",
            "Terrible, I hate it.",
            "Not happy with the purchase.",
            "Absolutely amazing and wonderful!",
            "Worst experience ever.",
            "I am very pleased with my purchase.",
            "Disappointed, it didn't work as expected.",
            "The best thing I've ever bought.",
            "Totally awful, will not buy again."
        ],
        "label": [1, 1, 0, 0, 1, 0, 1, 0, 1, 0]  # 1 = Positive, 0 = Negative
    }

    df = pd.DataFrame(data)

    positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
    negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    df["num_words"] = df["review"].apply(lambda x: len(x.split()))
    df["num_positive_words"] = df["review"].apply(lambda x: sum(word in x.lower() for word in positive_words))
    df["num_complaints"] = df["review"].apply(lambda x: sum(word in x.lower() for word in negative_words))

    X = df[["num_words", "num_positive_words", "num_complaints"]]
    y = df["label"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    model = LogisticRegression(solver='lbfgs')
    model.fit(X_train, y_train)

    # Save model using joblib instead of pickle
    joblib.dump(model, 'sentiment_model.joblib', compress=3)

# Define a callable class
class SentimentAnalyzer:
    def __init__(self):
        # Load the model using joblib
        train_and_save_model()
        self.model = joblib.load("sentiment_model.joblib")
        self.positive_words = ["love", "satisfied", "amazing", "fantastic", "wonderful", "pleased", "best"]
        self.negative_words = ["hate", "terrible", "worst", "disappointed", "awful"]

    async def __call__(self, text):
        # Simulate a long-running operation
        await asyncio.sleep(11)  # This will trigger the timeout since it's > 10 seconds
        
        num_words = len(text.split())
        num_positive_words = sum(word in text.lower() for word in self.positive_words)
        num_complaints = sum(word in text.lower() for word in self.negative_words)
        features = [[num_words, num_positive_words, num_complaints]]
        
        # Get prediction and confidence score
        prediction = self.model.predict(features)
        confidence_scores = self.model.predict_proba(features)
        
        # Return dictionary directly instead of JSON string
        result = {
            "label": "Positive" if prediction[0] == 1 else "Negative",
            "confidence": float(confidence_scores[0][prediction[0]])
        }
        return result

Overwriting model.py


In [134]:
%%writefile main.py
import asyncio
from fastapi import FastAPI, HTTPException
from model import SentimentAnalyzer
from pydantic import BaseModel

app = FastAPI()

class Review(BaseModel):
    text: str

@app.post("/analyze_reviews")
async def analyze_reviews(review: Review):
    try:
        sentiment_model = SentimentAnalyzer()
        # Set model input and timeout limit
        result = await asyncio.wait_for(
            sentiment_model(review.text),
            timeout=10
        )
        return {"sentiment": result["label"]}      
    except asyncio.TimeoutError:
        # Raise HTTP status code for timeout error
        raise HTTPException(status_code=408, detail="Analysis timed out")
    except Exception:
        # Raise HTTP status code for internal error
        raise HTTPException(status_code=500, detail="Analysis failed")
        
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Overwriting main.py


You can set the sleep in the code shorter for a succesful response

In [133]:
!curl -X POST http://localhost:8080/analyze_reviews -H "Content-Type: application/json" -d '{"text": "This is a test"}'

{"sentiment":"Negative"}

In [137]:
import requests

url = "http://localhost:8080/analyze_reviews"
headers = { "Content-Type": "application/json"}
data = {"text": "This is a test"}

response = requests.post(url, headers=headers, json=data)
print(response.json())

{'sentiment': 'Negative'}


## Chapter 3 - API Versioning, Monitoring and logging

### Section 3.1 - API versioning and documentation