# Model Training Comparison
Train multiple regressors on melting point features and compare metrics.

In [7]:
import json
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple

import numpy as np
import pandas as pd
from sklearn.base import clone
from sklearn.compose import TransformedTargetRegressor
from sklearn.model_selection import (
    KFold,
    RandomizedSearchCV,
    cross_val_predict,
    train_test_split,
)
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import PowerTransformer
from sklearn.utils import check_random_state

# Models
from lightgbm import LGBMRegressor
from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, GradientBoostingRegressor
from sklearn.ensemble import HistGradientBoostingRegressor

from rdkit import Chem
from rdkit.Chem.Scaffolds import MurckoScaffold

In [8]:
def expand_bitstring_column(df: pd.DataFrame, column: str, prefix: str) -> Tuple[pd.DataFrame, List[str]]:
    """Expand a bitstring column into individual 0/1 columns."""
    if column not in df.columns:
        return df, []
    bit_lengths = df[column].dropna().map(len)
    if bit_lengths.empty:
        return df.drop(columns=[column]), []
    bit_len = int(bit_lengths.max())
    filled = df[column].fillna("0" * bit_len)
    bits_df = filled.apply(lambda s: pd.Series([int(ch) for ch in s[:bit_len]])).astype(np.int8)
    bits_df.columns = [f"{prefix}_{i}" for i in bits_df.columns]
    df = df.drop(columns=[column])
    df = pd.concat([df, bits_df], axis=1)
    return df, list(bits_df.columns)


def expand_tuple_column(df: pd.DataFrame, column: str, prefix: str, expected: Optional[int] = None) -> Tuple[pd.DataFrame, List[str]]:
    """Expand a tuple-like column (e.g., PMI1/2/3) into numeric columns."""
    if column not in df.columns:
        return df, []
    def to_seq(val: object) -> Sequence:
        if isinstance(val, str):
            try:
                parsed = json.loads(val)
                if isinstance(parsed, list):
                    return parsed
            except Exception:
                return []
        if isinstance(val, (list, tuple)):
            return val
        return []
    seq_series = df[column].apply(to_seq)
    width = expected or int(seq_series.map(len).max() or 0)
    cols: List[str] = []
    if width:
        expanded = pd.DataFrame(seq_series.apply(lambda seq: list(seq)[:width]).tolist(), index=df.index)
        expanded = expanded.add_prefix(f"{prefix}_").astype(np.float32)
        cols = list(expanded.columns)
        df = pd.concat([df.drop(columns=[column]), expanded], axis=1)
    else:
        df = df.drop(columns=[column])
    return df, cols


def load_and_prepare_features(data_path: Path) -> Tuple[pd.DataFrame, pd.Series, Optional[pd.Series]]:
    train_data = pd.read_csv(data_path)
    TARGET_COLUMN = "Tm"
    if TARGET_COLUMN not in train_data.columns:
        raise KeyError(f"Missing target column '{TARGET_COLUMN}'")

    smiles_col = "canonical_smiles" if "canonical_smiles" in train_data.columns else None
    smiles: Optional[pd.Series] = None

    drop_raw = ["id", "gasteiger_charges"]
    df = train_data.drop(columns=[c for c in drop_raw if c in train_data.columns])
    if smiles_col:
        df = df.rename(columns={smiles_col: "__smiles__"})

    df, morgan_cols = expand_bitstring_column(df, "morgan_fingerprint_bits", "morgan")
    df, maccs_cols = expand_bitstring_column(df, "maccs_keys_bits", "maccs")
    df, pmi_cols = expand_tuple_column(df, "principal_moments_3d", "pmi", expected=3)

    target = df[TARGET_COLUMN].astype(np.float32)
    features = df.drop(columns=[TARGET_COLUMN])
    if "__smiles__" in features.columns:
        smiles = features.pop("__smiles__")

    features = features.apply(pd.to_numeric, errors="coerce")

    all_nan_cols = features.columns[features.isna().all()]
    const_cols = [c for c in features.columns if features[c].nunique(dropna=False) <= 1]
    drop_cols = sorted(set(all_nan_cols.tolist() + const_cols))
    features = features.drop(columns=drop_cols)
    median_values = features.median(numeric_only=True)
    features = features.fillna(median_values).astype(np.float32)

    print(
        f"Prepared features: {features.shape[0]} rows, {features.shape[1]} cols ("
        f"+{len(morgan_cols)} morgan, +{len(maccs_cols)} maccs, +{len(pmi_cols)} pmi; "
        f"dropped {len(all_nan_cols)} all-NaN cols, {len(const_cols)} constant cols)"
    )
    return features, target, smiles

