In [1]:
import os
import json
import argparse
from datetime import datetime

import numpy as np
import pandas as pd
import joblib

from sklearn.metrics import accuracy_score, roc_auc_score, confusion_matrix

# ---------- Defaults ----------
DEFAULT_MODEL  = r"C:\Users\sagni\Downloads\Cardio Track\cardiotrack_model.pkl"
DEFAULT_INPUT  = r"C:\Users\sagni\Downloads\Cardio Track\archive\heart.csv"
DEFAULT_OUTDIR = r"C:\Users\sagni\Downloads\Cardio Track"

POSSIBLE_TARGETS = [
    "target", "Target", "TARGET",
    "output", "Output",
    "diagnosis", "Diagnosis",
    "label", "Label",
    "HeartDisease", "heart_disease"
]

# ---------- Helpers ----------
def guess_target_column(df: pd.DataFrame):
    for col in POSSIBLE_TARGETS:
        if col in df.columns:
            return col
    # Heuristic: if last column is low-cardinality, treat as target (but only for metrics, not needed for pred)
    last = df.columns[-1]
    try:
        if df[last].dropna().nunique() <= 10:
            return last
    except Exception:
        pass
    return None

def extract_binary_proba(pipe, X):
    """Returns (proba_1, proba_0 or None) for binary classifiers; handles pipelines."""
    if not hasattr(pipe, "predict_proba"):
        return None, None
    probs = pipe.predict_proba(X)
    classes = getattr(pipe, "classes_", None)
    if classes is None and hasattr(pipe, "named_steps") and "model" in pipe.named_steps:
        classes = getattr(pipe.named_steps["model"], "classes_", None)
    if classes is not None and len(classes) == 2:
        # Prefer class label 1 as positive if present, else max label
        if 1 in classes:
            idx = list(classes).index(1)
        else:
            idx = int(np.argmax(classes))
        proba_1 = probs[:, idx]
        proba_0 = 1.0 - proba_1
        return proba_1, proba_0
    # multiclass → return max prob as pseudo-risk
    return probs.max(axis=1), None

def assign_risk_level(p):
    """Map probability to risk level."""
    if p is None:
        return "Unknown"
    if p >= 0.80:
        return "High"
    if p >= 0.50:
        return "Moderate"
    return "Low"

def rule_based_flags(row):
    """
    UCI-style column names supported (additive, safe if missing):
      - trestbps: resting systolic BP (>=140 high)
      - chol: serum chol (>=240 high)
      - thalach: max HR achieved (<100 low exercise capacity flag)
      - fbs: fasting blood sugar > 120 mg/dl (1 = true)
      - restecg: resting ECG (1/2 often abnormal categories)
      - oldpeak: ST depression induced by exercise relative to rest (>=2 concerning)
      - ca: number of major vessels (0–3) colored by fluoroscopy (>0 riskier)
      - thal: 3=normal, 6=fixed defect, 7=reversible defect (>=6 bad)
      - cp: chest pain type (3/4 typical/atypical angina in some codings)
    """
    flags = []

    def has(col): return col in row and pd.notna(row[col])

    if has("trestbps") and row["trestbps"] >= 140:
        flags.append("High BP (trestbps≥140)")
    if has("chol") and row["chol"] >= 240:
        flags.append("High Cholesterol (chol≥240)")
    if has("thalach") and row["thalach"] < 100:
        flags.append("Low exercise capacity (thalach<100)")
    if has("fbs") and row["fbs"] == 1:
        flags.append("High fasting blood sugar (fbs=1)")
    if has("restecg") and row["restecg"] in [1, 2]:
        flags.append("Abnormal resting ECG (restecg)")
    if has("oldpeak") and row["oldpeak"] >= 2.0:
        flags.append("ST depression elevated (oldpeak≥2)")
    if has("ca") and row["ca"] > 0:
        flags.append("Major vessels observed (ca>0)")
    if has("thal") and row["thal"] >= 6:
        flags.append("Thal defect (thal≥6)")
    if has("cp") and row["cp"] in [3, 4]:
        flags.append("Angina-type chest pain (cp∈{3,4})")

    return flags

