<a href="https://colab.research.google.com/github/kimdanny/COMP0189-practical/blob/main/Week-06/deepglobe_land_cover_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# COMP0189: Applied Artificial Intelligence
## Week 5 (Deep Learning - image segmentation)

In this notebook we use [Unet](https://arxiv.org/abs/1505.04597) for Land Cover Classfication from Satellite Imagery using [DeepGlobe Land Cover Classification Dataset](https://www.kaggle.com/datasets/balraj98/deepglobe-land-cover-classification-dataset).



**Land cover classification** involves analysing satellite images and segmenting them into regions based on their land cover type. These types include urban areas, forests, water bodies, and more. Segmentation **assigns a class label to each pixel**, creating detailed, colour-coded maps of land use.

For example:
- Pixels representing urban areas are assigned the RGB value (0, 255, 255).
- Pixels representing water are assigned the RGB value (0, 0, 255).


After this week you will be able to ...

- Train U-Net models in PyTorch.
- Implement Dice loss and BCE-Dice loss.
- Visualize the prediction output on some of the test images using the trained U-Net.
- Learn how data augmentation affects model training.
- Compute the area of one class on the test set ground truth, the same class on the predicted masks on the same test set, and compute the difference between the two to see the error of your predictions.

### Libraries 📚⬇

In [None]:
!pip install segmentation-models-pytorch==0.2.0
!pip install opencv-python
!pip install albumentations

In [2]:
import os, cv2
import numpy as np
import pandas as pd
import random, tqdm
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

import warnings
warnings.filterwarnings("ignore")

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import albumentations as album

In [3]:
import segmentation_models_pytorch as smp

In [None]:
# Getting dataset's metadata
! wget https://raw.githubusercontent.com/kimdanny/COMP0189-practical/main/data/class_dict.csv
! wget https://raw.githubusercontent.com/kimdanny/COMP0189-practical/main/data/metadata.csv

In [None]:
# Downloading dataset
! wget https://competitions.codalab.org/my/datasets/download/b6def20d-34c5-4871-8d9d-d97075179ea0 -O land-train.zip
! wget https://competitions.codalab.org/my/datasets/download/dfb325b3-4e9c-43c0-93b3-036eec5fa773 -O land_valid_sat.zip
! wget https://competitions.codalab.org/my/datasets/download/61ac1b46-9bd3-4694-810f-ffd08a7f832a -O land_test_sat.zip

In [None]:
! unzip -qq land-train.zip
! mv land-train train
! unzip -qq land_valid_sat.zip -d valid
! unzip -qq land_test_sat.zip -d test

### Read Data & Create train / valid splits 📁

We previously downloaded the datasets metadata files.

`metadata.csv` reports the image IDs, image paths, which split they belong to, and the path to the segmentation mask (label).  
`class_dict.csv` reports the RGB colour code for each of the 7 possible classes in the segmentation masks.

In [None]:
DATA_DIR = '/content/'

# Load metadata into df
metadata_df = pd.read_csv(os.path.join(DATA_DIR, 'metadata.csv'))

metadata_df.head(5)

In [None]:
# Load class info into df
class_dict_df = pd.read_csv(os.path.join(DATA_DIR, 'class_dict.csv'))

class_dict_df.head(5)

Use the metadata to create your data splits.

In [None]:
# Get train set & select relevant columns
metadata_df = metadata_df[metadata_df['split']=='train']
metadata_df = metadata_df[['image_id', 'sat_image_path', 'mask_path']]

# Update paths
metadata_df['sat_image_path'] = metadata_df['sat_image_path'].apply(lambda img_pth: os.path.join(DATA_DIR, img_pth))
metadata_df['mask_path'] = metadata_df['mask_path'].apply(lambda img_pth: os.path.join(DATA_DIR, img_pth))

# Shuffle dataframe
metadata_df = metadata_df.sample(frac=1).reset_index(drop=True)

# Perform 80/20 split for train / test
test_df = metadata_df.sample(frac=0.2, random_state=42)
train_df = metadata_df.drop(test_df.index)

# Perform 90/10 split for train / val
valid_df = train_df.sample(frac=0.1, random_state=42)
train_df = train_df.drop(valid_df.index)

# Check number of samples
print('Train / test / val samples:')
print(len(train_df), '/', len(test_df), '/', len(valid_df))

Extract label information.

In [None]:
# Get class names
class_names = class_dict_df['name'].tolist()
# Get class RGB values
class_rgb_values = class_dict_df[['r','g','b']].values.tolist()

print('All dataset classes and their corresponding RGB values in labels:')
print('Class Names: ', class_names)
print('Class RGB values: ', class_rgb_values)

#### Shortlist specific classes to segment 

In [None]:
# Useful to shortlist specific classes in datasets with large number of classes
select_classes = ['urban_land', 'agriculture_land', 'rangeland', 'forest_land', 'water', 'barren_land', 'unknown']

# Get RGB values of required classes
select_class_indices = [class_names.index(cls.lower()) for cls in select_classes]
select_class_rgb_values =  np.array(class_rgb_values)[select_class_indices]

print('Selected classes and their corresponding RGB values in labels:')
print('Class Names: ', class_names)
print('Class RGB values: ', class_rgb_values)

### Helper functions for viz. & one-hot encoding/decoding

In [10]:
# helper function for data visualization
def visualize(**images):
    """
    Plot images in one row
    """
    n_images = len(images)
    plt.figure(figsize=(20,8))
    for idx, (name, image) in enumerate(images.items()):
        plt.subplot(1, n_images, idx + 1)
        plt.xticks([]);
        plt.yticks([])
        # get title from the parameter names
        plt.title(name.replace('_',' ').title(), fontsize=20)
        plt.imshow(image)
    plt.show()

# Perform one hot encoding on label
def one_hot_encode(label, label_values):
    """
    Convert a segmentation image label array to one-hot format
    by replacing each pixel value with a vector of length num_classes
    # Arguments
        label: The 2D array segmentation image label
        label_values

    # Returns
        A 2D array with the same width and hieght as the input, but
        with a depth size of num_classes
    """
    semantic_map = []
    for colour in label_values:
        equality = np.equal(label, colour)
        class_map = np.all(equality, axis = -1)
        semantic_map.append(class_map)
    semantic_map = np.stack(semantic_map, axis=-1)

    return semantic_map

# Perform reverse one-hot-encoding on labels / preds
def reverse_one_hot(image):
    """
    Transform a 2D array in one-hot format (depth is num_classes),
    to a 2D array with only 1 channel, where each pixel value is
    the classified class key.
    # Arguments
        image: The one-hot format image

    # Returns
        A 2D array with the same width and hieght as the input, but
        with a depth size of 1, where each pixel value is the classified
        class key.
    """
    x = np.argmax(image, axis = -1)
    return x

# Perform colour coding on the reverse-one-hot outputs
def colour_code_segmentation(image, label_values):
    """
    Given a 1-channel array of class keys, colour code the segmentation results.
    # Arguments
        image: single channel array where each value represents the class key.
        label_values

    # Returns
        Colour coded image for segmentation visualization
    """
    colour_codes = np.array(label_values)
    x = colour_codes[image.astype(int)]

    return x

## Custom Dataset class
Pytorch has utilities that help us create well-structured datsets, which is important for computer vision tasks.

This class inherits from `torch.utils.data.Dataset` and allows us to put together a dataset class that handles:
- reading the satellite images and their corresponding segmentation masks
- preprocessing such as one-hot encoding the segmentation masks
- any desired operations on the data, such as augmentation

In [11]:
class LandCoverDataset(torch.utils.data.Dataset):

    """DeepGlobe Land Cover Classification Challenge Dataset. Read images, apply augmentation and preprocessing transformations.

    Args:
        df (str): DataFrame containing images / labels paths
        class_rgb_values (list): RGB values of select classes to extract from segmentation mask
        augmentation (albumentations.Compose): data transfromation pipeline
            (e.g. flip, scale, etc.)
        preprocessing (albumentations.Compose): data preprocessing
            (e.g. noralization, shape manipulation, etc.)

    """
    def __init__(
            self,
            df,
            class_rgb_values=None,
            augmentation=None,
            preprocessing=None,
    ):
        self.image_paths = df['sat_image_path'].tolist()
        self.mask_paths = df['mask_path'].tolist()

        self.class_rgb_values = class_rgb_values
        self.augmentation = augmentation
        self.preprocessing = preprocessing

    def __getitem__(self, i):

        # read images and masks
        image = cv2.cvtColor(cv2.imread(self.image_paths[i]), cv2.COLOR_BGR2RGB)
        mask = cv2.cvtColor(cv2.imread(self.mask_paths[i]), cv2.COLOR_BGR2RGB)

        # one-hot-encode the mask
        mask = one_hot_encode(mask, self.class_rgb_values).astype('float')

        # apply augmentations
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        # apply preprocessing
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        return image, mask

    def __len__(self):
        # return length of
        return len(self.image_paths)

#### Visualize Sample Image and Mask 📈

In [None]:
# Instantiate the dataset
dataset = LandCoverDataset(train_df, class_rgb_values=select_class_rgb_values)

# Sample a random image to visualise
random_idx = random.randint(0, len(dataset)-1)
image, mask = dataset[random_idx]

# Use helper function defined earlier to inspect image
visualize(
    original_image = image,
    ground_truth_mask = colour_code_segmentation(reverse_one_hot(mask), select_class_rgb_values),
    one_hot_encoded_mask = reverse_one_hot(mask)
)

### Defining Augmentation and preprocessing pipeline for data loader

In [13]:
def get_training_augmentation():
    train_transform = [
        album.RandomCrop(height=1024, width=1024, always_apply=True),
        album.HorizontalFlip(p=0.5),
        album.VerticalFlip(p=0.5),
    ]
    return album.Compose(train_transform)


def get_validation_augmentation():
    train_transform = [
        album.CenterCrop(height=1024, width=1024, always_apply=True),
    ]
    return album.Compose(train_transform)


def get_training_no_augmentation():
    train_transform = [
        album.CenterCrop(height=1024, width=1024, always_apply=True),
    ]
    return album.Compose(train_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn=None):
    """Construct preprocessing transform
    Args:
        preprocessing_fn (callable): data normalization function
            (can be specific for each pretrained neural network)
    Return:
        transform: albumentations.Compose
    """
    _transform = []
    if preprocessing_fn:
        _transform.append(album.Lambda(image=preprocessing_fn))
    _transform.append(album.Lambda(image=to_tensor, mask=to_tensor))

    return album.Compose(_transform)

#### Visualize Augmented Images & Masks

In [None]:
augmented_dataset = LandCoverDataset(
    train_df,
    augmentation=get_training_augmentation(),
    class_rgb_values=select_class_rgb_values,
)

image, mask = augmented_dataset[random_idx]
visualize(
        original_image = image,
        ground_truth_mask = colour_code_segmentation(reverse_one_hot(mask), select_class_rgb_values),
        one_hot_encoded_mask = reverse_one_hot(mask))

## Training Unet

### Model Definition

In [15]:
ENCODER = 'resnet50'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = select_classes
ACTIVATION = 'sigmoid' # could be None for logits or 'softmax2d' for multiclass segmentation



model = smp.Unet(
    encoder_name=ENCODER,        # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
    encoder_weights=ENCODER_WEIGHTS,     # use `imagenet` pre-trained weights for encoder initialization
    classes=len(CLASSES),
    activation=ACTIVATION,# model output channels (number of classes in your dataset)
)

preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

# Task 1: Diceloss


Implement the 1-dice loss
(https://www.kaggle.com/code/bigironsphere/loss-function-library-keras-pytorch)

In [None]:
import torch
import torch.nn as nn

def f_score(pr, gt, beta=1, eps=1e-7, threshold=None, activation='sigmoid'):
    """
    Args:
        pr (torch.Tensor): Predicted tensor (logits or probabilities)
        gt (torch.Tensor): Ground truth tensor (binary mask)
        beta (float): Weight for precision-recall balance
        eps (float): Small epsilon for numerical stability
        threshold (float or None): Threshold for binarization (if needed)
        activation (str): Activation function ('sigmoid' or 'softmax2d')
    Returns:
        float: Dice coefficient (F-score)
    """

    if activation is None or activation == "none":
        activation_fn = lambda x: x  # No activation applied
    elif activation == "sigmoid":
        activation_fn = torch.nn.Sigmoid()
    elif activation == "softmax2d":
        activation_fn = torch.nn.Softmax2d()
    else:
        raise NotImplementedError("Activation must be 'sigmoid' or 'softmax2d'.")

    # Your code here...


class DiceLoss(nn.Module):
    """Dice Loss for segmentation tasks."""
    __name__ = 'dice_loss'

    def __init__(self, eps=1e-7, activation='sigmoid'):
        super().__init__()
        self.activation = activation
        self.eps = eps

    def forward(self, y_pr, y_gt):
        # Your code here...


class BCEDiceLoss(DiceLoss):
    """Combination of BCE Loss and Dice Loss."""
    __name__ = 'bce_dice_loss'

    def __init__(self, eps=1e-7, activation='sigmoid', lambda_dice=1.0, lambda_bce=1.0):
        super().__init__(eps, activation)
        self.lambda_dice = lambda_dice
        self.lambda_bce = lambda_bce

        if activation is None:
            self.bce = nn.BCELoss(reduction='mean')
        else:
            self.bce = nn.BCEWithLogitsLoss(reduction='mean')

    def forward(self, y_pr, y_gt):
        # Your code here...


# Task 2: To see the effect of data augmentation, we will do ablation. Let's train the model without the data augmentation.

By not passing the `augmentation` parameter when initializing the `LandCoverDataset`, you can create a data loader that does not contain the augmented image data. However, in this notebook (for educational purpose), just to resize the image, we created a separate function (`get_training_no_augmentation()`). Passing an image without resizing it will cause memory error as the image size is bigger than the Colab's GPU capacity.


## 2.1 Dataset and dataloaders

In [17]:
# Get train and val dataset instances
train_dataset_without_aug = LandCoverDataset(train_df,augmentation=get_training_no_augmentation(),
                                                  preprocessing=get_preprocessing(preprocessing_fn),
                                                   class_rgb_values=select_class_rgb_values)

valid_dataset = LandCoverDataset(
    valid_df,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_rgb_values=select_class_rgb_values,
)

# Create test dataset instance
test_dataset = LandCoverDataset(
    valid_df,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_rgb_values=select_class_rgb_values,
)

# Create dataloaders for train, val and test datasets
train_loader = DataLoader(train_dataset_without_aug, batch_size=4, shuffle=True, num_workers=2)
valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=4)
test_dataloader = DataLoader(test_dataset)

Visualise test dataset

In [None]:
# test dataset for visualization (without preprocessing augmentations & transformations)
test_dataset_vis = LandCoverDataset(
    valid_df,
    augmentation=get_validation_augmentation(),
    class_rgb_values=select_class_rgb_values,
)

# get a random test image/mask index
random_idx = random.randint(0, len(test_dataset_vis)-1)
image, mask = test_dataset_vis[random_idx]

visualize(
    original_image = image,
    ground_truth_mask = colour_code_segmentation(reverse_one_hot(mask), select_class_rgb_values),
    one_hot_encoded_mask = reverse_one_hot(mask)
)


## 2.2 Training setup

In [None]:
# Set flag to train the model or not. If set to 'False', only prediction is performed.
TRAINING = True

# Set num of epochs
EPOCHS = 1

# Set device: `cuda` for GPU or `cpu` for CPU
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define loss function used during training
# Set your loss function here... 
loss = None

# Define evaluation metrics
metrics = [
    smp.utils.metrics.IoU(threshold=0.5),
]

# Define optimizer
optimizer = torch.optim.Adam([
    dict(params=model.parameters(), lr=0.00008),
])

# Define learning rate scheduler (not used in this NB)
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer, T_0=1, T_mult=2, eta_min=5e-5,
)

