 # Quick Classification Pipeline for Drug Synergy
 - Encodes categorical features (Drug1, Drug2, Cell line)
 - Uses synergy score (ZIP) as numerical feature
 - Trains RandomForestClassifier as baseline
 - Evaluates with Confusion Matrix & Classification Report

In [None]:
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
from typing import Tuple, Dict
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import GroupShuffleSplit, GroupKFold
from sklearn.experimental import enable_hist_gradient_boosting  # noqa: F401
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import (
    classification_report, confusion_matrix, roc_auc_score,
    average_precision_score
)
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

# =========================
# CONFIG
# =========================
INPUT_PATH = Path(r"C:\Users\46762\VSCODE\BIG_PHARMA\data\raw\Syner&Antag_zip.csv")  # <-- your CSV
OUT_DIR    = Path(r"C:\Users\46762\VSCODE\BIG_PHARMA\data\interim")
OUT_DIR.mkdir(parents=True, exist_ok=True)

RANDOM_SEED = 42
TEST_SIZE   = 0.2

# If your classification label was derived from ZIP cutoffs, keep this False to avoid trivial leakage.
INCLUDE_ZIP_AS_FEATURE = False

# Optional: run a label-shuffle sanity check to detect hidden leakage
RUN_RANDOM_LABEL_TEST = True

# =========================
# Cleaning helpers (mirrors your style)
# =========================
def norm_str(x):
    """Normalize strings: lowercase, trim, unify dashes."""
    if pd.isna(x): return np.nan
    s = str(x).strip().lower().replace("–", "-")
    return " ".join(s.split())

def canonicalize_pairs(df: pd.DataFrame) -> pd.DataFrame:
    pairs = df[["drug1", "drug2"]].apply(lambda r: tuple(sorted([r["drug1"], r["drug2"]])), axis=1)
    df["drug_min"] = [p[0] for p in pairs]
    df["drug_max"] = [p[1] for p in pairs]
    return df

def add_label_encodings(df: pd.DataFrame) -> Tuple[pd.DataFrame, LabelEncoder, LabelEncoder]:
    le_cell, le_drug = LabelEncoder(), LabelEncoder()
    df["cell_id"] = le_cell.fit_transform(df["cell_line"])
    all_drugs = pd.Index(df["drug_min"]).append(pd.Index(df["drug_max"])).unique()
    le_drug.fit(all_drugs)
    df["drug_min_id"] = le_drug.transform(df["drug_min"])
    df["drug_max_id"] = le_drug.transform(df["drug_max"])
    return df, le_cell, le_drug

def add_frequency_features(df: pd.DataFrame) -> pd.DataFrame:
    dmin_freq = df["drug_min"].value_counts().to_dict()
    dmax_freq = df["drug_max"].value_counts().to_dict()
    cell_freq = df["cell_line"].value_counts().to_dict()

    df["drug_min_freq"] = df["drug_min"].map(dmin_freq)
    df["drug_max_freq"] = df["drug_max"].map(dmax_freq)
    df["cell_freq"]     = df["cell_line"].map(cell_freq)

    for c in ["drug_min_freq", "drug_max_freq", "cell_freq"]:
        df[f"{c}_log"] = np.log1p(df[c])
    return df

def clean_input(path: Path) -> pd.DataFrame:
    """Load and clean the classified dataset in your style."""
    df = pd.read_csv(path)

    # --- Standardize column names ---
    rename = {
        "Drug1": "drug1",
        "Drug2": "drug2",
        "Cell line": "cell_line",
        "ZIP": "synergy_zip",         # keep as numeric but exclude from features by default
        "classification": "label"
    }
    df = df.rename(columns=rename)

    # --- Normalize strings ---
    for c in ["drug1", "drug2", "cell_line"]:
        df[c] = df[c].apply(norm_str)

    # --- Drop NA and self-pairs ---
    df = df.dropna(subset=["drug1", "drug2", "cell_line", "label"])
    df = df[df["drug1"] != df["drug2"]]

    # --- Canonical pair & pair_id ---
    df = canonicalize_pairs(df)
    df["pair_id"] = df["drug_min"] + "_" + df["drug_max"]

    # --- Label encodings (IDs) ---
    df, _, _ = add_label_encodings(df)

    # --- Frequency features ---
    df = add_frequency_features(df)

    # Make sure synergy_zip is numeric if present
    if "synergy_zip" in df.columns:
        df["synergy_zip"] = pd.to_numeric(df["synergy_zip"], errors="coerce")

    # Save cleaned snapshot
    cleaned_path = OUT_DIR / "classified_clean.csv"
    df.to_csv(cleaned_path, index=False)
    print(f"[INFO] Saved cleaned dataset → {cleaned_path} | shape={df.shape}")
    return df

