In [43]:
import os
from PIL import Image                # For image loading
import torch
import torch.nn as nn                      # Core PyTorch
from torch.utils.data import Dataset, DataLoader, random_split  # Dataset tools
from torchvision import transforms   # For preprocessing / augmentations
import torchvision.transforms.functional as F  # For paired transforms
import matplotlib.pyplot as plt       # Optional visualization
import os, json, random, re
from pathlib import Path
import numpy as np
from pprint import pprint

In [44]:
class KvasirSegDataset(Dataset):
    def __init__(self, images_dir, masks_dir, transform=None):
        self.images_dir = images_dir
        self.masks_dir = masks_dir
        self.transform = transform

        self.images = sorted(os.listdir(images_dir))
        self.masks = sorted(os.listdir(masks_dir))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = os.path.join(self.images_dir, self.images[idx])
        mask_path = os.path.join(self.masks_dir, self.masks[idx])

        image = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")

        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        return image, mask


# --- dataset & dataloaders ---
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

dataset = KvasirSegDataset(r"C:\Users\olane\OneDrive - Kristiania\Git\Exam DL\Kvasir-SEG",
r"C:\Users\olane\OneDrive - Kristiania\Git\Exam DL\Kvasir-SEG", 
transform)

train_size = int(0.7 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)


In [45]:
# --- Step 1: Deterministic 70/30 split for Kvasir-SEG ---

# 1) Set your folders (use raw string r"" or forward slashes to avoid \U issues)
IM_DIR = r"C:\Users\olane\OneDrive - Kristiania\Git\Exam DL\Kvasir-SEG\images"
MS_DIR = r"C:\Users\olane\OneDrive - Kristiania\Git\Exam DL\Kvasir-SEG\masks"

# 2) Basic checks
assert os.path.isdir(IM_DIR), f"Images folder not found: {IM_DIR}"
assert os.path.isdir(MS_DIR), f"Masks folder not found: {MS_DIR}"

img_files = sorted([f for f in os.listdir(IM_DIR) if not f.startswith('.')])
msk_files = sorted([f for f in os.listdir(MS_DIR) if not f.startswith('.')])

print(f"Found {len(img_files)} images and {len(msk_files)} masks")

# 3) (Optional but recommended) Check that file basenames match between folders
#    Kvasir-SEG usually keeps identical names between images/ and masks/.
def stem_no_ext(name: str) -> str:
    return os.path.splitext(name)[0]

img_stems = [stem_no_ext(f) for f in img_files]
msk_stems = [stem_no_ext(f) for f in msk_files]

if img_stems != msk_stems:
    # Try a natural sort (e.g., "2.png" before "10.png")
    def natural_key(s):
        return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', s)]
    img_files = sorted(img_files, key=natural_key)
    msk_files = sorted(msk_files, key=natural_key)
    img_stems = [stem_no_ext(f) for f in img_files]
    msk_stems = [stem_no_ext(f) for f in msk_files]

    if img_stems != msk_stems:
        # last-resort: show a few mismatches to fix naming issues early
        mismatches = [(i, a, b) for i, (a, b) in enumerate(zip(img_stems, msk_stems)) if a != b][:10]
        raise RuntimeError(
            "Image/mask filename order mismatch. First mismatches:\n" +
            "\n".join([f"{i}: {a}  vs  {b}" for i, a, b in mismatches])
        )

print("✅ Image/mask lists are aligned by basename.")

# 4) Create deterministic indices (shuffle once with a fixed seed)
N = len(img_files)
assert N == len(msk_files), "Different counts for images and masks."
assert N >= 1, "No files found."

SEED = 1337
rng = random.Random(SEED)
indices = list(range(N))
rng.shuffle(indices)

train_count = int(0.7 * N)  # 70%
train_idx = indices[:train_count]
val_idx   = indices[train_count:]

print(f"Split sizes -> train: {len(train_idx)}, val: {len(val_idx)}")

# 5) Save split to disk (JSON) next to the dataset (you can choose another location)
split_out = Path(IM_DIR).parent / "split_kvasirseg_seed1337.json"
with open(split_out, "w", encoding="utf-8") as f:
    json.dump({"seed": SEED, "train_idx": train_idx, "val_idx": val_idx}, f, indent=2)

print(f"✅ Saved split file: {split_out}")


