<a href="https://colab.research.google.com/github/jairathnishant/MLOps_flask_CI-CD/blob/main/MLOps_Pipieline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Problem Statement:


## You are tasked with building a Wealth Management/Investment prediction system. This involves generating synthetic data, training a machine learning model, deploying the model using an API, and ensuring automated model retraining when performance falls below a threshold.





## Steps:
### **1. Generating the Dataset (Wealth Management/Investment dataset)**
You will generate a synthetic dataset related to wealth management or investment portfolios. The dataset should include features such as:

* **investment_amount**: The amount invested in a portfolio.
* **years_of_investment**: Duration the money has been invested.
* **risk_tolerance**: Risk appetite of the investor (Low, Medium, High).
* **age**: The age of the investor.
* **annual_income**: The investor’s annual income.
* **portfolio_type**: Type of portfolio (Stocks, Bonds, Mixed).
* **Target Variable**: A binary classification task to predict whether the investment will be **profitable** (1) or **not profitable** (0).

### **2. Data Cleaning, Feature Extraction, etc.**
* Perform any necessary cleaning, such as handling missing values or outliers.
* Convert categorical features like portfolio_type and risk_tolerance into numerical values using one-hot encoding.
* Extract relevant features for model training.

### **3. Training a Model**

* Train a Random Forest Classifier or any other suitable model to predict whether an investment will be profitable.

* Split the dataset into training and testing sets, evaluate model performance, and save the trained model for deployment.


### **4. Create Flask API or FastAPI for Predictions**
* Build a prediction API (using Flask or FastAPI) to serve predictions. The API should accept a JSON input and return a prediction (profitable/not profitable).
* Example input:
```
{
"investment_amount": 50000,
"years_of_investment": 5,
"risk_tolerance": "Medium",
"age": 45,
"annual_income": 120000,
"portfolio_type": "Stocks"
}
```

### **5. Set Up Automated Retraining Based on Performance**
* Monitor the model’s accuracy and set up an automated retraining mechanism when accuracy drops below a specified threshold (e.g., 80%). Retraining should use the same dataset for simplicity but allow for new data to be incorporated.
* Ensure that the model is saved and reloaded during the retraining process.

### **6. Logging Requests and Responses**
* Log each prediction request and response, including the input features, prediction, and the time taken to process the request.

### **7. Running the Notebook (Prediction via the API)**
* Ensure the notebook is runnable, allowing you to test predictions and see the entire workflow from dataset generation to API deployment and predictions.

# Generating the Dataset (Wealth Management/Investment dataset) - 1000 datapoints

In [1]:
import pandas as pd
import numpy as np

# Function to create a synthetic wealth management dataset
def create_wealth_management_dataset(n_samples=1000):
    np.random.seed(42)
    data = {
        'investment_amount': np.random.randint(1000, 100000, n_samples),
        'years_of_investment': np.random.randint(1, 30, n_samples),
        'risk_tolerance': np.random.choice(['Low', 'Medium', 'High'], n_samples),
        'age': np.random.randint(25, 70, n_samples),
        'annual_income': np.random.randint(30000, 200000, n_samples),
        'portfolio_type': np.random.choice(['Stocks', 'Bonds', 'Mixed'], n_samples)
    }
    df = pd.DataFrame(data)

    # Generate a target variable (1: profitable, 0: not profitable)
    df['profitable'] = np.random.randint(0, 2, n_samples)
    return df

# Create the dataset
df = create_wealth_management_dataset()
df.head()

Unnamed: 0,investment_amount,years_of_investment,risk_tolerance,age,annual_income,portfolio_type,profitable
0,16795,1,High,28,166865,Stocks,0
1,1860,21,Medium,64,166643,Stocks,1
2,77820,22,High,59,177281,Mixed,0
3,55886,21,Medium,56,143550,Bonds,1
4,7265,6,Medium,66,121218,Bonds,0


In [2]:
# Train test split

from sklearn.model_selection import train_test_split

def split_df(df):

    X = df.drop('profitable', axis = 1)
    y = df['profitable']

    return train_test_split(X, y, test_size = 0.2, random_state = 42)

X_train, X_test, y_train, y_test = split_df(df)

In [3]:
print(X_train.shape, y_train.shape)

(800, 6) (800,)


# Data Cleaning and Preprocessing

In [4]:
from re import X
# Label encoding

