In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
import os, time
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms, models
from sklearn.metrics import classification_report, confusion_matrix

# ---------------- USER CONFIG ----------------
DATA_DIR = "/kaggle/input/chest-xray-pneumonia/chest_xray"   # change as needed
OUT_DIR = Path("fast_outputs"); OUT_DIR.mkdir(exist_ok=True, parents=True)
QUICK = True               # set False to run full dataset
SUBSAMPLE_FRAC = 0.20      # when QUICK=True, use 20% of training data
IMG_SIZE = 160             # smaller -> faster; set 224 for final runs
BATCH_SIZE = 32
BASE_EPOCHS = 2            # quick baseline epochs
FINETUNE_EPOCHS = 3        # quick fine-tune epochs
LR_BASE = 1e-3
LR_FINETUNE = 2e-4
NUM_WORKERS = 2            # keep small for Kaggle/Colab stability
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
# ----------------------------------------------

print("Device:", DEVICE)
train_dir = os.path.join(DATA_DIR, "train")
val_dir = os.path.join(DATA_DIR, "val")
test_dir = os.path.join(DATA_DIR, "test")
for p in (train_dir, val_dir, test_dir):
    if not os.path.exists(p):
        raise FileNotFoundError(f"Missing folder: {p}")

# --------- transforms & datasets ----------
train_tf_baseline = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])
train_tf_finetune = transforms.Compose([
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.85,1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(0.08,0.08,0.08,0.02),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])
test_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

train_ds_full = datasets.ImageFolder(train_dir, transform=train_tf_baseline)
val_ds = datasets.ImageFolder(val_dir, transform=test_tf)
test_ds = datasets.ImageFolder(test_dir, transform=test_tf)

# optional quick subsample to speed up experiments
if QUICK:
    n_sub = max(200, int(len(train_ds_full) * SUBSAMPLE_FRAC))
    idxs = sorted(random.sample(range(len(train_ds_full)), n_sub))
    train_ds = Subset(train_ds_full, idxs)
    print(f"QUICK mode: using {n_sub} / {len(train_ds_full)} training samples")
else:
    train_ds = train_ds_full

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)

class_names = train_ds_full.classes
print("Classes:", class_names)
print("Sizes (train/val/test):", len(train_ds), len(val_ds), len(test_ds))

# -------- helpers ----------
def train_epoch(model, loader, optimizer, criterion, device, scaler=None, use_amp=False):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        if use_amp and scaler is not None:
            with torch.cuda.amp.autocast():
                out = model(xb)
                loss = criterion(out, yb)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            out = model(xb)
            loss = criterion(out, yb)
            loss.backward()
            optimizer.step()
        running_loss += float(loss.item()) * xb.size(0)
        preds = out.argmax(dim=1)
        correct += (preds == yb).sum().item()
        total += xb.size(0)
    return running_loss/total, correct/total

def eval_model(model, loader, criterion, device):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    preds_all, labels_all = [], []
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            out = model(xb)
            loss = criterion(out, yb)
            running_loss += float(loss.item()) * xb.size(0)
            preds = out.argmax(dim=1)
            preds_all.extend(preds.cpu().numpy())
            labels_all.extend(yb.cpu().numpy())
            correct += (preds == yb).sum().item()
            total += xb.size(0)
    return running_loss/total, correct/total, np.array(preds_all), np.array(labels_all)

# ---------------- BASELINE (ResNet18 frozen) ----------------
print("\n== Baseline (ResNet18 frozen backbone) ==")
model_base = models.resnet18(pretrained=True)
for p in model_base.parameters(): p.requires_grad = False
num_ftrs = model_base.fc.in_features
model_base.fc = nn.Linear(num_ftrs, len(class_names))
model_base = model_base.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_base.fc.parameters(), lr=LR_BASE)

best_val_acc = 0.0
best_base_path = OUT_DIR/"best_resnet18.pt"

for epoch in range(BASE_EPOCHS):
    t0 = time.time()
    train_loss, train_acc = train_epoch(model_base, train_loader, optimizer, criterion, DEVICE)
    val_loss, val_acc, _, _ = eval_model(model_base, val_loader, criterion, DEVICE)
    print(f"Baseline Epoch {epoch+1}/{BASE_EPOCHS} train_acc:{train_acc:.4f} val_acc:{val_acc:.4f} time:{time.time()-t0:.1f}s")
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model_base.state_dict(), best_base_path)
        print("Saved baseline model:", best_base_path)

# baseline test evaluation
model_base.load_state_dict(torch.load(best_base_path, map_location=DEVICE))
_, test_acc_base, preds_b, labels_b = eval_model(model_base, test_loader, criterion, DEVICE)
print("Baseline test acc:", test_acc_base)
print(classification_report(labels_b, preds_b, target_names=class_names))
cm = confusion_matrix(labels_b, preds_b)
plt.figure(figsize=(4,3)); sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names); plt.title("Baseline Confusion"); plt.savefig(OUT_DIR/"confusion_baseline.png"); plt.close()

