In [None]:
import sys
import os

# Get the path to the directory above the current one
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))

if project_root not in sys.path:
    sys.path.append(project_root)

# Double-check your current directory
print(f"Current Working Directory: {os.getcwd()}")
print(f"Project root added to path. You can now import.")

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
import torchvision.models as models

import albumentations as A
from albumentations.pytorch import ToTensorV2

import time
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import trange, tqdm
from PIL import Image, ImageOps
import copy
import pandas as pd

In [3]:
# Adjust the config values based on your preference
config = {"n_classes": 21,
          "lr_rate": 0.001,
          "batch_size": 16,
          "num_workers": 2,
          "resolution": 256,
          "sch_step_size": 15,
          "sch_gamma": 0.1}

In [None]:
! pip install urllib3

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("gopalbhattrai/pascal-voc-2012-dataset")

print("Path to dataset files:", path)

In [None]:
import os

# Define source (Read-only) and target (Writable wrapper)
# Note: We need to point to the actual VOCdevkit folder inside the input
input_root = '/kaggle/input/pascal-voc-2012-dataset'
target_root = '/content/VOCdevkit/VOC2012'

# Create the destination directory structure (but not the final folder)
os.makedirs('/content/VOCdevkit', exist_ok=True)

# Create the Symlink
# This creates a "ghost" folder at target_root that points to input_root
if not os.path.exists(target_root):
    print(f"Creating symlink: {target_root} -> {input_root}")
    os.symlink(input_root, target_root)
    print("Success! (No copying required)")
else:
    print("Symlink already exists.")

# Verify it works
print("\nVerifying structure...")
!ls -F /content/VOCdevkit/

In [None]:
import shutil
import os

# The path you got from the download
cached_path = '/root/.cache/kagglehub/datasets/gopalbhattrai/pascal-voc-2012-dataset/versions/1'
target_path = '/content/VOCdevkit'

# Check if we already moved it to avoid errors
if not os.path.exists(target_path):
    print(f"Moving data from {cached_path} to {target_path}...")
    # We use move because it's instant (metadata change only)
    shutil.move(cached_path, target_path)
    print("Move complete!")
else:
    print("Data is already in /content/VOCdevkit")

print("\nDirectory Structure:")
!find /content/VOCdevkit -maxdepth 2 -type d

If you encounter a `FileNotFoundError`, it is likely due to a symlink failure.

**Symptom**: There is only a single `VOC2012` file inside the VOCdevkit directory instead of a folder.

**Solution**: Delete the created `VOCdevkit` and directory and the `VOC2012` file in it. Skip the symlink cell entirely and run the Kaggle Direct Download cell instead. Note that in this case, the data root path will not have `VOC2012` before `VOC2012_train_val` directories.

In [None]:
# NOTE: This utility is consistent with V1 to V4 to maintain a controlled experimental environment
from core.data_engine import VOCDataEngine # Handles data loading and DataLoader creation automatically

# One call to prepare the entire data pipeline
engine = VOCDataEngine(root_path='/content/VOCdevkit/VOC2012/VOC2012_train_val/VOC2012_train_val', batch_size=config["batch_size"], num_workers=config["num_workers"], resolution=config["resolution"])
# or "/content/VOCdevkit/VOC2012_train_val/VOC2012_train_val" if the solution against "FileNotFoundError" is implemented
train_loader, val_loader = engine.get_loaders()

In [19]:
class BaseSegmentationModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.history = {
            'train_loss': [], 'val_loss': [],
            'train_iou': [], 'val_iou': []
        }
        self.best_iou = 0.0

    def update_history(self, train_loss, val_loss, train_iou, val_iou):
        self.history['train_loss'].append(train_loss.item())
        self.history['val_loss'].append(val_loss.item())
        self.history['train_iou'].append(train_iou.item())
        self.history['val_iou'].append(val_iou.item())

    def save_checkpoint(self, path, epoch, iou):
        if iou > self.best_iou:
            self.best_iou = iou
            torch.save({
                'epoch': epoch,
                'model_state_dict': self.state_dict(),
                'history': self.history,
                'best_iou': self.best_iou
            }, path)
            print(f"--- Best Model Saved (mIoU: {iou:.4f}) ---")