def label_encode(df):
    df['risk_tolerance'] = df['risk_tolerance'].map({'Low': 0, 'Medium': 1, 'High': 2})

label_encode(X_train)
label_encode(X_test)

In [5]:
X_train.head()

Unnamed: 0,investment_amount,years_of_investment,risk_tolerance,age,annual_income,portfolio_type
29,3747,17,0,55,135426,Mixed
535,60275,21,1,41,114526,Mixed
695,10435,10,1,25,122708,Mixed
557,95297,5,0,57,92120,Bonds
836,38131,15,0,33,101696,Mixed


In [6]:
#  One hot encoding
from sklearn.preprocessing import OneHotEncoder

enc = OneHotEncoder(handle_unknown = 'ignore', drop = 'first')

def one_hot_encode(df_train, df_test):
    enc_train = pd.DataFrame(enc.fit_transform(df_train[['portfolio_type']]).toarray())

    # print(enc_train)
    enc_test = pd.DataFrame(enc.transform(df_test[['portfolio_type']]).toarray())
    enc_train.columns = enc_test.columns = enc.get_feature_names_out()

    df_enc_train = pd.concat([df_train.reset_index(drop=True), enc_train], axis = 1)
    df_enc_test = pd.concat([df_test.reset_index(drop=True), enc_test], axis = 1)

    df_enc_train.drop('portfolio_type', axis = 1, inplace = True)
    df_enc_test.drop('portfolio_type', axis = 1, inplace = True)
    return df_enc_train, df_enc_test

X_train_enc, X_test_enc = one_hot_encode(X_train, X_test)

# Training the Model

In [7]:
from  sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [8]:
rfmodel = RandomForestClassifier()
param_dist = {'n_estimators': randint(50, 500),
              'max_depth': randint(1, 20)}

rand_search = RandomizedSearchCV(rfmodel,
                                 param_distributions = param_dist,
                                 n_iter = 5,
                                 cv = 5)

In [9]:
X_train_enc.shape

(800, 7)

In [10]:
X_train_enc

Unnamed: 0,investment_amount,years_of_investment,risk_tolerance,age,annual_income,portfolio_type_Mixed,portfolio_type_Stocks
0,3747,17,0,55,135426,1.0,0.0
1,60275,21,1,41,114526,1.0,0.0
2,10435,10,1,25,122708,1.0,0.0
3,95297,5,0,57,92120,0.0,0.0
4,38131,15,0,33,101696,1.0,0.0
...,...,...,...,...,...,...,...
795,40504,28,2,69,193317,1.0,0.0
796,86981,1,0,29,181094,0.0,0.0
797,74553,13,2,40,159048,0.0,0.0
798,4051,1,1,43,43689,1.0,0.0


In [11]:
def rfmodel_best_train(X_train, y_train):
    rand_search.fit(X_train, y_train)
    return rand_search.best_estimator_

best_rf = rfmodel_best_train(X_train_enc, y_train)

In [12]:
def rfmodel_prediction(X_test, best_rf):
    return best_rf.predict(X_test)

y_pred = rfmodel_prediction(X_test_enc, best_rf)

In [13]:
def model_performance(y_test, y_pred):

    print(f'Accuracy: {accuracy_score(y_test, y_pred)}')
    print(f'Precision: {precision_score(y_test, y_pred)}')
    print(f'Recall: {recall_score(y_test, y_pred)}')

model_performance(y_test, y_pred)

Accuracy: 0.535
Precision: 0.5227272727272727
Recall: 0.4742268041237113


In [14]:
import pickle

model = best_rf

# Load the model from the file
with open('rf_model.pkl', 'wb') as file:
    pickle.dump(model, file)

# Create Flask API or FastAPI for Predictions
### Make sure to run the Flask API in background so that other cells can execute after starting the servver.

You can use Python's threading module to run the Flask app as a background task. This way, the Flask server runs in the background without blocking the execution of other cells.

In [15]:
# !pip install flask

In [16]:
!pip install flask-ngrok
!pip install pyngrok==4.1.1
!ngrok authtoken '2pIfPXSsJMTXA30Gkq5MJOfV90w_63VrqXDUG48hJhxS8RHQr'

Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


In [17]:
# from google.colab.output import eval_js
# print(eval_js("google.colab.kernel.proxyPort(5000)"))

In [18]:
# Build a prediction API (using FastAPI) to serve predictions. The API should accept a JSON input and return a prediction (profitable/not profitable)

