In [None]:
import mlflow
import mlflow.xgboost
import mlflow.pyfunc

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, hour, to_timestamp, lit, udf
from pyspark.sql.types import StringType

from xgboost import XGBClassifier

from sklearn.metrics import log_loss
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.calibration import CalibratedClassifierCV

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK, SparkTrials

import joblib
import logging
import sys
import json
import traceback
from typing import List, Dict, Any, Tuple
import yaml
import time
from datetime import datetime, timedelta


# Import libraries
%run reference
print(ENV_VARS, MODELS_NAME)

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

RUN_ID = dbutils.jobs.taskValues.get(taskKey="data_ingestion_task", key="run_id")
print(f"RUN_ID: {RUN_ID}")

In [None]:
def initialize_spark():
    try:
        spark = SparkSession.builder \
            .appName("ClickModel") \
            .config("spark.executor.memory", "4g") \
            .config("spark.driver.memory", "4g") \
            .getOrCreate()
        return spark
    except Exception as e:
        logger.error(f"Failed to initialize Spark session: {str(e)}")
        raise


def get_current_user_email():
    try:
        return dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
    except Exception as e:
        logger.error(f"Failed to get user email: {str(e)}")
        return "unknown_user@fluentco.com"


def get_params_from_experiment(experiment_name):
    try:
        # Get experiment ID
        experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id

        # Get runs for the experiment, sorted by start time in descending order
        runs_ = mlflow.search_runs(experiment_ids=[experiment_id])

        runs = runs_.sort_values(by='end_time', ascending=False).reset_index().drop('index', axis=1)

        # Access log parameter values of the latest run
        if runs.shape[0] > 0:
            hyper_params_ = runs.iloc[0][f'params.{MODELS_NAME["MODEL1"]}_hyperparams']
            hyper_params = eval(hyper_params_)
        else:
            print("No runs found.")

    except:
        hyper_params = None

    return hyper_params


def load_data(spark):
    try:
        query = """
        SELECT * 
        FROM centraldata_{}.datascience_model_deployment.adflow_click_training_data_version_2
        WHERE timestamp > date_add(DAY, -22, now())
        """.format(ENV_VARS['ENV'])
        df_spark = spark.sql(query)
        return df_spark
    except Exception as e:
        logger.error(f"Failed to load data: {str(e)}")
        raise


def test_train_split(df_spark, train_test_ratio=0.8):
    try:
        train_test_splits = df_spark.randomSplit([train_test_ratio, 1 - train_test_ratio], seed=1234)
        train_data = train_test_splits[0]
        test_data = train_test_splits[1]
        return train_data.toPandas(), test_data.toPandas()

    except Exception as e:
        logger.error(f"Error in train-test split: {str(e)}")
        raise


def get_encodings(df, features):
    le_dict = {}
    for col in features:
        if col in df.columns:
            le_dict[col] = df[col].astype("category").cat.categories
    return le_dict


def encode_features(df, le_dict):
    for col in le_dict.keys():
        if col in df.columns:
            df[col] = df[col].astype(CategoricalDtype(categories=le_dict[col]))
        else:
            logger.warning(f"Column {col} not found in DataFrame")
    return df


def apply_feature_types(df: pd.DataFrame, features_types: Dict[str, type]) -> pd.DataFrame:
    for feature, dtype in features_types.items():
        if dtype == str:
            df[feature] = df[feature].astype('category')  # Ensure it's retained as a categorical type
        else:
            df[feature] = df[feature].astype(dtype)
    return df


def convert_integers_with_missing_to_float(df: pd.DataFrame, features_types: Dict[str, type]) -> pd.DataFrame:
    for feature, dtype in features_types.items():
        if pd.api.types.is_integer_dtype(df[feature]):
            # Check if the column is an integer and has null values
            if df[feature].isnull().any():
                df[feature] = df[feature].astype('float64')
    return df


def clean_os(os_name: str) -> str:
    if os_name in ["Android", "iOS", "Windows", "Mac"]:
        return os_name
    return "Other"


def clean_gender(gender: str) -> str:
    gender = gender.lower()
    if gender in ["f", "female"]:
        return "F"
    if gender in ["m", "male"]:
        return "M"
    return None


def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    df["os_name"] = df["os_name"].apply(clean_os)
    df["gender"] = df["gender"].apply(clean_gender)
    return df


def create_robust_signature(X_test: pd.DataFrame, model: XGBClassifier, n_examples: int = 5) -> tuple:
    try:
        input_example = X_test.sample(n=n_examples, random_state=42).copy()
        for col in input_example.columns:
            input_example.loc[input_example.index[0], col] = np.nan
        
        signature = mlflow.models.infer_signature(X_test, model.predict_proba(X_test))
        return signature, input_example
    except Exception as e:
        logger.error(f"Error in create_robust_signature: {str(e)}")
        raise