# -------- Quick Grad-CAM for baseline ----------
def gradcam_resnet_simple(model, tensor_img, target_class, layer_name="layer4"):
    model.eval()
    features, gradients = [], []
    def forward_hook(m, i, o): features.append(o)
    def backward_hook(m, gi, go): gradients.append(go[0])
    module = getattr(model, layer_name)
    fh = module.register_forward_hook(forward_hook)
    bh = module.register_backward_hook(backward_hook)
    out = model(tensor_img)
    score = out[0, target_class]
    model.zero_grad()
    score.backward(retain_graph=True)
    if not features or not gradients:
        fh.remove(); bh.remove(); raise RuntimeError("Failed hooks")
    act = features[-1][0].detach(); grad = gradients[-1][0].detach()
    weights = grad.mean(dim=(1,2))
    cam = (weights.view(-1,1,1) * act).sum(dim=0).cpu().numpy()
    cam = np.maximum(cam, 0); cam = cv2.resize(cam, (IMG_SIZE, IMG_SIZE))
    if cam.max()>0: cam = (cam - cam.min())/(cam.max()+1e-8)
    fh.remove(); bh.remove()
    return cam

print("Generating 4 baseline Grad-CAM images...")
for i in range(4):
    img, lbl = test_ds[i]
    inp = img.unsqueeze(0).to(DEVICE)
    try:
        cam = gradcam_resnet_simple(model_base, inp, target_class=lbl)
    except Exception as e:
        print("Grad-CAM err:", e); break
    img_np = img.permute(1,2,0).cpu().numpy(); img_np = (img_np * np.array([0.229,0.224,0.225])) + np.array([0.485,0.456,0.406])
    fname = OUT_DIR/f"gradcam_base_{i}_pred{lbl}.png"
    plt.figure(figsize=(4,4)); plt.imshow(img_np); plt.imshow(cam, cmap='jet', alpha=0.4); plt.axis('off'); plt.savefig(fname); plt.close()
print("Saved baseline Grad-CAMs to", OUT_DIR)

# ---------------- Fine-tune (small, mixed precision) ----------------
print("\n== Fine-tune (unfreeze last block) ==")
# Use same model architecture (ResNet18) for speed; unfreeze layer4 + fc
model_ft = models.resnet18(pretrained=True)
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, len(class_names))
# unfreeze last block and fc
for name, param in model_ft.named_parameters():
    if "layer4" in name or "fc" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False
model_ft = model_ft.to(DEVICE)

params_to_opt = [p for p in model_ft.parameters() if p.requires_grad]
optimizer_ft = optim.AdamW(params_to_opt, lr=LR_FINETUNE, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer_ft, T_max=FINETUNE_EPOCHS)
scaler = torch.cuda.amp.GradScaler(enabled=(DEVICE.type=="cuda"))

best_val_acc = 0.0
best_ft_path = OUT_DIR/"best_finetuned_resnet18.pt"

# use a DataLoader with stronger augment (train_tf_finetune) but reuse dataset indices
train_ds_finetune_full = datasets.ImageFolder(train_dir, transform=train_tf_finetune)
if QUICK:
    train_ds_finetune = Subset(train_ds_finetune_full, idxs)  # same idxs as before
else:
    train_ds_finetune = train_ds_finetune_full
train_loader_ft = DataLoader(train_ds_finetune, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)

for epoch in range(FINETUNE_EPOCHS):
    t0 = time.time()
    # training with mixed precision
    model_ft.train()
    running_loss, correct, total = 0.0, 0, 0
    for xb, yb in train_loader_ft:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        optimizer_ft.zero_grad()
        with torch.cuda.amp.autocast(enabled=(DEVICE.type=="cuda")):
            out = model_ft(xb)
            loss = nn.CrossEntropyLoss()(out, yb)
        scaler.scale(loss).backward()
        scaler.step(optimizer_ft)
        scaler.update()
        running_loss += float(loss.item())*xb.size(0)
        preds = out.argmax(dim=1)
        correct += (preds==yb).sum().item()
        total += xb.size(0)
    scheduler.step()
    val_loss, val_acc, _, _ = eval_model(model_ft, val_loader, nn.CrossEntropyLoss(), DEVICE)
    print(f"Fine-tune Epoch {epoch+1}/{FINETUNE_EPOCHS} train_acc:{(correct/total if total else 0):.4f} val_acc:{val_acc:.4f} time:{time.time()-t0:.1f}s")
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model_ft.state_dict(), best_ft_path)
        print("Saved fine-tuned model:", best_ft_path)

# final eval
model_ft.load_state_dict(torch.load(best_ft_path, map_location=DEVICE))
_, test_acc_ft, preds_ft, labels_ft = eval_model(model_ft, test_loader, nn.CrossEntropyLoss(), DEVICE)
print("Fine-tuned test acc:", test_acc_ft)
print(classification_report(labels_ft, preds_ft, target_names=class_names))
cm = confusion_matrix(labels_ft, preds_ft)
plt.figure(figsize=(4,3)); sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names); plt.title("Finetuned Confusion"); plt.savefig(OUT_DIR/"confusion_finetuned.png"); plt.close()

