In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Train the Hybrid CBM

In [2]:
# === dataset.py =============================================================
"""PyTorch Dataset for the AneRBC hybrid Concept‑Bottleneck pipeline."""
from __future__ import annotations
from pathlib import Path
from typing import List, Tuple, Union, Dict, Optional
import json
import pandas as pd
import torch
from PIL import Image
from torch.utils.data import Dataset

class AneRBC(Dataset):
    """Single split of the AneRBC manifest.

    Parameters
    ----------
    csv_path : str or Path
        Path to the manifest CSV.
    split : str
        'train', 'val', or 'test'.
    concept_names : list[str] | None
        Global ordered list of concept keys.  If `None`, will be inferred
        from the *first* row of this split (legacy behaviour).
    transform : callable | None
        Vision transform.
    """

    def __init__(
        self,
        csv_path: Union[str, Path],
        split: str = "train",
        transform=None,
        *,
        concept_names: Optional[List[str]] = None,
        val_ratio: float = 0.15,
        test_ratio: float = 0.15,
        random_state: int = 0,
    ) -> None:
        super().__init__()
        df = pd.read_csv(csv_path)
        if random_state is not None:
            df = df.sample(frac=1.0, random_state=random_state).reset_index(drop=True)
        n = len(df)
        val_start, test_start = int(n * (1 - val_ratio - test_ratio)), int(n * (1 - test_ratio))
        if split == "train":
            self.df = df.iloc[:val_start]
        elif split == "val":
            self.df = df.iloc[val_start:test_start]
        elif split == "test":
            self.df = df.iloc[test_start:]
        else:
            raise ValueError("split must be train/val/test")
        self.transform = transform

        # Shared, deterministic concept ordering ---------------------------------
        if concept_names is None:
            # Fall back to keys in the first row of *this* split (legacy)
            concept_names = list(json.loads(self.df.iloc[0].concepts).keys())
        self.concept_names: List[str] = concept_names
        self.loss_mask = torch.tensor(
            [0 if k.isupper() else 1 for k in self.concept_names], dtype=torch.float32
        )

    # ---------------------------------------------------------------------
    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx: int):
        row = self.df.iloc[idx]
        img = Image.open(row.image).convert("RGB")
        if self.transform:
            img = self.transform(img)
        c_dict = json.loads(row.concepts)
        vec = [c_dict.get(k, 0.0) for k in self.concept_names]
        cvec = torch.tensor(vec, dtype=torch.float32)
        label = torch.tensor(row.label, dtype=torch.long)
        return img, cvec, label

    # ---------------------------------------------------------------------
    def get_loss_mask(self):
        return self.loss_mask

    def num_concepts(self):
        return len(self.concept_names)

# =====================================================================

def make_datasets(csv_path: Union[str, Path], transform=None, **kw):
    """Return (train, val, test) datasets sharing **one** concept ordering."""
    df_all = pd.read_csv(csv_path)
    # union of all keys
    all_keys = set()
    for js in df_all.concepts:
        all_keys.update(json.loads(js).keys())
    concept_names = sorted(all_keys)  # deterministic order

    return (
        AneRBC(csv_path, "train", transform, concept_names=concept_names, **kw),
        AneRBC(csv_path, "val",   transform, concept_names=concept_names, **kw),
        AneRBC(csv_path, "test",  transform, concept_names=concept_names, **kw),
    )


In [3]:
# === models.py =============================================================
"""Model builders for AneRBC CBM."""
import torch
import torch.nn as nn
from typing import Optional
import torchvision.models as tv
from torchvision.models import resnet18, ResNet18_Weights, resnet34, ResNet34_Weights


