In [None]:
!pip install segmentation-models-pytorch -q

[0m

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
from PIL import Image
import segmentation_models_pytorch as smp
from segmentation_models_pytorch.utils.metrics import IoU, Fscore, Accuracy
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import matplotlib.pyplot as plt

# CONFIG
ENCODER = "vgg16"
ENCODER_WEIGHTS = "imagenet"
CLASSES = 1
ACTIVATION = "sigmoid"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
IMAGE_SIZE = 256
BATCH_SIZE = 32
EPOCHS = 1000
CHECKPOINT_PATH = "/kaggle/working/base_unet_model_original_sigmoid.pth"
PATIENCE_LR = 20
PATIENCE_ES = 100
SAVE_PLOTS_PATH = "/kaggle/working/test_predictions_original"
os.makedirs(SAVE_PLOTS_PATH, exist_ok=True)

# Paths
train_img_dir = "/kaggle/input/polyp-dataset-v1/polyp_dataset/Original/images/train"
train_mask_dir = "/kaggle/input/polyp-dataset-v1/polyp_dataset/Original/masks/train"
test_img_dir = "/kaggle/input/polyp-dataset-v1/polyp_dataset/Original/images/test"
test_mask_dir = "/kaggle/input/polyp-dataset-v1/polyp_dataset/Original/masks/test"

# Albumentations transforms
preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

transform = A.Compose([
    A.Resize(IMAGE_SIZE, IMAGE_SIZE),
    #A.Lambda(image=preprocessing_fn),
    ToTensorV2(transpose_mask=True),
])
raw_transform=A.Compose([
    A.Resize(IMAGE_SIZE, IMAGE_SIZE),
    ToTensorV2(transpose_mask=True),
])
# Path to validation filenames file
val_filenames_path = "/kaggle/input/validation-data/val_images(1).txt"

# Read validation image names into a set (strip newline and whitespace)
with open(val_filenames_path, "r") as f:
    val_filenames = set(line.strip() for line in f.readlines())

print(f"Number of validation images: {len(val_filenames)}")

# Dataset class
class PolypDataset(torch.utils.data.Dataset):
    def __init__(self, img_dir, mask_dir, transform=None, raw_transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.img_names = sorted(os.listdir(img_dir))
        self.raw_transform = raw_transform

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_names[idx])
        mask_path = os.path.join(self.mask_dir, self.img_names[idx])  # Same name assumed

        image = np.array(Image.open(img_path).convert("RGB"))
        raw = image
        mask = np.array(Image.open(mask_path).convert("L"))

        # Binarize mask
        mask = (mask > 127).astype("float32")

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented["image"].float()  # Normalize to [0, 1]
            mask = augmented["mask"].unsqueeze(0).float()  # Ensure shape: [1, H, W]
        if self.raw_transform:
            augmented_raw = self.transform(image=image, mask=None)
            raw = augmented_raw["image"]

        return image, mask, raw


# Custom Dataset class updated to accept a list of filenames to use
class PolypDatasetSubset(torch.utils.data.Dataset):
    def __init__(self, img_dir, mask_dir, filenames, transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.img_names = sorted(filenames)

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        img_name = self.img_names[idx]
        img_path = os.path.join(self.img_dir, img_name)
        mask_path = os.path.join(self.mask_dir, img_name)

        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"))

        mask = (mask > 127).astype("float32")

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented["image"].float()  # Already normalized by ToTensorV2
            mask = augmented["mask"].unsqueeze(0).float()

        return image, mask


# Get all train image filenames
all_train_filenames = sorted(os.listdir(train_img_dir))

# Split filenames by validation file
val_files = [f for f in all_train_filenames if f in val_filenames]
train_files = [f for f in all_train_filenames if f not in val_filenames]

print(f"Training images: {len(train_files)}")
print(f"Validation images: {len(val_files)}")

# Create datasets using the subsets of filenames
train_dataset = PolypDatasetSubset(train_img_dir, train_mask_dir, train_files, transform)
val_dataset = PolypDatasetSubset(train_img_dir, train_mask_dir, val_files, transform)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, num_workers=4, pin_memory=True)


# # Load datasets
# full_train_dataset = PolypDataset(train_img_dir, train_mask_dir, transform)
test_dataset = PolypDataset(test_img_dir, test_mask_dir, transform)

# # Split into train/val
# val_size = int(0.3 * len(full_train_dataset))
# train_size = len(full_train_dataset) - val_size
# train_dataset, val_dataset = random_split(full_train_dataset, [train_size, val_size])

# # Dataloaders
# train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, num_workers=4, pin_memory=True)

for imgs, masks in train_loader:
    print(f"Batch image tensor shape: {imgs.shape}")  # (B, 3, H, W)
    print(f"Batch image tensor min: {imgs.min().item()} max: {imgs.max().item()} mean: {imgs.mean().item()}")
    break  # check only first batch

for imgs, masks in val_loader:
    print(f"Batch image tensor shape: {imgs.shape}")  # (B, 3, H, W)
    print(f"Batch image tensor min: {imgs.min().item()} max: {imgs.max().item()} mean: {imgs.mean().item()}")
    break  # check only first batch

for imgs, masks in test_loader:
    print(f"Batch image tensor shape: {imgs.shape}")  # (B, 3, H, W)
    print(f"Batch image tensor min: {imgs.min().item()} max: {imgs.max().item()} mean: {imgs.mean().item()}")
    break  # check only first batch

In [None]:
import torch
import torch.nn as nn
def double_convolution(in_channels, out_channels):
    """
    In the original paper implementation, the convolution operations were
    not padded but we are padding them here. This is because, we need the
    output result size to be same as input size.
    """
    conv_op = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
    )
    return conv_op