In [20]:
class UnetDown(nn.Module):
  def __init__(self, input_size, output_size):
    super().__init__()
    model = [
        nn.BatchNorm2d(input_size),
        nn.ELU(),
        nn.Conv2d(input_size, output_size, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(output_size),
        nn.ELU(),
        nn.MaxPool2d(kernel_size=2),
        nn.Conv2d(output_size, output_size, kernel_size=3, stride=1, padding=1)
    ]

    self.model = nn.Sequential(*model)

  def forward(self, x):
    return self.model(x)

class UnetUp(nn.Module):
  def __init__(self, input_size, output_size):
    super().__init__()

    model = [
        nn.BatchNorm2d(input_size),
        nn.ELU(),
        nn.Conv2d(input_size, output_size, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(output_size),
        nn.ELU(),
        nn.Upsample(scale_factor=2, mode="nearest"),
        nn.Conv2d(output_size, output_size, kernel_size=3, stride=1, padding=1)
    ]

    self.model = nn.Sequential(*model)

  def forward(self, x):
    return self.model(x)

class ASPP(nn.Module):
  def __init__(self, input_size, output_size):
    super().__init__()

    # Parallel branches
    self.branch1 = nn.Sequential(
        nn.Conv2d(input_size, output_size, kernel_size=1, bias=False),
        nn.BatchNorm2d(output_size),
        nn.ReLU(inplace=False)
    )

    self.branch2 = nn.Sequential(
        nn.Conv2d(input_size, output_size, kernel_size=3, padding=2, dilation=2, bias=False),
        nn.BatchNorm2d(output_size),
        nn.ReLU(inplace=False)
    )

    self.branch3 = nn.Sequential(
        nn.Conv2d(input_size, output_size, kernel_size=3, padding=4, dilation=4, bias=False),
        nn.BatchNorm2d(output_size),
        nn.ReLU(inplace=False)
    )

    self.branch4 = nn.Sequential(
        nn.Conv2d(input_size, output_size, kernel_size=3, padding=8, dilation=8, bias=False),
        nn.BatchNorm2d(output_size),
        nn.ReLU(inplace=False)
    )

    # Global Average Pooling Branch (The "God View")
    self.global_avg_pool = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),  # Squash to 1x1
            nn.Conv2d(input_size, output_size, 1, bias=False), # Process the global context
            nn.BatchNorm2d(output_size),
            nn.ReLU(inplace=False)
        )
    # Fuse Layer (Combine all views)
    self.fuse = nn.Sequential(
        nn.Conv2d(output_size * 5, output_size, kernel_size=1, bias=False),
        nn.BatchNorm2d(output_size),
        nn.ReLU(inplace=False),
        nn.Dropout(0.5) # Prevent overfitting to specific textures
    )

  def forward(self, x):

    # Get dimensions for upsampling later
    size = x.shape[2:]

    # Run all paths in parallel
    b1 = self.branch1(x)
    b2 = self.branch2(x)
    b3 = self.branch3(x)
    b4 = self.branch4(x)

    # Calculate Global Average Pooling
    gap = self.global_avg_pool(x)
    # Upsample the 1x1 pixel back to the feature map size
    gap = F.interpolate(gap, size=size, mode='bilinear', align_corners=False)

    # Stack them like a sandwich
    out = torch.cat([b1, b2, b3, b4, gap], dim=1)

    # Mix them
    return self.fuse(out)

