<a href="https://colab.research.google.com/github/karimassi/road-segmentation/blob/test/CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%matplotlib inline
import matplotlib.image as mpimg
import numpy as np
import matplotlib.pyplot as plt
import os,sys
from PIL import Image
from torch.utils.data import Dataset, random_split
import torchvision.io as io
import torch
import torchvision
from torch import nn

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True, use_metadata_server=False)

Mounted at /content/drive


In [10]:
def accuracy(prediction, label):
    """
    Compute the accuracy of the prediction
    
    @param prediction : the prediction of the model, int64 tensor of shape (batch_size), either 0 or 1
    @param label      : the labels of the data     , int64 tensor of shape (batch_size), either 0 or 1
    """
    
    batch_size = label.size(0)
    correct = torch.sum(prediction == label)
    return (correct / batch_size).cpu()

def F1_score(prediction, label):
    """
    Compute the F1-score of the prediction
    
    @param prediction : the prediction of the model, int64 tensor of shape (batch_size), either 0 or 1
    @param label      : the labels of the data     , int64 tensor of shape (batch_size), either 0 or 1
    """
    
    batch_size = label.size(0)
    
    precision = (torch.sum(prediction * label) / torch.sum(prediction))
    recall = (torch.sum(prediction * label) / torch.sum(label))
    
    F1 = 2 * precision * recall / (precision + recall)
    return F1.cpu().item()


In [5]:
def train(model, criterion, dataset_train, dataset_test, optimizer, num_epochs):
    """
    Train the given model
    
    @param model         : torch.nn.Module
    @param criterion     : torch.nn.modules.loss._Loss
    @param dataset_train : torch.utils.data.DataLoader
    @param dataset_test  : torch.utils.data.DataLoader
    @param optimizer     : torch.optim.Optimizer
    @param num_epochs    : int
    """
    print("Starting training")
    model.to(device)
    for epoch in range(num_epochs):
        # Train an epoch
        model.train()
        for batch_x, batch_y in dataset_train:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)

            # Evaluate the network (forward pass)
            batch_pred = model(batch_x)
            loss = criterion(batch_pred, batch_y)

            # Compute the gradient
            optimizer.zero_grad()
            loss.backward()

            # Update the parameters of the model with a gradient step
            optimizer.step()

        # Test the quality on the test set
        model.eval()
        accuracies_test = []
        f1_scores_test = []
        for batch_x, batch_y in dataset_test:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)

            # Evaluate the network (forward pass)
            prediction = model(batch_x)
            accuracies_test.append(accuracy(prediction, batch_y))
            f1_scores_test.append(F1_score(prediction, batch_y))

        print(f"Epoch {epoch + 1 : 2} | Test accuracy : {np.mean(accuracies_test):.5} | Test F1 : {np.mean(f1_scores_test):.5}")

In [6]:
treshold = 0.5

class PatchModel(nn.Module):
    """
    Model that tells if a 16 x 16 RGB (as a 3 x 16 x 16 tensor) correspond to a road (1) or not (0)
    """
    def __init__(self):
        super().__init__()
        
        # 3 channels 16 x 16
        self.conv1 = nn.Conv2d(
            in_channels=3,
            out_channels=10,
            kernel_size=5
        )
        # 10 channels 12 x 12 (12 = 16 - (kernel_size - 1))
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        # 10 channels 6 x 6 (6 = 12 / kernel_size)
        self.conv2 = nn.Conv2d(
            in_channels=10,
            out_channels=20,
            kernel_size=3
        )
        # 20 channels 4 x 4 (4 = 6 - (kernel_size - 1))
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        # 20 channels 2 x 2 (2 = 4 / kernel_size)
        
        self.lin1 = nn.Linear(
            in_features=20 * 2 * 2,
            out_features=10
        )
        self.lin2 = nn.Linear(
            in_features=10,
            out_features=1
        )
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.relu(self.pool1(self.conv1(x)))
        x = self.relu(self.pool2(self.conv2(x)))
        x = x.view(-1, 20 * 2 * 2)
        x = self.relu(self.lin1(x))
        x = self.sigmoid(self.lin2(x))
        # If we are in testing mode then the output should be either 0 or 1
        if not self.training:
            x = 1 * (x > treshold)
        return x.view(-1)

