<a href="https://colab.research.google.com/github/unknownpgr/road-simulator/blob/master/src/training/02_Segmentation_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 02. Segmentation Model
The goal of this script is to implement pixel segmentation model for lane detection.

## Question
I used torch.nn.BCEWithLogitsLoss for loss function. What exactly it is?

## Import Libraries 

In [1]:
# Platform
from google.colab import drive
from IPython.display import clear_output

# Install required library
!pip install segmentation-models-pytorch

# Data preprocessing
import os
import cv2
import random
import numpy as np
import pandas as pd
import sklearn

# Training
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as T
import segmentation_models_pytorch as smp
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import KFold
from tqdm import tqdm

# Visualize
import matplotlib.pyplot as plt

clear_output()

## Prepare Dataset
Run this block after mounting Google Dirve.

This block will remove existing files, copy dataset zip file from google drive to collab, and extract dataset.

In [2]:
!rm -rf /content/dataset
!rm /content/dataset.zip
!cp "/content/drive/MyDrive/[2021]Computer Vision ML/data_segmentation.zip" /content/dataset.zip
!unzip /content/dataset.zip -d /content/dataset
clear_output()

## Construct Traning Model

In [3]:
class U_Net(nn.Module):
    def __init__(self):
        super(U_Net, self).__init__()

        # Define model structure at once by using nn.ModuleList.
        # Even though only one model is used, use nn.ModuleList for further extension.
        self.layers = nn.ModuleList([
                                    #  Use Unet as pixel classifier
                                     smp.Unet(
                                         encoder_name="resnet34",        
                                        #  encoder_weights="imagenet",     
                                         in_channels=1,                  
                                         classes=4
                                     )
                                     ])

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

# Test model. Not to affect global variables, wrap it with function.
def test():
    model_test = U_Net()
    print(model_test)
test()

Downloading: "https://download.pytorch.org/models/resnet34-333f7ec4.pth" to /root/.cache/torch/hub/checkpoints/resnet34-333f7ec4.pth


HBox(children=(FloatProgress(value=0.0, max=87306240.0), HTML(value='')))