In [None]:
clean_os_udf = udf(clean_os, StringType())
clean_gender_udf = udf(clean_gender, StringType())


def clean_data(df_spark):
    df_spark = df_spark.withColumn("os_name", clean_os_udf(df_spark["os_name"]))
    df_spark = df_spark.withColumn("gender", clean_gender_udf(df_spark["gender"]))
    return df_spark

In [None]:
def log_model_and_metrics(model: XGBClassifier, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, 
                          hyperparams: Dict, features_types: Dict, pip_requirements: List[str], run_id):
    try:
        start_time = time.time()
        with mlflow.start_run(run_id=run_id) as run:
            test_preds = model.predict_proba(X_test)[:, 1]
            test_log_loss = log_loss(y_test, test_preds)
            train_preds = model.predict_proba(X_train)[:, 1]
            train_log_loss = log_loss(y_train, train_preds)

            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_version", 1)
            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_hyperparams", hyperparams)
            mlflow.log_metric(f"{MODELS_NAME['MODEL1']}_test_log_loss", test_log_loss)
            mlflow.log_metric(f"{MODELS_NAME['MODEL1']}_train_log_loss", train_log_loss)

            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_training_environment", f"https://{spark.conf.get('spark.databricks.workspaceUrl')}")
            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_training_timestamp", datetime.now().isoformat())
            end_time = time.time()
            training_duration = timedelta(seconds=end_time - start_time)
            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_training_duration", str(training_duration))
            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_training_run_id", run.info.run_id)

            features_types_str = {k: str(v) for k, v in features_types.items()}

            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_features_types", json.dumps(features_types_str))
            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_encoding_scheme", "CategoricalDtype")
            mlflow.log_param(f"{MODELS_NAME['MODEL1']}_scaling_strategy", "None")

            signature, input_example = create_robust_signature(X_test, model)
            
            model_info = mlflow.xgboost.log_model(
                model, 
                artifact_path="xgboost-model",
                signature=signature,
                input_example=input_example,
                model_format="json",
                pip_requirements=pip_requirements
            )

            return run_id, model_info.model_uri
    except Exception as e:
        logger.error(f"Error in logging model and metrics: {str(e)}")
        raise

In [None]:
def train_model(X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, hyperparams: Dict) -> XGBClassifier:
    try:
        model = XGBClassifier(**hyperparams)
        model.fit(X_train, y_train, eval_set=[(X_test, y_test)])
        return model
    except Exception as e:
        logger.error(f"Error in model training: {str(e)}")
        raise

In [None]:
def save_artifacts(model, encodings, features_types, run_id: str):
    try:
        with mlflow.start_run(run_id=run_id):  # Attach to the existing run
            for artifact_name in ["model", "encodings", "features_types"]:
                joblib.dump(eval(artifact_name), f"./{artifact_name}.sav", compress=3)
                mlflow.log_artifact(f"./{artifact_name}.sav", "prod_artifacts")
    except Exception as e:
        logger.error(f"Error in saving artifacts: {str(e)}")
        raise

