In [1]:
# ============================================================
# Cell 1 — Imports and Global Configuration (Colab Compatible)
# ============================================================

# Install OpenML (Colab does not include this library)
!pip install --quiet openml

# ------------------------------------------------------------
# Core Python
# ------------------------------------------------------------
import os
import time
import json
import random
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple

# ------------------------------------------------------------
# Numerical / data handling
# ------------------------------------------------------------
import numpy as np
import pandas as pd

# ------------------------------------------------------------
# OpenML
# ------------------------------------------------------------
import openml

# ------------------------------------------------------------
# Scikit-learn utilities
# ------------------------------------------------------------
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    f1_score,
)

# ------------------------------------------------------------
# Reproducibility utilities
# ------------------------------------------------------------

def set_global_seed(seed: int) -> None:
    """
    Set random seeds for Python, NumPy (and any other libraries we might add).
    Call this once at the beginning of the main execution.
    """
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)


# ------------------------------------------------------------
# Global configuration dataclass
# ------------------------------------------------------------

@dataclass
class AutoMLConfig:
    """
    Global configuration for the AutoML pipeline.
    Everything downstream reads values from here.
    """
    # Reproducibility
    random_seed: int = 42

    # Data splitting
    test_size: float = 0.2
    n_splits_cv: int = 5

    # Dataset-size thresholds
    small_n_samples: int = 10_000
    medium_n_samples: int = 100_000

    # Max number of HPO trials (multi-fidelity)
    max_trials_small: int = 100
    max_trials_medium: int = 60
    max_trials_large: int = 30

    # HPO behavior
    initial_random_trials: int = 20

    # Fidelity levels used in BOHB-like HPO
    fidelity_levels: List[Dict[str, Any]] = None

    # Label for output CSV
    approach_name: str = "AutoML-BOHB"

    def __post_init__(self):
        # Define fidelity levels if not set
        if self.fidelity_levels is None:
            self.fidelity_levels = [
                {"name": "low",    "row_fraction": 0.25, "n_splits": 3},
                {"name": "medium", "row_fraction": 0.50, "n_splits": 3},
                {"name": "high",   "row_fraction": 1.00, "n_splits": self.n_splits_cv},
            ]


def get_default_config() -> AutoMLConfig:
    """
    Return a new AutoML configuration object.
    Used in the main execution cell.
    """
    return AutoMLConfig()


# ------------------------------------------------------------
# Pretty printing for progress banners
# ------------------------------------------------------------

def print_banner(message: str) -> None:
    """
    Print a clean, visually distinct status banner.
    Helps Colab users track progress.
    """
    bar = "=" * max(10, len(message) + 8)
    print(f"\n{bar}\n>>> {message}\n{bar}\n")


In [2]:
# ============================================================
# Cell 2 — Dataset Retrieval and Splitting
# ============================================================

from dataclasses import dataclass
from typing import List, Dict, Any, Tuple


@dataclass
class DatasetMeta:
    """
    Metadata for one OpenML dataset, used throughout the pipeline.
    """
    dataset_id: int
    dataset_name: str
    target_name: str

    n_samples: int
    n_features: int
    n_classes: int

    categorical_features: List[str]
    numerical_features: List[str]

    n_categorical: int
    n_numerical: int

    missing_value_pct: float
    class_imbalance_ratio: float


def load_openml_dataset(dataset_id: int) -> Tuple[pd.DataFrame, pd.Series, DatasetMeta]:
    """
    Load a dataset from OpenML and compute meta-information:
    - feature types (categorical vs numerical)
    - basic size stats
    - missing value percentage
    - class imbalance ratio

    Returns:
        X: pandas DataFrame of features
        y: pandas Series of target labels
        meta: DatasetMeta object
    """
    print(f"Loading OpenML dataset {dataset_id} ...")
    dataset = openml.datasets.get_dataset(dataset_id)

    # Determine target name
    target_name = dataset.default_target_attribute

    # Get data as pandas DataFrame
    X, y, categorical_indicator, attribute_names = dataset.get_data(
        target=target_name,
        dataset_format="dataframe"
    )

    # Basic shape
    n_samples, n_features = X.shape

    # Feature type lists
    categorical_features = [
        name for name, is_cat in zip(attribute_names, categorical_indicator) if is_cat
    ]
    numerical_features = [
        name for name, is_cat in zip(attribute_names, categorical_indicator) if not is_cat
    ]
    n_categorical = len(categorical_features)
    n_numerical = len(numerical_features)

    # Number of classes (assuming classification; y can be strings or numbers)
    unique_classes = pd.unique(y)
    n_classes = len(unique_classes)

    # Missing value percentage (features only)
    total_entries = n_samples * n_features
    missing_count = X.isna().sum().sum()
    missing_value_pct = (missing_count / total_entries) * 100.0 if total_entries > 0 else 0.0

    # Class imbalance ratio: min_count / max_count
    # We use value_counts to handle any label type
    class_counts = pd.Series(y).value_counts()
    if len(class_counts) > 0:
        class_imbalance_ratio = float(class_counts.min() / class_counts.max())
    else:
        class_imbalance_ratio = 1.0  # degenerate case (shouldn't happen)

    meta = DatasetMeta(
        dataset_id=int(dataset_id),
        dataset_name=dataset.name,
        target_name=target_name,
        n_samples=n_samples,
        n_features=n_features,
        n_classes=n_classes,
        categorical_features=categorical_features,
        numerical_features=numerical_features,
        n_categorical=n_categorical,
        n_numerical=n_numerical,
        missing_value_pct=missing_value_pct,
        class_imbalance_ratio=class_imbalance_ratio,
    )

    print(
        f"Loaded dataset {meta.dataset_id} ({meta.dataset_name}) "
        f"with {meta.n_samples} samples, {meta.n_features} features "
        f"({meta.n_numerical} numerical, {meta.n_categorical} categorical)."
    )
    print(
        f"Missing values: {meta.missing_value_pct:.2f}% | "
        f"Class imbalance ratio: {meta.class_imbalance_ratio:.3f}"
    )

    return X, y, meta