def anomaly_flags(df_num: pd.DataFrame, row: pd.Series, z_thresh=3.0):
    """Simple per-feature z-score outlier flags based on dataset distribution."""
    flags = []
    if df_num.shape[0] < 5:
        return flags
    means = df_num.mean()
    stds  = df_num.std(ddof=0).replace(0, np.nan)
    for col in df_num.columns:
        val = row.get(col, np.nan)
        if pd.notna(val) and pd.notna(stds[col]):
            z = (val - means[col]) / stds[col]
            if abs(z) >= z_thresh:
                flags.append(f"Anomalous {col} (|z|≥{z_thresh:.0f})")
    return flags

def short_recommendations(risk_level, flags):
    """Tiny, non-diagnostic tips based on detected flags."""
    tips = []
    if risk_level in ["Moderate", "High"]:
        tips.append("Consult a clinician for a formal evaluation.")
    if any("BP" in f for f in flags):
        tips.append("Monitor blood pressure; reduce sodium, manage stress.")
    if any("Cholesterol" in f for f in flags):
        tips.append("Review diet (fiber, unsaturated fats); consider lipid panel follow-up.")
    if any("exercise capacity" in f for f in flags):
        tips.append("Build gradual cardio routine (as advised by a professional).")
    if any("ST depression" in f for f in flags) or any("Thal defect" in f for f in flags):
        tips.append("Abnormal exercise/ECG indicators—seek medical advice.")
    if not tips:
        tips.append("Maintain regular activity, balanced diet, adequate sleep.")
    return "; ".join(dict.fromkeys(tips))  # de-duplicate, keep order