class ResNetUNet(BaseSegmentationModel):
  def __init__(self, n_classes=21):
    super().__init__()
    # The Encoder (Pre-trained ResNet-34)
    # Load weights trained on ImageNet
    self.base_model = models.resnet34(weights=models.ResNet34_Weights.DEFAULT)

    self.base_layers = list(self.base_model.children())

    # Layer 0: Initial Conv + BN + ReLU + MaxPool
    # Output shape: [Batch, 64, H/4, W/4]
    self.layer0 = nn.Sequential(*self.base_layers[:3])

    # Layer 1: First ResBlock
    # Output shape: [Batch, 64, H/4, W/4]
    self.layer1 = nn.Sequential(*self.base_layers[4])

    # Layer 2: Second ResBlock
    # Output shape: [Batch, 128, H/8, W/8]
    self.layer2 = self.base_layers[5]

    # Layer 3: Third ResBlock
    # Output shape: [Batch, 256, H/16, W/16]
    self.layer3 = self.base_layers[6]

    # Layer 4: Fourth ResBlock
    # Output shape: [Batch, 512, H/32, W/32]
    self.layer4 = self.base_layers[7]

    self.bottleneck = ASPP(input_size=512, output_size=512)

    self.up1 = UnetUp(512, 256)
    self.up2 = UnetUp(512, 128)
    self.up3 = UnetUp(256, 64)
    self.up4 = UnetUp(128, 64)

    self.final_conv = nn.Conv2d(128, n_classes, kernel_size=1)

  def forward(self, x):
    # ---- ENCODER PATH ----
    x = self.base_layers[0](x) # Conv1
    x = self.base_layers[1](x) # BN
    x0 = self.base_layers[2](x) # ReLU

    x0_skip = x0.clone()

    x_pool = self.base_layers[3](x0) # MaxPool

    x1 = self.layer1(x_pool) # ResBlock1
    x2 = self.layer2(x1) # ResBlock 2
    x3 = self.layer3(x2) # ResBlock 3
    x4 = self.layer4(x3) # ResBlock 4

    x_neck = self.bottleneck(x4)

    # --- DECODER PATH ---
    d1 = self.up1(x_neck)
    d1_ = torch.cat((d1, x3), 1)

    d2 = self.up2((d1_))
    d2_ = torch.cat((d2, x2), 1)

    d3 = self.up3(d2_)
    d3_ = torch.cat((d3, x1), 1)

    d4 = self.up4(d3_)
    d4_ = torch.cat((d4, x0), 1)

    out = self.final_conv(d4_)
    out = F.interpolate(out, scale_factor=2, mode='bilinear', align_corners=False)

    return out

In [21]:
unetv4 = ResNetUNet(n_classes=config["n_classes"])
device = "cuda" if torch.cuda.is_available() else "cpu"
unetv4 = unetv4.to(device)

In [22]:
from core.losses import MulticlassDiceLoss, FocalLoss2d, LossMixer
from core.accuracy import MulticlassIOU

# Define loss functions
loss_dice = MulticlassDiceLoss()
loss_focal = FocalLoss2d()
loss_fn = LossMixer(loss_dice, loss_focal)

# Define optimizer and scheduler
optimizer = torch.optim.Adam([
    dict(params=unetv4.parameters(), lr=config["lr_rate"]),
])

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=config["sch_step_size"], gamma=config["sch_gamma"])

def iou_fn(y_true, y_pred):
    multiclassIOU = MulticlassIOU(num_classes=config["n_classes"])
    iou_score = multiclassIOU.forward(y_pred, y_true)
    return iou_score

In [23]:
# UTILITY: Re-imports local modules to reflect code changes without a kernel restart.

import sys
import importlib
import diagnostics.model_inspector

# 1. Force a reload of the specific module
importlib.reload(diagnostics.model_inspector)

# 2. Re-import the classes to the global namespace
from core.training import Training, EarlyStopping

print("Memory Purged. StaticMethods should be active now.")

Memory Purged. StaticMethods should be active now.


In [25]:
# MODEL INSPECTOR LOGIC IS DELIBERATELY SKIPPED BECAUSE OF THE INABILITY OF THE MODEL TO OVERWRITE
# RELU INPLACE OPERATIONS OF RESNET BACKEND
from timeit import default_timer as timer

def print_train_time(start:float,
                     end: float,
                     device: torch.device=None):
  """Prints difference between start and end time."""
  total_time = end - start
  print(f"Train time on {device}: {total_time:.3f} seconds")
  return total_time

In [26]:
unetv4 = ResNetUNet(n_classes=21)
device = "cuda" if torch.cuda.is_available() else "cpu"
unetv4 = unetv4.to(device)

In [27]:
# Define loss functions
loss_dice = MulticlassDiceLoss()
loss_focal = FocalLoss2d()
loss_fn = LossMixer(loss_dice, loss_focal)

