In [6]:
import sys
import os
import json
import pickle
import pandas as pd
import numpy as np

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from importlib import reload

from sklearn.metrics import roc_auc_score,  accuracy_score, f1_score, precision_score, recall_score, auc
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'dags', 'src'))

import config

reload(config)

engineered_vars = {
    "categorical": ["application_year", "application_month", "application_week", "application_day", "application_season"],
    "numerical": ["current_credit_balance_ratio"],
    "date": ["application_date"]
}


job_id = "04782775f4d4426f8b5256546c1e2960"


filename = f"../dags/data/collected/{job_id}.csv"

In [7]:
## helpers.py methods
import pandas as pd

def check_dataset_sanity(df: pd.DataFrame) -> bool:
    """
    Checks the sanity of a dataset by identifying null values in a pandas DataFrame.

    Parameters:
    df (pd.DataFrame): The input DataFrame to be checked for null values.

    Returns:
    bool: True if the dataset is considered sane (no null values), False otherwise.
    
    Raises:
    Exception: If there are null values in the DataFrame, an exception is raised with the column names containing null values.
    """
    # Check for null values in the DataFrame
    nulls = df.isnull().sum()
    
    # Get the column names with null values
    null_columns = nulls[nulls > 0].index.tolist()
    
    # If there are no null values, return True
    if len(null_columns) == 0:
        return True
    else:
        # If there are null values, raise an exception with the column names
        raise Exception(f"There are null values in the training dataset: {null_columns}")


def persist_deploy_report(job_id: str, model_name: str) -> None:
    """
    Persist the deploy report of a job.

    Parameters:
    job_id (str): The ID of the job.
    model_name (str): The name of the model.

    Returns:
    None
    """
    # Create a dictionary representing the deploy report
    report = {
        "job_id": job_id,
        "purpose_to_int": f"{job_id}_purpose_to_int_model.json",
        "missing_values": f"{job_id}_missing_values_model.pkl",
        "prediction_model": f"{model_name}.pkl",
        "train_report": f"{job_id}_train_report.json",
    }
    
    # Save the deploy report as a JSON file
    json.dump(report, open(os.path.join(config.PATH_DIR_MODELS, "deploy_report.json"), "w"))
    
    # Print the path where the deployment report is saved
    print(f'[INFO] Deployment report saved as {os.path.join(config.PATH_DIR_MODELS, "deploy_report.json")}')


    
def save_dataset(df: pd.DataFrame, path: str) -> None:
    """
    Save a dataset.

    Args:
        df: The DataFrame to be saved.
        path: The path where the dataset should be saved.

    Returns:
        None
    """
    df.to_csv(path, index=False)
    print(f"[INFO] Dataset saved to {path}")

def load_dataset(path: str) -> pd.DataFrame:
    """
    Load a dataset.

    Args:
        path: The path of the dataset to be loaded.

    Returns:
        The loaded DataFrame.
    """
    return pd.read_csv(path)


def save_model_as_pickle(model, model_name, directory=None) -> None:
    """
    Save a model as a pickle file.

    Args:
        model: The model object to be saved.
        model_name: The name of the model.
        directory: The directory where the pickle file should be saved. 
                   If not provided, the default models directory will be used.

    Returns:
        None
    """
    if directory:
        filename = os.path.join(directory, model_name + ".pkl")
    else:
        filename = os.path.join(config.PATH_DIR_MODELS, model_name + ".pkl")
    with open(filename, "wb") as f:
        pickle.dump(model, f)
    
    print(f"[INFO] Model saved as pickle file: {filename}")
    
    
def load_model_from_pickle(model_name: str):
    """
    Load a pickle model.

    Args:
        model_name: The name of the model to be loaded.

    Returns:
        The loaded model object.
    """
    with open(os.path.join(config.PATH_DIR_MODELS, model_name + ".pkl"), "rb") as f:
        print(f"[INFO] Model loaded: {model_name}")
        return pickle.load(f)



def save_model_as_json(model:dict, model_name:str, directory:str=None):
    """
    Save a model as a json file.

    Args:
        model: The model object to be saved.
        model_name: The name of the model.
        directory: The directory where the pickle file should be saved. 
                   If not provided, the default models directory will be used.

    Returns:
        None
    """
    
    if directory:
        filename = os.path.join(directory, model_name+".json")
    else:
        filename = os.path.join(config.PATH_DIR_MODELS, model_name+".json")
    with open(filename, "w") as f:
        json.dump(model, f)
        
    print("[INFO] Model saved as json file:", filename)