class ConceptPredictor(nn.Module):
    def __init__(self, num_concepts: int, backbone: str = 'resnet18', pretrained: bool = True,
                 freeze_until: Optional[str] = 'layer3'):
        super().__init__()
        if backbone == 'resnet18':
            net = resnet18(weights=ResNet18_Weights.DEFAULT)
            in_feats = net.fc.in_features
            net.fc = nn.Identity()
        elif backbone == 'resnet34':
            net = resnet34(weights=ResNet34_Weights.DEFAULT)
            in_feats = net.fc.in_features
            net.fc = nn.Identity()
        else:
            raise ValueError(f"Unsupported backbone {backbone}")
        self.backbone = net
        if freeze_until:
            freeze = True
            for name, p in self.backbone.named_parameters():
                if name.startswith(freeze_until):
                    freeze = False
                p.requires_grad = not freeze
        self.head = nn.Linear(in_feats, num_concepts)
        self.activation = nn.Sigmoid()
    def forward(self, x):
        feats = self.backbone(x)
        return self.activation(self.head(feats))

class LabelHead(nn.Module):
    # def __init__(self, num_concepts: int, hidden: int = 64):
    #     super().__init__()
    #     self.net = nn.Sequential(
    #         nn.Linear(num_concepts, 128),
    #         nn.ReLU(),
    #         nn.Linear(128, hidden),
    #         nn.ReLU(),
    #         nn.Linear(hidden, 2)
    #     )
    def __init__(self, num_concepts: int, hidden=(256, 128, 64), p_drop=0.2):
        super().__init__()
        layers = []
        dims = (num_concepts,) + hidden
        for i in range(len(hidden)):
            layers += [
                nn.Linear(dims[i], dims[i+1]),
                nn.BatchNorm1d(dims[i+1]),     # helps convergence
                nn.ReLU(inplace=True),
                nn.Dropout(p_drop)
            ]
        layers.append(nn.Linear(hidden[-1], 2))  # logits
        self.net = nn.Sequential(*layers)

    def forward(self, c):
        return self.net(c)

class End2EndCBM(nn.Module):
    def __init__(self, g: ConceptPredictor, h: LabelHead, loss_mask: torch.Tensor):
        super().__init__()
        self.g, self.h = g, h
        self.register_buffer('loss_mask', loss_mask)
    def forward(self, x, c_cbc):
        c_pred = self.g(x)
        y_logit = self.h(torch.cat([c_pred, c_cbc], dim=1))
        return c_pred, y_logit


In [4]:
# === train.py =============================================================
"""Training script for AneRBC Concept‑Bottleneck (independent / sequential / joint)."""
import argparse, torch, torch.nn as nn
from torch.utils.data import DataLoader
import torchvision.transforms as T
from sklearn.metrics import accuracy_score
from pathlib import Path
from torch.utils.data import WeightedRandomSampler
from sklearn.metrics import classification_report, accuracy_score, f1_score, recall_score, precision_score
import numpy as np
import torch.nn as nn
import torch

def parse_args():
    p = argparse.ArgumentParser()
    p.add_argument('--csv', required=True)
    p.add_argument('--batch', type=int, default=32)
    p.add_argument('--epochs', type=int, default=20)
    p.add_argument('--lr', type=float, default=1e-3)
    p.add_argument('--mode', choices=['joint'], default='joint')
    return p.parse_args()

# -------------------------------------------------------------------------

def make_loaders(csv_path, batch_size):
    ''' OLD: Make data loaders without balanced sampling'''
    tf = T.Compose([T.Resize((224, 224)), T.ToTensor()])
    ds_train, ds_val, ds_test = make_datasets(csv_path, transform=tf)
    kw = dict(batch_size=batch_size, num_workers=2, shuffle=True)
    print("concept_names:", ds_train.concept_names)  # Order of concepts
    return (
        DataLoader(ds_train, **kw),
        DataLoader(ds_val, batch_size=batch_size, num_workers=2),
        DataLoader(ds_test, batch_size=batch_size, num_workers=2),
        ds_train.get_loss_mask(),
        ds_train.num_concepts(),
    )

