In [1]:
# !pip install -r ../src/orchestration/requirements.txt


In [None]:
import os
from datetime import datetime, timedelta
from typing import Dict, List

import mlflow
import pandas as pd
import pendulum

import ray
from ray import tune
from ray.train.xgboost import XGBoostTrainer
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch


ray.init("ray://localhost:10001", namespace="experiment-1", log_to_driver=False)


In [2]:
# Training Configuration
TRAINING_CONFIG = {
    "model_path": "model-checkpoints/final-model/xgb_model",
    "test_size": 0.3,
    "num_workers": 1,
    "resources_per_worker": {"CPU": 4},
    "use_gpu": False,
    "num_boost_round": 1,
}

# XGBoost Parameters
XGBOOST_PARAMS = {
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error", "rmse", "mae", "auc"],
    "tree_method": "hist",
    "max_depth": 1,
    "eta": 0.3,
    "subsample": 0.8,
    "colsample_bytree": 0.8,
}

# Feature Configuration
FEATURE_COLUMNS = [
    "brand",
    "price",
    "event_weekday",
    "category_code_level1",
    "category_code_level2",
    "activity_count",
    "is_purchased",
]

CATEGORICAL_COLUMNS = [
    "brand",
    "event_weekday",
    "category_code_level1",
    "category_code_level2",
]

# DAG Configuration
DEFAULT_ARGS = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
    "execution_timeout": timedelta(hours=2),
    "start_date": pendulum.datetime(2024, 1, 1, tz="UTC"),
}

# Tune Configuration
TUNE_CONFIG = {
    "model_path": "model-checkpoints/hyperparameter-tuning/xgb_model",
    "num_trials": 4,  # Number of trials for hyperparameter search
    "max_epochs": 4,  # Maximum epochs per trial
    "grace_period": 4,  # Minimum epochs before pruning
    "mlflow_tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://0.0.0.0:5001"),
}

# Tune Search Space
TUNE_SEARCH_SPACE = {
    "max_depth": tune.randint(3, 5),
    "learning_rate": tune.loguniform(1e-4, 1e-1),
    "min_child_weight": tune.choice([1, 2, 3, 4, 5]),
    "subsample": tune.uniform(0.5, 1.0),
    "colsample_bytree": tune.uniform(0.5, 1.0),
    "gamma": tune.uniform(0, 1),
}

# Model Configuration
MODEL_NAME = "purchase_prediction_model"


In [3]:
class ModelPipeline:
    def __init__(self):
        self.postgres_conn = "postgresql://dwh:dwh@localhost:5433/dwh"
        mlflow.set_tracking_uri(TUNE_CONFIG["mlflow_tracking_uri"])

    def load_training_data(self) -> Dict[str, List[Dict]]:
        df = pd.read_sql(
            "SELECT * FROM dwh.vw_ml_purchase_prediction", self.postgres_conn
        )

        print(f"Loaded {len(df)} rows of data")

        # Data preprocessing
        df["price"] = df["price"].astype(float)

        # Create category mappings
        category_mappings = {}
        for col in CATEGORICAL_COLUMNS:
            unique_values = df[col].dropna().unique()
            category_mapping = {
                val: idx for idx, val in enumerate(sorted(unique_values))
            }
            category_mappings[col] = category_mapping
            df[col] = df[col].map(category_mapping).fillna(-1)

        return {
            "data": df.to_dict(orient="records"),
            "category_mappings": category_mappings,
        }

    def tune_hyperparameters(self, data: dict) -> dict:
        experiment_name = f"experiment-1_xgb_tune_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        mlflow.create_experiment(experiment_name)

        df = pd.DataFrame(data["data"])
        dataset = ray.data.from_pandas(df)

        def train_xgboost(config):
            training_params = {**XGBOOST_PARAMS, **config}
            trainer = XGBoostTrainer(
                label_column="is_purchased",
                num_boost_round=TRAINING_CONFIG["num_boost_round"],
                params=training_params,
                datasets={"train": dataset},
            )
            results = trainer.fit()
            ray.train.report(results.metrics)

        tuner = ray.tune.run(
            train_xgboost,
            config=TUNE_SEARCH_SPACE,
            num_samples=TUNE_CONFIG["num_trials"],
            scheduler=ASHAScheduler(
                metric="train_rmse",
                mode="min",
            ),
            search_alg=OptunaSearch(
                metric="train_rmse",
                mode="min",
            ),
        )

        best_trial = tuner.get_best_trial("train_rmse", "min")
        return {
            "best_config": best_trial.config,
            "best_metrics": best_trial.last_result,
        }

    def train_final_model(self, data: dict, best_params: dict) -> dict:
        experiment_name = f"xgb_final_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        mlflow.create_experiment(experiment_name)

        with mlflow.start_run() as run:
            model_params = {**XGBOOST_PARAMS, **best_params["best_config"]}
            df = pd.DataFrame(data["data"])
            dataset = ray.data.from_pandas(df)

            trainer = XGBoostTrainer(
                label_column="is_purchased",
                num_boost_round=TRAINING_CONFIG["num_boost_round"],
                params=model_params,
                datasets={"train": dataset},
            )

            result = trainer.fit()

            # Log model and metrics
            mlflow.xgboost.log_model(
                result.checkpoint.get_model(),
                "model",
                registered_model_name=MODEL_NAME,
            )

            # Log category mappings if available
            if "category_mappings" in data:
                mlflow.log_dict(data["category_mappings"], "category_mappings.json")

            return {
                "metrics": result.metrics,
                "checkpoint_path": result.checkpoint.path,
                "mlflow_run_id": run.info.run_id,
                "mlflow_model_uri": f"models:/{MODEL_NAME}/Staging",
            }


In [None]:
pipeline = ModelPipeline()

# Load and preprocess data
data = pipeline.load_training_data()


In [None]:
experiment_name = f"experiment-1_xgb_tune_{datetime.now().strftime('%Y%m%d%H%M%S')}"
mlflow.create_experiment(experiment_name)

df = pd.DataFrame(data["data"])
dataset = ray.data.from_pandas(df)

dataset.show(1)


In [None]:

def train_xgboost(config):
    training_params = {**XGBOOST_PARAMS, **config}
    trainer = XGBoostTrainer(
        label_column="is_purchased",
        num_boost_round=TRAINING_CONFIG["num_boost_round"],
        params=training_params,
        datasets={"train": dataset},
    )
    results = trainer.fit()
    ray.train.report(results.metrics)

tuner = ray.tune.run(
    train_xgboost,
    config=TUNE_SEARCH_SPACE,
    num_samples=TUNE_CONFIG["num_trials"],
    scheduler=ASHAScheduler(
        metric="train_rmse",
        mode="min",
    ),
    search_alg=OptunaSearch(
        metric="train_rmse",
        mode="min",
    ),
)

best_trial = tuner.get_best_trial("train_rmse", "min")


In [None]:

# Train final model
results = pipeline.train_final_model(data, best_params)

print("Training completed successfully!")
print(f"Final metrics: {results['metrics']}")
print(f"Model URI: {results['mlflow_model_uri']}")