def load_model_from_json(model_name: str) -> dict:
    """
    Load a json model.

    Args:
        model_name: The name of the model to be loaded.

    Returns:
        dict.
    """
    with open(os.path.join(config.PATH_DIR_MODELS, model_name+".json"), "r") as f:
        print(f"[INFO] Model loaded: {model_name}")
        return json.load(f)

In [8]:
## train.py methods 

def performance_report(y_true, y_pred, y_prob):
    """
    Generate performance report for a model.

    Parameters:
    y_true (np.array): An array containing the true values.
    y_pred (np.array): An array containing the predicted values.
    y_prob (np.array): An array containing the prediction probabilities.

    Returns:
    dict: A dictionary containing various performance metrics.
    """
    # Create an empty dictionary to store the performance report
    report = dict()

    # Calculate and store the dataset size
    report["dataset size"] = y_true.shape[0]

    # Calculate and store the positive rate
    report["positive rate"] = y_true.sum() / y_true.shape[0]

    # Calculate and store the accuracy
    report["accuracy"] = accuracy_score(y_true, y_pred)

    # Calculate and store the F1 score
    report["f1"] = f1_score(y_true, y_pred)

    # Calculate and store the precision
    report["precision"] = precision_score(y_true, y_pred)

    # Calculate and store the recall
    report["recall"] = recall_score(y_true, y_pred)

    # Calculate and store the AUC score
    report["auc"] = roc_auc_score(y_true, y_prob)

    # Return the performance report
    return report


def select_model(df: pd.DataFrame, metric: str = config.MODEL_PERFORMANCE_METRIC, model_names: list = ["rf", "gb"], performance_thresh: float = config.MODEL_PERFORMANCE_THRESHOLD, degradation_thresh: float = config.MODEL_DEGRADATION_THRESHOLD) -> str:
    """
    Select the best model based on their performance reports.

    Parameters:
    df (pd.DataFrame): The performance report DataFrame.
    metric (str): The metric to select the best model (default: config.MODEL_PERFORMANCE_METRIC).
    model_names (list): The list of model names to select from (default: ["rf", "gb"]).
    performance_thresh (float): The threshold for the performance (default: config.MODEL_PERFORMANCE_THRESHOLD).
    degradation_thresh (float): The threshold for degradation (default: config.MODEL_DEGRADATION_THRESHOLD).

    Returns:
    str: The name of the selected model.

    Raises:
    Exception: If no model is selected due to all models having performance below the threshold.
    """
    # Create an empty list to store model degradation performance
    degradation_performance = []

    # Iterate over each model
    for model in model_names:
        # Check if the model's performance is below the performance threshold
        if df.loc[metric, f"{model}_train"] < performance_thresh:
            continue

        # Calculate the degradation
        degradation = df.loc[metric, f"{model}_train"] - df.loc[metric, f"{model}_test"]

        # Check if the degradation is below the degradation threshold
        if degradation < degradation_thresh:
            degradation_performance.append((model, degradation))

    # Check if any model meets the selection criteria
    if len(degradation_performance) == 0:
        raise Exception("No model selected: all models have performance below the threshold. Possible overfitting.")

    # Return the model with the minimum degradation
    return min(degradation_performance, key=lambda x: x[1])[0]


def train(train_dataset_filename:str=None, test_dataset_filename:str=None, job_id="", rescale=False):
    """
    Train a model on the train dataset loaded from `train_dataset_filename` and test dataset loaded from `test_dataset_filename`.

    Parameters:
    train_dataset_filename (str): The filename of the train dataset (default: None).
    test_dataset_filename (str): The filename of the test dataset (default: None).
    job_id (str): The job ID (default: "").
    rescale (bool): If True, scaled numerical variables are used (default: False).

    Returns:
        dict
    """
    if train_dataset_filename==None:
        train_dataset_filename = os.path.join(config.PATH_DIR_DATA, "preprocessed", f"{job_id}_training.csv")
    if test_dataset_filename==None:
        test_dataset_filename = os.path.join(config.PATH_DIR_DATA, "preprocessed", f"{job_id}_inference.csv")
    tdf = load_dataset(train_dataset_filename)
    vdf = load_dataset(test_dataset_filename)
    check_dataset_sanity(tdf)
    check_dataset_sanity(vdf)
    
    predictors = config.PREDICTORS
    target = config.TARGET
    if rescale:
        for col in predictors:
            if f"{config.RESCALE_METHOD}_{col}" in tdf.columns:
                tdf[col] = tdf[f"{config.RESCALE_METHOD}_{col}"]
            if f"{config.RESCALE_METHOD}_{col}" in vdf.columns:
                vdf[col] = vdf[f"{config.RESCALE_METHOD}_{col}"]
        
    rf = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=config.RANDOM_SEED)
    gb = GradientBoostingClassifier(n_estimators=100, max_depth=10, random_state=config.RANDOM_SEED)
    X, Y = tdf[predictors], tdf[target]
    report = dict()
    models = dict()
    for cl, name in [(rf, "rf"), (gb, "gb")]:
        print("[INFO] Training model:", name)
        cl.fit(X, Y)
        t_pred = cl.predict(X)
        v_pred = cl.predict(vdf[predictors])
        t_prob = cl.predict_proba(X)[:, 1]
        v_prob = cl.predict_proba(vdf[predictors])[:, 1]
        report[f"{name}_train"] = performance_report(Y, t_pred, t_prob)
        report[f"{name}_test"] = performance_report(vdf[target], v_pred, v_prob)
        models[name] = cl
        
    model_name = select_model(pd.DataFrame(report), metric=config.MODEL_PERFORMANCE_METRIC, model_names=list(models.keys()))
    report["final_model"] = model_name
    save_model_as_pickle(models[model_name], f"{job_id}_{model_name}")
    save_model_as_json(report, f"{job_id}_train_report")
    return report