In [9]:
data_path = Path("result/data/melting_point_features.csv")
if not data_path.exists():
    raise FileNotFoundError(f"Data file not found at {data_path.resolve()}")
X, y, smiles = load_and_prepare_features(data_path)

# Keep a quick holdout for sanity checks alongside CV
X_train, X_valid, y_train, y_valid = train_test_split(
    X, y, test_size=0.2, random_state=42, shuffle=True
)
print(f"Holdout: {X_train.shape[0]} train rows, {X_valid.shape[0]} valid rows, {X_train.shape[1]} features")

Prepared features: 2660 rows, 2185 cols (+2048 morgan, +167 maccs, +0 pmi; dropped 2 all-NaN cols, 134 constant cols)
Holdout: 2128 train rows, 532 valid rows, 2185 features


In [10]:
USE_TARGET_TRANSFORM = True
TARGET_TRANSFORMER = PowerTransformer(method="yeo-johnson", standardize=True)

def maybe_wrap(model):
    if not USE_TARGET_TRANSFORM:
        return model
    return TransformedTargetRegressor(regressor=model, transformer=TARGET_TRANSFORMER)


def generate_scaffold(smiles: str) -> str:
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return "parse_failed"
    try:
        return MurckoScaffold.MurckoScaffoldSmiles(mol=mol) or "no_scaffold"
    except Exception:
        return "no_scaffold"


def make_scaffold_folds(smiles: Optional[pd.Series], n_splits: int = 5, seed: int = 42) -> List[Tuple[np.ndarray, np.ndarray]]:
    if smiles is None:
        kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
        return [(train_idx, val_idx) for train_idx, val_idx in kf.split(X)]
    rng = check_random_state(seed)
    smiles_seq = smiles.reset_index(drop=True)
    scaffold_to_indices: Dict[str, List[int]] = {}
    for idx, smi in enumerate(smiles_seq):
        scaffold = generate_scaffold(smi)
        scaffold_to_indices.setdefault(scaffold, []).append(idx)
    # Largest scaffolds first, then round-robin assign to folds
    sorted_scaffolds = sorted(scaffold_to_indices.items(), key=lambda kv: len(kv[1]), reverse=True)
    folds: List[List[int]] = [[] for _ in range(n_splits)]
    for i, (_, idxs) in enumerate(sorted_scaffolds):
        target_fold = i % n_splits
        rng.shuffle(idxs)
        folds[target_fold].extend(idxs)
    split_indices: List[Tuple[np.ndarray, np.ndarray]] = []
    all_indices = np.arange(len(smiles_seq))
    for fold in folds:
        val_idx = np.array(sorted(fold))
        train_idx = np.setdiff1d(all_indices, val_idx)
        split_indices.append((train_idx, val_idx))
    return split_indices


def evaluate_holdout(name, model, X_train, y_train, X_valid, y_valid):
    mdl = clone(model)
    mdl.fit(X_train, y_train)
    y_pred = mdl.predict(X_valid)
    mae = mean_absolute_error(y_valid, y_pred)
    rmse = np.sqrt(mean_squared_error(y_valid, y_pred))
    r2 = r2_score(y_valid, y_pred)
    return {"model": name, "mae_holdout": float(mae), "rmse_holdout": float(rmse), "r2_holdout": float(r2)}


def evaluate_cv(name, model, X, y, folds: List[Tuple[np.ndarray, np.ndarray]]):
    preds = np.zeros(len(y), dtype=float)
    for train_idx, val_idx in folds:
        mdl = clone(model)
        mdl.fit(X.iloc[train_idx], y.iloc[train_idx])
        preds[val_idx] = mdl.predict(X.iloc[val_idx])
    mae = mean_absolute_error(y, preds)
    rmse = np.sqrt(mean_squared_error(y, preds))
    r2 = r2_score(y, preds)
    return {"model": name, "mae_cv": float(mae), "rmse_cv": float(rmse), "r2_cv": float(r2)}


