In [5]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Heart Disease — Comprehensive ML Pipeline (Single Python Script)
- Data loading (UCI Cleveland)
- Preprocessing & EDA (saves figures)
- PCA (plots cumulative explained variance)
- Feature selection (RF importance, RFE, Chi²)
- Supervised models (LogReg, Decision Tree, RandomForest, SVM) + metrics & ROC
- Unsupervised (KMeans elbow, Hierarchical dendrogram sample)
- Hyperparameter tuning (RF randomized, SVM grid)
- Export best model (joblib)
Tested on: Python 3.9+
"""
from pathlib import Path

# إعدادات
class Args:
    data = None
    outdir = "artifacts"
    eda = True
    pca = True
    feat = True
    supervised = True
    unsupervised = True
    tune = True
    export = True
    do_all = False

main(Args())


import argparse
import io
import os
from pathlib import Path
import warnings

warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split, RandomizedSearchCV, GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, MinMaxScaler
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA
from sklearn.feature_selection import RFE, chi2
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score,
                             roc_auc_score, roc_curve, classification_report, confusion_matrix)
from sklearn.base import clone

from sklearn.cluster import KMeans
from scipy.cluster.hierarchy import dendrogram, linkage

from joblib import dump

UCI_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/processed.cleveland.data"
COLUMNS = ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang',
           'oldpeak','slope','ca','thal','num']

# --------------------------------
# Helper Functions
# --------------------------------

def ensure_dirs(outdir: Path):
    (outdir / "figures").mkdir(parents=True, exist_ok=True)
    (outdir / "models").mkdir(parents=True, exist_ok=True)
    (outdir / "results").mkdir(parents=True, exist_ok=True)

def load_data(path: str = None) -> pd.DataFrame:
    if path and Path(path).exists():
        df = pd.read_csv(path)
        if "target" not in df.columns and "num" in df.columns:
            df["target"] = (df["num"] > 0).astype(int)
            df = df.drop(columns=["num"])
        return df
    df = pd.read_csv(UCI_URL, header=None, names=COLUMNS, na_values='?')
    df['target'] = (df['num'] > 0).astype(int)
    df.drop(columns=['num'], inplace=True)
    return df

def build_preprocessor(X: pd.DataFrame):
    numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
    categorical_features = [c for c in X.columns if c not in numeric_features]
    numeric_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler())
    ])
    categorical_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore"))
    ])
    preprocessor = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    return preprocessor, numeric_features, categorical_features

def do_eda(df: pd.DataFrame, outdir: Path):
    info_buf = io.StringIO()
    df.info(buf=info_buf)
    (outdir / "results" / "data_info.txt").write_text(info_buf.getvalue())
    (outdir / "results" / "missing_values.txt").write_text(str(df.isna().sum()))

    df.hist(bins=20, figsize=(14,10))
    plt.tight_layout()
    plt.savefig(outdir / "figures" / "histograms.png", dpi=160)
    plt.close()

    corr = df.select_dtypes(include=[np.number]).corr()
    plt.figure(figsize=(9,7))
    sns.heatmap(corr, annot=False)
    plt.title("Correlation Heatmap")
    plt.tight_layout()
    plt.savefig(outdir / "figures" / "correlation_heatmap.png", dpi=160)
    plt.close()

def do_pca(preprocessor, X_train, outdir: Path):
    pca_pipeline = Pipeline(steps=[("pre", preprocessor),
                                  ("pca", PCA(n_components=None, random_state=42))])
    pca_pipeline.fit(X_train)
    pca = pca_pipeline.named_steps["pca"]
    explained = pca.explained_variance_ratio_
    cum = np.cumsum(explained)

    plt.figure()
    plt.plot(range(1, len(explained)+1), cum, marker="o")
    plt.xlabel("Number of components")
    plt.ylabel("Cumulative explained variance")
    plt.title("PCA — Cumulative Explained Variance")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(outdir / "figures" / "pca_cumulative_variance.png", dpi=160)
    plt.close()

    comps_95 = int((cum>=0.95).argmax()+1) if len(cum)>0 else 0
    (outdir / "results" / "pca_variance.txt").write_text(
        f"Components to reach ≥95% variance: {comps_95}\n"
    )

def feature_names_after_pre(preprocessor, numeric_features, categorical_features):
    oh = preprocessor.transformers_[1][1].named_steps["onehot"] if categorical_features else None
    cat_names = list(oh.get_feature_names_out(categorical_features)) if oh is not None else []
    return list(numeric_features) + cat_names

def feature_selection(preprocessor, X_train, y_train, numeric_features, categorical_features, outdir: Path):
    rf = Pipeline(steps=[("pre", preprocessor),
                        ("model", RandomForestClassifier(n_estimators=300, random_state=42))])
    rf.fit(X_train, y_train)
    feat_names = feature_names_after_pre(rf.named_steps["pre"], numeric_features, categorical_features)
    importances = rf.named_steps["model"].feature_importances_
    fi = pd.DataFrame({"feature": feat_names, "importance": importances}).sort_values("importance", ascending=False)
    fi.to_csv(outdir / "results" / "feature_importances.csv", index=False)

    fi.head(20).sort_values("importance").plot.barh(x="feature", y="importance")
    plt.title("Top 20 Feature Importances (RandomForest)")
    plt.tight_layout()
    plt.savefig(outdir / "figures" / "rf_feature_importances.png", dpi=160)
    plt.close()

    pre_only = clone(preprocessor).fit(X_train, y_train)
    Xtr = pre_only.transform(X_train)
    est = LogisticRegression(max_iter=2000)
    selector = RFE(estimator=est, n_features_to_select=min(15, Xtr.shape[1]), step=1)
    selector.fit(Xtr, y_train)
    rfe_features = np.array(feature_names_after_pre(pre_only, numeric_features, categorical_features))[selector.get_support()]
    (outdir / "results" / "rfe_features.txt").write_text("\n".join(map(str, rfe_features.tolist())))

    chi_pre = ColumnTransformer(transformers=[
        ("num", Pipeline([("imputer", SimpleImputer(strategy="median")),
                          ("scale", MinMaxScaler())]), numeric_features),
        ("cat", Pipeline([("imputer", SimpleImputer(strategy="most_frequent")),
                          ("onehot", OneHotEncoder(handle_unknown="ignore"))]), categorical_features)
    ])
    Xtr_chi = chi_pre.fit_transform(X_train)
    chi_vals, chi_p = chi2(Xtr_chi, y_train)
    chi_df = pd.DataFrame({"feature": feature_names_after_pre(chi_pre, numeric_features, categorical_features),
                           "chi2": chi_vals, "p_value": chi_p}).sort_values("chi2", ascending=False)
    chi_df.to_csv(outdir / "results" / "chi2_features.csv", index=False)

def train_supervised(preprocessor, X_train, X_test, y_train, y_test, outdir: Path):
    models = {
        "LogReg": LogisticRegression(max_iter=2000),
        "DecisionTree": DecisionTreeClassifier(random_state=42),
        "RandomForest": RandomForestClassifier(n_estimators=300, random_state=42),
        "SVM_RBF": SVC(kernel="rbf", probability=True, random_state=42)
    }

    rows = {}
    roc_curves = {}

    for name, clf in models.items():
        pipe = Pipeline([("pre", preprocessor), ("clf", clf)])
        pipe.fit(X_train, y_train)
        y_pred = pipe.predict(X_test)

        rows[name] = {
            "accuracy": accuracy_score(y_test, y_pred),
            "precision": precision_score(y_test, y_pred),
            "recall": recall_score(y_test, y_pred),
            "f1": f1_score(y_test, y_pred)
        }
        if hasattr(pipe.named_steps["clf"], "predict_proba"):
            y_prob = pipe.predict_proba(X_test)[:, 1]
            rows[name]["roc_auc"] = roc_auc_score(y_test, y_prob)
            fpr, tpr, _ = roc_curve(y_test, y_prob)
            roc_curves[name] = (fpr, tpr)

        cr = classification_report(y_test, y_pred, digits=3)
        (outdir / "results" / f"classification_report_{name}.txt").write_text(cr)

    pd.DataFrame(rows).T.to_csv(outdir / "results" / "supervised_summary.csv")

    plt.figure()
    for name, (fpr, tpr) in roc_curves.items():
        plt.plot(fpr, tpr, label=name)
    plt.plot([0,1],[0,1],'--')
    plt.xlabel("FPR"); plt.ylabel("TPR")
    plt.title("ROC Curves")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(outdir / "figures" / "roc_curves.png", dpi=160)
    plt.close()

def do_unsupervised(preprocessor, X, y, outdir: Path):
    pre_all = Pipeline([("pre", preprocessor)]).fit(X)
    X_all = pre_all.transform(X)

    inertias, Ks = [], list(range(1, 10))
    for k in Ks:
        km = KMeans(n_clusters=k, n_init=10, random_state=42)
        km.fit(X_all)
        inertias.append(km.inertia_)

    plt.figure()
    plt.plot(Ks, inertias, marker="o")
    plt.xlabel("k"); plt.ylabel("Inertia")
    plt.title("KMeans — Elbow")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(outdir / "figures" / "kmeans_elbow.png", dpi=160)
    plt.close()

    idx = np.random.RandomState(42).choice(X_all.shape[0], size=min(100, X_all.shape[0]), replace=False)
    Z = linkage(X_all[idx], method="ward")
    plt.figure(figsize=(10,5))
    dendrogram(Z, truncate_mode="level", p=5)
    plt.title("Hierarchical Clustering — Dendrogram (sample)")
    plt.tight_layout()
    plt.savefig(outdir / "figures" / "hierarchical_dendrogram.png", dpi=160)
    plt.close()

def tune_models(preprocessor, X_train, y_train, X_test, y_test, outdir: Path):
    from scipy.stats import randint

    rf_pipe = Pipeline([("pre", preprocessor),
                        ("clf", RandomForestClassifier(random_state=42))])

    rf_params = {
        "clf__n_estimators": randint(100, 600),
        "clf__max_depth": randint(2, 20),
        "clf__min_samples_split": randint(2, 20),
        "clf__min_samples_leaf": randint(1, 10)
    }

    rf_search = RandomizedSearchCV(rf_pipe, rf_params, n_iter=25, scoring="f1",
                                   cv=5, random_state=42, n_jobs=-1, verbose=1)
    rf_search.fit(X_train, y_train)
    rf_best = rf_search.best_estimator_

    svm_pipe = Pipeline([("pre", preprocessor),
                         ("clf", SVC(kernel="rbf", probability=True, random_state=42))])

    svm_grid = {
        "clf__C": [0.1, 1, 10, 100],
        "clf__gamma": ["scale", 0.1, 0.01, 0.001]
    }

    svm_search = GridSearchCV(svm_pipe, svm_grid, scoring="f1", cv=5, n_jobs=-1, verbose=1)
    svm_search.fit(X_train, y_train)
    svm_best = svm_search.best_estimator_

    return rf_best, svm_best

def export_best_model(rf_best, svm_best, X_test, y_test, outdir: Path):
    rf_f1 = f1_score(y_test, rf_best.predict(X_test))
    svm_f1 = f1_score(y_test, svm_best.predict(X_test))
    best_model = rf_best if rf_f1 >= svm_f1 else svm_best
    dump(best_model, outdir / "models" / "heart_best_model.joblib")
    (outdir / "results" / "best_model.txt").write_text(f"Selected model F1: {max(rf_f1, svm_f1)}")

# --------------------------------
# Main Execution
# --------------------------------

def main(args):
    outdir = Path(args.outdir)
    ensure_dirs(outdir)
    df = load_data(args.data)
    if args.eda:
        do_eda(df, outdir)
    X = df.drop(columns=["target"])
    y = df["target"]
    preprocessor, num_cols, cat_cols = build_preprocessor(X)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    if args.pca:
        do_pca(preprocessor, X_train, outdir)
    if args.feat:
        feature_selection(preprocessor, X_train, y_train, num_cols, cat_cols, outdir)
    if args.supervised:
        train_supervised(preprocessor, X_train, X_test, y_train, y_test, outdir)
    if args.unsupervised:
        do_unsupervised(preprocessor, X, y, outdir)
    if args.tune:
        rf_best, svm_best = tune_models(preprocessor, X_train, y_train, X_test, y_test, outdir)
        if args.export:
            export_best_model(rf_best, svm_best, X_test, y_test, outdir)
    print(f"✅ Done. Results in {outdir.resolve()}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, default=None)
    parser.add_argument("--outdir", type=str, default="artifacts")
    parser.add_argument("--eda", action="store_true")
    parser.add_argument("--pca", action="store_true")
    parser.add_argument("--feat", action="store_true")
    parser.add_argument("--supervised", action="store_true")
    parser.add_argument("--unsupervised", action="store_true")
    parser.add_argument("--tune", action="store_true")
    parser.add_argument("--export", action="store_true")
    parser.add_argument("--do_all", action="store_true")
    args = parser.parse_args()
    if args.do_all:
        args.eda = args.pca = args.feat = args.supervised = args.unsupervised = args.tune = True
        args.export = True
    main(args)


Fitting 5 folds for each of 25 candidates, totalling 125 fits
Fitting 5 folds for each of 16 candidates, totalling 80 fits
✅ Done. Results in /content/artifacts
✅ Done. Results in /content/artifacts