def pick_model_and_deploy(job_id, models, df, metric="auc", predictors=config.PREDICTORS, target=config.TARGET) -> str:
    """
    Among all `models`, select the model that performs best on `df` and mark it for deployment.

    Parameters:
    job_id (str): The ID of the job.
    models (list): A list of dictionaries representing the models.
    df (pd.DataFrame): The DataFrame on which the models are evaluated.
    metric (str): The metric used to evaluate the models (default: "auc").
    predictors (list): The list of predictor variables (default: config.PREDICTORS).
    target (str): The target variable (default: config.TARGET).

    Returns:
    str: The name of the selected model for deployment.
    """
    # Check if the columns in `predictors` are present in `df`
    cols = set(predictors).difference(set(df.columns))
    assert cols == set(), f"{cols} not in {df.columns}"

    # Initialize variables for tracking the best model
    score = 0
    m_idx = 0

    # Iterate over the models and evaluate their performance on `df`
    for i, model in enumerate(models):
        y_true = df[target]
        y_pred = model["model"].predict(df[predictors])
        y_prob = model["model"].predict_proba(df[predictors])[:, 1]
        r = performance_report(y_true, y_pred, y_prob)
        if r[metric] > score:
            score = r[metric]
            m_idx = i

    # Persist the deploy report for the selected model
    persist_deploy_report(job_id, models[m_idx]["model_name"])

    # Return the name of the selected model for deployment
    return models[m_idx]["model_name"]


In [9]:
report = train(job_id=job_id)
print(report)

[INFO] Training model: rf
[INFO] Training model: gb
[INFO] Model saved as pickle file: ../dags/models\04782775f4d4426f8b5256546c1e2960_rf.pkl
[INFO] Model saved as json file: ../dags/models\04782775f4d4426f8b5256546c1e2960_train_report.json
{'rf_train': {'dataset size': 15264, 'positive rate': 0.7576650943396226, 'accuracy': 0.8986504192872118, 'f1': 0.9362140766090793, 'precision': 0.894782471626734, 'recall': 0.9816688283614353, 'auc': 0.9669825347451965}, 'rf_test': {'dataset size': 6688, 'positive rate': 0.7764653110047847, 'accuracy': 0.9020633971291866, 'f1': 0.9394471664971803, 'precision': 0.9034495021337127, 'recall': 0.9784325052955902, 'auc': 0.9377489764649738}, 'gb_train': {'dataset size': 15264, 'positive rate': 0.7576650943396226, 'accuracy': 0.9962657232704403, 'f1': 0.9975417259671367, 'precision': 0.9950955085183273, 'recall': 1.0, 'auc': 0.9999966104813035}, 'gb_test': {'dataset size': 6688, 'positive rate': 0.7764653110047847, 'accuracy': 0.8920454545454546, 'f1': 0

In [10]:
model = pick_model_and_deploy(
    job_id=job_id,
    df = pd.read_csv(f"../dags/data/preprocessed/{job_id}_inference.csv"),
    models = [{
        "model_name": f"{job_id}_{report['final_model']}", 
        "model": pickle.load(open(f"../dags/models/{job_id}_rf.pkl", "rb"))
    }]
)
print("Deployed model:", model)

[INFO] Deployment report saved as ../dags/models\deploy_report.json
Deployed model: 04782775f4d4426f8b5256546c1e2960_rf