# # load best saved model checkpoint from previous commit (if present)
# if os.path.exists('../input/deepglobe-land-cover-classification-deeplabv3/best_model.pth'):
#     model = torch.load('../input/deepglobe-land-cover-classification-deeplabv3/best_model.pth', map_location=DEVICE)
#     print('Loaded pre-trained DeepLabV3+ model!')

Setup training process for one epoch using Pytorch helper class `smp.utils.train.TrainEpoch`.

This helps handle the forward pass, loss calculation, backpropagation, and weight updates for each batch of images in the training dataset.

In [20]:
train_epoch = smp.utils.train.TrainEpoch(
    model,
    loss=loss,
    metrics=metrics,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

## 2.3 Train your model

In [21]:
torch.cuda.empty_cache()

In [22]:
print(DEVICE)

cuda


> **Note**: you can train your own model, but this will take some time. Instead, you can use one of the pre-trained models provided which have been trained over 30 epochs. 

> Skip to section 2.4 to load a pre-trained model

In [None]:
%%time

if TRAINING:

    best_iou_score = 0.0
    train_logs_list, valid_logs_list = [], []

    for i in range(0, EPOCHS):

        # Perform training & validation
        print('\nEpoch: {}'.format(i))
        train_logs = train_epoch.run(train_loader)
        valid_logs = valid_epoch.run(valid_loader)
        train_logs_list.append(train_logs)
        valid_logs_list.append(valid_logs)

        # Save model if a better val IoU score is obtained
        if best_iou_score < valid_logs['iou_score']:
            best_iou_score = valid_logs['iou_score']
            torch.save(model, './no_aug_30_epochs.pth')
            print('Model saved!')

## 2.4 Evaluate your model

> **Hint**: load a pre-trained model for better results. 

In [None]:
# load best model (or pretrained model)
if os.path.exists('/content/no_aug_30_epochs.pth'):
    best_model = torch.load('/content/no_aug_30_epochs.pth', weights_only=False, map_location=DEVICE)
    print('Loaded Unet model from this run.')

In [None]:
test_epoch = smp.utils.train.ValidEpoch(
    best_model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

valid_logs = test_epoch.run(test_dataloader)
print("Evaluation on Test Data: ")
print(f"Mean IoU Score: {valid_logs['iou_score']:.4f}")
print(f"Mean Dice Loss: {valid_logs['dice_loss']:.4f}")

In [26]:
sample_preds_folder = 'sample_predictions/'
if not os.path.exists(sample_preds_folder):
    os.makedirs(sample_preds_folder)

Visualise the predictions output on some of the test images

In [None]:
for idx in range(7):

    image, gt_mask = test_dataset[idx]
    image_vis = test_dataset_vis[idx][0].astype('uint8')
    x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)

    # Predict test image
    pred_mask = best_model(x_tensor)
    pred_mask = pred_mask.detach().squeeze().cpu().numpy()

    # Convert pred_mask from `CHW` format to `HWC` format
    pred_mask = np.transpose(pred_mask,(1,2,0))

    # Get prediction channel corresponding to foreground
    pred_urban_land_heatmap = pred_mask[:,:,select_classes.index('urban_land')]
    pred_mask = colour_code_segmentation(reverse_one_hot(pred_mask), select_class_rgb_values)

    # Convert gt_mask from `CHW` format to `HWC` format
    gt_mask = np.transpose(gt_mask,(1,2,0))
    gt_mask = colour_code_segmentation(reverse_one_hot(gt_mask), select_class_rgb_values)
    cv2.imwrite(os.path.join(sample_preds_folder, f"sample_pred_{idx}.png"), np.hstack([image_vis, gt_mask, pred_mask])[:,:,::-1])

    visualize(
        original_image = image_vis,
        ground_truth_mask = gt_mask,
        predicted_mask = pred_mask,
        pred_urban_land_heatmap = pred_urban_land_heatmap
    )

**Task**: write functions to compute evaluation metrics (IoU, DICE) in each class. 

In [None]:
# per class metrics
     

def evaluate_per_class_metrics(model, dataloader, device, class_names):
    """
    Compute the average Dice Score and IoU per class for the best model.

    Args:
        model (torch.nn.Module): The trained segmentation model.
        dataloader (torch.utils.data.DataLoader): Dataloader for test data.
        device (torch.device): Device (CPU or CUDA).
        class_names (list): List of class names.

    Returns:
        pd.DataFrame: DataFrame containing per-class Dice Score and IoU.
    """
    model.eval()

    # dict to store per-class dice scores 
    dice_scores = {cls: [] for cls in class_names}

    # dict to store per-class iou
    iou_scores = {cls: [] for cls in class_names}

    with torch.no_grad():
        for images, masks in dataloader:
            images, masks = images.to(device), masks.to(device)

            # Forward pass
            preds = model(images)
            preds = torch.sigmoid(preds)  # Ensure values are between 0 and 1
            preds = (preds > 0.3).float()  # Binarize predictions
            # Compute Dice Score & IoU per class
            for i, cls in enumerate(class_names):
                dice = f_score(preds[:, i], masks[:, i], beta=1).item()
                iou = None # Write a function to implement this..

                dice_scores[cls].append(dice)
                iou_scores[cls].append(iou)

    # Compute mean Dice Score & IoU per class
    avg_dice = {cls: np.mean(scores) for cls, scores in dice_scores.items()}
    avg_iou = {cls: np.mean(scores) for cls, scores in iou_scores.items()}

    # Create DataFrame
    df_metrics = pd.DataFrame({
        "Class": class_names,
        "Avg Dice Score": [avg_dice[cls] for cls in class_names],
        "Avg IoU Score": [avg_iou[cls] for cls in class_names]
    })

    return df_metrics


# Run the evaluation
# Your code here...

# Task 3: Train the model with the data augmentation

## 3.1 Dataset and dataloaders

In [36]:
# Get train and val dataset instances with augmented data
train_dataset = LandCoverDataset(
    train_df,
    augmentation=get_training_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_rgb_values=select_class_rgb_values,
)

valid_dataset = LandCoverDataset(
    valid_df,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_rgb_values=select_class_rgb_values,
)

test_dataset = LandCoverDataset(
    valid_df,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_rgb_values=select_class_rgb_values,
)


# Get train and val data loaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=2)
valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=4)
test_dataloader = DataLoader(test_dataset)