class UNet(nn.Module):
    def __init__(self, num_classes):
        super(UNet, self).__init__()
        self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
        # Contracting path.
        # Each convolution is applied twice.
        self.down_convolution_1 = double_convolution(3, 64)
        self.down_convolution_2 = double_convolution(64, 128)
        self.down_convolution_3 = double_convolution(128, 256)
        self.down_convolution_4 = double_convolution(256, 512)
        self.down_convolution_5 = double_convolution(512, 1024)

        # Expanding path.
        self.up_transpose_1 = nn.ConvTranspose2d(
            in_channels=1024, out_channels=512,
            kernel_size=2,
            stride=2)
        # Below, `in_channels` again becomes 1024 as we are concatinating.
        self.up_convolution_1 = double_convolution(1024, 512)
        self.up_transpose_2 = nn.ConvTranspose2d(
            in_channels=512, out_channels=256,
            kernel_size=2,
            stride=2)
        self.up_convolution_2 = double_convolution(512, 256)
        self.up_transpose_3 = nn.ConvTranspose2d(
            in_channels=256, out_channels=128,
            kernel_size=2,
            stride=2)
        self.up_convolution_3 = double_convolution(256, 128)
        self.up_transpose_4 = nn.ConvTranspose2d(
            in_channels=128, out_channels=64,
            kernel_size=2,
            stride=2)
        self.up_convolution_4 = double_convolution(128, 64)
        # output => `out_channels` as per the number of classes.
        self.out = nn.Conv2d(
            in_channels=64, out_channels=num_classes,
            kernel_size=1
        )

    def forward(self, x):
        # TODO: Write here!
        down_1 = self.down_convolution_1(x)
        down_2 = self.max_pool2d(down_1)
        down_3 = self.down_convolution_2(down_2)
        down_4 = self.max_pool2d(down_3)
        down_5 = self.down_convolution_3(down_4)
        down_6 = self.max_pool2d(down_5)
        down_7 = self.down_convolution_4(down_6)
        down_8 = self.max_pool2d(down_7)
        down_9 = self.down_convolution_5(down_8)

        up_1 = self.up_transpose_1(down_9)
        up_2 = self.up_convolution_1(torch.cat([down_7, up_1], 1))
        up_3 = self.up_transpose_2(up_2)
        up_4 = self.up_convolution_2(torch.cat([down_5, up_3], 1))
        up_5 = self.up_transpose_3(up_4)
        up_6 = self.up_convolution_3(torch.cat([down_3, up_5], 1))
        up_7 = self.up_transpose_4(up_6)
        up_8 = self.up_convolution_4(torch.cat([down_1, up_7], 1))

        out = self.out(up_8)

        return out


model_test = UNet(num_classes=1)
rand_inp=torch.rand(1, 3, 128, 128)
rand_out = model_test(rand_inp)
rand_out.shape

In [None]:
# Model
model = UNet(num_classes=1).to(DEVICE)


# # Make encoder un-trainable
# for param in model.encoder.parameters():
#     param.requires_grad = False

## check children
# for name, module in model.encoder.named_children():
#     print(name)

# # FREEZE all but last two layers of encoder
# # Freeze all encoder layers
# for param in model.encoder.parameters():
#     param.requires_grad = False

# # Unfreeze last two layers (e.g., layer3 and layer4 in ResNet)
# for name, child in model.encoder.named_children():
#     if name in ['layer3', 'layer4']:
#         for param in child.parameters():
#             param.requires_grad = True



# Loss and optimizer
dice_loss_fn = smp.losses.DiceLoss(mode='binary')
bce_loss_fn = smp.losses.SoftBCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# # Metrics from smp
# dice_metric = smp.metrics.Fscore(threshold=0.5)      # Dice coefficient
# iou_metric = smp.metrics.IoU(threshold=0.5)          # Intersection over Union
# accuracy_metric = smp.metrics.Accuracy(threshold=0.5)


dice_metric = Fscore(threshold=0.5)
iou_metrirc = IoU(threshold=0.5)
accuracy_metric = Accuracy(threshold=0.5)

# Additional Imports
import torch.nn as nn
import torchvision.utils as vutils
import matplotlib.pyplot as plt
import os
from tqdm import tqdm


# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=PATIENCE_LR, factor=0.5, verbose=True)

best_val_dice = 0
epochs_no_improve = 0


def train_epoch(loader, model, dice_loss_fn, bce_loss_fn, optimizer):
    model.train()
    epoch_loss = 0
    dice_score = 0
    iou_score = 0
    acc_score = 0
    n_batches = 0

    for imgs, masks in tqdm(loader):
        imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
        preds = model(imgs)

        loss = dice_loss_fn(preds, masks) + bce_loss_fn(preds, masks)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        n_batches += 1

        dice_score += dice_metric(preds, masks).item()
        iou_score += iou_metric(preds, masks).item()
        acc_score += accuracy_metric(preds, masks).item()

    return epoch_loss / n_batches, dice_score / n_batches, iou_score / n_batches, acc_score / n_batches

def validate(loader, model, dice_loss_fn, bce_loss_fn):
    model.eval()
    val_loss = 0
    dice_score = 0
    iou_score = 0
    acc_score = 0
    n_batches = 0

    with torch.no_grad():
        for imgs, masks in loader:
            imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
            preds = model(imgs)
            loss = dice_loss_fn(preds, masks) + bce_loss_fn(preds, masks)
            val_loss += loss.item()
            n_batches += 1

            dice_score += dice_metric(preds, masks).item()
            iou_score += iou_metric(preds, masks).item()
            acc_score += accuracy_metric(preds, masks).item()

    return val_loss / n_batches, dice_score / n_batches, iou_score / n_batches, acc_score / n_batches
train_losses = []
val_losses = []

for epoch in range(EPOCHS):
    train_loss, train_dice, train_iou, train_acc = train_epoch(train_loader, model, dice_loss_fn, bce_loss_fn, optimizer)
    val_loss, val_dice, val_iou, val_acc = validate(val_loader, model, dice_loss_fn, bce_loss_fn)

    train_losses.append(train_loss)
    val_losses.append(val_loss)

    print(f"Epoch {epoch+1}/{EPOCHS}")
    print(f"Train Loss: {train_loss:.4f} - Dice: {train_dice:.4f} - IoU: {train_iou:.4f} - Acc: {train_acc:.4f}")
    print(f"Val   Loss: {val_loss:.4f} - Dice: {val_dice:.4f} - IoU: {val_iou:.4f} - Acc: {val_acc:.4f}")
    print("-" * 60)

    # scheduler, saving best model, early stopping...
    scheduler.step(val_dice)
    if val_dice > best_val_dice:
        best_val_dice = val_dice
        torch.save(model.state_dict(), CHECKPOINT_PATH)
        print("\n>> Saved new best model.\n")
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1

    if epochs_no_improve >= PATIENCE_ES:
        print(f"\n>> Early stopping: no improvement for {PATIENCE_ES} epochs.")
        break

# Load best model for inference
model.load_state_dict(torch.load(CHECKPOINT_PATH))
model.eval()

mean = torch.tensor([0.485,0.456,0.406], device=DEVICE).view(1,3,1,1)
std  = torch.tensor([0.229,0.224,0.225], device=DEVICE).view(1,3,1,1)

test_dice = 0
test_iou = 0
n_batches = 0

with torch.no_grad():
    for i, (imgs, masks, raws) in enumerate(test_loader):
        imgs, masks, raws = imgs.to(DEVICE), masks.to(DEVICE), raws.to(DEVICE)
        preds = model(imgs)

        test_dice += dice_metric(preds, masks).item()
        test_iou += iou_metric(preds, masks).item()
        n_batches += 1

        preds_bin = (preds > 0.5).float()

        for b in range(imgs.shape[0]):
            fig, axs = plt.subplots(1, 3, figsize=(12, 4))
            axs[0].imshow(raws[b].cpu().permute(1, 2, 0))
            axs[0].set_title("Input Image")
            axs[1].imshow(preds_bin[b, 0].cpu(), cmap='gray')
            axs[1].set_title("Prediction")
            axs[2].imshow(masks[b, 0].cpu(), cmap='gray')
            axs[2].set_title("Ground Truth")
            for ax in axs:
                ax.axis('off')
            plt.tight_layout()
            plt.savefig(f"{SAVE_PLOTS_PATH}/test_{i*BATCH_SIZE + b}.png")
            plt.close()

print(f"Test Dice: {test_dice / n_batches:.4f} | Test IoU: {test_iou / n_batches:.4f}")


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8,6))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss over Epochs')
plt.legend()
plt.grid(True)

# Save the figure to a file (change path/name as you want)
plt.savefig('/kaggle/working/train_val_loss_plot.png', dpi=300, bbox_inches='tight')

plt.show()

In [None]:
!zip -r test_predictions_original_base_unet.zip /kaggle/working/test_predictions_original

In [None]:
from IPython.display import FileLink
FileLink(r'test_predictions_original_base_unet.zip')