# def make_loaders(csv_path, batch_size):
#     ''' Make data loaders with balanced sampling '''
#     tf = T.Compose([T.Resize((224, 224)), T.ToTensor()])
#     # tf = T.Compose([
#     #     T.Resize((224,224)), # Resize before converting to Tensor
#     #     T.RandomApply([T.ColorJitter(.2,.2,.2,.05)], p=0.7),
#     #     T.RandomRotation(360, interpolation=T.InterpolationMode.BILINEAR),
#     #     T.RandomHorizontalFlip(),
#     #     T.RandomVerticalFlip(),
#     #     T.RandomErasing(p=0.3, scale=(0.01, 0.03)),  # Cutout-ish
#     #     T.ToTensor() # Convert to Tensor after applying other image augmentations
#     # ])
#     ds_train, ds_val, ds_test = make_datasets(csv_path, transform=tf)

#     # -------- get labels array for the subset ----------------------------
#     if isinstance(ds_train, torch.utils.data.Subset):
#         orig = ds_train.dataset                     # AneRBC instance
#         idxs = ds_train.indices
#         labels = orig.df.iloc[idxs]["label"].to_numpy()
#     else:
#         labels = ds_train.df["label"].to_numpy()

#     # -------- build sampler ----------------------------------------------
#     class_freq = np.bincount(labels) / len(labels)       # e.g. [0.55, 0.45]
#     weights = 1.0 / class_freq[labels]                   # len == len(ds_train)
#     sampler = WeightedRandomSampler(weights, len(weights), replacement=True)

#     train_loader = DataLoader(ds_train, batch_size=batch_size,
#                               sampler=sampler, num_workers=2)
#     val_loader   = DataLoader(ds_val,   batch_size=batch_size, num_workers=2)
#     test_loader  = DataLoader(ds_test,  batch_size=batch_size, num_workers=2)

#     print("concept_names:", ds_train.dataset.concept_names if isinstance(ds_train, torch.utils.data.Subset) else ds_train.concept_names)
#     return train_loader, val_loader, test_loader, \
#            ds_train.dataset.get_loss_mask() if isinstance(ds_train, torch.utils.data.Subset) else ds_train.get_loss_mask(), \
#            ds_train.dataset.num_concepts() if isinstance(ds_train, torch.utils.data.Subset) else ds_train.num_concepts()

# -------------------------------------------------------------------------

def train_loop(model, loader, optimiser, loss_c, loss_y, device):
    model.train()
    tot_c = tot_y = 0
    for x, c_all, y in loader:
        x, c_all, y = x.to(device), c_all.to(device), y.to(device)
        c_cbc = c_all[:, model.loss_mask == 0]
        c_gt = c_all[:, model.loss_mask == 1]
        optimiser.zero_grad()
        c_pred, y_logit = model(x, c_cbc)
        # lc = loss_c(c_pred, c_gt).mean()
        lc = weighted_concept_loss(c_pred, c_gt)  # weighted concept loss
        ly = loss_y(y_logit, y)
        (lc + ly).backward()
        optimiser.step()
        tot_c += lc.item() * len(x)
        tot_y += ly.item() * len(x)
    return tot_c / len(loader.dataset), tot_y / len(loader.dataset)