Found 1000 images and 1000 masks
✅ Image/mask lists are aligned by basename.
Split sizes -> train: 700, val: 300
✅ Saved split file: C:\Users\olane\OneDrive - Kristiania\Git\Exam DL\Kvasir-SEG\split_kvasirseg_seed1337.json


In [46]:
with open(r"C:\Users\olane\OneDrive - Kristiania\Git\Exam DL\Kvasir-SEG\split_kvasirseg_seed1337.json", "r", encoding="utf-8") as f:
    sp = json.load(f)
train_idx = sp["train_idx"]
val_idx   = sp["val_idx"]

print(len(train_idx), len(val_idx))

700 300


# experiment design

In [47]:
SEED = 55
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

IM_DIR = r"C:\Users\olane\OneDrive - Kristiania\Git\Exam DL\Kvasir-SEG\images"
MS_DIR = r"C:\Users\olane\OneDrive - Kristiania\Git\Exam DL\Kvasir-SEG\masks"
assert os.path.isdir(IM_DIR) and os.path.isdir(MS_DIR), "Fix IM_DIR/MS_DIR paths."

# deterministic 70/30 split (filenames only, assuming 1-1 matching sort order)
all_imgs = sorted(os.listdir(IM_DIR))
all_msks = sorted(os.listdir(MS_DIR))
assert len(all_imgs)==len(all_msks)==1000

# Option A: pure sort-based split (simple and reproducible)
train_idx = list(range(700))
val_idx   = list(range(700, 1000))

# Option B (recommended): one-time deterministic shuffle & save
# rnd = random.Random(SEED); idx = list(range(1000)); rnd.shuffle(idx)
# train_idx, val_idx = idx[:700], idx[700:]
# with open("split_kvasirseg_seed1337.json","w") as f: json.dump({"train":train_idx,"val":val_idx}, f)

# Repro & paths

In [48]:


class KvasirSegDataset(Dataset):
    def __init__(self, im_dir, ms_dir, indices, policy="none", size=(256,256)):
        self.im_paths = [os.path.join(im_dir, f) for i,f in enumerate(sorted(os.listdir(im_dir))) if i in indices]
        self.ms_paths = [os.path.join(ms_dir, f) for i,f in enumerate(sorted(os.listdir(ms_dir))) if i in indices]
        self.policy = policy
        self.size = size

    def __len__(self): return len(self.im_paths)

    def _geo_params(self):
        # Shared random params for paired transforms
        hflip = random.random() < 0.5
        vflip = random.random() < 0.5
        angle = random.uniform(-10, 10)
        translate = (random.uniform(-0.05,0.05), random.uniform(-0.05,0.05)) # as fraction
        scale = random.uniform(0.9, 1.1)
        shear = random.uniform(-5, 5)
        return hflip, vflip, angle, translate, scale, shear

    def _apply_policy(self, img, msk):
        w, h = img.size

        if self.policy in ("flip","flip+rot","geo+scale","geo+color","strong"):
            hflip, vflip, angle, translate, scale, shear = self._geo_params()

            # flips
            if self.policy in ("flip","flip+rot","geo+scale","geo+color","strong"):
                if hflip:
                    img = F.hflip(img); msk = F.hflip(msk)
                if vflip:
                    img = F.vflip(img); msk = F.vflip(msk)

            # rotations / affine
            if self.policy in ("flip+rot","geo+scale","geo+color","strong"):
                tx = int(translate[0]*w); ty = int(translate[1]*h)
                img = F.affine(img, angle=angle, translate=(tx,ty), scale=1.0, shear=[shear,0], interpolation=F.InterpolationMode.BILINEAR)
                msk = F.affine(msk, angle=angle, translate=(tx,ty), scale=1.0, shear=[shear,0], interpolation=F.InterpolationMode.NEAREST)

            # random resize crop (scale jitter)
            if self.policy in ("geo+scale","geo+color","strong"):
                # simulate scale by resized crop: pick a crop <= original then resize back
                scale_fac = random.uniform(0.85, 1.0)
                cw, ch = int(w*scale_fac), int(h*scale_fac)
                if cw>0 and ch>0:
                    x = random.randint(0, max(0, w-cw))
                    y = random.randint(0, max(0, h-ch))
                    img = img.crop((x,y,x+cw,y+ch)).resize((w,h), Image.BILINEAR)
                    msk = msk.crop((x,y,x+cw,y+ch)).resize((w,h), Image.NEAREST)

            # image-only color jitter
            if self.policy in ("geo+color","strong"):
                b = random.uniform(0.9,1.1)
                c = random.uniform(0.9,1.1)
                s = random.uniform(0.9,1.1)
                img = F.adjust_brightness(img, b)
                img = F.adjust_contrast(img,  c)
                img = F.adjust_saturation(img, s)

            # a bit stronger affine (acts like elastic-lite)
            if self.policy == "strong":
                angle2 = random.uniform(-15, 15)
                shear2 = random.uniform(-8, 8)
                img = F.affine(img, angle=angle2, translate=(0,0), scale=random.uniform(0.95,1.05),
                               shear=[shear2,0], interpolation=F.InterpolationMode.BILINEAR)
                msk = F.affine(msk, angle=angle2, translate=(0,0), scale=random.uniform(0.95,1.05),
                               shear=[shear2,0], interpolation=F.InterpolationMode.NEAREST)

        return img, msk

    def __getitem__(self, idx):
        img = Image.open(self.im_paths[idx]).convert("RGB")
        msk = Image.open(self.ms_paths[idx]).convert("L")     # single-channel

        if self.policy != "none":
            img, msk = self._apply_policy(img, msk)

        # final resize -> tensor
        img = F.resize(img, self.size, interpolation=F.InterpolationMode.BILINEAR)
        msk = F.resize(msk, self.size, interpolation=F.InterpolationMode.NEAREST)
        img = F.to_tensor(img)                     # [3,H,W], 0..1
        msk = (F.to_tensor(msk) > 0.5).float()     # [1,H,W] binary
        return img, msk


