In [None]:
# ===============================================
# Breast Cancer DBT — Leak-Safe CLIP + MLP Pipeline
# -----------------------------------------------
# • Reproducible: auto-installs deps and downloads data from Kaggle
# • Leak-safe: split BEFORE any augmentation/resampling
# • Imbalance-aware: SMOTETomek + Focal Loss + ENS class weights
# • Exports: JPG figures + Word (.docx) files (caption + description)
# ===============================================


In [None]:
# ---------- [0] Robust installs (works in .py or notebook) ----------
import sys, subprocess, importlib, warnings, os
from pathlib import Path

warnings.filterwarnings("ignore", category=UserWarning)

def _pip_install(args):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet"] + args)

def _ensure(pkg, pypi_name=None):
    try:
        return importlib.import_module(pkg)
    except Exception:
        _pip_install([pypi_name or pkg])
        return importlib.import_module(pkg)

# Core scientific stack
np = _ensure("numpy")
pd = _ensure("pandas")
matplotlib = _ensure("matplotlib"); plt = importlib.import_module("matplotlib.pyplot")
sns = _ensure("seaborn")
PIL = _ensure("PIL", "Pillow"); from PIL import Image

# ML / metrics
sklearn = _ensure("sklearn", "scikit-learn")
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import label_binarize
from sklearn.metrics import (confusion_matrix, accuracy_score, balanced_accuracy_score, f1_score,
                             precision_score, recall_score, roc_auc_score, roc_curve, auc,
                             precision_recall_curve, average_precision_score)

# Imbalance + aug
imblearn = _ensure("imblearn", "imbalanced-learn"); from imblearn.combine import SMOTETomek
alb = _ensure("albumentations"); import albumentations as A
_ensure("cv2", "opencv-python-headless")

# Word export
docx = _ensure("docx", "python-docx"); from docx import Document
from docx.shared import Inches, Pt

# Torch (try default; if missing, fall back to CPU index)
try:
    import torch
except Exception:
    try:
        _pip_install(["torch", "torchvision", "torchaudio"])
    except Exception:
        _pip_install(["torch", "torchvision", "torchaudio", "--index-url",
                      "https://download.pytorch.org/whl/cpu"])
    import torch

# CLIP without Git
try:
    import clip
except Exception:
    _pip_install(["clip-anytorch"])
    import clip

# Kaggle downloader
kgh = _ensure("kagglehub")

In [None]:

# ---------- [1] Configuration ----------
# # Repro paths (project-relative)
ROOT = Path(__file__).resolve().parent if "__file__" in globals() else Path(".").resolve()
DATA_ROOT = ROOT / "data"                # downloaded dataset will live here (cache symlink)
RESULTS_DIR = ROOT / "results"           # all outputs saved here
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

# # Classes as shipped in dataset folders
CLASSES = ['Benign', 'Actionable', 'Cancer', 'Normal']
class_to_idx = {c: i for i, c in enumerate(CLASSES)}
idx_to_class = {i: c for c, i in class_to_idx.items()}
IMG_SUFFIXES = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}

# # Reproducibility + device
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
import random; random.seed(RANDOM_STATE)
torch.manual_seed(RANDOM_STATE)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[INFO] Using device: {device}")

In [None]:
# ---------- [2] Download dataset from Kaggle (reproducible) ----------
# # This mirrors the path you shared previously but downloads automatically.
# # If the dataset is already cached, kagglehub returns the existing path.
print("[INFO] Downloading dataset via kagglehub …")
dataset_cache_dir = kgh.dataset_download("gabrielcarvalho11/breast-cancer-screening-dbt", version=1)
DATASET_DIR = Path(dataset_cache_dir) / "Breast-Cancer-Screening-DBT"
DATA_ROOT.mkdir(exist_ok=True, parents=True)  # optional local mount
print(f"[INFO] Dataset root: {DATASET_DIR}")


In [None]:



# ---------- [3] Export helpers (JPG + Word only) ----------
def save_df_to_docx(df: "pd.DataFrame", path: Path, caption: str, description: str):
    """# Save a table to Word with caption above and description below."""
    doc = Document()
    p_cap = doc.add_paragraph(); run = p_cap.add_run(f"Table: {caption}")
    run.bold = True; run.font.size = Pt(12)

    table = doc.add_table(rows=df.shape[0] + 1, cols=df.shape[1])
    table.style = "Light Grid"
    for j, col in enumerate(df.columns):
        table.cell(0, j).text = str(col)
    for i in range(df.shape[0]):
        for j in range(df.shape[1]):
            table.cell(i + 1, j).text = str(df.iloc[i, j])

    doc.add_paragraph("")  # spacer
    p_desc = doc.add_paragraph(description); p_desc.runs[0].italic = True
    doc.save(str(path))