# Define optimizer and scheduler
optimizer = torch.optim.Adam([
    dict(params=unetv4.parameters(), lr=config["lr_rate"]),
])

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=config["sch_step_size"], gamma=config["sch_gamma"])

def iou_fn(y_true, y_pred):
    multiclassIOU = MulticlassIOU(num_classes=config["n_classes"])
    iou_score = multiclassIOU.forward(y_pred, y_true)
    return iou_score

In [None]:
torch.manual_seed(42)

from core.training import Training, EarlyStopping

training = Training()

# Measure time
from timeit import default_timer as timer
train_time_start_on_gpu = timer()

# Initialize early stopping
early_stopper = EarlyStopping(patience=7, min_delta=0.01)

# Set epochs
epochs = 50

# Create an optimizeation and evaluation loop using train_step() and valid_step()
for epoch in tqdm(range(epochs)):
  input_x, label_x = next(iter(train_loader))
  input_y, label_y = next(iter(val_loader))
  t_loss, t_iou = T=training.train_step(model=unetv4,
             data_loader=train_loader,
             loss_fn=loss_fn,
             optimizer=optimizer,
             accuracy_fn=iou_fn,
             device=device)
  v_loss, v_iou = training.valid_step(model=unetv4,
            data_loader=val_loader,
            loss_fn=loss_fn,
            optimizer=optimizer,
            accuracy_fn=iou_fn,
            device=device)

  unetv4.update_history(t_loss, v_loss, t_iou, v_iou)

  # 3. Check Early Stopping
  early_stopper(v_loss)
  if early_stopper.early_stop:
      unetv4.save_checkpoint("best_unet_v4.pth", epoch, v_iou)
      print(f"Early stopping at epoch {epoch}. Model is no longer improving.")
      break

train_time_end_on_gpu = timer()
total_train_time_model_1 = print_train_time(start=train_time_start_on_gpu,
                                            end=train_time_end_on_gpu,
                                            device=device)

In [None]:
checkpoint = torch.load('best_unet_v4.pth', weights_only=True)
unetv4.load_state_dict(checkpoint['model_state_dict'])
unetv4.eval()

In [30]:
import os
import matplotlib.pyplot as plt

def save_evolution_gallery(data, target, predictions, titles, save_dir="inference_results", filename="sample_1.png", num_classes=21):
    """
    Creates the directory and saves the side-by-side evolution plot.
    """
    # Create the directory if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
        print(f"Created directory: {save_dir}")

    num_plots = 2 + len(predictions)
    fig, axes = plt.subplots(1, num_plots, figsize=(5 * num_plots, 5))
    cmap = plt.get_cmap('tab20', num_classes)

    # Original Image (Squeezing and permuting for [H, W, C])
    img = data[0].permute(1, 2, 0).cpu().numpy()
    # If normalized, denormalize here (e.g., img * std + mean)
    axes[0].imshow(img)
    axes[0].set_title("Original Image")

    # Ground Truth
    axes[1].imshow(target[0].cpu(), cmap=cmap, vmin=0, vmax=num_classes-1)
    axes[1].set_title("Ground Truth")

    # Model Predictions
    for i, pred in enumerate(predictions):
        # Handle if pred is raw logits (B, C, H, W) or already argmaxed (B, H, W)
        if pred.ndim == 4:
            mask = pred[0].argmax(0).cpu().numpy()
        else:
            mask = pred[0].cpu().numpy()

        axes[i+2].imshow(mask, cmap=cmap, vmin=0, vmax=num_classes-1)
        axes[i+2].set_title(titles[i])

    for ax in axes:
        ax.axis('off')

    plt.tight_layout()

    # Save the figure
    save_path = os.path.join(save_dir, filename)
    plt.savefig(save_path, bbox_inches='tight', dpi=150)

    plt.close(fig)

    return save_path

In [31]:
valid_iter = iter(val_loader)

In [None]:
# Get one batch from validation
data, target = next(valid_iter)
unetv4.eval()
output = unetv4(data.to(device))
pred = output.argmax(dim=1) # Convert probabilities to class labels

n_images = 50

for n in range(n_images):
  save_evolution_gallery(data, target, [pred], ["UnetV4"], save_dir="unetv4_predictions", filename=f"unetv4_pred_{n}.png")