In [None]:
# UNet++ library
!pip install segmentation-models-pytorch



Collecting segmentation-models-pytorch
  Downloading segmentation_models_pytorch-0.5.0-py3-none-any.whl.metadata (17 kB)
Downloading segmentation_models_pytorch-0.5.0-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.8/154.8 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: segmentation-models-pytorch
Successfully installed segmentation-models-pytorch-0.5.0


In [14]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from torchvision.transforms import functional as F
from PIL import Image
from pathlib import Path
import glob
import torch.nn as nn
import segmentation_models_pytorch as smp
import json

In [15]:
from pathlib import Path
import glob

# Local path to your dataset
data_dir = Path(r"C:\Users\wesal\OneDrive\Desktop\Junior-fall\Machine Learning\Is It This One\data")
images_dir = data_dir / "images"
masks_dir = data_dir / "masks"

# Grab all images and masks
all_images = sorted(glob.glob(str(images_dir / "*.[jJ][pP][gG]")))
all_masks = sorted(glob.glob(str(masks_dir / "*.[pP][nN][gG]")))

print(f"Found {len(all_images)} images and {len(all_masks)} masks")

# Select only images that have masks
mask_indices = list(range(1, 22)) + list(range(69, 90))  # 1–21 and 69–89

train_images = [str(images_dir / f"{i:04d}.jpg") for i in mask_indices]
train_masks = [str(masks_dir / f"{i:04d}_mask.png") for i in mask_indices]

print(f"Selected {len(train_images)} images for training")


Found 128 images and 42 masks
Selected 42 images for training


In [None]:
class UterusDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None):
        self.images = image_paths
        self.masks = mask_paths
        self.transform = transform  # store transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = Image.open(self.images[idx]).convert("RGB")
        mask = Image.open(self.masks[idx]).convert("L")

        if self.transform:
            # Apply the transform to the image
            img = self.transform(img)

        # Resize mask to match image size
        mask = F.resize(mask, (256, 256))

        # Random flips for augmentation
        if torch.rand(1) > 0.5:
            img = F.hflip(img)
            mask = F.hflip(mask)
        if torch.rand(1) > 0.5:
            img = F.vflip(img)
            mask = F.vflip(mask)

        # Convert to tensor
        mask = transforms.ToTensor()(mask)

        return img, mask

    
transform = transforms.Compose([
    transforms.Resize((256, 256)),  
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor()
])


dataset = UterusDataset(train_images, train_masks, transform=transform)
val_split = 0.2
val_size = int(len(dataset) * val_split)
train_size = len(dataset) - val_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False)

print("Dataset and DataLoader ready!")
print(f"Total training samples: {len(dataset)}")


Dataset and DataLoader ready!
Total training samples: 42


In [9]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

model = smp.UnetPlusPlus(
    encoder_name="resnet34",
    encoder_weights="imagenet",
    in_channels=3,
    classes=1,  # binary mask
)
model.to(device)


Using device: cpu


UnetPlusPlus(
  (encoder): ResNetEncoder(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=Tru

In [10]:
bce_loss = nn.BCEWithLogitsLoss()

def dice_loss(pred, target, eps=1e-6):
    pred = torch.sigmoid(pred)
    intersect = (pred * target).sum()
    union = pred.sum() + target.sum() + eps
    return 1 - (2 * intersect / union)


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 20
patience = 5  # early stopping patience
best_loss = float("inf")
counter = 0

# folders
ckpt_dir = Path("checkpoints")
ckpt_dir.mkdir(exist_ok=True)

# load history if exists
history_file = ckpt_dir / "history.json"
if history_file.exists():
    with open(history_file, "r") as f:
        history = json.load(f)
else:
    history = {"loss": []}

print("Training starting...")

for epoch in range(len(history["loss"]), num_epochs):

    model.train()
    epoch_loss = 0

    for imgs, masks in train_loader:
        imgs, masks = imgs.to(device), masks.to(device)
        masks = (masks > 0.5).float()

        optimizer.zero_grad()
        outputs = model(imgs)

        loss = dice_loss(outputs, masks) + bce_loss(outputs, masks)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / len(train_loader)
    history["loss"].append(avg_loss)

    print(f"Epoch {epoch+1} / {num_epochs} - Loss: {avg_loss:.4f}")

    # save every epoch
    torch.save(model.state_dict(), ckpt_dir / f"epoch_{epoch+1}.pth")

    # save best model
    if avg_loss < best_loss:
        best_loss = avg_loss
        torch.save(model.state_dict(), ckpt_dir / "best_model.pth")
        counter = 0
    else:
        counter += 1

    # save history
    with open(history_file, "w") as f:
        json.dump(history, f)

    # early stopping
    if counter >= patience:
        print("Early stopping triggered.")
        break

print("Training finished!")


Training starting...


KeyboardInterrupt: 

In [None]:
import json
import matplotlib.pyplot as plt

with open("checkpoints/history.json", "r") as f:
    history = json.load(f)

plt.plot(history["loss"], marker="o")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss")
plt.grid()
plt.show()


NameError: name 'history' is not defined

In [None]:
# Make sure all 128 images are covered
all_images = sorted(glob.glob(str(images_dir / "*.jpg")))

model.eval()
with torch.no_grad():
    for img_path in all_images:
        idx = int(Path(img_path).stem)  # 0001 -> 1
        mask_out_path = Path("data/mask") / f"{idx:04d}_pred.png"

        # Skip if mask already exists
        if mask_out_path.exists():
            continue

        img = Image.open(img_path).convert("RGB")
        img_tensor = transform(img).unsqueeze(0).to(device)
        pred = torch.sigmoid(model(img_tensor))[0,0]  # [H,W]
        pred_np = (pred.cpu().numpy() > 0.5).astype(np.uint8) * 255

        Image.fromarray(pred_np).save(mask_out_path)