def save_figure_jpg_and_docx(fig, jpg_path: Path, docx_path: Path, caption: str, description: str):
    """# Save a figure as JPG and wrap it in a Word doc (caption + description)."""
    fig.savefig(str(jpg_path), dpi=300, bbox_inches="tight", format="jpg")
    plt.close(fig)
    doc = Document()
    p_cap = doc.add_paragraph(); run = p_cap.add_run(f"Figure: {caption}")
    run.bold = True; run.font.size = Pt(12)
    doc.add_picture(str(jpg_path), width=Inches(6.5))
    doc.add_paragraph("")
    p_desc = doc.add_paragraph(description); p_desc.runs[0].italic = True
    doc.save(str(docx_path))


In [None]:

# ---------- [4] Dataset listing + sanity checks ----------
def list_images_by_class(root: Path):
    """# Walk class folders and return image paths, labels, and counts."""
    paths, labels, counts = [], [], {}
    for c in CLASSES:
        folder = root / c
        imgs = [p for p in folder.glob("*") if p.suffix.lower() in IMG_SUFFIXES]
        counts[c] = len(imgs)
        for p in imgs:
            paths.append(p)
            labels.append(class_to_idx[c])
    return np.array(paths), np.array(labels), counts

def plot_class_hist(counts_dict, title):
    """# Barplot of class counts."""
    fig = plt.figure(figsize=(7.2, 5))
    names = list(counts_dict.keys()); vals = [counts_dict[k] for k in names]
    sns.barplot(x=names, y=vals)
    plt.ylabel("Number of images"); plt.xticks(rotation=20); plt.title(title); plt.tight_layout()
    return fig

# List & visualize all images
all_paths, all_labels, all_counts = list_images_by_class(DATASET_DIR)
if len(all_paths) == 0:
    raise SystemExit("No images found. Check dataset structure under:\n" + str(DATASET_DIR))

fig = plot_class_hist(all_counts, "Class distribution (all images)")
save_figure_jpg_and_docx(
    fig,
    RESULTS_DIR / "Figure_Class_Distribution_All.jpg",
    RESULTS_DIR / "Figure_Class_Distribution_All.docx",
    caption="Class distribution across all images.",
    description="Number of images per class before any splitting or resampling."
)


In [None]:


# ---------- [5] Split BEFORE any transforms (leak-safe) ----------
train_paths, test_paths, y_train, y_test = train_test_split(
    all_paths, all_labels, test_size=0.20, random_state=RANDOM_STATE, stratify=all_labels
)
train_paths, val_paths, y_train, y_val = train_test_split(
    train_paths, y_train, test_size=0.20, random_state=RANDOM_STATE, stratify=y_train
)

from collections import Counter
def counts_from_labels(y):
    c = Counter(y); return {idx_to_class[i]: c.get(i, 0) for i in sorted(c)}

fig = plot_class_hist(counts_from_labels(y_train), "Class distribution (train split)")
save_figure_jpg_and_docx(
    fig,
    RESULTS_DIR / "Figure_Class_Distribution_Train.jpg",
    RESULTS_DIR / "Figure_Class_Distribution_Train.docx",
    caption="Training split class distribution.",
    description="Counts of each class in the training split. All resampling is performed only on this split."
)


In [None]:
# ---------- [6] CLIP features (augment train only) ----------
# # Load CLIP encoder (ViT-B/32)
clip_model, preprocess = clip.load("ViT-B/32", device=device)
clip_model.eval()

# # Lightweight augmentations for train set
AUG = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.35),
    A.RandomBrightnessContrast(p=0.35),
    A.ShiftScaleRotate(shift_limit=0.08, scale_limit=0.12, rotate_limit=20, p=0.45),
], p=1.0)

def load_image_preprocess(path, augment=False):
    """# Load PIL → optional Albumentations → CLIP preprocess tensor."""
    img = Image.open(path).convert("RGB")
    if augment:
        arr = np.array(img)
        arr = AUG(image=arr)["image"]
        img = Image.fromarray(arr)
    return preprocess(img).unsqueeze(0).to(device)

@torch.no_grad()
def encode_path(path, augment=False):
    """# Encode one image path to a 512-d CLIP feature."""
    x = load_image_preprocess(path, augment=augment)
    feat = clip_model.encode_image(x)
    return feat.squeeze(0).cpu().numpy()