@torch.inference_mode()
def eval_loop(model, loader, loss_c, loss_y, device):
    ''' Model evaluation '''

    model.eval()
    # Validation loss
    tot_c = tot_y = 0
    preds, labels = [], []

    for x, c_all, y in loader:
        x, c_all, y = x.to(device), c_all.to(device), y.to(device)
        c_cbc = c_all[:, model.loss_mask == 0]
        c_gt = c_all[:, model.loss_mask == 1]
        c_pred, y_logit = model(x, c_cbc)

        # lc = loss_c(c_pred, c_gt).mean()
        lc = weighted_concept_loss(c_pred, c_gt)  # weighted concept loss
        ly = loss_y(y_logit, y)
        tot_c += lc.item() * len(x)
        tot_y += ly.item() * len(x)

        # Get predictions and ground truth labels
        preds.append(y_logit.argmax(1).cpu())
        labels.append(y.cpu())

    # Calculate metrics
    y_true = torch.cat(labels)
    y_pred = torch.cat(preds)
    acc    = accuracy_score(y_true, y_pred)
    rec    = recall_score(y_true, y_pred)
    prec   = precision_score(y_true, y_pred)
    f1     = f1_score(y_true, y_pred)

    acc = accuracy_score(torch.cat(labels), torch.cat(preds))
    return tot_c / len(loader.dataset), tot_y / len(loader.dataset), acc, rec, prec, f1

# -------------------------------------------------------------------------

def weighted_concept_loss(pred, target):
    """
    pred, target: tensors [B, K_total]
    Only the 12 morph positions (mask==1) are weighted; CBC columns remain masked out.
    """
    # element‑wise BCE
    raw = bce(pred, target)                          # B × K_total
    # broadcast pos_weight across batch, using mask to select morph cols
    w   = torch.zeros_like(raw, device=raw.device)
    relevant_pos_weight = pos_weight[mask == 1]  # Length 15, for 15 morphology flags
    w[:, morph_mask.bool()] = relevant_pos_weight.type(torch.float32) # CBC cols stay 0 (will be masked)
    return (morph_mask * w * raw).mean()                  # scalar


In [5]:
import types

args = types.SimpleNamespace()
args.csv = '/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/anerbc_manifest_filtered_out_bad_data.csv'  # <--- Set your required CSV path here
args.batch = 32
args.epochs = 30
args.lr = 1e-3
args.mode = 'joint' # Although choices were defined, here you just set the value

device = 'cuda' if torch.cuda.is_available() else 'cpu'

bce = nn.BCELoss(reduction='none')
prevalence = torch.tensor([0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.300, 0.418, 0.526, 0.486, 0.055, 0.070, 0.471, 0.500,
        0.072, 0.140, 0.234, 0.057, 0.205, 0.067, 0.072],
       device='cuda:0', dtype=torch.float64)

pos_weight = (1.0 - prevalence) / prevalence          # inverse odds, shape (12,)

trL, vaL, teL, mask, num_c = make_loaders(args.csv, args.batch)
print("mask= ", mask)
n_morph = int(mask.sum().item())  # count of supervised morphology flags
g = ConceptPredictor(n_morph, backbone="resnet34")  # predicts only morph flags
h = LabelHead(num_c)
morph_mask = mask[mask == 1].to(device)
model = End2EndCBM(g, h, mask).to(device)
# opt = torch.optim.AdamW([{'params':g.backbone.parameters(), 'lr':1e-4}, {'params':g.head.parameters()}, {'params':h.parameters()}], lr=args.lr)
opt = torch.optim.AdamW(model.parameters(), lr=args.lr)
loss_c = nn.BCELoss(reduction='none')
loss_y = nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, factor=0.3, patience=3)

test_acc = 0.
run = 1
while test_acc < 0.1:
  print(f"Run #{run} ========================================")
  best_val_acc = 0.
  best_f1 = 0.
  for epoch in range(1, args.epochs+1):
      lc, ly = train_loop(model, trL, opt, loss_c, loss_y, device)
      vlc, vly, vacc, vrecall, vprec, vf1 = eval_loop(model, vaL, loss_c, loss_y, device)
      print(f"E{epoch:02d} trainC {lc:.3f} valC {vlc:.3f}  trainY {ly:.3f} valY {vly:.3f}  acc {vacc:.3f} recall {vrecall:.3f} prec {vprec:.3f} f1 {vf1:.3f}")

      if vacc>best_val_acc:
          best_val_acc = vacc
          torch.save(model.state_dict(), 'best.ckpt')
      elif vf1>best_f1:
          best_f1 = vf1
          torch.save(model.state_dict(), 'best.ckpt')

      scheduler.step(vly)

  # final test
  model.load_state_dict(torch.load('best.ckpt'))
  _, _, test_acc, test_recall, test_prec, test_f1 = eval_loop(model, teL, loss_c, loss_y, device)
  torch.save(model.state_dict(), f'/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/checkpoints/acc_0{int(test_acc * 1000)}_f1_0{int(test_f1 * 1000)}.ckpt')
  print(f"TEST accuracy: {test_acc:.3f}")
  print(f"TEST f1: {test_f1:.3f}")
  run += 1