import pickle
from flask import Flask, request, jsonify
import threading
import joblib
from flask_ngrok import run_with_ngrok

In [19]:

app = Flask(__name__)
run_with_ngrok(app)


# Load pre-trained model
try:
    model = joblib.load("rf_model.pkl")
except FileNotFoundError:
    model = None
    print("Model file not found. Please make sure it exists.")

# Define route for prediction
@app.route('/predict', methods=['POST'])
def predict():

    #Ensure the model is loaded
    if model is None:
        return jsonify({'error': 'Model not loaded'})

    # Get input data from request
    data = request.get_json()
    if not data:
        return jsonify({'error': 'No input data provided'})

    # Parse features from JSON (customize for input format)
    try:
        investment_amount = int(data.get('investment_amount'))
        years_of_investment = int(data.get('years_of_investment'))
        risk_tolerance = data.get('risk_tolerance')
        age = int(data.get('age'))
        annual_income = float(data.get('annual_income'))
        portfolio_type_Mixed = data.get('portfolio_type_Mixed')
        portfolio_type_Stocks = data.get('portfolio_type_Stocks')

    except (ValueError, TypeError):
        return jsonify({'error': 'Invalid input data'})

    # Make prediction
    prediction = model.predict([[investment_amount, years_of_investment, risk_tolerance, age, annual_income, portfolio_type_Mixed, portfolio_type_Stocks]])
    result = 'profitable' if prediction[0] == 1 else 'not profitable'

    # Return prediction as JSON
    return jsonify({'prediction': result})

In [20]:
# Run the application

import threading

def run_app():
    app.run()

if __name__ == "__main__":
    flask_thread = threading.Thread(target=run_app)
    flask_thread.start()

 * Serving Flask app '__main__'


!curl -X POST -H "Content-Type: application/json" -d '{"investment_amount": 50000, "years_of_investment": 5, "risk_tolerance": 0, "age": 45, "annual_income": 120000, "portfolio_type_Mixed": 0,"portfolio_type_Stocks": 1}' http://127.0.0.1:5000/predict

In [21]:
!curl -X POST -H "Content-Type: application/json" -d '{"investment_amount": 50000, "years_of_investment": 5, "risk_tolerance": 0, "age": 45, "annual_income": 120000, "portfolio_type_Mixed": 0,"portfolio_type_Stocks": 1}' http://127.0.0.1:5000/predict

 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug:127.0.0.1 - - [24/Nov/2024 16:11:00] "POST /predict HTTP/1.1" 200 -


{"prediction":"not profitable"}


# Set Up Automated Retraining Based on Performance

In [None]:
# Sourcing data for model performance testing

new_data = create_wealth_management_dataset()

# splitting the data into train and test
X_train_new, X_test_new, y_train_new, y_test_new = split_df(new_data)

# Label encoding the categorical feature in both train and test
label_encode(X_train_new)
label_encode(X_test_new)

# One hot encdoing the non-ordinal categorical feature
X_train_enc_new, X_test_enc_new = one_hot_encode(X_train_new, X_test_new)

In [None]:
# Evaluate model performance on test data

def evaluate_model(X_test, y_test, model):

    if model is None:
        return None

    y_pred = model.predict(X_test)
    return accuracy_score(y_test, y_pred)

In [21]:
# Retrain model

def retrain_model(X_train, y_train):

    global model
    if X_train is None:
        print("No training data available")
        return
    print("Retraining the model...")

    new_model = RandomForestClassifier()
    new_model.fit(X_train, y_train)
    joblib.dump(new_model, "rf_model.pkl")
    model = new_model
    print("Model retrained successfully")

In [None]:
# Kick off retraining

def monitor_and_retrain():

    while True:
        test_accuracy = evaluate_model(X_test_enc_new, y_test_new, model)
        # print(f"Test Accuracy: {test_accuracy}")
        if test_accuracy < 0.8:
            retrain_model(X_train_enc_new, y_train_new)

In [None]:
# Start monitoring in a separate thread

monitor_thread = THread(target = monitor_and_retrain, daemon = True)
monitor_thread.start()

# Logging Requests and Responses

# Running the Notebook (Prediction)
### Use Python’s requests library or a tool like Postman to send requests to the API for prediction.

# Stopping the Flask Thread (Optional)

In [22]:
# Optionally, stop the Flask server when done

flask_thread.join(timeout=1)