In [None]:
# ==========================================
# [1] Required package installation
# ==========================================
!pip install xgboost lightgbm catboost optuna

import pandas as pd
import numpy as np
import io
import optuna
import re
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Model libraries
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostRegressor

In [None]:
# ==========================================
# [1] Data loading and preprocessing
# ==========================================
def load_data():
    """
    Load the harmonized dataset for baseline ensemble experiments.

    The function first checks whether Dataset.csv already exists in the
    current working directory. If not, and if the script is running in
    Google Colab, it prompts the user to upload the file manually.
    """
    try:
        from google.colab import files
        import os
        if not os.path.exists('Dataset.csv'):
            print("=== [Google Colab] Please upload Dataset.csv ===")
            uploaded = files.upload()
            filename = list(uploaded.keys())[0]
            df = pd.read_csv(io.BytesIO(uploaded[filename]))
        else:
            df = pd.read_csv('Dataset.csv')
    except:
        df = pd.read_csv('Dataset.csv')
    return df

df = load_data()

# [Important preprocessing step]
# Sanitize column names to avoid downstream LightGBM parsing issues
df.columns = [re.sub(r'[^A-Za-z0-9_]+', '_', col) for col in df.columns]
print("Cleaned columns:", df.columns.tolist())

In [None]:
# ==========================================
# [1-1] Basic preprocessing
# ==========================================
# Drop redundant identifier column if present
# (Country_Name is redundant if Country_Code is already available)
if 'Country_Name' in df.columns:
    df = df.drop('Country_Name', axis=1)

# Define column groups
cat_cols = ['Country_Code', 'Continent']
target_col = 'Maternal_Mortality_Ratio'
feature_names = [c for c in df.columns if c not in ['Year', target_col]]

# Handle missing values
# - categorical variables: Unknown
# - numerical variables: median imputation
for col in cat_cols:
    df[col] = df[col].fillna("Unknown")

num_cols = [c for c in df.columns if c not in cat_cols + ['Year', target_col]]
for col in num_cols:
    df[col] = df[col].fillna(df[col].median())

In [None]:
# ==========================================
# [1-2] Label encoding for categorical identifiers
# ==========================================
df_encoded = df.copy()
for col in cat_cols:
    le = LabelEncoder()
    df_encoded[col] = le.fit_transform(df[col].astype(str))

In [None]:
# ==========================================
# [1-3] Temporal split design
# ==========================================
# Phase 1: hyperparameter optimization
# Train: 2011-2014 / Validation: 2015
train_opt = df_encoded[(df_encoded['Year'] >= 2011) & (df_encoded['Year'] <= 2014)]
val_opt   = df_encoded[df_encoded['Year'] == 2015]

# Phase 2: retraining after model selection
# Train: 2011-2015
train_retrain = df_encoded[(df_encoded['Year'] >= 2011) & (df_encoded['Year'] <= 2015)]

# Phase 3: final held-out evaluation
# Test: 2016
test = df_encoded[df_encoded['Year'] == 2016]

# Utility function to separate X and y
def get_xy(data):
    return data[feature_names], data[target_col]

X_opt_t, y_opt_t = get_xy(train_opt)
X_opt_v, y_opt_v = get_xy(val_opt)
X_final_t, y_final_t = get_xy(train_retrain)
X_test, y_test = get_xy(test)