# # Choose minority classes from TRAIN only (for targeted aug)
tr_counts = counts_from_labels(y_train)
minority_classes = [c for c, _ in sorted(tr_counts.items(), key=lambda kv: kv[1])[:2]]

def extract_features(paths, y, augment_for_classes=None):
    """# Encode a list of image paths to CLIP features (train can be augmented)."""
    X, y_out = [], []
    for p, lab in zip(paths, y):
        cls_name = idx_to_class[int(lab)]
        aug = (augment_for_classes is not None) and (cls_name in augment_for_classes)
        X.append(encode_path(p, augment=aug))
        y_out.append(lab)
    return np.vstack(X), np.array(y_out)

print("[INFO] Encoding CLIP features (train/val/test)…")
X_train, y_train = extract_features(train_paths, y_train, augment_for_classes=minority_classes)
X_val,   y_val   = extract_features(val_paths,   y_val,   augment_for_classes=None)
X_test,  y_test  = extract_features(test_paths,  y_test,  augment_for_classes=None)


In [None]:



# ---------- [7] Handle imbalance on TRAIN only ----------
print("[INFO] Applying SMOTETomek on training features …")
resampler = SMOTETomek(random_state=RANDOM_STATE)
X_train_res, y_train_res = resampler.fit_resample(X_train, y_train)

fig = plot_class_hist(counts_from_labels(y_train), "Training distribution (original)")
save_figure_jpg_and_docx(
    fig, RESULTS_DIR / "Figure_Train_Distribution_Original.jpg", RESULTS_DIR / "Figure_Train_Distribution_Original.docx",
    caption="Original training distribution.",
    description="Class counts in the training set before resampling."
)
fig = plot_class_hist(counts_from_labels(y_train_res), "Training distribution (SMOTETomek)")
save_figure_jpg_and_docx(
    fig, RESULTS_DIR / "Figure_Train_Distribution_Resampled.jpg", RESULTS_DIR / "Figure_Train_Distribution_Resampled.docx",
    caption="Training distribution after SMOTETomek.",
    description="Class counts after applying SMOTETomek to mitigate class imbalance."
)

In [None]:



# ---------- [8] Simple MLP classifier with Focal Loss ----------
import torch.nn as nn, torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

class MammogramDS(Dataset):
    """# Tensor dataset for CLIP features."""
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self): return len(self.y)
    def __getitem__(self, i): return self.X[i], self.y[i]