def train_test_split_dataset(
    X: pd.DataFrame,
    y: pd.Series,
    meta: DatasetMeta,
    config: AutoMLConfig,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
    """
    Split the dataset into train and test sets using a fixed random seed.
    Uses stratification on y (classification assumption).

    Args:
        X: feature DataFrame
        y: target Series
        meta: DatasetMeta, used mainly for logging
        config: AutoMLConfig with test_size and random_seed

    Returns:
        X_train, X_test, y_train, y_test
    """
    print(
        f"Splitting dataset {meta.dataset_id} ({meta.dataset_name}) "
        f"into train/test with test_size={config.test_size:.2f} ..."
    )

    X_train, X_test, y_train, y_test = train_test_split(
        X,
        y,
        test_size=config.test_size,
        random_state=config.random_seed,
        stratify=y,
    )

    print(
        f"Train size: {len(X_train)} samples | "
        f"Test size: {len(X_test)} samples"
    )

    return X_train, X_test, y_train, y_test


In [3]:
# ============================================================
# Cell 3 — Preprocessing Pipeline Construction
# ============================================================

from typing import Optional
from sklearn.preprocessing import FunctionTransformer


def build_preprocessor(meta: DatasetMeta) -> ColumnTransformer:
    """
    Build the default preprocessing pipeline based on dataset metadata.

    Logic (as agreed earlier):
      - If we know feature types:
          * numerical: SimpleImputer(mean) + StandardScaler
          * categorical: SimpleImputer(most_frequent) + OneHotEncoder(ignore unknown, dense)
        -> combined via ColumnTransformer
      - If we effectively have no type information, fall back to:
          * SimpleImputer(mean) + StandardScaler on all columns
    """
    n_num = meta.n_numerical
    n_cat = meta.n_categorical

    # If we have at least *some* feature information, use ColumnTransformer
    if (n_num + n_cat) > 0:
        print(
            f"Building preprocessing pipeline for dataset {meta.dataset_id} "
            f"({meta.dataset_name}) with {n_num} numerical and {n_cat} categorical features."
        )

        transformers = []

        if n_num > 0:
            numeric_transformer = Pipeline(steps=[
                ("imputer", SimpleImputer(strategy="mean")),
                ("scaler", StandardScaler()),
            ])
            transformers.append(
                ("num", numeric_transformer, meta.numerical_features)
            )

        if n_cat > 0:
            categorical_transformer = Pipeline(steps=[
                ("imputer", SimpleImputer(strategy="most_frequent")),
                ("onehot", OneHotEncoder(
                    handle_unknown="ignore",
                    sparse_output=False,
                )),
            ])
            transformers.append(
                ("cat", categorical_transformer, meta.categorical_features)
            )

        if len(transformers) == 0:
            # Extremely unlikely: no usable features at all
            print(
                "Warning: No numerical or categorical features detected; "
                "falling back to simple numeric preprocessing on all columns."
            )
            preprocessor = ColumnTransformer(
                transformers=[
                    (
                        "all",
                        Pipeline(steps=[
                            ("imputer", SimpleImputer(strategy="mean")),
                            ("scaler", StandardScaler()),
                        ]),
                        meta.numerical_features + meta.categorical_features,
                    )
                ]
            )
        else:
            preprocessor = ColumnTransformer(
                transformers=transformers
            )

    else:
        # Fallback: no feature type information
        print(
            f"Feature types unknown for dataset {meta.dataset_id} ({meta.dataset_name}); "
            "using simple numeric preprocessing on all columns."
        )
        preprocessor = ColumnTransformer(
            transformers=[
                (
                    "all",
                    Pipeline(steps=[
                        ("imputer", SimpleImputer(strategy="mean")),
                        ("scaler", StandardScaler()),
                    ]),
                    list(range(meta.n_features)),  # assumes positional indexing
                )
            ]
        )

    return preprocessor


# ------------------------------------------------------------
# Optional Feature Engineering Block
# ------------------------------------------------------------

def _light_feature_engineering_transform(X):
    """
    A lightweight, generic feature engineering transform.

    Current behavior:
      - Takes the preprocessed feature matrix X (numpy array).
      - Creates log1p-transformed features on non-negative values and
        horizontally stacks them with the original features.

    This roughly doubles the feature space but stays generic and safe:
      - log1p(0) is defined,
      - we clip X at 0 for the log transform to avoid negative issues.
    """
    X = np.asarray(X)
    # Clip at 0 for log1p to avoid log on negative values
    X_clipped = np.clip(X, a_min=0.0, a_max=None)
    X_log = np.log1p(X_clipped)
    return np.hstack([X, X_log])


def build_feature_engineering_block(mode: str) -> FunctionTransformer:
    """
    Build the feature engineering block based on the selected mode.

    Modes:
      - "none": identity transform (no additional features).
      - "light": apply a simple log1p-based expansion on the preprocessed features.

    Returns:
        A scikit-learn-compatible transformer to be included in the pipeline.
    """
    mode = (mode or "none").lower()

    if mode == "none":
        print("Feature engineering mode: none (identity transform).")
        fe_block = FunctionTransformer(
            func=lambda X: X,
            validate=False,
        )
    elif mode == "light":
        print("Feature engineering mode: light (adding log1p-transformed features).")
        fe_block = FunctionTransformer(
            func=_light_feature_engineering_transform,
            validate=False,
        )
    else:
        raise ValueError(f"Unknown feature engineering mode: {mode!r}")

    return fe_block


In [4]:
# ============================================================
# Cell 4 — Model Search Space and Pipeline Assembly
# ============================================================

from typing import Dict, Any

# Model imports
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC, SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import (
    RandomForestClassifier,
    ExtraTreesClassifier,
    HistGradientBoostingClassifier,
)
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB


# ------------------------------------------------------------
# Model search space definition
# ------------------------------------------------------------

def get_model_search_space() -> Dict[str, Any]:
    """
    Define the joint search space for:
      - feature_engineering_mode
      - model_type
      - model-specific hyperparameters

    The structure is a plain Python dict, which will be interpreted by
    the HPO routine in Cell 5.

    Conventions:
      - "type": one of {"categorical", "int", "float", "logfloat"}
      - "choices": for categorical
      - "low", "high": for numerical ranges
    """
    search_space: Dict[str, Any] = {}

    # Top-level hyperparameters
    search_space["feature_engineering_mode"] = {
        "type": "categorical",
        "choices": ["none", "light"],
    }

    search_space["model_type"] = {
        "type": "categorical",
        "choices": [
            "logistic_regression",
            "linear_svm",
            "rbf_svm",
            "decision_tree",
            "random_forest",
            "extra_trees",
            "hist_gradient_boosting",
            "knn",
            "naive_bayes",
        ],
    }

    # Model-specific hyperparameters
    search_space["logistic_regression"] = {
        "C": {"type": "logfloat", "low": 1e-4, "high": 1e3},
        # to keep things simple/robust we stick to l2; l1 would require solver juggling
        "penalty": {"type": "categorical", "choices": ["l2"]},
        "class_weight": {"type": "categorical", "choices": [None, "balanced"]},
    }

    search_space["linear_svm"] = {
        "C": {"type": "logfloat", "low": 1e-4, "high": 1e3},
        "loss": {"type": "categorical", "choices": ["hinge", "squared_hinge"]},
        "class_weight": {"type": "categorical", "choices": [None, "balanced"]},
    }

    search_space["rbf_svm"] = {
        "C": {"type": "logfloat", "low": 1e-3, "high": 1e3},
        "gamma": {"type": "logfloat", "low": 1e-4, "high": 10.0},
        "class_weight": {"type": "categorical", "choices": [None, "balanced"]},
    }

    search_space["decision_tree"] = {
        "criterion": {"type": "categorical", "choices": ["gini", "entropy"]},
        "max_depth": {"type": "int", "low": 2, "high": 30},
        "min_samples_split": {"type": "int", "low": 2, "high": 20},
        "min_samples_leaf": {"type": "int", "low": 1, "high": 20},
        "max_features": {"type": "float", "low": 0.1, "high": 1.0},
    }

    search_space["random_forest"] = {
        "n_estimators": {"type": "int", "low": 100, "high": 600},
        "criterion": {"type": "categorical", "choices": ["gini", "entropy"]},
        "max_depth": {"type": "int", "low": 3, "high": 30},
        "min_samples_split": {"type": "int", "low": 2, "high": 20},
        "min_samples_leaf": {"type": "int", "low": 1, "high": 20},
        "max_features": {"type": "float", "low": 0.2, "high": 1.0},
        "bootstrap": {"type": "categorical", "choices": [True, False]},
        "class_weight": {"type": "categorical", "choices": [None, "balanced"]},
    }

    search_space["extra_trees"] = {
        "n_estimators": {"type": "int", "low": 100, "high": 600},
        "criterion": {"type": "categorical", "choices": ["gini", "entropy"]},
        "max_depth": {"type": "int", "low": 3, "high": 30},
        "min_samples_split": {"type": "int", "low": 2, "high": 20},
        "min_samples_leaf": {"type": "int", "low": 1, "high": 20},
        "max_features": {"type": "float", "low": 0.2, "high": 1.0},
        "bootstrap": {"type": "categorical", "choices": [True, False]},
    }

    search_space["hist_gradient_boosting"] = {
        "learning_rate": {"type": "logfloat", "low": 0.01, "high": 0.3},
        "max_depth": {"type": "int", "low": 2, "high": 16},
        "max_leaf_nodes": {"type": "int", "low": 16, "high": 256},
        "min_samples_leaf": {"type": "int", "low": 20, "high": 200},
        "l2_regularization": {"type": "float", "low": 0.0, "high": 1.0},
        "max_bins": {"type": "int", "low": 64, "high": 255},
    }

    search_space["knn"] = {
        "n_neighbors": {"type": "int", "low": 1, "high": 50},
        "weights": {"type": "categorical", "choices": ["uniform", "distance"]},
        "p": {"type": "categorical", "choices": [1, 2]},  # Manhattan vs Euclidean
        "leaf_size": {"type": "int", "low": 20, "high": 60},
    }

    search_space["naive_bayes"] = {
        # GaussianNB's var_smoothing (log-scale)
        "var_smoothing": {"type": "logfloat", "low": 1e-12, "high": 1e-6},
    }

    return search_space


# ------------------------------------------------------------
# Estimator factory
# ------------------------------------------------------------

def _build_estimator(model_type: str, params: Dict[str, Any]):
    """
    Map a (model_type, hyperparameter dict) pair to a concrete
    scikit-learn estimator instance.
    """
    if model_type == "logistic_regression":
        return LogisticRegression(
            C=params.get("C", 1.0),
            penalty=params.get("penalty", "l2"),
            class_weight=params.get("class_weight", None),
            solver="lbfgs",
            max_iter=1000,
        )

    if model_type == "linear_svm":
        return LinearSVC(
            C=params.get("C", 1.0),
            loss=params.get("loss", "squared_hinge"),
            class_weight=params.get("class_weight", None),
            max_iter=5000,
        )

    if model_type == "rbf_svm":
        return SVC(
            C=params.get("C", 1.0),
            gamma=params.get("gamma", "scale"),
            kernel="rbf",
            class_weight=params.get("class_weight", None),
            probability=True,  # useful for probability-based metrics later
        )

    if model_type == "decision_tree":
        return DecisionTreeClassifier(
            criterion=params.get("criterion", "gini"),
            max_depth=params.get("max_depth", None),
            min_samples_split=params.get("min_samples_split", 2),
            min_samples_leaf=params.get("min_samples_leaf", 1),
            max_features=params.get("max_features", None),
        )

    if model_type == "random_forest":
        return RandomForestClassifier(
            n_estimators=params.get("n_estimators", 100),
            criterion=params.get("criterion", "gini"),
            max_depth=params.get("max_depth", None),
            min_samples_split=params.get("min_samples_split", 2),
            min_samples_leaf=params.get("min_samples_leaf", 1),
            max_features=params.get("max_features", "sqrt"),
            bootstrap=params.get("bootstrap", True),
            class_weight=params.get("class_weight", None),
            n_jobs=-1,
        )

    if model_type == "extra_trees":
        return ExtraTreesClassifier(
            n_estimators=params.get("n_estimators", 100),
            criterion=params.get("criterion", "gini"),
            max_depth=params.get("max_depth", None),
            min_samples_split=params.get("min_samples_split", 2),
            min_samples_leaf=params.get("min_samples_leaf", 1),
            max_features=params.get("max_features", "sqrt"),
            bootstrap=params.get("bootstrap", False),
            n_jobs=-1,
        )

    if model_type == "hist_gradient_boosting":
        return HistGradientBoostingClassifier(
            learning_rate=params.get("learning_rate", 0.1),
            max_depth=params.get("max_depth", None),
            max_leaf_nodes=params.get("max_leaf_nodes", 31),
            min_samples_leaf=params.get("min_samples_leaf", 20),
            l2_regularization=params.get("l2_regularization", 0.0),
            max_bins=params.get("max_bins", 255),
        )

    if model_type == "knn":
        return KNeighborsClassifier(
            n_neighbors=params.get("n_neighbors", 5),
            weights=params.get("weights", "uniform"),
            p=params.get("p", 2),
            leaf_size=params.get("leaf_size", 30),
            n_jobs=-1,
        )

    if model_type == "naive_bayes":
        return GaussianNB(
            var_smoothing=params.get("var_smoothing", 1e-9),
        )

    raise ValueError(f"Unknown model_type {model_type!r}")


# ------------------------------------------------------------
# Full pipeline builder
# ------------------------------------------------------------

def build_model_pipeline(
    model_config: Dict[str, Any],
    preprocessor,
    fe_block,
) -> Pipeline:
    """
    Build a full scikit-learn Pipeline:

        preprocessor -> feature_engineering -> model

    Args:
        model_config: dict containing at least:
            - "model_type"
            - "feature_engineering_mode" (the mode that's already used to build fe_block)
            - model-specific hyperparameters
        preprocessor: the preprocessor object (ColumnTransformer)
        fe_block: the feature-engineering transformer

    Returns:
        pipeline: fitted-ready Pipeline object.
    """
    model_type = model_config.get("model_type")
    if model_type is None:
        raise ValueError("model_config must contain a 'model_type' key.")

    # Extract model-specific params (exclude non-model keys)
    non_param_keys = {"model_type", "feature_engineering_mode"}
    model_params = {k: v for k, v in model_config.items() if k not in non_param_keys}

    estimator = _build_estimator(model_type, model_params)

    pipeline = Pipeline(
        steps=[
            ("preprocessor", preprocessor),
            ("feature_engineering", fe_block),
            ("model", estimator),
        ]
    )

    return pipeline


In [5]:
# ============================================================
# Cell 5 — Hyperparameter Optimization (HPO)
# ============================================================

from typing import Dict, Any, Tuple


# ------------------------------------------------------------
# Budget regime selection
# ------------------------------------------------------------

def determine_budget_regime(meta: DatasetMeta, config: AutoMLConfig) -> Dict[str, Any]:
    """
    Decide how many HPO trials to run for this dataset based on its size.

    Returns:
        {
            "regime": "small" | "medium" | "large",
            "max_trials": int,
            "fidelity": dict  # one of config.fidelity_levels
        }
    """
    n = meta.n_samples
    if n <= config.small_n_samples:
        regime = "small"
        max_trials = config.max_trials_small
    elif n <= config.medium_n_samples:
        regime = "medium"
        max_trials = config.max_trials_medium
    else:
        regime = "large"
        max_trials = config.max_trials_large

    # For now, we use the highest fidelity level (full data & full CV)
    fidelity = config.fidelity_levels[-1]

    print(
        f"Dataset {meta.dataset_id} ({meta.dataset_name}) has {n} samples → "
        f"using '{regime}' budget with max_trials={max_trials} "
        f"and fidelity={fidelity['name']} "
        f"(row_fraction={fidelity['row_fraction']}, n_splits={fidelity['n_splits']})."
    )

    return {
        "regime": regime,
        "max_trials": max_trials,
        "fidelity": fidelity,
    }


# ------------------------------------------------------------
# Random sampling from the search space
# ------------------------------------------------------------

def _sample_numeric(param_spec: Dict[str, Any]) -> Any:
    """Sample a numeric hyperparameter from its spec."""
    ptype = param_spec["type"]
    low = param_spec["low"]
    high = param_spec["high"]

    if ptype == "int":
        return int(np.random.randint(low, high + 1))
    elif ptype == "float":
        return float(np.random.uniform(low, high))
    elif ptype == "logfloat":
        log_low = np.log(low)
        log_high = np.log(high)
        value = np.exp(np.random.uniform(log_low, log_high))
        return float(value)
    else:
        raise ValueError(f"Unsupported numeric type: {ptype!r}")


def _sample_categorical(param_spec: Dict[str, Any]) -> Any:
    """Sample a categorical hyperparameter from its spec."""
    choices = param_spec["choices"]
    idx = np.random.randint(0, len(choices))
    return choices[idx]


def sample_random_config(search_space: Dict[str, Any]) -> Dict[str, Any]:
    """
    Sample a full random configuration from the given search space.

    The resulting config dict contains:
      - "feature_engineering_mode"
      - "model_type"
      - model-type-specific hyperparameters
    """
    config: Dict[str, Any] = {}

    # Top-level choices
    fe_spec = search_space["feature_engineering_mode"]
    model_type_spec = search_space["model_type"]

    feature_engineering_mode = _sample_categorical(fe_spec)
    model_type = _sample_categorical(model_type_spec)

    config["feature_engineering_mode"] = feature_engineering_mode
    config["model_type"] = model_type

    # Model-specific hyperparameters
    model_space = search_space.get(model_type, {})
    for param_name, param_spec in model_space.items():
        ptype = param_spec["type"]
        if ptype in ("int", "float", "logfloat"):
            value = _sample_numeric(param_spec)
        elif ptype == "categorical":
            value = _sample_categorical(param_spec)
        else:
            raise ValueError(f"Unknown parameter type {ptype!r} for {param_name!r}")
        config[param_name] = value

    return config


# ------------------------------------------------------------
# CV evaluation helper
# ------------------------------------------------------------

def _evaluate_config_cv(
    pipeline: Pipeline,
    X_train: pd.DataFrame,
    y_train: pd.Series,
    n_splits: int,
    row_fraction: float,
    base_random_state: int,
) -> Tuple[float, float]:
    """
    Evaluate a pipeline via cross-validation on (a fraction of) the training data.

    Args:
        pipeline: scikit-learn Pipeline to evaluate
        X_train, y_train: training data
        n_splits: number of CV folds
        row_fraction: fraction of rows to subsample (1.0 → use all)
        base_random_state: used for subsampling and CV splitter

    Returns:
        (mean_accuracy, std_accuracy)
    """
    # Subsample rows if row_fraction < 1.0
    if row_fraction < 1.0:
        n_total = len(X_train)
        n_sub = max(1, int(n_total * row_fraction))
        rng = np.random.RandomState(base_random_state)
        idx = rng.choice(n_total, size=n_sub, replace=False)
        X_sub = X_train.iloc[idx].reset_index(drop=True)
        y_sub = y_train.iloc[idx].reset_index(drop=True)
    else:
        X_sub = X_train
        y_sub = y_train

    skf = StratifiedKFold(
        n_splits=n_splits,
        shuffle=True,
        random_state=base_random_state,
    )

    scores = []

    for fold_idx, (train_idx, val_idx) in enumerate(skf.split(X_sub, y_sub), start=1):
        X_tr = X_sub.iloc[train_idx]
        X_val = X_sub.iloc[val_idx]
        y_tr = y_sub.iloc[train_idx]
        y_val = y_sub.iloc[val_idx]

        # Fit and evaluate
        pipeline.fit(X_tr, y_tr)
        y_pred = pipeline.predict(X_val)
        acc = accuracy_score(y_val, y_pred)
        scores.append(acc)

    mean_acc = float(np.mean(scores)) if scores else 0.0
    std_acc = float(np.std(scores)) if scores else 0.0
    return mean_acc, std_acc


# ------------------------------------------------------------
# Main HPO loop
# ------------------------------------------------------------

def run_hpo(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    meta: DatasetMeta,
    preprocessor: ColumnTransformer,
    search_space: Dict[str, Any],
    budget_settings: Dict[str, Any],
    config: AutoMLConfig,
) -> Tuple[Dict[str, Any], float, float, int, float]:
    """
    Run hyperparameter optimization for a single dataset.

    Currently implemented as:
      - Pure random search over the joint configuration space
      - Single fidelity level (the highest one from config.fidelity_levels)
      - Cross-validation accuracy as the objective

    Args:
        X_train, y_train: training data
        meta: DatasetMeta
        preprocessor: pre-built preprocessing transformer for this dataset
        search_space: model and FE hyperparameter definitions
        budget_settings: output of determine_budget_regime(...)
        config: global AutoML config

    Returns:
        best_config: dict of best hyperparameters (including model_type, fe_mode)
        best_cv_mean: float, mean CV accuracy for best_config
        best_cv_std: float, std of CV accuracy for best_config
        n_trials: int, number of evaluated configs
        optimization_time: float, total time spent in HPO (seconds)
    """
    max_trials = budget_settings["max_trials"]
    fidelity = budget_settings["fidelity"]
    row_fraction = fidelity["row_fraction"]
    n_splits = fidelity["n_splits"]

    print_banner(
        f"Starting HPO for dataset {meta.dataset_id} ({meta.dataset_name}) "
        f"with max_trials={max_trials}"
    )

    start_time = time.time()

    best_config: Dict[str, Any] = {}
    best_cv_mean: float = -np.inf
    best_cv_std: float = 0.0

    # Main random search loop
    for trial_idx in range(1, max_trials + 1):
        # Sample a random configuration
        config_candidate = sample_random_config(search_space)

        fe_mode = config_candidate.get("feature_engineering_mode", "none")
        fe_block = build_feature_engineering_block(fe_mode)

        # Build pipeline for this config
        pipeline = build_model_pipeline(config_candidate, preprocessor, fe_block)

        # Evaluate via CV
        base_random_state = config.random_seed + trial_idx
        mean_acc, std_acc = _evaluate_config_cv(
            pipeline,
            X_train,
            y_train,
            n_splits=n_splits,
            row_fraction=row_fraction,
            base_random_state=base_random_state,
        )

        # Update best if necessary
        if mean_acc > best_cv_mean:
            best_cv_mean = mean_acc
            best_cv_std = std_acc
            best_config = config_candidate

        # Occasionally print progress
        if (trial_idx == 1) or (trial_idx % max(1, max_trials // 5) == 0):
            print(
                f"[HPO] Trial {trial_idx:3d}/{max_trials} | "
                f"cv_accuracy={mean_acc:.4f} (std={std_acc:.4f}) | "
                f"best={best_cv_mean:.4f} (model={best_config.get('model_type')}, "
                f"fe={best_config.get('feature_engineering_mode')})"
            )

    optimization_time = float(time.time() - start_time)

    print_banner(
        f"HPO finished for dataset {meta.dataset_id} ({meta.dataset_name}) | "
        f"best_cv_accuracy={best_cv_mean:.4f} | "
        f"n_trials={max_trials} | "
        f"optimization_time={optimization_time:.1f}s"
    )

    return best_config, best_cv_mean, best_cv_std, max_trials, optimization_time


In [6]:
# ============================================================
# Cell 6 — Final Training and Test Evaluation
# ============================================================

from typing import Dict, Any, Tuple


def train_final_model(
    best_config: Dict[str, Any],
    X_train: pd.DataFrame,
    y_train: pd.Series,
    preprocessor: ColumnTransformer,
    config: AutoMLConfig,
) -> Tuple[Pipeline, float]:
    """
    Train the final model on the full training data using the best configuration
    found during HPO.

    Args:
        best_config: dict with at least:
            - "model_type"
            - "feature_engineering_mode"
            - model-specific hyperparameters
        X_train, y_train: full training data (no subsampling)
        preprocessor: pre-built preprocessing transformer for this dataset
        config: global configuration (used e.g. for random seed if needed)

    Returns:
        final_pipeline: fitted Pipeline (preprocessor -> FE -> model)
        training_time: seconds spent on final training
    """
    if not best_config:
        raise ValueError("best_config is empty; HPO must return a non-empty configuration.")

    model_type = best_config.get("model_type", "unknown")
    fe_mode = best_config.get("feature_engineering_mode", "none")

    print_banner(
        f"Training final model for dataset with model_type={model_type}, "
        f"feature_engineering_mode={fe_mode}"
    )

    # Rebuild feature engineering block and full pipeline
    fe_block = build_feature_engineering_block(fe_mode)
    final_pipeline = build_model_pipeline(best_config, preprocessor, fe_block)

    # Fit on full training data
    start_time = time.time()
    final_pipeline.fit(X_train, y_train)
    training_time = float(time.time() - start_time)

    print(
        f"Final model trained in {training_time:.2f} seconds "
        f"(model_type={model_type}, fe_mode={fe_mode})."
    )

    return final_pipeline, training_time


def evaluate_on_test(
    final_pipeline: Pipeline,
    X_test: pd.DataFrame,
    y_test: pd.Series,
) -> Dict[str, float]:
    """
    Evaluate the final trained pipeline on the held-out test set.

    Computes:
      - test_accuracy
      - test_balanced_accuracy
      - test_f1_macro
      - test_f1_weighted

    Args:
        final_pipeline: fitted Pipeline from train_final_model
        X_test, y_test: held-out test data

    Returns:
        dict with the four test metrics.
    """
    print(
        f"Evaluating final model on test set with {len(X_test)} samples..."
    )

    y_pred = final_pipeline.predict(X_test)

    test_accuracy = float(accuracy_score(y_test, y_pred))
    test_balanced_accuracy = float(balanced_accuracy_score(y_test, y_pred))
    test_f1_macro = float(f1_score(y_test, y_pred, average="macro"))
    test_f1_weighted = float(f1_score(y_test, y_pred, average="weighted"))

    print(
        "Test metrics: "
        f"accuracy={test_accuracy:.4f}, "
        f"balanced_accuracy={test_balanced_accuracy:.4f}, "
        f"f1_macro={test_f1_macro:.4f}, "
        f"f1_weighted={test_f1_weighted:.4f}"
    )

    return {
        "test_accuracy": test_accuracy,
        "test_balanced_accuracy": test_balanced_accuracy,
        "test_f1_macro": test_f1_macro,
        "test_f1_weighted": test_f1_weighted,
    }


def summarize_hyperparameters(best_config: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
    """
    Build:
      - a compact, human-readable hyperparameter string
      - a JSON-serializable dict of all hyperparameters

    Excludes none of the keys from the JSON, but for the string we skip
    overly verbose or redundant fields if needed (for now we keep all).

    Args:
        best_config: dict containing at least:
            - "model_type"
            - "feature_engineering_mode"
            - model-specific hyperparameters

    Returns:
        hyperparameters_str: string, e.g. "model=rf, fe=light, max_depth=10, n_estimators=300"
        hyperparameters_json: dict, a shallow copy of best_config
    """
    if not best_config:
        return "", {}

    model_type = best_config.get("model_type", "unknown")
    fe_mode = best_config.get("feature_engineering_mode", "none")

    # Human-readable string
    # Show model_type + fe_mode first, then sorted hyperparameters
    non_param_keys = {"model_type", "feature_engineering_mode"}
    param_items = sorted(
        [(k, v) for k, v in best_config.items() if k not in non_param_keys],
        key=lambda kv: kv[0],
    )

    param_str_parts = [f"{k}={v}" for k, v in param_items]
    params_str = ", ".join(param_str_parts)

    hyperparameters_str = (
        f"model_type={model_type}, feature_engineering_mode={fe_mode}"
        + (", " + params_str if params_str else "")
    )

    # JSON/dict version: just copy best_config
    hyperparameters_json = dict(best_config)

    return hyperparameters_str, hyperparameters_json


In [7]:
# ============================================================
# Cell 7 — Result Row Construction and Saving
# ============================================================

from typing import List, Dict, Any
import json


def build_result_row(
    meta: DatasetMeta,
    best_config: Dict[str, Any],
    cv_mean: float,
    cv_std: float,
    test_metrics: Dict[str, float],
    optimization_time: float,
    training_time: float,
    n_trials: int,
    config: AutoMLConfig,
) -> Dict[str, Any]:
    """
    Build a single result-row dict matching the benchmark_results.csv format:

        dataset_id
        dataset_name
        approach
        model_type
        hyperparameters
        cv_accuracy_mean
        cv_accuracy_std
        test_accuracy
        test_balanced_accuracy
        test_f1_macro
        test_f1_weighted
        optimization_time
        training_time
        n_trials
        feature_engineering
        rationale
        hyperparameters_json

    Args:
        meta: DatasetMeta for this dataset
        best_config: dict of best hyperparameters (includes model_type, fe_mode)
        cv_mean, cv_std: cross-validation accuracy statistics for best_config
        test_metrics: dict with test_* metrics
        optimization_time: total HPO time in seconds
        training_time: final model training time in seconds
        n_trials: number of evaluated configurations
        config: global AutoMLConfig (provides approach_name, etc.)

    Returns:
        A dict with keys exactly matching the desired CSV columns.
    """
    # Derive hyperparameters string + JSON
    hyperparameters_str, hyperparameters_json = summarize_hyperparameters(best_config)

    model_type = best_config.get("model_type", "unknown")
    fe_mode = best_config.get("feature_engineering_mode", "none")

    # We don't use LLM rationales here → leave empty string
    rationale_str = ""

    # Serialize JSON dict to a string for CSV storage
    hyperparameters_json_str = json.dumps(hyperparameters_json)

    row: Dict[str, Any] = {
        "dataset_id": meta.dataset_id,
        "dataset_name": meta.dataset_name,
        "approach": config.approach_name,
        "model_type": model_type,
        "hyperparameters": hyperparameters_str,
        "cv_accuracy_mean": float(cv_mean),
        "cv_accuracy_std": float(cv_std),
        "test_accuracy": float(test_metrics["test_accuracy"]),
        "test_balanced_accuracy": float(test_metrics["test_balanced_accuracy"]),
        "test_f1_macro": float(test_metrics["test_f1_macro"]),
        "test_f1_weighted": float(test_metrics["test_f1_weighted"]),
        "optimization_time": float(optimization_time),
        "training_time": float(training_time),
        "n_trials": int(n_trials),
        "feature_engineering": fe_mode,
        "rationale": rationale_str,
        "hyperparameters_json": hyperparameters_json_str,
    }

    print(
        f"Built result row for dataset {meta.dataset_id} ({meta.dataset_name}): "
        f"test_accuracy={row['test_accuracy']:.4f}, model_type={model_type}, fe={fe_mode}"
    )

    return row


def save_results(results_rows: List[Dict[str, Any]], output_path: str) -> pd.DataFrame:
    """
    Convert a list of result-row dicts into a DataFrame and save to CSV.

    Ensures the column order matches the benchmark_results.csv file:
        dataset_id
        dataset_name
        approach
        model_type
        hyperparameters
        cv_accuracy_mean
        cv_accuracy_std
        test_accuracy
        test_balanced_accuracy
        test_f1_macro
        test_f1_weighted
        optimization_time
        training_time
        n_trials
        feature_engineering
        rationale
        hyperparameters_json

    Args:
        results_rows: list of result-row dicts from build_result_row
        output_path: path to the CSV file to write

    Returns:
        The pandas DataFrame that was saved.
    """
    if not results_rows:
        print("No results to save — results_rows is empty.")
        return pd.DataFrame()

    columns = [
        "dataset_id",
        "dataset_name",
        "approach",
        "model_type",
        "hyperparameters",
        "cv_accuracy_mean",
        "cv_accuracy_std",
        "test_accuracy",
        "test_balanced_accuracy",
        "test_f1_macro",
        "test_f1_weighted",
        "optimization_time",
        "training_time",
        "n_trials",
        "feature_engineering",
        "rationale",
        "hyperparameters_json",
    ]

    df = pd.DataFrame(results_rows)

    # Reorder columns to match the expected output format
    df = df[columns]

    print_banner(f"Saving results to {output_path}")
    df.to_csv(output_path, index=False)
    print(f"Saved {len(df)} rows to {output_path}")

    return df


In [16]:
# ============================================================
# Cell 8 — Main Pipeline Execution
# ============================================================

# 1. User specifies dataset IDs here
dataset_ids = [ 1459
]
# 2. Load global config
config = get_default_config()
set_global_seed(config.random_seed)

print_banner("Starting Full AutoML Pipeline")

all_results = []

# 3. Loop over datasets
for dataset_id in dataset_ids:
    print_banner(f"Processing dataset {dataset_id}")

    # -------------------------
    # Load dataset
    # -------------------------
    X, y, meta = load_openml_dataset(dataset_id)

    # -------------------------
    # Train/test split
    # -------------------------
    X_train, X_test, y_train, y_test = train_test_split_dataset(
        X, y, meta, config
    )

    # -------------------------
    # Build preprocessing pipeline
    # -------------------------
    preprocessor = build_preprocessor(meta)

    # -------------------------
    # Model search space
    # -------------------------
    search_space = get_model_search_space()

    # -------------------------
    # Determine budget for this dataset
    # -------------------------
    budget_settings = determine_budget_regime(meta, config)

    # -------------------------
    # Run HPO (random-search version)
    # -------------------------
    (
        best_config,
        cv_mean,
        cv_std,
        n_trials,
        optimization_time,
    ) = run_hpo(
        X_train=X_train,
        y_train=y_train,
        meta=meta,
        preprocessor=preprocessor,
        search_space=search_space,
        budget_settings=budget_settings,
        config=config,
    )

    # -------------------------
    # Train final model
    # -------------------------
    final_pipeline, training_time = train_final_model(
        best_config, X_train, y_train, preprocessor, config
    )

    # -------------------------
    # Test evaluation
    # -------------------------
    test_metrics = evaluate_on_test(final_pipeline, X_test, y_test)

    # -------------------------
    # Assemble one-row result
    # -------------------------
    result_row = build_result_row(
        meta=meta,
        best_config=best_config,
        cv_mean=cv_mean,
        cv_std=cv_std,
        test_metrics=test_metrics,
        optimization_time=optimization_time,
        training_time=training_time,
        n_trials=n_trials,
        config=config,
    )
    all_results.append(result_row)

    print_banner(
        f"Finished dataset {dataset_id} — "
        f"Test Accuracy: {test_metrics['test_accuracy']:.4f}"
    )

# -----------------------------
# 4. Save all results to CSV
# -----------------------------
output_path = "automl_pipeline_results.csv"
df_results = save_results(all_results, output_path)

print_banner("AutoML Pipeline Completed")
df_results



>>> Starting Full AutoML Pipeline


>>> Processing dataset 1459

Loading OpenML dataset 1459 ...
Loaded dataset 1459 (artificial-characters) with 10218 samples, 7 features (7 numerical, 0 categorical).
Missing values: 0.00% | Class imbalance ratio: 0.424
Splitting dataset 1459 (artificial-characters) into train/test with test_size=0.20 ...
Train size: 8174 samples | Test size: 2044 samples
Building preprocessing pipeline for dataset 1459 (artificial-characters) with 7 numerical and 0 categorical features.
Dataset 1459 (artificial-characters) has 10218 samples → using 'medium' budget with max_trials=60 and fidelity=high (row_fraction=1.0, n_splits=5).

>>> Starting HPO for dataset 1459 (artificial-characters) with max_trials=60

Feature engineering mode: none (identity transform).
[HPO] Trial   1/60 | cv_accuracy=0.6502 (std=0.0087) | best=0.6502 (model=decision_tree, fe=none)
Feature engineering mode: none (identity transform).
Feature engineering mode: light (adding log1p-transformed



Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: none (identity transform).
Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: none (identity transform).
Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: none (identity transform).
Feature engineering mode: light (adding log1p-transformed features).
[HPO] Trial  36/60 | cv_accuracy=0.7671 (std=0.0113) | best=0.8722 (model=hist_gradient_boosting, fe=light)
Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: light (adding log1p-transformed features).
Feature engineering mode: light (adding log

Unnamed: 0,dataset_id,dataset_name,approach,model_type,hyperparameters,cv_accuracy_mean,cv_accuracy_std,test_accuracy,test_balanced_accuracy,test_f1_macro,test_f1_weighted,optimization_time,training_time,n_trials,feature_engineering,rationale,hyperparameters_json
0,1459,artificial-characters,AutoML-BOHB,hist_gradient_boosting,"model_type=hist_gradient_boosting, feature_eng...",0.872155,0.009642,0.909491,0.903619,0.903054,0.909516,680.779557,3.702307,60,light,,"{""feature_engineering_mode"": ""light"", ""model_t..."