concept_names: ['HCT', 'HGB', 'MCH', 'MCV', 'RBC', 'anisocytosis', 'elliptocytes', 'hypochromic', 'microcytic', 'monocytosis', 'neutrophilia', 'normochromic', 'normocytic', 'platelets_decreased', 'platelets_increased', 'polychromasia', 'reactive_lymphocytes', 'target_cells', 'tear_drop_cells', 'thalassemia']
mask=  tensor([0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
        1., 1.])


Downloading: "https://download.pytorch.org/models/resnet34-b627a593.pth" to /root/.cache/torch/hub/checkpoints/resnet34-b627a593.pth
100%|██████████| 83.3M/83.3M [00:00<00:00, 218MB/s]




KeyboardInterrupt: 

# Load-and-Predict

In [9]:
# ------------------------------------------------------------
#   helper functions
# ------------------------------------------------------------
import re

CBC_CANON = {
    # canonical : list of regex patterns covering spelling variants
    "HGB": [r"\bHGB\b", r"\bH\.?emoglobin\b", r"\bHemoglobin\b"],
    "HCT": [r"\bHCT\b", r"\bH\.?ematocrit\b", r"\bHematocrit\b"],
    "RBC": [r"\bRBC\b", r"\bRed\s+Blood\s+Cells?\b"],
    "MCV": [r"\bMCV\b", r"\bMean\s+Corpuscular\s+Volume\b"],
    "MCH": [r"\bMCH\b", r"\bMean\s+Corpuscular\s+Hemoglobin\b"],
}

CBC_RE = {k: re.compile('|'.join(pat), re.I) for k, pat in CBC_CANON.items()}
VALUE_RE = re.compile(r"([0-9]*\.?[0-9]+)")

def parse_cbc(text: str) -> dict:
    out = {k: None for k in CBC_CANON}
    for line in text.splitlines():
        for key, rx in CBC_RE.items():
            if rx.search(line):
                m = VALUE_RE.search(line)
                if m:
                    out[key] = float(m.group(1))
    # fallback: set missing values to 0.0 or NaN
    return {k: (v if v is not None else 0.0) for k, v in out.items()}


# ------------------------------------------------------------
#   Load-and-Predict
# ------------------------------------------------------------
from pathlib import Path

# 0. imports & helpers
import json, torch
from PIL import Image
import torchvision.transforms as T

# 1. recreate the concept list & loss-mask (same code as training)
csv_path = "/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/anerbc_manifest_filtered_out_bad_data.csv"
_, _, ds_test = make_datasets(csv_path)      # we only need one split
concept_names = ds_test.concept_names
loss_mask     = ds_test.get_loss_mask()
n_morph       = int(loss_mask.sum().item())  # 14

# 2. rebuild the architecture
g = ConceptPredictor(n_morph, backbone="resnet34", pretrained=False)   # weights will be overwritten
h = LabelHead(len(concept_names))
model = End2EndCBM(g, h, loss_mask)
model.load_state_dict(torch.load("/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/checkpoints/resnet32_mlp_256_128_64/acc_0909_f1_0917.ckpt", map_location="cpu"))
model.load_state_dict(torch.load("/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/checkpoints/resnet32_mlp_256_128_64/acc_0916_f1_0919_wbce.ckpt", map_location="cpu"))
model.eval()