class MLP(nn.Module):
    """# 2-layer MLP with BN + Dropout."""
    def __init__(self, in_dim=512, hidden=256, num_classes=4):
        super().__init__()
        self.fc1 = nn.Linear(in_dim, hidden); self.bn1 = nn.BatchNorm1d(hidden)
        self.fc2 = nn.Linear(hidden, hidden//2); self.bn2 = nn.BatchNorm1d(hidden//2)
        self.fc3 = nn.Linear(hidden//2, num_classes); self.drop = nn.Dropout(0.5)
    def forward(self, x):
        x = F.relu(self.bn1(self.fc1(x))); x = self.drop(x)
        x = F.relu(self.bn2(self.fc2(x))); x = self.drop(x)
        return self.fc3(x)

class FocalLoss(nn.Module):
    """# Focal loss with optional class weights α (ENS)."""
    def __init__(self, alpha=None, gamma=2.0):
        super().__init__(); self.alpha = alpha; self.gamma = gamma
    def forward(self, logits, targets):
        ce = F.cross_entropy(logits, targets, reduction="none")
        pt = torch.exp(-ce)
        loss = (1 - pt) ** self.gamma * ce
        if self.alpha is not None: loss = self.alpha[targets] * loss
        return loss.mean()

def effective_number_weights(y, beta=0.999):
    """# ENS weights for class imbalance."""
    counts = np.bincount(y, minlength=len(CLASSES))
    eff = 1.0 - np.power(beta, counts)
    w = (1.0 - beta) / (eff + 1e-8)
    w = w / w.sum() * len(CLASSES)
    return torch.tensor(w, dtype=torch.float32)

def train_model(Xtr, ytr, Xva, yva, epochs=30, batch=32, lr=1e-3):
    """# Train MLP with focal loss; save loss curves."""
    ds_tr, ds_va = MammogramDS(Xtr, ytr), MammogramDS(Xva, yva)
    dl_tr = DataLoader(ds_tr, batch_size=batch, shuffle=True)
    dl_va = DataLoader(ds_va, batch_size=64, shuffle=False)

    model = MLP(in_dim=Xtr.shape[1], hidden=256, num_classes=len(CLASSES)).to(device)
    alpha = effective_number_weights(ytr).to(device)
    crit = FocalLoss(alpha=alpha, gamma=2.0)
    opt = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
    sch = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=epochs)

    best_state, best_val = None, float("inf")
    tr_hist, va_hist = [], []

    for ep in range(epochs):
        model.train(); tr_loss = 0.0
        for xb, yb in dl_tr:
            xb, yb = xb.to(device), yb.to(device)
            opt.zero_grad(); out = model(xb)
            loss = crit(out, yb); loss.backward(); opt.step()
            tr_loss += loss.item()
        sch.step()

        model.eval(); va_loss = 0.0
        with torch.no_grad():
            for xb, yb in dl_va:
                xb, yb = xb.to(device), yb.to(device)
                va_loss += crit(model(xb), yb).item()

        tr_loss /= max(1, len(dl_tr)); va_loss /= max(1, len(dl_va))
        tr_hist.append(tr_loss); va_hist.append(va_loss)
        if va_loss < best_val:
            best_val, best_state = va_loss, {k: v.cpu() for k, v in model.state_dict().items()}
        if (ep+1) % 5 == 0 or ep == 0:
            print(f"[EPOCH {ep+1:>2}/{epochs}] Train {tr_loss:.4f} | Val {va_loss:.4f}")

    # Loss curves → JPG + DOCX
    fig = plt.figure(figsize=(7.2, 4.5))
    plt.plot(range(1, epochs+1), tr_hist, label="Train")
    plt.plot(range(1, epochs+1), va_hist, label="Validation")
    plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Training/Validation Loss")
    plt.legend(); plt.grid(True); plt.tight_layout()
    save_figure_jpg_and_docx(
        fig, RESULTS_DIR / "Figure_Training_Loss.jpg", RESULTS_DIR / "Figure_Training_Loss.docx",
        caption="Training and validation loss across epochs.",
        description="Curves depict optimization progress and generalization gap; the best model is chosen by minimum validation loss."
    )

    model.load_state_dict({k: v.to(device) for k, v in best_state.items()})
    return model

model_t = train_model(X_train_res, y_train_res, X_val, y_val, epochs=30)


In [None]:

# ---------- [9] Evaluation helpers ----------
def predict_proba(model, X):
    model.eval()
    with torch.no_grad():
        logits = model(torch.tensor(X, dtype=torch.float32).to(device))
        return torch.softmax(logits, dim=1).cpu().numpy()

def per_class_specificity(y_true, y_pred, n_classes):
    spec = []
    for c in range(n_classes):
        y_pos = (y_true == c).astype(int)
        y_hat = (y_pred == c).astype(int)
        tn = np.sum((y_pos == 0) & (y_hat == 0))
        fp = np.sum((y_pos == 0) & (y_hat == 1))
        spec.append(tn / (tn + fp + 1e-12))
    return spec

def evaluate_split(name, X, y, save_prefix):
    """# Compute metrics, confusion matrix, ROC/PR; save JPG + DOCX + tables."""
    probs = predict_proba(model_t, X)
    preds = np.argmax(probs, axis=1)

    metrics = {
        "Accuracy": accuracy_score(y, preds),
        "Balanced Accuracy": balanced_accuracy_score(y, preds),
        "Macro F1": f1_score(y, preds, average="macro"),
        "Macro Precision": precision_score(y, preds, average="macro"),
        "Macro Recall": recall_score(y, preds, average="macro"),
    }

    y_bin = label_binarize(y, classes=list(range(len(CLASSES))))
    rows, aucs = [], []
    specs = per_class_specificity(y, preds, len(CLASSES))
    for i, cls in enumerate(CLASSES):
        prc = precision_score(y, preds, labels=[i], average=None, zero_division=0)[0]
        rec = recall_score(y, preds, labels=[i], average=None, zero_division=0)[0]
        f1c = f1_score(y, preds, labels=[i], average=None, zero_division=0)[0]
        auc_i = roc_auc_score(y_bin[:, i], probs[:, i])
        aucs.append(auc_i)
        rows.append([cls, f"{prc:.4f}", f"{rec:.4f}", f"{f1c:.4f}", f"{specs[i]:.4f}", f"{auc_i:.4f}"])
    metrics["Macro AUC"] = float(np.mean(aucs))

    # Confusion matrix
    cm = confusion_matrix(y, preds)
    cm_pct = cm.astype(float) / (cm.sum(axis=1, keepdims=True) + 1e-12) * 100.0
    fig = plt.figure(figsize=(7.2, 6))
    sns.heatmap(cm_pct, annot=True, fmt=".1f", cmap="Blues",
                xticklabels=CLASSES, yticklabels=CLASSES, vmin=0, vmax=100)
    plt.xlabel("Predicted"); plt.ylabel("True")
    plt.title(f"{name} — Normalized Confusion Matrix (%)"); plt.tight_layout()
    save_figure_jpg_and_docx(
        fig,
        RESULTS_DIR / f"Figure_{save_prefix}_Confusion_Matrix.jpg",
        RESULTS_DIR / f"Figure_{save_prefix}_Confusion_Matrix.docx",
        caption=f"{name} confusion matrix (normalized %).",
        description="Each cell shows the percentage of images of a true class predicted as each class (rows sum to 100%)."
    )

    # ROC + PR curves
    fig = plt.figure(figsize=(13, 5))
    # ROC
    plt.subplot(1, 2, 1)
    for i, cls in enumerate(CLASSES):
        fpr, tpr, _ = roc_curve(y_bin[:, i], probs[:, i])
        plt.plot(fpr, tpr, label=f"{cls} (AUC={auc(fpr, tpr):.2f})")
    plt.plot([0,1],[0,1],"k--"); plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title(f"{name} ROC"); plt.legend()
    # PR
    plt.subplot(1, 2, 2)
    for i, cls in enumerate(CLASSES):
        prec, rec, _ = precision_recall_curve(y_bin[:, i], probs[:, i])
        ap = average_precision_score(y_bin[:, i], probs[:, i])
        plt.plot(rec, prec, label=f"{cls} (AP={ap:.2f})")
    plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title(f"{name} Precision–Recall"); plt.legend()
    plt.tight_layout()
    save_figure_jpg_and_docx(
        fig,
        RESULTS_DIR / f"Figure_{save_prefix}_ROC_PR.jpg",
        RESULTS_DIR / f"Figure_{save_prefix}_ROC_PR.docx",
        caption=f"{name} ROC and Precision–Recall curves.",
        description="Per-class one-vs-rest ROC and PR curves derived from calibrated softmax probabilities."
    )

    # Tables
    overall_df = pd.DataFrame([{k: f"{v:.4f}" for k, v in metrics.items()}])
    save_df_to_docx(
        overall_df,
        RESULTS_DIR / f"Table_{save_prefix}_Overall_Metrics.docx",
        caption=f"{name} overall performance metrics.",
        description="Aggregated classification metrics across classes for the split."
    )

    perclass_df = pd.DataFrame(rows, columns=["Class","Precision","Recall","F1","Specificity","AUC"])
    save_df_to_docx(
        perclass_df,
        RESULTS_DIR / f"Table_{save_prefix}_PerClass_Metrics.docx",
        caption=f"{name} per-class performance metrics.",
        description="Precision, recall, F1, specificity, and AUC for each class computed one-vs-rest."
    )

    return metrics

print("\n[INFO] Evaluating …")
val_metrics  = evaluate_split("Validation", X_val, y_val, "Validation")
test_metrics = evaluate_split("Test",       X_test, y_test, "Test")

In [None]:


# ---------- [10] Experiment summary (Word) ----------
summary = Document()
summary.add_paragraph().add_run("Experiment Summary").bold = True
summary.add_paragraph(
    "Leak-safe pipeline using CLIP (ViT-B/32) features + MLP with focal loss. "
    "Train/Val/Test split performed BEFORE any augmentation/resampling. "
    "SMOTETomek applied to training features only. "
)
summary.add_paragraph(f"Device: {device}")
summary.add_paragraph(f"Train size (post-resample): {len(X_train_res)}  |  Val: {len(X_val)}  |  Test: {len(X_test)}")
summary.add_paragraph(
    f"Validation — Accuracy {val_metrics['Accuracy']:.4f}, "
    f"Balanced Accuracy {val_metrics['Balanced Accuracy']:.4f}, "
    f"Macro F1 {val_metrics['Macro F1']:.4f}, Macro AUC {val_metrics['Macro AUC']:.4f}"
)
summary.add_paragraph(
    f"Test — Accuracy {test_metrics['Accuracy']:.4f}, "
    f"Balanced Accuracy {test_metrics['Balanced Accuracy']:.4f}, "
    f"Macro F1 {test_metrics['Macro F1']:.4f}, Macro AUC {test_metrics['Macro AUC']:.4f}"
)
summary.save(str(RESULTS_DIR / "Summary_Experiment.docx"))

print("\n[DONE] All JPGs and DOCX files are in:", RESULTS_DIR)