In [None]:
# ==========================================
# [2] Unified ensemble training function
# ==========================================
def run_ensemble(model_name):
    """
    Run a complete baseline experiment for a selected ensemble model.

    Workflow:
    1. Hyperparameter optimization on 2011-2014 / 2015 split
    2. Retraining on the full pretest period (2011-2015)
    3. Evaluation on the held-out year 2016
    4. Extraction of model-based feature importance
    """
    print(f"\n{'='*10} Processing {model_name} {'='*10}")

    # --- Step 1: Optuna optimization ---
    def objective(trial):
        if model_name == 'XGBoost':
            params = {
                'objective': 'reg:absoluteerror',
                'n_estimators': trial.suggest_int('n_estimators', 100, 500),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
                'max_depth': trial.suggest_int('max_depth', 3, 9),
                'subsample': trial.suggest_float('subsample', 0.6, 1.0),
                'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
                'n_jobs': -1,
                'eval_metric': 'mae',
                'early_stopping_rounds': 20
            }
            model = xgb.XGBRegressor(**params)
            model.fit(X_opt_t, y_opt_t, eval_set=[(X_opt_v, y_opt_v)], verbose=False)

        elif model_name == 'LightGBM':
            params = {
                'objective': 'regression_l1',
                'n_estimators': trial.suggest_int('n_estimators', 100, 500),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
                'num_leaves': trial.suggest_int('num_leaves', 20, 100),
                'subsample': trial.suggest_float('subsample', 0.6, 1.0),
                'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
                'verbose': -1,
                'n_jobs': -1
            }
            model = lgb.LGBMRegressor(**params)

            # Use LightGBM callback-based early stopping
            model.fit(
                X_opt_t, y_opt_t,
                eval_set=[(X_opt_v, y_opt_v)],
                eval_metric='mae',
                callbacks=[lgb.early_stopping(stopping_rounds=20, verbose=False)]
            )

        elif model_name == 'CatBoost':
            params = {
                'loss_function': 'MAE',
                'iterations': trial.suggest_int('iterations', 100, 500),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
                'depth': trial.suggest_int('depth', 4, 10),
                'l2_leaf_reg': trial.suggest_float('l2_leaf_reg', 1, 10),
                'verbose': 0,
                'thread_count': -1
            }
            model = CatBoostRegressor(**params)
            cat_features_idx = [i for i, c in enumerate(feature_names) if c in cat_cols]
            model.fit(
                X_opt_t, y_opt_t,
                eval_set=(X_opt_v, y_opt_v),
                cat_features=cat_features_idx,
                early_stopping_rounds=20,
                verbose=0
            )

        preds = model.predict(X_opt_v)
        return mean_absolute_error(y_opt_v, preds)

    print(">> [Step 1] Optimizing hyperparameters...")
    study = optuna.create_study(direction='minimize')
    study.optimize(objective, n_trials=10)
    best_params = study.best_trial.params
    print(f"   Best parameters: {best_params}")

    # --- Step 2: Retraining on 2011-2015 ---
    print(">> [Step 2] Retraining on the full pretest period (2011-2015)...")

    if model_name == 'XGBoost':
        final_params = best_params.copy()
        final_params.pop('eval_metric', None)
        final_params.pop('early_stopping_rounds', None)
        final_model = xgb.XGBRegressor(**final_params, n_jobs=-1, enable_categorical=True)

    elif model_name == 'LightGBM':
        final_model = lgb.LGBMRegressor(**best_params, n_jobs=-1, verbose=-1)

    elif model_name == 'CatBoost':
        final_model = CatBoostRegressor(**best_params, verbose=0, thread_count=-1)

    # Fit on the full pretest period
    if model_name == 'CatBoost':
        cat_features_idx = [i for i, c in enumerate(feature_names) if c in cat_cols]
        final_model.fit(X_final_t, y_final_t, cat_features=cat_features_idx, verbose=0)
    else:
        final_model.fit(X_final_t, y_final_t)

    # --- Step 3: Evaluation on the held-out year 2016 ---
    print(">> [Step 3] Evaluating on the held-out test year (2016)...")
    preds = final_model.predict(X_test)

    mae = mean_absolute_error(y_test, preds)
    rmse = np.sqrt(mean_squared_error(y_test, preds))
    r2 = r2_score(y_test, preds)

    # --- Step 4: Extract feature importance ---
    if model_name == 'CatBoost':
        imp_vals = final_model.get_feature_importance()
    elif model_name == 'LightGBM':
        imp_vals = final_model.feature_importances_
    else:  # XGBoost
        imp_vals = final_model.feature_importances_

    importances = pd.Series(imp_vals, index=feature_names).sort_values(ascending=False)

    return {
        'Metrics': {'MAE': mae, 'RMSE': rmse, 'R2': r2},
        'Importance': importances,
        'Preds': preds
    }

In [None]:
# ==========================================
# [3] Run all baseline models
# ==========================================
results = {}
models_list = ['XGBoost', 'LightGBM', 'CatBoost']

for m in models_list:
    results[m] = run_ensemble(m)

In [None]:
# ==========================================
# [4] Result comparison and visualization
# ==========================================

# 1. Summary performance table
metrics_df = pd.DataFrame(
    {m: results[m]['Metrics'] for m in models_list}
).T[['MAE', 'RMSE', 'R2']]

print("\n" + "="*40)
print(" FINAL RESULTS (Held-out Test Year: 2016) ")
print("="*40)
print(metrics_df)

# 2. Performance comparison plot
fig, ax1 = plt.subplots(figsize=(12, 6))
metrics_df[['MAE', 'RMSE']].plot(
    kind='bar',
    ax=ax1,
    width=0.4,
    position=1,
    color=['#74b9ff', '#a29bfe']
)
ax1.set_ylabel("Error (MAE / RMSE)", fontsize=12)
ax1.set_title("Ensemble Baseline Performance Comparison (Held-out Test Year: 2016)", fontsize=14)

ax2 = ax1.twinx()
metrics_df['R2'].plot(
    kind='bar',
    ax=ax2,
    width=0.2,
    position=0,
    color='#ff7675',
    label='R2 Score'
)
ax2.set_ylabel("R2 Score", color='#ff7675', fontsize=12)
ax2.set_ylim(0, 1.1)
plt.legend(loc='upper right')
plt.show()

# 3. Feature importance comparison (Top 10)
fig, axs = plt.subplots(1, 3, figsize=(20, 6))
colors = ['#74b9ff', '#a29bfe', '#ff7675']

for i, model in enumerate(models_list):
    results[model]['Importance'].nlargest(10).sort_values().plot(
        kind='barh',
        ax=axs[i],
        color=colors[i]
    )
    axs[i].set_title(f"{model} Feature Importance")

plt.tight_layout()
plt.show()

# 4. Optional: prediction scatter plots
plt.figure(figsize=(18, 5))
for i, model in enumerate(models_list):
    plt.subplot(1, 3, i + 1)
    sns.scatterplot(
        x=y_test,
        y=results[model]['Preds'],
        alpha=0.6,
        color=colors[i]
    )
    plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
    plt.title(f"{model} Predictions (R2: {results[model]['Metrics']['R2']:.3f})")
    plt.xlabel("Actual")
    plt.ylabel("Predicted")

plt.tight_layout()
plt.show()