# Dataset with paired transforms (same geometry for image & mask)

In [49]:

def conv_block(cin, cout):
    return nn.Sequential(
        nn.Conv2d(cin, cout, 3, padding=1), nn.BatchNorm2d(cout), nn.ReLU(inplace=True),
        nn.Conv2d(cout, cout, 3, padding=1), nn.BatchNorm2d(cout), nn.ReLU(inplace=True)
    )

class UNet(nn.Module):
    def __init__(self, in_ch=3, out_ch=1, base=32):
        super().__init__()
        self.enc1 = conv_block(in_ch, base)
        self.enc2 = conv_block(base, base*2)
        self.enc3 = conv_block(base*2, base*4)
        self.pool = nn.MaxPool2d(2)

        self.bott = conv_block(base*4, base*8)

        self.up3 = nn.ConvTranspose2d(base*8, base*4, 2, 2)
        self.dec3 = conv_block(base*8, base*4)
        self.up2 = nn.ConvTranspose2d(base*4, base*2, 2, 2)
        self.dec2 = conv_block(base*4, base*2)
        self.up1 = nn.ConvTranspose2d(base*2, base, 2, 2)
        self.dec1 = conv_block(base*2, base)

        self.out  = nn.Conv2d(base, out_ch, 1)

    def forward(self, x):
        e1 = self.enc1(x); p1 = self.pool(e1)
        e2 = self.enc2(p1); p2 = self.pool(e2)
        e3 = self.enc3(p2); p3 = self.pool(e3)

        b  = self.bott(p3)

        d3 = self.up3(b); d3 = torch.cat([d3, e3], 1); d3 = self.dec3(d3)
        d2 = self.up2(d3); d2 = torch.cat([d2, e2], 1); d2 = self.dec2(d2)
        d1 = self.up1(d2); d1 = torch.cat([d1, e1], 1); d1 = self.dec1(d1)
        return self.out(d1)                         # logits


# unet 


In [50]:
def dice_iou(pred_logits, target, thr=0.5, eps=1e-7):
    pred = (torch.sigmoid(pred_logits) > thr).float()
    inter = (pred*target).sum(dim=(1,2,3))
    union = pred.sum(dim=(1,2,3)) + target.sum(dim=(1,2,3))
    dice = (2*inter + eps) / (union + eps)

    # IoU: |A ∩ B| / |A ∪ B|
    union_iou = pred.sum(dim=(1,2,3)) + target.sum(dim=(1,2,3)) - inter
    iou = (inter + eps) / (union_iou + eps)
    return dice.mean().item(), iou.mean().item()

