# Homework 5: Scene-Dependent Image Segmentation

The goal of this homework is to implement a model that seperates foreground and background objects for a specific scene.  
We will use the highway scene from the Change Detection dataset:  
http://jacarini.dinf.usherbrooke.ca/dataset2014#

![input image](highway/input/in001600.jpg "Title") ![gt image](highway/groundtruth/gt001600.png "Title")

The groundtruth images contain 5 labels namely
- 0 : Static
- 50 : Hard shadow
- 85 : Outside region of interest
- 170 : Unknown motion (usually around moving objects, due to semi-transparency and motion blur)
- 255 : Motion

In [8]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from PIL import Image
import os
import cv2

## Task 1: Create a custom (Pytorch) dataset


https://pytorch.org/tutorials/beginner/basics/data_tutorial.html
You need to create a class that inherets from **from torch.utils.data.Dataset** and implements two methods:
- **def \_\_len\_\_(self)**:  returns the length of the dataset
- **def \_\_getitem\_\_(self, idx)**: given an integer idx returns the data x,y
    - x is the image as a float tensor of shape: $(3,H,W)$ 
    - y is the label image as a mask of shape: $(H,W)$ each pixel should contain the label 0 (background) or 1 (foreground). It is recommended to use the type torch.long
    
**Tips**:
- The first 470 images are not labeled. Just ignore these images. 
- If possible load all images into memory or evene directly to GPU to increase speed.
- You can change the resolution to fit your model or your memory
- Add data augmentation to increase the data size

In [9]:
from torch.utils.data import Dataset

def binarize(img):
    img = img > 0
    bin_img = torch.tensor(img, dtype = torch.long)
    return bin_img

class CustomImageDataset(Dataset):
    def __init__(self, imgs_dir, targets_dir, transform=None, imgs_to_skip=469):
        self.imgs_dir = imgs_dir
        self.targets_dir = targets_dir
        self.transform = transform

        # get a list of the name of the images that are in the directory:
        #  - skip first 469 images, because they are labeled
        #  - sort it, so we are sure input image matches target
        self.img_file_names = list(sorted(os.listdir(imgs_dir)[imgs_to_skip:]))
        self.target_file_names = list(sorted(os.listdir(targets_dir)[imgs_to_skip:]))
        assert len(self.img_file_names) == len(self.target_file_names) # make sure we have the same number of images and targets
    
    def __len__(self):
        return len(self.target_file_names)
    
    def __getitem__(self, idx):
        # 1. read input image
        img_path = os.path.join(self.imgs_dir, self.img_file_names[idx])
        img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
        img_tensor = torch.from_numpy(img)

        # 2. read target image
        target_path = os.path.join(self.targets_dir, self.target_file_names[idx])
        target_img = cv2.imread(target_path, cv2.IMREAD_GRAYSCALE)

        if self.transform:
            target_img = self.transform(target_img)
        target_tensor = target_img
        return img_tensor, target_tensor

In [10]:
# dataset = CustomImageDataset(imgs_dir='./highway/input', targets_dir='./highway/groundtruth', transform=binarize)
# print(f"Dataset size={len(dataset)}")
# org_img, mask_img = dataset[1220]
# plt.imshow(org_img)
# plt.show()
# plt.imshow(mask_img, cmap="gray")
# plt.show()

## Task 2: Create a custom Segmentation Model

- input: a batch of images $(B,3,H,W)$ 
- output: a batch of pixel-wise class predictions $(B,C,H,W)$, where $C=2$

Tips:
- It is recommended to use a Fully-Convolutional Neural Network, because it flexible to the input and output resolution.
- Use Residual Blocks with convolutional layers.
- Base your model on established segmentation models:
    - U-Net: https://arxiv.org/abs/1505.04597
    - Deeplab: https://arxiv.org/abs/1606.00915

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class Down(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)


class Up(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
        self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])

        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)

class AiaUNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(AiaUNet, self).__init__()

        self.inc = (DoubleConv(in_channels, 64))
        self.down1 = (Down(64, 128))
        self.down2 = (Down(128, 256))
        self.up3 = (Up(256, 128))
        self.up4 = (Up(128, 64))
        self.outc = (OutConv(64, out_channels))

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x = self.up3(x3, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

## Task 3: Create a training loop
- split data into training and test data, e.g. 80% training data and 20% test data using your custom dataset.
- Create a Dataloader for your custom datasets 
- Define a training loop for a single epoch:
    - forward pass
    - Loss function, e.g. cross entropy
    - optimizer 
    - backward pass
    - logging
- Define validation loop:
    - forward pass
    - extract binary labels, e.g. threshold or argmax for each pixel.
    - compute evaluation metrics: Accuracy, Precision, Recall and Intersection over Union for each image

## Split data test/train and create dataloader

In [12]:
from torch.utils.data import DataLoader
from torch.utils.data import random_split

dataset = CustomImageDataset(imgs_dir='./highway/input', targets_dir='./highway/groundtruth', transform=binarize)
dataset_size = len(dataset)

train_size_percentage = 0.8
train_size = int(dataset_size * train_size_percentage)
test_size = dataset_size - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

batch_size = 4 # with 8 or more I get CUDA out of memory :/
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"Train size = {len(train_dataset)}, number of batches in train = {len(train_loader)}")
print(f"Test size = {len(test_dataset)}, number of batches in test = {len(test_loader)}")