In [7]:
root_dir = "/content/drive/Shareddrives/road-segmentation/data/"
img_path = root_dir + "training/images/"
gt_path = root_dir + "training/groundtruth/"
test_path = "test_set_images/"

In [18]:
class PatchedSatImagesDataset(Dataset):
    img_size = (400, 400)
    patch_size = (16, 16)
    
    def __init__(self, training_img_path, training_gt_path, foreground_threshold = None, transform = None):
        """
        Dataset for the traing data, this dataset is already patched
        
        @param training_img_path    : (string)             path to the training sat images
        @param training_gt_path     : (string)             path to the groundtruth images
        @param foreground_threshold : (float, optional)     if a value is provided then the label is 1 if the mean of the patch is greater than this value. 
                                                           if no value is provided, the mean is returned as label
        @param transform            : (callable, optional) a transformation to apply to each patch before returning it
        """
        super().__init__()
        
        # self.files = [{"sat" : io.read_image(training_img_path + f), "gt" : torch.tensor(mpimg.imread(training_gt_path + f))} for f in [f"satImage_{i + 1:03}.png" for i in range(10)]]
        self.files = [{"sat" : io.read_image(training_img_path + f), "gt" : torch.tensor(mpimg.imread(training_gt_path + f))} for f in sorted(os.listdir(training_img_path))]
        self.foreground_threshold = foreground_threshold
        self.transform = transform
    
    def patch_per_img(self):
        return (self.img_size[0] // self.patch_size[0]) * (self.img_size[1] // self.patch_size[1])
    
    def __len__(self): 
        return len(self.files) * self.patch_per_img()
        
    def __getitem__(self, idx):
        files_number = idx // self.patch_per_img()
        patch_number = idx % self.patch_per_img()
        files = self.files[files_number]
        sat_img = files["sat"]
        gt_img = files["gt"]
        row_number = patch_number // (self.img_size[0] // self.patch_size[0])
        col_number = patch_number % (self.img_size[0] // self.patch_size[0])
        
        X = sat_img[:, row_number : row_number + self.patch_size[0], col_number : col_number + self.patch_size[1]] / 255
        Y = torch.mean(gt_img[row_number : row_number + self.patch_size[0], col_number : col_number + self.patch_size[1]])
        
        if self.transform is not None:
            X = self.transform(X)
        
        if self.foreground_threshold is not None:
            if Y > self.foreground_threshold :
                Y = 1
            else :
                Y = 0
        
        return X, Y

In [19]:
num_epochs = 10
learning_rate = 1e-3
batch_size = 100

dataset = PatchedSatImagesDataset(img_path, gt_path, treshold)

data_len = len(dataset)
train_len = int(data_len * 0.7)
test_len = int(data_len * 0.3)

dataset_train, dataset_test = random_split(dataset, [train_len, test_len])

print(len(dataset_train), len(dataset_test))

dataloader_train = torch.utils.data.DataLoader(
    PatchedSatImagesDataset(img_path, gt_path),
    batch_size=batch_size,
    shuffle=True
)

dataloader_test = torch.utils.data.DataLoader(
    PatchedSatImagesDataset(img_path, gt_path),
    batch_size=batch_size,
    shuffle=True
)

# Train the logistic regression model with the Adam optimizer
criterion = torch.nn.MSELoss()
model = PatchModel().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
train(model, criterion, dataloader_train, dataloader_test, optimizer, num_epochs)


43750 18750
Starting training
Epoch  1 | Test accuracy : 0.71158 | Test F1 : nan
Epoch  2 | Test accuracy : 0.71158 | Test F1 : nan
Epoch  3 | Test accuracy : 0.71158 | Test F1 : nan
Epoch  4 | Test accuracy : 0.70525 | Test F1 : nan
Epoch  5 | Test accuracy : 0.69912 | Test F1 : 0.55745
Epoch  6 | Test accuracy : 0.6956 | Test F1 : 0.64395
Epoch  7 | Test accuracy : 0.69643 | Test F1 : 0.64751
Epoch  8 | Test accuracy : 0.70686 | Test F1 : 0.5644
Epoch  9 | Test accuracy : 0.69454 | Test F1 : 0.68143
Epoch  10 | Test accuracy : 0.70152 | Test F1 : 0.69207