def tune_lightgbm(X, y, folds: List[Tuple[np.ndarray, np.ndarray]]):
    base = LGBMRegressor(
        objective="regression_l1",
        random_state=42,
        n_estimators=1600,
        learning_rate=0.03,
        subsample=0.9,
        colsample_bytree=0.9,
        verbose=-1,
    )
    param_dist = {
        "num_leaves": [63, 95, 127, 191],
        "max_depth": [-1, 10, 14, 18, 22],
        "min_child_samples": [10, 20, 40, 80],
        "subsample": [0.7, 0.8, 0.9, 1.0],
        "colsample_bytree": [0.6, 0.75, 0.9, 1.0],
        "reg_alpha": [0.0, 0.05, 0.1, 0.2],
        "reg_lambda": [0.0, 0.1, 0.5, 1.0],
    }
    search = RandomizedSearchCV(
        estimator=base,
        param_distributions=param_dist,
        n_iter=24,
        scoring="neg_mean_absolute_error",
        cv=folds,
        random_state=42,
        n_jobs=-1,
        verbose=0,
    )
    search.fit(X, y)
    print(f"Best LightGBM params: {search.best_params_}")
    return search.best_estimator_, search.best_params_

In [None]:
folds = make_scaffold_folds(smiles, n_splits=5, seed=42)
cv_label = "ScaffoldKFold" if smiles is not None else "KFold"
print(f"Using {cv_label} with {len(folds)} folds")

tuned_lgbm, best_params = tune_lightgbm(X, y, folds)
models = [
    ("lightgbm_tuned", maybe_wrap(tuned_lgbm)),
    ("random_forest", maybe_wrap(RandomForestRegressor(n_estimators=600, random_state=42, n_jobs=-1, max_depth=None))),
    ("extra_trees", maybe_wrap(ExtraTreesRegressor(n_estimators=600, random_state=42, n_jobs=-1, max_depth=None))),
    ("grad_boost", maybe_wrap(GradientBoostingRegressor(random_state=42, n_estimators=800, learning_rate=0.05, max_depth=3))),
    ("hist_grad_boost", maybe_wrap(HistGradientBoostingRegressor(random_state=42, max_depth=12, learning_rate=0.05, max_iter=900))),
    ("lightgbm_baseline", maybe_wrap(LGBMRegressor(objective="regression", random_state=42, n_estimators=1200, learning_rate=0.03, num_leaves=128, subsample=0.85, colsample_bytree=0.85, min_child_samples=20, reg_lambda=0.5, reg_alpha=0.1, verbose=-1))),
 ]

results = []
for name, mdl in models:
    print(f"Training {name}...")
    holdout_metrics = evaluate_holdout(name, mdl, X_train, y_train, X_valid, y_valid)
    cv_metrics = evaluate_cv(name, mdl, X, y, folds)
    merged = {**holdout_metrics, **cv_metrics}
    if name == "lightgbm_tuned":
        merged["best_params"] = best_params
    results.append(merged)

results_df = pd.DataFrame(results).set_index("model")
display(results_df[[c for c in results_df.columns if c.startswith("mae") or c.startswith("rmse") or c.startswith("r2")]])
best = results_df.sort_values("mae_cv").head(3)
print("Best (by CV MAE):")
display(best)

Using ScaffoldKFold with 5 folds


In [None]:
# Save metrics
output_dir = Path("result/data")
output_dir.mkdir(parents=True, exist_ok=True)
metrics_json = output_dir / "model_comparison_metrics.json"
metrics_csv = output_dir / "model_comparison_metrics.csv"
metrics_json.write_text(json.dumps(results, indent=2))
results_df.to_csv(metrics_csv)
print(f"Saved metrics to {metrics_json} and {metrics_csv}")

Saved metrics to result/data/model_comparison_metrics.json and result/data/model_comparison_metrics.csv