@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    dices, ious = [], []
    for x,y in loader:
        x,y = x.to(device), y.to(device)
        logits = model(x)
        d,i = dice_iou(logits, y)
        dices.append(d); ious.append(i)
    return float(np.mean(dices)), float(np.mean(ious))

def train_one_policy(policy_name, epochs=15, batch=8, lr=1e-3, size=(256,256), device=None):
    device = device or ("cuda" if torch.cuda.is_available() else "cpu")

    train_ds = KvasirSegDataset(IM_DIR, MS_DIR, train_idx, policy=policy_name, size=size)
    val_ds   = KvasirSegDataset(IM_DIR, MS_DIR, val_idx,   policy="none",     size=size)

    train_loader = DataLoader(train_ds, batch_size=batch, shuffle=True, num_workers=0)
    val_loader   = DataLoader(val_ds,   batch_size=batch, shuffle=False, num_workers=0)

    model = UNet().to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = torch.nn.BCEWithLogitsLoss()

    best = {"val_dice": -1, "val_iou": -1, "epoch": -1}
    for ep in range(1, epochs+1):
        model.train()
        for x,y in train_loader:
            x,y = x.to(device), y.to(device)
            logits = model(x)
            loss = loss_fn(logits, y)
            opt.zero_grad(); loss.backward(); opt.step()
        val_dice, val_iou = evaluate(model, val_loader, device)
        if val_dice > best["val_dice"]:
            best = {"val_dice": val_dice, "val_iou": val_iou, "epoch": ep}
    return best


# Metrics (Dice, IoU) & training loop

In [51]:
def dice_iou(pred_logits, target, thr=0.5, eps=1e-7):
    pred = (torch.sigmoid(pred_logits) > thr).float()
    inter = (pred*target).sum(dim=(1,2,3))
    union = pred.sum(dim=(1,2,3)) + target.sum(dim=(1,2,3))
    dice = (2*inter + eps) / (union + eps)

    # IoU: |A ∩ B| / |A ∪ B|
    union_iou = pred.sum(dim=(1,2,3)) + target.sum(dim=(1,2,3)) - inter
    iou = (inter + eps) / (union_iou + eps)
    return dice.mean().item(), iou.mean().item()

@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    dices, ious = [], []
    for x,y in loader:
        x,y = x.to(device), y.to(device)
        logits = model(x)
        d,i = dice_iou(logits, y)
        dices.append(d); ious.append(i)
    return float(np.mean(dices)), float(np.mean(ious))

def train_one_policy(policy_name, epochs=15, batch=8, lr=1e-3, size=(256,256), device=None):
    device = device or ("cuda" if torch.cuda.is_available() else "cpu")

    train_ds = KvasirSegDataset(IM_DIR, MS_DIR, train_idx, policy=policy_name, size=size)
    val_ds   = KvasirSegDataset(IM_DIR, MS_DIR, val_idx,   policy="none",     size=size)

    train_loader = DataLoader(train_ds, batch_size=batch, shuffle=True, num_workers=0)
    val_loader   = DataLoader(val_ds,   batch_size=batch, shuffle=False, num_workers=0)

    model = UNet().to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = torch.nn.BCEWithLogitsLoss()

    best = {"val_dice": -1, "val_iou": -1, "epoch": -1}
    for ep in range(1, epochs+1):
        model.train()
        for x,y in train_loader:
            x,y = x.to(device), y.to(device)
            logits = model(x)
            loss = loss_fn(logits, y)
            opt.zero_grad(); loss.backward(); opt.step()
        val_dice, val_iou = evaluate(model, val_loader, device)
        if val_dice > best["val_dice"]:
            best = {"val_dice": val_dice, "val_iou": val_iou, "epoch": ep}
    return best


#  Run the ablation & log results

In [53]:
policies = ["none", "flip", "flip+rot", "geo+scale", "geo+color", "strong"]
results = []

for p in policies:
    print(f"\n=== Training policy: {p} ===")
    best = train_one_policy(p, epochs=15, batch=8, lr=1e-3, size=(256,256))
    best["policy"] = p
    results.append(best)

# Pretty print table
pprint(results)



=== Training policy: none ===


RuntimeError: Numpy is not available