Train size = 984, number of batches in train = 246
Test size = 247, number of batches in test = 62


## Define a training loop

In [13]:
from torch.optim import Adam

def reshape_input_tensor(input_tensor):
    B, H, W, C = input_tensor.shape
    return input_tensor.reshape((B, C, H, W)).float()

def train_single_epoch(model, dataloader, epoch, optimizer, criterion,  device):
    model.train()
    losses = []
    for i, (input_tensor, target_tensor) in enumerate(dataloader):
        iteration = (train_size // batch_size) * epoch + i

        input_tensor = reshape_input_tensor(input_tensor.to(device))
        target_tensor = target_tensor.to(device)

        prediction_tensor = model(input_tensor)
        prediction_tensor = torch.squeeze(prediction_tensor)

        loss = criterion(prediction_tensor, target_tensor.float())
        if (iteration + 1) % 10 == 0:
            print(f"\tIteration nr {iteration + 1}: loss={loss}")

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.detach())
    return losses

def evaluate_on_test_set(model, dataloader, criterion,  device):
    model.eval()
    with torch.no_grad():
        test_loss = 0
        for i, (input_tensor, target_tensor) in enumerate(dataloader):
            input_tensor = reshape_input_tensor(input_tensor.to(device))
            target_tensor = target_tensor.to(device)

            prediction_tensor = model(input_tensor)
            prediction_tensor = torch.squeeze(prediction_tensor)

            loss = criterion(prediction_tensor, target_tensor.float())
            # TODO: accuracy etc. as well maybe? or just after whole training?

            test_loss += loss
    return test_loss
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Working on {device} device")
model = AiaUNet(in_channels=3, out_channels=1).to(device)
# model.load_state_dict(torch.load("PATH")).to(device) # TODO: uncomment when you want to load model
optimizer = Adam(model.parameters(), lr=0.001, betas=(0.5, 0.999))
criterion = nn.CrossEntropyLoss()

train_epoch_losses = []
test_epoch_losses = []
for epoch in range(0, 5):
    print(f"Epoch={epoch+1}")

    new_train_losses = train_single_epoch(model, train_loader, epoch, optimizer, criterion, device)
    train_epoch_losses.append(new_train_losses)

    new_test_loss = evaluate_on_test_set(model, test_loader, criterion, device)
    print(f"Test loss after {epoch+1} epoch = {new_test_loss}")
    test_epoch_losses.append(new_test_loss)

    torch.save(model.state_dict(), f"model-{epoch+1}epoch.pt")

# TODO: tune learning_rate maybe?

Working on cuda device
Epoch=1
	Iteration nr 10: loss=115.27424621582031
	Iteration nr 20: loss=147.3333740234375
	Iteration nr 30: loss=136.964111328125
	Iteration nr 40: loss=139.22616577148438
	Iteration nr 50: loss=109.6189956665039
	Iteration nr 60: loss=110.09159088134766
	Iteration nr 70: loss=92.34272766113281
	Iteration nr 80: loss=117.950439453125
	Iteration nr 90: loss=111.44789123535156
	Iteration nr 100: loss=49.79337692260742
	Iteration nr 110: loss=130.32078552246094
	Iteration nr 120: loss=124.33794403076172
	Iteration nr 130: loss=148.1920166015625
	Iteration nr 140: loss=117.60888671875
	Iteration nr 150: loss=113.54073333740234
	Iteration nr 160: loss=31.070484161376953
	Iteration nr 170: loss=86.25753021240234
	Iteration nr 180: loss=127.16291809082031
	Iteration nr 190: loss=101.72335052490234
	Iteration nr 200: loss=83.99574279785156
	Iteration nr 210: loss=218.7068328857422
	Iteration nr 220: loss=90.38179016113281
	Iteration nr 230: loss=110.47145080566406
	Iter

## TODO: evaluation

In [29]:
print("\nTest losses")
for l in test_epoch_losses:
    print(l.item())
# TODO: code for evaluation...


Test losses
6533.568359375
6214.29296875
6102.3525390625
6095.54052734375
6081.53125


## Task 4: Small Report of your model and training
- visualize training and test error over each epoch
- report the evaluation metrics of the final model