# ---------- Main ----------
def main():
    parser = argparse.ArgumentParser(description="Generate Cardio Report (risk + rule-based flags)")
    parser.add_argument("--model",  default=DEFAULT_MODEL,  help="Path to joblib .pkl model pipeline")
    parser.add_argument("--input",  default=DEFAULT_INPUT,  help="Path to input CSV")
    parser.add_argument("--outdir", default=DEFAULT_OUTDIR, help="Output directory")
    parser.add_argument("--id_col", default=None, help="Optional ID column to carry")
    parser.add_argument("--target", default=None, help="Optional target column if present (for metrics)")

    args, _ = parser.parse_known_args()

    os.makedirs(args.outdir, exist_ok=True)

    # Load artifacts & data
    pipe = joblib.load(args.model)
    df   = pd.read_csv(args.input).dropna(how="all").reset_index(drop=True)

    # Identify target/ID
    target_col = args.target or guess_target_column(df)
    if target_col and target_col not in df.columns:
        target_col = None
    id_col = args.id_col if (args.id_col and args.id_col in df.columns) else None

    # Split features / labels
    if target_col:
        y_true = df[target_col].copy()
        X = df.drop(columns=[target_col])
    else:
        y_true = None
        X = df.copy()

    # Keep numeric-only view for anomaly detection
    df_numeric = X.select_dtypes(include=[np.number]).copy()

    # Predict
    y_pred = pipe.predict(X)
    proba_1, proba_0 = extract_binary_proba(pipe, X)

    # Build per-row report
    report_rows = []
    for i in range(len(df)):
        row_orig = df.iloc[i]
        row_feat = X.iloc[i]
        rid = row_orig[id_col] if id_col else i  # simple index if no id

        # Model-based risk
        p1 = float(proba_1[i]) if proba_1 is not None else None
        risk = assign_risk_level(p1)

        # Rule flags + anomaly flags
        flags = []
        try:
            flags += rule_based_flags(row_orig)
        except Exception:
            pass
        try:
            flags += anomaly_flags(df_numeric, row_feat, z_thresh=3.0)
        except Exception:
            pass

        flags = list(dict.fromkeys(flags))  # de-dup, keep order
        flags_str = "; ".join(flags) if flags else ""

        # Quick recommendations
        rec = short_recommendations(risk, flags)

        row_out = {
            "id": rid,
            "predicted": int(y_pred[i]) if isinstance(y_pred[i], (np.integer,)) else (y_pred[i].item() if isinstance(y_pred[i], np.generic) else y_pred[i]),
            "risk_probability": (None if p1 is None else round(p1, 4)),
            "risk_level": risk,
            "flags_count": len(flags),
            "flags": flags_str,
            "recommendations": rec
        }
        if proba_0 is not None:
            row_out["proba_0"] = round(float(proba_0[i]), 4)

        report_rows.append(row_out)

    report_df = pd.DataFrame(report_rows)

    # Save CSV report
    csv_path = os.path.join(args.outdir, "cardiotrack_cardio_report.csv")
    report_df.to_csv(csv_path, index=False)

    # Optional metrics if ground truth present
    metrics = {}
    if target_col:
        y_true_proc = y_true.copy()
        uniq = sorted(pd.unique(y_true_proc.dropna()))
        if set(uniq) == {1, 2}:
            y_true_proc = y_true_proc.replace({2: 1})
        elif set(uniq) == {0, 2}:
            y_true_proc = y_true_proc.replace({2: 1})
        try:
            metrics["accuracy"] = float(accuracy_score(y_true_proc, y_pred))
        except Exception:
            pass
        try:
            if proba_1 is not None and len(np.unique(y_true_proc.dropna())) == 2:
                if not np.allclose(np.min(proba_1), np.max(proba_1)):
                    metrics["roc_auc"] = float(roc_auc_score(y_true_proc, proba_1))
        except Exception:
            pass
        try:
            labels_sorted = np.unique(np.concatenate([y_true_proc.values, y_pred]))
            cm = confusion_matrix(y_true_proc, y_pred, labels=labels_sorted).tolist()
            metrics["labels"] = [int(x) if isinstance(x, (np.integer,)) else (x.item() if isinstance(x, np.generic) else x)
                                 for x in labels_sorted]
            metrics["confusion_matrix"] = cm
        except Exception:
            pass

    # Run-level summary
    high_ct = int((report_df["risk_level"] == "High").sum())
    mod_ct  = int((report_df["risk_level"] == "Moderate").sum())
    low_ct  = int((report_df["risk_level"] == "Low").sum())

    summary = {
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "model_path": args.model,
        "input_path": args.input,
        "outdir": args.outdir,
        "rows": int(len(df)),
        "risk_counts": {"High": high_ct, "Moderate": mod_ct, "Low": low_ct},
        "metrics_if_target_present": metrics if metrics else None,
        "notes": "Wellness/risk indicator only — not a medical diagnosis."
    }

    # Save JSON + TXT summaries
    json_path = os.path.join(args.outdir, "cardiotrack_cardio_report.json")
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2)

    txt_path = os.path.join(args.outdir, "cardiotrack_cardio_report.txt")
    with open(txt_path, "w", encoding="utf-8") as f:
        f.write("=== CardioTrack Cardio Report (non-diagnostic) ===\n")
        f.write(f"Time: {summary['timestamp']}\n")
        f.write(f"Rows analyzed: {summary['rows']}\n")
        f.write(f"Risk distribution -> High: {high_ct}, Moderate: {mod_ct}, Low: {low_ct}\n")
        if metrics:
            f.write(f"Accuracy: {metrics.get('accuracy')}\n")
            f.write(f"ROC AUC: {metrics.get('roc_auc')}\n")
        f.write("\nTop High-Risk (up to 10):\n")
        top_high = report_df[report_df["risk_level"] == "High"].head(10)
        if top_high.empty:
            f.write("  (none)\n")
        else:
            for _, r in top_high.iterrows():
                f.write(f"  id={r['id']} | p={r['risk_probability']} | flags={r['flags']}\n")
        f.write("\nNotes: This report provides risk indicators and wellness guidance only. "
                "For symptoms or concerns, consult a qualified clinician.\n")

    print("\n=== Cardio Report Created ===")
    print("Per-person CSV :", csv_path)
    print("Run summary JSON:", json_path)
    print("Readable TXT   :", txt_path)

if __name__ == "__main__":
    main()



=== Cardio Report Created ===
Per-person CSV : C:\Users\sagni\Downloads\Cardio Track\cardiotrack_cardio_report.csv
Run summary JSON: C:\Users\sagni\Downloads\Cardio Track\cardiotrack_cardio_report.json
Readable TXT   : C:\Users\sagni\Downloads\Cardio Track\cardiotrack_cardio_report.txt