# =========================
# Feature building
# =========================
def build_features(df: pd.DataFrame, include_zip: bool) -> Tuple[pd.DataFrame, pd.Series, pd.Series]:
    """Return X, y, groups where groups = (pair_id + cell_line) to avoid leakage."""
    base_cols = [
        "drug_min_id", "drug_max_id", "cell_id",
        "drug_min_freq_log", "drug_max_freq_log", "cell_freq_log"
    ]
    if include_zip and "synergy_zip" in df.columns:
        base_cols.append("synergy_zip")

    X = df[base_cols].copy()
    y = df["label"].astype(str).copy()

    # Groups to avoid leakage across (drug_min, drug_max, cell_line)
    groups = (df["pair_id"].astype(str) + "__" + df["cell_line"].astype(str))
    return X, y, groups

# =========================
# Train / Evaluate
# =========================
def train_eval_once(X, y, groups, random_state=RANDOM_SEED, test_size=TEST_SIZE) -> Dict[str, float]:
    """Group-aware single split eval."""
    splitter = GroupShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
    tr_idx, te_idx = next(splitter.split(X, y, groups=groups))

    Xtr, Xte = X.iloc[tr_idx], X.iloc[te_idx]
    ytr, yte = y.iloc[tr_idx], y.iloc[te_idx]

    # Strong, fast baseline
    clf = HistGradientBoostingClassifier(
        learning_rate=0.08,
        max_depth=None,
        max_leaf_nodes=31,
        l2_regularization=0.0,
        random_state=random_state
    )
    clf.fit(Xtr, ytr)

    yhat = clf.predict(Xte)
    yproba = None
    try:
        yproba = clf.predict_proba(Xte)  # may not be available for HGB in older versions
    except Exception:
        pass

    print("\n=== Classification Report (Group-aware split) ===")
    print(classification_report(yte, yhat, digits=3))

    cm = confusion_matrix(yte, yhat, labels=np.unique(y))
    print("Confusion matrix (rows=true, cols=pred):\n", cm)

    # Compute ROC-AUC and PR-AUC for "synergy" class if probabilities exist and binary problem
    metrics = {}
    if yproba is not None and len(np.unique(y)) == 2:
        # Map positive class as "synergy" if present, else use the second class lexicographically
        classes = clf.classes_
        pos_idx = np.where(classes == "synergy")[0][0] if "synergy" in classes else 1
        proba_pos = yproba[:, pos_idx]
        # Convert labels to binary for metrics
        y_bin = (yte == classes[pos_idx]).astype(int)
        try:
            metrics["roc_auc"] = roc_auc_score(y_bin, proba_pos)
            metrics["pr_auc"]  = average_precision_score(y_bin, proba_pos)
            print(f"ROC-AUC: {metrics['roc_auc']:.3f} | PR-AUC: {metrics['pr_auc']:.3f}")
        except Exception:
            pass

    return metrics

def group_kfold_cv(X, y, groups, n_splits=5) -> None:
    """Extra: GroupKFold CV for stability (no leakage)."""
    gkf = GroupKFold(n_splits=n_splits)
    scores = []
    fold = 1
    for tr_idx, te_idx in gkf.split(X, y, groups):
        clf = HistGradientBoostingClassifier(
            learning_rate=0.08,
            max_depth=None,
            max_leaf_nodes=31,
            random_state=RANDOM_SEED + fold
        )
        clf.fit(X.iloc[tr_idx], y.iloc[tr_idx])
        yhat = clf.predict(X.iloc[te_idx])
        print(f"\n[Fold {fold}]")
        print(classification_report(y.iloc[te_idx], yhat, digits=3))
        fold += 1

def random_label_sanity_check(X, y, groups):
    """Shuffle labels and ensure performance collapses (detects hidden leakage)."""
    y_rand = y.sample(frac=1.0, random_state=999).reset_index(drop=True)
    X_rand = X.reset_index(drop=True)
    groups_rand = groups.reset_index(drop=True)

    print("\n[Sanity] Training with RANDOMIZED labels...")
    _ = train_eval_once(X_rand, y_rand, groups_rand, random_state=777)

# =========================
# Main
# =========================
def main():
    df = clean_input(INPUT_PATH)
    X, y, groups = build_features(df, include_zip=INCLUDE_ZIP_AS_FEATURE)

    print(f"[INFO] Feature matrix shape: {X.shape} | Labels: {y.value_counts().to_dict()}")
    print(f"[INFO] INCLUDE_ZIP_AS_FEATURE = {INCLUDE_ZIP_AS_FEATURE}")

    # Group-aware single split eval
    _ = train_eval_once(X, y, groups)

    # Optional: GroupKFold CV for stability
    print("\n[INFO] Running GroupKFold CV (5 folds) for stability...")
    group_kfold_cv(X, y, groups, n_splits=5)

    # Optional: randomized label test to detect leakage
    if RUN_RANDOM_LABEL_TEST:
        random_label_sanity_check(X, y, groups)

if __name__ == "__main__":
    main()