# Grad-CAM on fine-tuned model (4 samples)
print("Generating Grad-CAMs for fine-tuned model...")
for i in range(4):
    img, lbl = test_ds[i]
    inp = img.unsqueeze(0).to(DEVICE)
    try:
        cam = gradcam_resnet_simple(model_ft, inp, target_class=lbl)
    except Exception as e:
        print("Grad-CAM fine err:", e); break
    img_np = img.permute(1,2,0).cpu().numpy(); img_np = (img_np * np.array([0.229,0.224,0.225])) + np.array([0.485,0.456,0.406])
    fname = OUT_DIR/f"gradcam_finetune_{i}_pred{lbl}.png"
    plt.figure(figsize=(4,4)); plt.imshow(img_np); plt.imshow(cam, cmap='jet', alpha=0.4); plt.axis('off'); plt.savefig(fname); plt.close()
print("Saved fine-tune Grad-CAMs to", OUT_DIR)

# Save inference helper
inference_py = f'''
# inference_helper.py
import torch, torchvision.transforms as transforms
from PIL import Image
from torchvision import models
import numpy as np

def load_model(path):
    m = models.resnet18(pretrained=False)
    m.fc = torch.nn.Linear(m.fc.in_features, {len(class_names)})
    m.load_state_dict(torch.load(path, map_location='cpu'))
    m.eval()
    return m

def predict(img_path, model_path):
    tf = transforms.Compose([transforms.Resize(({IMG_SIZE},{IMG_SIZE})), transforms.ToTensor(),
                              transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])])
    img = Image.open(img_path).convert('RGB')
    x = tf(img).unsqueeze(0)
    m = load_model(model_path)
    with torch.no_grad():
        out = m(x)
        p = torch.softmax(out, dim=1).numpy()[0]
    return p

if __name__=='__main__':
    import sys
    print(predict(sys.argv[2], sys.argv[1]))
'''
(OUT_DIR/"inference_helper.py").write_text(inference_py)

# final summary
print("\n=== DONE ===")
print("Artifacts in:", OUT_DIR)
print("Baseline model:", best_base_path)
print("Fine-tuned model:", best_ft_path)


Device: cpu
QUICK mode: using 1043 / 5216 training samples
Classes: ['NORMAL', 'PNEUMONIA']
Sizes (train/val/test): 1043 16 624

== Baseline (ResNet18 frozen backbone) ==
Baseline Epoch 1/2 train_acc:0.8360 val_acc:0.5000 time:35.8s
Saved baseline model: fast_outputs/best_resnet18.pt
Baseline Epoch 2/2 train_acc:0.8993 val_acc:0.6875 time:37.6s
Saved baseline model: fast_outputs/best_resnet18.pt
Baseline test acc: 0.7419871794871795
              precision    recall  f1-score   support

      NORMAL       0.95      0.33      0.49       234
   PNEUMONIA       0.71      0.99      0.83       390

    accuracy                           0.74       624
   macro avg       0.83      0.66      0.66       624
weighted avg       0.80      0.74      0.70       624

Generating 4 baseline Grad-CAM images...
Grad-CAM err: Failed hooks
Saved baseline Grad-CAMs to fast_outputs

== Fine-tune (unfreeze last block) ==


  scaler = torch.cuda.amp.GradScaler(enabled=(DEVICE.type=="cuda"))
  with torch.cuda.amp.autocast(enabled=(DEVICE.type=="cuda")):


Fine-tune Epoch 1/3 train_acc:0.9051 val_acc:0.5000 time:49.1s
Saved fine-tuned model: fast_outputs/best_finetuned_resnet18.pt
Fine-tune Epoch 2/3 train_acc:0.9674 val_acc:0.7500 time:49.1s
Saved fine-tuned model: fast_outputs/best_finetuned_resnet18.pt
Fine-tune Epoch 3/3 train_acc:0.9847 val_acc:0.8125 time:49.8s
Saved fine-tuned model: fast_outputs/best_finetuned_resnet18.pt
Fine-tuned test acc: 0.8830128205128205
              precision    recall  f1-score   support

      NORMAL       0.98      0.71      0.82       234
   PNEUMONIA       0.85      0.99      0.91       390

    accuracy                           0.88       624
   macro avg       0.91      0.85      0.87       624
weighted avg       0.90      0.88      0.88       624

Generating Grad-CAMs for fine-tuned model...


  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)


Saved fine-tune Grad-CAMs to fast_outputs

=== DONE ===
Artifacts in: fast_outputs
Baseline model: fast_outputs/best_resnet18.pt
Fine-tuned model: fast_outputs/best_finetuned_resnet18.pt