## 3.2 Training setup

In [None]:
# Set flag to train the model or not. If set to 'False', only prediction is performed (using an older model checkpoint)
TRAINING = True

# Set num of epochs
EPOCHS = 1

# Set device: `cuda` or `cpu`
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# define loss function
# loss = smp.utils.losses.DiceLoss()
loss = DiceLoss()

# loss = BCEDiceLoss()

# define metrics
metrics = [
    smp.utils.metrics.IoU(threshold=0.5),
]

# define optimizer
optimizer = torch.optim.Adam([
    dict(params=model.parameters(), lr=0.00008),
])

# define learning rate scheduler (not used in this NB)
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer, T_0=1, T_mult=2, eta_min=5e-5,
)



In [38]:
train_epoch = smp.utils.train.TrainEpoch(
    model,
    loss=loss,
    metrics=metrics,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

## 3.3 Train your model

In [39]:
torch.cuda.empty_cache()

> **Note**: once again, the training setup is shown, but you can use one of the pre-trained models. 

In [None]:
%%time

if TRAINING:

    best_iou_score = 0.0
    train_logs_list, valid_logs_list = [], []

    for i in range(0, EPOCHS):

        # Perform training & validation
        print('\nEpoch: {}'.format(i))
        train_logs = train_epoch.run(train_loader)
        valid_logs = valid_epoch.run(valid_loader)
        train_logs_list.append(train_logs)
        valid_logs_list.append(valid_logs)

        # Save model if a better val IoU score is obtained
        if best_iou_score < valid_logs['iou_score']:
            best_iou_score = valid_logs['iou_score']
            torch.save(model, '/content/aug_30_epochs.pth')
            print('Model saved!')

## 3.4 Evaluate your model

In [None]:
# load best saved model checkpoint from the current run
if os.path.exists('/content/aug_30_epochs.pth'):
    best_model = torch.load('/content/aug_30_epochs.pth', weights_only=False, map_location=DEVICE)
    print('Loaded Unet model from this run.')


Loaded Unet model from this run.


Visualise the predictions output on some of the test images

In [None]:
for idx in range(7):

    image, gt_mask = test_dataset[idx]
    image_vis = test_dataset_vis[idx][0].astype('uint8')
    x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)

    # Predict test image
    pred_mask = best_model(x_tensor)
    pred_mask = pred_mask.detach().squeeze().cpu().numpy()

    # Convert pred_mask from `CHW` format to `HWC` format
    pred_mask = np.transpose(pred_mask,(1,2,0))

    # Get prediction channel corresponding to foreground
    pred_urban_land_heatmap = pred_mask[:,:,select_classes.index('urban_land')]
    pred_mask = colour_code_segmentation(reverse_one_hot(pred_mask), select_class_rgb_values)
    
    # Convert gt_mask from `CHW` format to `HWC` format
    gt_mask = np.transpose(gt_mask,(1,2,0))
    gt_mask = colour_code_segmentation(reverse_one_hot(gt_mask), select_class_rgb_values)
    cv2.imwrite(os.path.join(sample_preds_folder, f"sample_pred_{idx}.png"), np.hstack([image_vis, gt_mask, pred_mask])[:,:,::-1])

    visualize(
        original_image = image_vis,
        ground_truth_mask = gt_mask,
        predicted_mask = pred_mask,
        pred_urban_land_heatmap = pred_urban_land_heatmap
    )