# 3. image transform (same as train)
transform = T.Compose([T.Resize((224,224)), T.ToTensor()])

# 4. helper to run one smear + CBC vector
def predict(image_path: str, cbc_dict: dict[str, float]):
    """Return (p_anemic, concept_dict_pred). `cbc_dict` keys must match the manifest order."""
    # --- prepare tensors -------------------------------------------------
    img  = transform(Image.open(image_path).convert("RGB")).unsqueeze(0)  # 1×3×224×224
    c_cbc = torch.tensor([[cbc_dict.get(k, 0.0) for k in concept_names if k.isupper()]],
                         dtype=torch.float32)                             # 1×9

    # --- forward ---------------------------------------------------------
    with torch.inference_mode():
        c_pred, y_logit = model(img, c_cbc)
        p_anemic = torch.softmax(y_logit, dim=1)[0,1].item()

        # unpack the 14 predicted morphology bits
        morph_keys = [k for k in concept_names if not k.isupper()]
        c_vec = c_pred.squeeze().tolist()
        concept_pred = dict(zip(morph_keys, c_vec))

    return p_anemic, concept_pred

# 5. example call ---------------------------------------------------------
ROOT = Path("/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/AneRBC_full/AneRBC/AneRBC_dataset/AneRBC-I/")
anemia_labels = ["Healthy_individuals", "Anemic_individuals"]
label = anemia_labels[1]
stem = "054_a"  # _a for anemia, _h for healthy

img_path = ROOT / f"{label}/Original_images" / f"{stem}.png"
cbc_path = ROOT / f"{label}/CBC_reports" / f"{stem}.txt"

cbc_sample = parse_cbc(Path(cbc_path).read_text())
p, concepts = predict(img_path, cbc_sample)

print(f"Anemia probability: {p:.3f}")
for k,v in concepts.items():
    print(f"{k:20s}{v:.2f}")


Anemia probability: 0.979
anisocytosis        0.36
elliptocytes        0.98
hypochromic         1.00
microcytic          1.00
monocytosis         0.00
neutrophilia        0.03
normochromic        0.01
normocytic          0.00
platelets_decreased 0.00
platelets_increased 0.55
polychromasia       0.38
reactive_lymphocytes0.00
target_cells        0.78
tear_drop_cells     0.00
thalassemia         0.02


# AUROC

In [None]:
# ------------------------------------------------------------
#  AUROC for the supervised morphology flags
# ------------------------------------------------------------
from sklearn.metrics import roc_auc_score
import numpy as np, torch

@torch.no_grad()
def concept_auroc(model, loader, morph_mask, device='cpu'):
    """
    Returns list of AUROCs (len = # supervised morph flags)
    """
    model.eval()
    y_true, y_pred = [], []

    for x, c_all, _ in loader:
        x, c_all = x.to(device), c_all.to(device)
        c_gt   = c_all[:, morph_mask.bool()]              # B × K_morph
        c_cbc  = c_all[:, ~morph_mask.bool()]             # B × 5 CBC
        c_pred, _ = model(x, c_cbc)

        y_true.append(c_gt.cpu())
        y_pred.append(c_pred.cpu())

    y_true = torch.cat(y_true).numpy()
    y_pred = torch.cat(y_pred).numpy()

    aurocs = []
    for k in range(y_true.shape[1]):
        # skip if concept is all-0 or all-1 in this split
        if y_true[:, k].sum() in (0, len(y_true)):
            aurocs.append(np.nan)
        else:
            aurocs.append(roc_auc_score(y_true[:, k], y_pred[:, k]))
    return aurocs

# ------------------------------------------------------------
# 0. helpers already defined: ConceptPredictor, LabelHead, End2EndCBM, concept_auroc
# ------------------------------------------------------------
csv_path = "/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/anerbc_manifest_filtered_out_bad_data.csv"