In [None]:
def main(run_id):
    try:
        # Initialize Spark
        spark = initialize_spark()
        features_types = {
                "gender": str, 
                "campaign_id": str, 
                "traffic_partner_id": str, 
                "traffic_source_id": float,  # Updated from Int64 to float64
                "os_name": str, 
                "age": float,  # Updated from Int64 to float64
                "hour": pd.Int64Dtype(), 
                "household_income": float,
                "position": pd.Int64Dtype(),  # Retains Int64 as it doesn't have nulls
            }
        
        features = list(features_types.keys())
        categorical_features = [
            "gender", "campaign_id", "traffic_partner_id", "traffic_source_id", 
            "os_name", "hour", "position",
        ]

        target = "click"
        
        # Load and preprocess data
        logger.info("Loading data...")
        
        df_spark_ = load_data(spark)
        df_spark = clean_data(df_spark_)

        # Get the current user's email to set the experiment name
        user_email = get_current_user_email()
        experiment_name = f"/Users/{user_email}/{MODELS_NAME['MODEL1']}_{ENV_VARS['ENV']}_hyperparams"
        hyper_params_ = get_params_from_experiment(experiment_name)
        
        if hyper_params_ == None:  # if no experiment is available to fetch hyper-parameter values
            hyperparams = {
            'colsample_bytree': 0.6421218939603257,
            'early_stopping_rounds': 9,
            'learning_rate': 0.03711165197308055, 
            'max_depth': 8, 
            'n_estimators': 369,
            'subsample': 0.9173736479523668,
            'enable_categorical': True, 
            'eval_metric': 'logloss', 
            'tree_method': 'hist',
            'max_cat_to_onehot': 1,
            }
        else:
            hyperparams = {
                'colsample_bytree': hyper_params_['colsample_bytree'],
                'early_stopping_rounds': int(hyper_params_['early_stopping_rounds']),
                'learning_rate': hyper_params_['learning_rate'], 
                'max_depth': int(hyper_params_['max_depth']),
                'n_estimators': int(hyper_params_['n_estimators']),
                'subsample': hyper_params_['subsample'],
                'enable_categorical': True, 
                'eval_metric': 'logloss', 
                'tree_method': 'hist',
                'max_cat_to_onehot': 1,
            }
        
        print("hyperparams", hyperparams)

        # Add null values for robustness testing
        null_samples = {
            'campaign_id': df_spark.sample(False, 0.05).withColumn('campaign_id', F.lit(None)),
            'traffic_source_id': df_spark.sample(False, 0.05).withColumn('traffic_source_id', F.lit(None)),
            'traffic_partner_id': df_spark.sample(False, 0.05).withColumn('traffic_partner_id', F.lit(None))
        }
        df_spark = df_spark.unionAll(null_samples['campaign_id']) \
                        .unionAll(null_samples['traffic_source_id']) \
                        .unionAll(null_samples['traffic_partner_id'])

        logger.info("Splitting data into train and test sets...")
        train_pd, test_pd = test_train_split(df_spark, train_test_ratio=0.8)

        train_pd = apply_feature_types(train_pd, features_types)
        test_pd = apply_feature_types(test_pd, features_types)

        logger.info(f"Train DataFrame dtypes after applying features_types:\n{train_pd.dtypes}")
        logger.info(f"Test DataFrame dtypes after applying features_types:\n{test_pd.dtypes}")

        train_pd = convert_integers_with_missing_to_float(train_pd, features_types)
        test_pd = convert_integers_with_missing_to_float(test_pd, features_types)

        logger.info(f"Train DataFrame dtypes after converting integers with missing values to float:\n{train_pd.dtypes}")
        logger.info(f"Train DataFrame dtypes after converting integers with missing values to float:\n{test_pd.dtypes}")
        
        encodings = get_encodings(train_pd, categorical_features)
        train_pd = encode_features(train_pd, encodings)
        test_pd = encode_features(test_pd, encodings)

        X_train = train_pd[features]
        y_train = train_pd[target]
        X_test = test_pd[features]
        y_test = test_pd[target]

        # Train model
        logger.info("Training the model...")
        model = train_model(X_train, y_train, X_test, y_test, hyperparams)

        # Define pip requirements
        pip_requirements = [
            "mlflow==2.11.3", "scikit-learn==1.3.0", "scipy==1.10.0",
            "psutil==5.9.0", "pandas==1.5.3", "cloudpickle==2.2.1",
            "numpy==1.23.5", "category-encoders==2.6.3", "xgboost==2.0.3",
            "lz4==4.3.2", "typing-extensions==4.10.0"
        ]

        # Log model and metrics
        logger.info("Logging model and metrics...")
        run_id, model_uri = log_model_and_metrics(
                                                    model=model, 
                                                    X_train=X_train, 
                                                    y_train=y_train, 
                                                    X_test=X_test, 
                                                    y_test=y_test, 
                                                    hyperparams=hyperparams, 
                                                    pip_requirements=pip_requirements, 
                                                    features_types=features_types,
                                                    run_id=run_id  # Use the retrieved run_id
                                                )

        # Save artifacts
        save_artifacts(model, encodings, features_types, run_id)

        dbutils.jobs.taskValues.set("run_id", run_id)
        dbutils.jobs.taskValues.set("model_name", MODELS_NAME['MODEL1'])
        dbutils.jobs.taskValues.set("model_uri", model_uri)

        logger.info("Model training and logging completed successfully.")
        logger.info(f"run_id received {run_id}")
        logger.info(f"model_name received {MODELS_NAME['MODEL1']}")
        logger.info(f"model_uri received {model_uri}")

    except Exception as e:
        error_message = f"An error occurred in main execution: {str(e)}\nTraceback: {traceback.format_exc()}"
        logger.error(error_message)
        raise

    logger.info("Script execution completed.")


if __name__ == "__main__":
    try:
        main(run_id=RUN_ID)
    except Exception as e:
        critical_error = f"Critical error in main script execution: {str(e)}\nTraceback: {traceback.format_exc()}"
        logger.critical(critical_error)
        raise