In [None]:
test_epoch = smp.utils.train.ValidEpoch(
    best_model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

valid_logs = test_epoch.run(test_dataloader)
print("Evaluation on Test Data: ")
print(f"Mean IoU Score: {valid_logs['iou_score']:.4f}")
print(f"Mean Dice Loss: {valid_logs['dice_loss']:.4f}")

Inspect per-class metrics

In [None]:
# Your code here...

Now, compare results with and without augmentation

In [None]:
# compare with augmentation to without augmentation:
# Merge results into a single DataFrame for better comparison
df_comparison = pd.merge(df_results_no_aug, df_results_aug, on="Class", suffixes=("_No_Aug", "_Aug"))

# Rename columns for clarity
df_comparison.rename(columns={
    "Avg Dice Score_No_Aug": "Dice Score (No Aug)",
    "Avg Dice Score_Aug": "Dice Score (Aug)",
    "Avg IoU Score_No_Aug": "IoU Score (No Aug)",
    "Avg IoU Score_Aug": "IoU Score (Aug)"
}, inplace=True)

df_comparison

# Task 4: Loss comparison

### Now, let's try with different loss function: Dice Loss vs BCE Loss.

You can try implementing or importing different loss functions from pytorch library.  
Also, here's a survey for loss functions used for image segmentation: https://arxiv.org/pdf/2006.14822.pdf

In [None]:
# Get train and val dataset instances with augmented data
train_dataset = LandCoverDataset(
    train_df,
    augmentation=get_training_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_rgb_values=select_class_rgb_values,
)

valid_dataset = LandCoverDataset(
    valid_df,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_rgb_values=select_class_rgb_values,
)

test_dataset = LandCoverDataset(
    valid_df,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    class_rgb_values=select_class_rgb_values,
)


# Get train and val data loaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=2)
valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=4)
test_dataloader = DataLoader(test_dataset)