# transform identical to training
import torchvision.transforms as T
tf = T.Compose([T.Resize((224,224)), T.ToTensor()])

# 1. rebuild test dataset WITH transform
ds_test = AneRBC(csv_path, split="test", transform=tf)   # <- single creation
concept_names = ds_test.concept_names
mask          = ds_test.get_loss_mask()
morph_mask    = mask == 1                                # boolean of length 22

# 2. DataLoader
from torch.utils.data import DataLoader
teL = DataLoader(ds_test, batch_size=64, shuffle=False)

# 3. rebuild model skeleton
n_morph = int(morph_mask.sum().item())                   # 12 supervised flags
g = ConceptPredictor(n_morph, backbone="resnet34", pretrained=False)
h = LabelHead(len(concept_names), hidden=(256,128,64), p_drop=0.2)
model = End2EndCBM(g, h, mask)
state = torch.load("/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/checkpoints/resnet32_mlp_256_128_64/acc_0909_f1_0917.ckpt",
                   map_location="cpu")
model.load_state_dict(state, strict=True)
model.eval()

# 4. AUROC
aurocs = concept_auroc(model, teL, morph_mask, device='cpu')
morph_keys = [k for k in concept_names if not k.isupper()]

print("\nConcept AUROC:")
for k, auc in zip(morph_keys, aurocs):
    print(f"{k:25s} {auc:.3f}")
print(f"Macro‑average AUROC (ignoring NaNs): {np.nanmean(aurocs):.3f}")



Concept AUROC:
microcytic                0.895
normocytic                0.146
normochromic              0.121
hypochromic               0.944
elliptocytes              0.729
target_cells              0.488
tear_drop_cells           0.224
anisocytosis              0.223
polychromasia             0.536
neutrophilia              0.605
monocytosis               0.706
reactive_lymphocytes      0.920
platelets_increased       0.644
platelets_decreased       0.670
thalassemia               0.900
Macro‑average AUROC (ignoring NaNs): 0.583


# Prevalence for Weighted BCE Loss

In [7]:
import pandas as pd
import json
import torch

# Load your manifest CSV
csv_path = "/content/drive/MyDrive/Trustworthy_AI_Final_Project/CBM_w_AneRBC/anerbc_manifest_filtered_out_bad_data.csv"
df = pd.read_csv(csv_path)

# Get all concept keys
all_keys = set()
for js in df.concepts:
    all_keys.update(json.loads(js).keys())

# Sort concept keys to ensure consistent order
concept_names = sorted(list(all_keys))

# Calculate prevalence for each concept (except HGB)
prevalence_dict = {}
for concept in concept_names:
    if concept != "HGB":  # Exclude HGB
        total_count = len(df)
        positive_count = df['concepts'].apply(lambda x: json.loads(x).get(concept, 0) == 1).sum()
        prevalence_dict[concept] = positive_count / total_count

# Create prevalence tensor with HGB prevalence as 0
prevalence_list = [0.0]  # Start with HGB prevalence as 0
for concept in concept_names:
    if concept != "HGB":
        prevalence_list.append(prevalence_dict[concept])

device = 'cuda' if torch.cuda.is_available() else 'cpu'

prevalence = torch.tensor(prevalence_list, device=device)

# Print concept names and their prevalence values
for i, concept in enumerate(concept_names):
    print(f"{concept:20s} {prevalence[i].item():.3f}")

HCT                  0.000
HGB                  0.000
MCH                  0.000
MCV                  0.000
RBC                  0.000
anisocytosis         0.300
elliptocytes         0.418
hypochromic          0.526
microcytic           0.486
monocytosis          0.055
neutrophilia         0.070
normochromic         0.471
normocytic           0.500
platelets_decreased  0.072
platelets_increased  0.140
polychromasia        0.234
reactive_lymphocytes 0.057
target_cells         0.205
tear_drop_cells      0.067
thalassemia          0.072