U_Net(
  (layers): ModuleList(
    (0): Unet(
      (encoder): ResNetEncoder(
        (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
        (layer1): Sequential(
          (0): BasicBlock(
            (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (relu): ReLU(inplace=True)
            (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          )
          (1): BasicBlock(
            (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), p

## Configure Dataset Loader
`labels` parameter of `DatasetCustom` constructor is a dataframe of tuples `(file name, position, angle)`.

In [4]:
IMAGE_INPUT_SIZE = 128
IMAGE_LABEL_SIZE = 128

class ToTensor():
    def __call__(self,sample):
        image, label = sample
        # Swap color axis because axis order is:
        # Numpy image: H, W, C
        # Torch image: C, H, W
        image = image.transpose((2, 0, 1))
        label = label.transpose((2, 0, 1))
        return (torch.FloatTensor(image), torch.FloatTensor(label))

class DatasetCustom(Dataset):

    def __init__(self, metadata, transforms=[ToTensor()]):
        '''
        There was an error when metadata columns are not wrapped with `list`.
        By converting metadata type to list, problem solved.
        Why?
        '''
        self.path_input = list(metadata['input'])
        self.path_label = list(metadata['label'])
        self.transforms = T.Compose(transforms)
        self.length = len(metadata)

    def __len__(self):
        return self.length
  
    def __getitem__(self, index):
        path_input = self.path_input[index].replace('\\','/')
        path_label = self.path_label[index].replace('\\','/')

        # Because the input is an grayscale image, only take the first channel.
        input = cv2.imread(path_input)[:,:,0].astype('float')/255
        input = cv2.resize(input,(IMAGE_INPUT_SIZE,IMAGE_INPUT_SIZE))
        input = input.reshape([IMAGE_INPUT_SIZE,IMAGE_INPUT_SIZE,1])
        
        # Convert ecah pixel in the label to one-hot vector.
        label_raw = cv2.imread(path_label).astype('float')[:,:,1]
        label_raw = cv2.resize(label_raw,(IMAGE_LABEL_SIZE, IMAGE_LABEL_SIZE))

        c1 = label_raw > 200                                            # Lane
        c2 = np.logical_and(133 > label_raw, label_raw > 123)           # Obstacle
        c3 = label_raw < 20                                             # Ground
        c4 = np.logical_not(np.logical_or(np.logical_or(c1, c2), c3))   # Else

        label = np.zeros([IMAGE_LABEL_SIZE, IMAGE_LABEL_SIZE, 4], dtype=np.float)
        label[:,:,0][c1] = 1
        label[:,:,1][c2] = 1
        label[:,:,2][c3] = 1
        label[:,:,3][c4] = 1
        
        sample = (input, label)
        sample = self.transforms(sample)

        return sample

## Define Dataset and Data Loader
Make traning dataset and validation dataset by splitting whole label into two parts.

`label.csv` is a list of tuples `(file name, position, angle)`.

In [5]:
# Define dataset
ROOT = 'dataset'
VALIDATION_RATIO = 0.2

labels = pd.read_csv(os.path.join(ROOT, "meta.csv"))
labels = sklearn.utils.shuffle(labels)
labels = ROOT+'\\'+labels

valid_count = int(len(labels)*VALIDATION_RATIO)

train_dataset = DatasetCustom(labels[valid_count:])
valid_dataset = DatasetCustom(labels[:valid_count])

print('Train dataset:',len(train_dataset))
print('Valid dataset:',len(valid_dataset))

# Define data loaders.
train_data_loader = DataLoader(
    train_dataset,
    batch_size = 256,
    shuffle = True,
    num_workers = 2
)

valid_data_loader = DataLoader(
    valid_dataset,
    batch_size = 32,
    shuffle = True,
    num_workers = 2
)

Train dataset: 2923
Valid dataset: 730


## Set Traning Device
Use GPU if possible. Else, use CPU instead.

In [6]:
is_cuda_available = torch.cuda.is_available()
device = torch.device("cuda:0" if is_cuda_available else "cpu")

if is_cuda_available:
    print('CUDA is available and the device was set to GPU.')
else:
    print('CUDA is not available and the device was set to CPU.')

CUDA is available and the device was set to GPU.


## Configure Traning Environment
Traning environment configuration part and traning part are spearated so that traning can be done multiple times without reinitializing model.

In [7]:
# Define  model and move it to traning device.
model = U_Net()
model.to(device)

U_Net(
  (layers): ModuleList(
    (0): Unet(
      (encoder): ResNetEncoder(
        (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
        (layer1): Sequential(
          (0): BasicBlock(
            (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (relu): ReLU(inplace=True)
            (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          )
          (1): BasicBlock(
            (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), pa

In [8]:
# Define optimzer.
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)    

# Define learning rate scheduler. It will automatically adjust learning rate.
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size = 5,
                                               gamma = 0.75)

# Define loss function.
criterion = torch.nn.BCEWithLogitsLoss()

## Train Model
valid_loss_min should be sustained so that it is in separated block. The training part below can be run multiple times.

In [9]:
min_loss = float('inf')

In [None]:
try:
    for epoch in range(1, 501):
    
        with tqdm(train_data_loader, unit="batch") as train_bar:
            train_bar.set_description(f"Train Epoch {epoch}")

            # Train one epoch
            train_loss_list = []
            for sample in train_bar:

                # For each sample(batch), initialize gradients.
                optimizer.zero_grad()

                # Split images and labels, and move it to device.
                images, labels = sample

                images = images.to(device)
                labels = labels.to(device)

                # Set model to training mode.
                model.train()

                # Enable gradients.
                with torch.set_grad_enabled(True):
                    # Predict results.
                    predicts  = model(images)

                    # Calculate loss.
                    loss = criterion(predicts, labels)

                    # Update delta with back-propagation.
                    loss.backward()

                    # Training model with optimzer.
                    optimizer.step()

                # Add loss(which is just single number) to train loss list.
                train_loss_list.append(loss.item())

                # Get average of loss and display it on progress bar.
                train_loss = np.mean(train_loss_list)
                train_bar.set_postfix(train_loss = train_loss)
                
        # Adjust learning rate after training one epoch.
        lr_scheduler.step()

        # Calculate validation score after training one epoch.
        with tqdm(valid_data_loader, unit="batch") as valid_bar:
            valid_bar.set_description(f"Valid Epoch {epoch}")
    
            valid_loss_list = []
            for sample in valid_bar:
                optimizer.zero_grad()

                images, labels = sample
                images = images.to(device)
                labels = labels.to(device)

                # Unlike traning, set model to evaluation mode and disable gradients.
                model.eval()
                with torch.no_grad():
                    predicts  = model(images)
                    loss = criterion(predicts, labels)
                    valid_loss_list.append(loss.item())

                valid_loss = np.mean(valid_loss_list)
                valid_bar.set_postfix(valid_loss = valid_loss)

        valid_loss = np.mean(valid_loss_list)
        if valid_loss < min_loss:
            min_loss = valid_loss
            model_name = "unet"
            path = "/content/drive/MyDrive/[2021]Computer Vision ML/"
            torch.save(model, f'{path}[{model_name}].pth')

except KeyboardInterrupt:
    clear_output()
    print('Learning finished by keyboard inturrupt.')

Train Epoch 1: 100%|██████████| 12/12 [00:12<00:00,  1.03s/batch, train_loss=0.334]
Valid Epoch 1: 100%|██████████| 23/23 [00:01<00:00, 16.69batch/s, valid_loss=0.257]
Train Epoch 2: 100%|██████████| 12/12 [00:12<00:00,  1.05s/batch, train_loss=0.207]
Valid Epoch 2: 100%|██████████| 23/23 [00:01<00:00, 16.22batch/s, valid_loss=0.16]
Train Epoch 3: 100%|██████████| 12/12 [00:12<00:00,  1.06s/batch, train_loss=0.133]
Valid Epoch 3: 100%|██████████| 23/23 [00:01<00:00, 16.14batch/s, valid_loss=0.111]
Train Epoch 4: 100%|██████████| 12/12 [00:12<00:00,  1.08s/batch, train_loss=0.0929]
Valid Epoch 4: 100%|██████████| 23/23 [00:01<00:00, 16.26batch/s, valid_loss=0.087]
Train Epoch 5: 100%|██████████| 12/12 [00:13<00:00,  1.10s/batch, train_loss=0.073]
Valid Epoch 5: 100%|██████████| 23/23 [00:01<00:00, 16.22batch/s, valid_loss=0.0709]
Train Epoch 6: 100%|██████████| 12/12 [00:13<00:00,  1.11s/batch, train_loss=0.0622]
Valid Epoch 6: 100%|██████████| 23/23 [00:01<00:00, 16.27batch/s, valid_lo

## Test Trained Model

In [None]:
def to_rgb(array):
    array = np.swapaxes(array,0,1)
    array = np.swapaxes(array,1,2)
    array = array[:,:,0:3]
    array[array<0]=0
    array[array>1]=1
    return array

def test_model():
    # Get sample data
    index, sample = next(enumerate(valid_data_loader))
    images, labels = sample

    # Move it to device
    images = images.to(device)

    # Predict
    optimizer.zero_grad()
    model.eval()
    with torch.no_grad():
        predicts = model(images)

    # Get images, labels and prediction results to cpu, and convert them to numpy array.
    images = images.cpu().detach().numpy()
    predicts = predicts.cpu().detach().numpy()

    # Check value range
    print(np.max(images),np.max(predicts),np.max(labels.numpy()))
    print(np.min(images),np.min(predicts),np.min(labels.numpy()))

    # For each cases,
    for i in range(min(len(labels),20)):
        
        f, axarr = plt.subplots(1,4) 

        # Display image
        axarr[0].imshow(images[i][0])
        axarr[0].title.set_text(f"Input {i+1}")
        axarr[0].axis('off')

        axarr[1].imshow(to_rgb(predicts[i]))
        axarr[1].title.set_text(f"Prediction {i+1}")
        axarr[1].axis('off')
        
        axarr[2].imshow(to_rgb(labels[i]))
        axarr[2].title.set_text(f"Label {i+1}")
        axarr[2].axis('off')

        axarr[3].imshow(to_rgb(np.abs(predicts[i]-labels[i].numpy())))
        axarr[3].title.set_text(f"Error {i+1}")
        axarr[3].axis('off')

test_model()