In [None]:
# Set flag to train the model or not. If set to 'False', only prediction is performed (using an older model checkpoint)
TRAINING = True

# Set num of epochs
EPOCHS = 1

# Set device: `cuda` or `cpu`
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# define loss function
loss = None

# define metrics
metrics = [
    smp.utils.metrics.IoU(threshold=0.5),
]

# define optimizer
optimizer = torch.optim.Adam([
    dict(params=model.parameters(), lr=0.00008),
])

# define learning rate scheduler (not used in this NB)
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer, T_0=1, T_mult=2, eta_min=5e-5,
)


In [None]:
train_epoch = smp.utils.train.TrainEpoch(
    model,
    loss=loss,
    metrics=metrics,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

In [None]:
torch.cuda.empty_cache()

In [None]:
%%time

if TRAINING:

    best_iou_score = 0.0
    train_logs_list, valid_logs_list = [], []

    for i in range(0, EPOCHS):

        # Perform training & validation
        print('\nEpoch: {}'.format(i))
        train_logs = train_epoch.run(train_loader)
        valid_logs = valid_epoch.run(valid_loader)
        train_logs_list.append(train_logs)
        valid_logs_list.append(valid_logs)

        # Save model if a better val IoU score is obtained
        if best_iou_score < valid_logs['iou_score']:
            best_iou_score = valid_logs['iou_score']
            torch.save(model, './bceloss_30_epochs.pth')
            print('Model saved!')

> **Hint**: use a pre-trained model. 

In [None]:
if os.path.exists('./bceloss_30_epochs.pth'):
    best_model = torch.load('./bceloss_30_epochs.pth', weights_only=False, map_location=DEVICE)
    print('Loaded Unet model from this run.')

In [None]:
test_epoch = smp.utils.train.ValidEpoch(
    best_model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

valid_logs = test_epoch.run(test_dataloader)
print("Evaluation on Test Data: ")
print(f"Mean IoU Score: {valid_logs['iou_score']:.4f}")
print(f"Mean Dice Loss: {valid_logs['bce_dice_loss']:.4f}")

Same as before, evaluate per-class metrics...

In [None]:
# Your code here...

Now, compare the results from the two different loss functions

In [None]:
# compare with dice to bce:
# Merge results into a single DataFrame for better comparison
df_comparison = pd.merge(df_results_aug, df_results_bce, on="Class", suffixes=("_Dice", "_BCE"))

# Rename columns for clarity
df_comparison.rename(columns={
    "Avg Dice Score_Dice": "Dice Score (Dice)",
    "Avg Dice Score_BCE": "Dice Score (BCE)",
    "Avg IoU Score_Dice": "IoU Score (Dice)",
    "Avg IoU Score_BCE": "IoU Score (BCE)"
}, inplace=True)

df_comparison

**Discussion**  
We have trained the same model with two different losses: Diceloss and BCEloss.

What difference do you see from the IOU score?   

# Task 5: compute the average area difference between actual and predicted classes


Hint: base area on pixel.
https://stackoverflow.com/questions/58068315/calculate-the-area-of-the-masks-in-pixels-in-grey-scale-images-with-python

**Question**: What is the error of your predictions in terms of water area/surface?

In [None]:
def compute_average_area_difference(model, dataloader, device, class_names):
    """
    Compute the average area difference (per class) between predicted and ground truth masks.

    Args:
        model (torch.nn.Module): Trained segmentation model.
        dataloader (torch.utils.data.DataLoader): Test dataloader.
        device (torch.device): CPU or GPU.
        class_names (list): List of class names.

    Returns:
        pd.DataFrame: Table containing average area difference per class.
    """
    model.eval()

    # Initialize dictionary to store sum of differences
    total_area_diffs = {cls: 0 for cls in class_names}
    num_test_images = len(dataloader.dataset)  # Get total test images

    with torch.no_grad():
        for images, masks in dataloader:
            images = images.to(device)
            masks = masks.cpu().numpy()  # Move ground truth masks to CPU
            masks = (masks > 0).astype(np.uint8)  # Ensure binary ground truth masks

            # Get predictions
            preds = model(images)
            preds = torch.sigmoid(preds)  # Convert logits to probabilities
            preds = (preds > 0.5).float().cpu().numpy()  # Convert to binary masks

            # Loop over batch
            batch_size = images.shape[0]
            for b in range(batch_size):
                # Iterate through each class
                for i, cls in enumerate(class_names):
                    # Extract single-channel binary masks for the class
                    pred_mask = preds[b, i].astype(np.uint8)  # Predicted class mask
                    gt_mask = masks[b, i].astype(np.uint8)  # GT class mask

                    # Compute absolute difference in area
                    
                    # Your code here... 

    # Compute mean area difference per class (normalize by number of images)
    avg_area_diff = {cls: total_area_diffs[cls] / num_test_images for cls in class_names}

    # Create a DataFrame for visualization
    df_area_difference = pd.DataFrame({
        "Class": class_names,
        "Avg Area Difference (Pixels)": [avg_area_diff[cls] for cls in class_names]
    })

    return df_area_difference

# Run the function to compute area differences
df_area_diff = compute_average_area_difference(best_model, test_dataloader, DEVICE, select_classes)

df_area_diff