In [None]:
import math
import os
import re
import cv2
import numpy as np
import matplotlib.pyplot as plt
import random
import tensorflow as tf
import copy
from glob import glob
from random import sample
from PIL import Image

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# !unzip /content/drive/MyDrive/cil-road-segmentation-2021.zip -d /content/drive/MyDrive/cil-road-segmentation-2021

In [None]:
# mean and std of training set
mean1 = np.array([0.330, 0.327, 0.293])
std1 = np.array([0.183, 0.176, 0.175])

# flip the image horizaontally with probability = prob
def horizontal_flip(train_img, label_img, prob=0.75):
    rdn = np.random.random()
    if rdn < prob:
        return cv2.flip(train_img, 1), cv2.flip(label_img, 1)
    else:
        return train_img, label_img


# flip the image vertically with probability = prob
def vertical_flip(train_img, label_img, prob=0.75):
    rdn = np.random.random()
    if rdn < prob:
        return cv2.flip(train_img, 0), cv2.flip(label_img, 0)
    else:
        return train_img, label_img


# rotate the image by k*90 degree with probability = prob
def rotate_90s(train_img, label_img, prob=0.75):
    rdn = np.random.random()
    if rdn < prob:
        # 1<= k <= 3, rotate clockwise by 90/180/270 degree
        k = np.random.randint(low=1, high=4, size=1)[0]
        return np.rot90(train_img, k), np.rot90(label_img, k)
    else:
        return train_img, label_img


# adjust the hue of an RGB image by random factor in [-10, 10] with probability = prob
def hue_image(train_img, label_img, min_hue_factor=-10, max_hue_factor=10, prob=0.75):
    rdn = np.random.random()
    if rdn < prob:
        hsv = cv2.cvtColor(train_img, cv2.COLOR_RGB2HSV)
        h, s, v = cv2.split(hsv)
        delta = np.random.randint(low=min_hue_factor, high=max_hue_factor, size=1)[0]
        h = np.clip(h+delta, 0, 180).astype(h.dtype)
        final_hsv = cv2.merge((h, s, v))
        new_train_img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2RGB)
        return new_train_img, label_img
    else:
        return train_img, label_img


# adjust the saturation of an RGB image by random factor in [-20, 20] with probability = prob
def saturation_image(train_img, label_img, min_saturation_factor=-20, max_saturation_factor=20, prob=0.75):
    rdn = np.random.random()
    if rdn < prob:
        hsv = cv2.cvtColor(train_img, cv2.COLOR_RGB2HSV)
        h, s, v = cv2.split(hsv)
        delta = np.random.randint(low=min_saturation_factor, high=max_saturation_factor, size=1)[0]
        s = np.clip(s+delta, 0, 255).astype(h.dtype)
        final_hsv = cv2.merge((h, s, v))
        # print(h.shape)
        # print(v.shape)
        # print(s.shape)
        new_train_img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2RGB)
        return new_train_img, label_img
    else:
        return train_img, label_img


# adjust the brightness of an RGB image by random delta in [-30, 30] with probability = prob
def brightness_image(train_img, label_img, min_brightness_factor=-30, max_brightness_factor=30, prob=0.75):
    rdn = np.random.random()
    if rdn < prob:
        brightness_factor = np.random.uniform(min_brightness_factor, max_brightness_factor, 1)[0]
        new_train_img = _contrast_and_brightness(train_img, 1, brightness_factor)
        return new_train_img, label_img
    else:
        return train_img, label_img


# adjust the contrast of an RGB image by random factor in [0.6, 2.5] with probability = prob
def contrast_image(train_img, label_img, min_contrast_factor=0.6, max_contrast_factor=2.5, prob=0.75):
    rdn = np.random.random()
    if rdn < prob:
        contrast_factor = np.random.uniform(min_contrast_factor, max_contrast_factor, 1)[0]
        new_train_img = _contrast_and_brightness(train_img, contrast_factor, 0)
        return new_train_img, label_img
    else:
        return train_img, label_img


def _contrast_and_brightness(img, contrast_factor, brightness_factor):
    blank = np.zeros(img.shape, img.dtype)
    dst = cv2.addWeighted(img, contrast_factor, blank, 1-contrast_factor, brightness_factor)
    return dst


def random_scale(train_img, label_img, pad_reflect=False, probs=[0.7, 0.9, 1]):
    rdn = np.random.random()
    if rdn < probs[0]:
        return _crop_and_scale_up(train_img, label_img)
    elif rdn < probs[1]:
        return _random_shift(train_img, label_img)
    else:
        return _shrink_and_pad(train_img, label_img, pad_reflect)


def _crop_and_scale_up(train_img, label_img, crop_size=[(200, 200), (250, 250), (300, 300), (350, 350)]):
    original_shape = train_img.shape[:2]

    # cropping
    random_crop_shape = random.choice(crop_size)
    train_img, label_img = _random_crop(train_img, label_img, random_crop_shape)

    # scalse up
    train_img = cv2.resize(train_img, original_shape, interpolation=cv2.INTER_LINEAR)
    label_img = cv2.resize(label_img, original_shape, interpolation=cv2.INTER_LINEAR)

    return train_img, label_img


def _random_crop(train_img, label_img, crop_shape):
    original_shape = train_img.shape[:2]

    crop_h = original_shape[0]-crop_shape[0]
    crop_w = original_shape[1]-crop_shape[1]
    nh = random.randint(0, crop_h)
    nw = random.randint(0, crop_w)
    train_crop = train_img[nh:nh + crop_shape[0], nw:nw + crop_shape[1]]
    label_crop = label_img[nh:nh + crop_shape[0], nw:nw + crop_shape[1]]
    return train_crop, label_crop


def _random_shift(train_img, label_img):
    original_shape = train_img.shape[:2]
    max_translation = np.multiply(0.15, original_shape).astype(np.int64)

    delta_h = random.randint(-max_translation[0], max_translation[0])
    delta_w = random.randint(-max_translation[1], max_translation[1])

    train_img = _shift(_shift(train_img, delta_h, height=True), delta_w, height=False)
    label_img = _shift(_shift(label_img, delta_h, height=True), delta_w, height=False)

    return train_img, label_img


def _shift(img, delta, height):
    if delta == 0:
        return img
    translated_img = np.empty_like(img)
    if height:
        if delta >= 0:
            translated_img[:delta] = 0
            translated_img[delta:] = img[:-delta]
        elif delta < 0:
            translated_img[:delta] = img[-delta:]
            translated_img[delta:] = 0
        return translated_img
    else:
        if delta >= 0:
            translated_img[:, :delta] = 0
            translated_img[:, delta:] = img[:, :-delta]
        elif delta < 0:
            translated_img[:, :delta] = img[:, -delta:]
            translated_img[:, delta:] = 0
        return translated_img


def _shrink_and_pad(train_img, label_img, pad_reflect, shrink_range=(0.6, 0.95)):
    original_shape = train_img.shape[:2]

    random_ratio = np.random.uniform(shrink_range[0], shrink_range[1])
    train_img, label_img = _random_shrink(train_img, label_img, random_ratio)
    train_img, label_img = _random_pad(train_img, label_img, original_shape, pad_reflect)

    return train_img, label_img


def _random_shrink(train_img, label_img, ratio):
    original_shape = train_img.shape[:2]
    shrink_shape = (int(original_shape[0]*ratio), int(original_shape[1]*ratio))
    # shrink
    train_img = cv2.resize(train_img, shrink_shape, interpolation=cv2.INTER_LINEAR)
    label_img = cv2.resize(label_img, shrink_shape, interpolation=cv2.INTER_LINEAR)
    return train_img, label_img


def _random_pad(train_img, label_img, target_shape, pad_reflect):
    original_shape = train_img.shape[:2]

    # put to center and padding
    margin = np.subtract(target_shape, original_shape)

    # random translation: limited by max_ratio and remained margin
    max_translation = np.multiply(0.15, original_shape)
    max_translation = np.minimum((margin // 2), max_translation)
    max_translation = max_translation.astype(np.int64)

    # place image with random translation
    pad_top = margin[0] // 2 + random.randint(-max_translation[0], max_translation[0])
    pad_left = margin[1] // 2 + random.randint(-max_translation[1], max_translation[1])
    pad_bottom = margin[0] - pad_top
    pad_right = margin[1] - pad_left

    # padding to original size
    if pad_reflect:
        train_img = cv2.copyMakeBorder(train_img, pad_top, pad_bottom, pad_left, pad_right, cv2.BORDER_REFLECT)
        label_img = cv2.copyMakeBorder(label_img, pad_top, pad_bottom, pad_left, pad_right, cv2.BORDER_REFLECT)
    else:
        train_img = cv2.copyMakeBorder(train_img, pad_top, pad_bottom, pad_left, pad_right, cv2.BORDER_CONSTANT, value=0)
        label_img = cv2.copyMakeBorder(label_img, pad_top, pad_bottom, pad_left, pad_right, cv2.BORDER_CONSTANT, value=0)

    return train_img, label_img


# rotate the image by a minor degree in [+25, -25] with probability = prob
def random_rotate(train_img, label_img, min_angle=-25, max_angle=25, prob=0.75):
    rdn = np.random.random()
    if rdn < prob:
        random_angle = np.random.uniform(min_angle, max_angle, 1)[0]
        return _rotate_image(train_img, random_angle), _rotate_image(label_img, random_angle)
    else:
        return train_img, label_img
        # return tf.convert_to_tensor(train_img), tf.convert_to_tensor(label_img)


def _rotate_image(img, angle):
    if -1 < angle < 1:
        return img
    shape_2d = (img.shape[1], img.shape[0])
    center_2d = (img.shape[1] / 2, img.shape[0] / 2)
    rotation_matrix = cv2.getRotationMatrix2D(center_2d, angle, 1.0)
    img = cv2.warpAffine(img, rotation_matrix, shape_2d, flags=cv2.INTER_LINEAR)
    return img


def normalize(img):
    img = img.astype(np.float32) / 255.0
    img = img - mean1
    img = img / std1
    return img


def discretize(gt, threshold=40):
    # The order matters
    gt[gt < threshold] = 0
    gt[gt >= threshold] = 1
    return gt


def get_edge_mask(image):
    """ Accept image before binarization """
    edge_mask = cv2.Canny(image, 0, 255)
    edge_mask[image < 40] = 0
    edge_mask[edge_mask != 0] = 1
    return edge_mask

In [None]:
# from google.colab.patches import cv2_imshow

# train_img = cv2.imread('/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training/images/satImage_001.png')
# label_img = cv2.imread('/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training/groundtruth/satImage_001.png')
# imgs = np.hstack([train_img,label_img])
# cv2_imshow(imgs)

In [None]:
# !unzip /content/drive/MyDrive/cil-project/cil-road-segmentation-2021.zip -d /content/drive/MyDrive/cil-project/cil-road-segmentation-2021

In [None]:
# some constants
PATCH_SIZE = 16  # pixels per side of square patches
VAL_SIZE = 10  # size of the validation set (number of images)
CUTOFF = 0.25  # minimum average brightness for a mask patch to be classified as containing road

In [None]:
# # unzip the dataset, split it and organize it in folders
# if not os.path.isdir('validation'):  # make sure this has not been executed yet
#   try:
#           # !unzip /content/drive/MyDrive/cil-project/cil-road-segmentation-2021.zip -d cil-road-segmentation-2021
#           # !mv /content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training/training/* /content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training
#           # !rm -rf /content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training/training
#           # !mkdir /content/drive/MyDrive/cil-project/cil-road-segmentation-2021/validation
#           # !mkdir /content/drive/MyDrive/cil-project/cil-road-segmentation-2021/validation/images
#           # !mkdir /content/drive/MyDrive/cil-project/cil-road-segmentation-2021/validation/groundtruth
#           for img in sample(glob("/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training/images/*.png"), VAL_SIZE):
#             os.rename(img, img.replace('/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training', '/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/validation'))
#             mask = img.replace('images', 'groundtruth')
#             os.rename(mask, mask.replace('/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training', '/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/validation'))
#   except:
#       print('Please upload a .zip file containing your datasets.')

In [None]:
def load_all_from_path_255(path):
    # loads all HxW .pngs contained in path as a 4D np.array of shape (n_images, H, W, 3)
    # images are loaded as floats with values in the interval [0., 1.]
    return np.stack([np.array(Image.open(f)) for f in sorted(glob(path + '/*.png'))]).astype(np.float32)

def load_all_from_path(path):
    # loads all HxW .pngs contained in path as a 4D np.array of shape (n_images, H, W, 3)
    # images are loaded as floats with values in the interval [0., 1.]
    return np.stack([np.array(Image.open(f)) for f in sorted(glob(path + '/*.png'))]).astype(np.float32) / 255.

# paths to training and validation datasets
train_path = '/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/training'
val_path = '/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/validation'

train_images = load_all_from_path_255(os.path.join(train_path, 'images'))
train_masks = load_all_from_path_255(os.path.join(train_path, 'groundtruth'))
val_images = load_all_from_path_255(os.path.join(val_path, 'images'))
val_masks = load_all_from_path_255(os.path.join(val_path, 'groundtruth'))

In [None]:
print(train_images.shape)

(90, 400, 400, 3)


In [None]:
import tensorflow.compat.v1 as tf
from google.colab.patches import cv2_imshow
def pre_process_and_save_images(train_images, label_images):
  transformed_train_images = np.zeros(train_images.shape, dtype=np.float32)
  transformed_label_images = np.zeros(label_images.shape, dtype=np.float32)
  cnt = 0
  for train_image, label_image in zip(train_images, label_images):
      # train_image, label_image = horizontal_flip(train_image, label_image)
      # train_image, label_image = vertical_flip(train_image, label_image)
      # train_image, label_image = rotate_90s(train_image, label_image)
      train_image, label_image = random_rotate(train_image, label_image)
      # train_image, label_image = random_scale(train_image, label_image)
      # train_image, label_image = hue_image(train_image, label_image)
      # train_image, label_image = saturation_image(train_image, label_image)
      # train_image, label_image = brightness_image(train_image, label_image)
      # train_image, label_image = contrast_image(train_image, label_image)
      transformed_train_images[cnt] = train_image
      transformed_label_images[cnt] = label_image
      cnt += 1
  return np.vstack((train_images, transformed_train_images)), np.vstack((label_images, transformed_label_images))

In [None]:
train_images, train_masks = pre_process_and_save_images(train_images, train_masks)
val_images_0, val_masks_0 = pre_process_and_save_images(val_images, val_masks)
val_images, val_masks = pre_process_and_save_images(val_images_0, val_masks_0)
print(val_images.shape)

(40, 400, 400, 3)


In [None]:
val_images, val_masks = pre_process_and_save_images(val_images, val_masks)
print(val_images.shape)

(80, 400, 400, 3)


In [None]:
folder_name = '/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/original_random_rotate'
num_val = str(val_images.shape[0])
# !mkdir $folder_name
# np.save(folder_name+'/train_images.npy',train_images)
# np.save(folder_name+'/train_masks.npy',train_masks)
np.save(folder_name+'/val_images.npy',val_images)
np.save(folder_name+'/val_masks.npy',val_masks)
# !mkdir $folder_name/model
# !mkdir $folder_name/predict

In [None]:
folder_name = '/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/original_random_rotate'
num_val = str(val_images.shape[0])
# !rm -rf $folder_name
!mkdir $folder_name
np.save(folder_name+'/train_images.npy',train_images)
np.save(folder_name+'/train_masks.npy',train_masks)
np.save(folder_name+'/val_images.npy',val_images)
np.save(folder_name+'/val_masks.npy',val_masks)
!mkdir $folder_name/model
!mkdir $folder_name/predict

In [None]:
def from_array_to_pictures(pictures_array, path_suffix):
  cnt = 0
  for a in pictures_array:
    path = folder_name+path_suffix
    cv2.imwrite(path + str(cnt) + '.png', a)
    cnt += 1

In [None]:
# !rm -rf $folder_name/training
# !rm -rf $folder_name/training/images
# !rm -rf $folder_name/training/groundtruth
!rm -rf $folder_name/validation
# !rm -rf $folder_name/validation/images
# !rm -rf $folder_name/validation/groundtruth
# !mkdir $folder_name/training
# !mkdir $folder_name/training/images
# !mkdir $folder_name/training/groundtruth
!mkdir $folder_name/validation
!mkdir $folder_name/validation/images
!mkdir $folder_name/validation/groundtruth

In [None]:
# from_array_to_pictures(train_images, '/training/images/')
# from_array_to_pictures(train_masks, '/training/groundtruth/')
from_array_to_pictures(val_images, '/validation/images/')
from_array_to_pictures(val_masks, '/validation/groundtruth/')

In [None]:
pip install tensorboard

In [None]:
# import tensorflow.compat.v1 as tf
# from google.colab.patches import cv2_imshow
# def pre_process_images(train_images, label_images):
#   transformed_train_images = np.zeros(train_images.shape, dtype=np.float32)
#   transformed_label_images = np.zeros(label_images.shape, dtype=np.float32)
#   cnt = 0
#   with tf.Session() as sess:
#     for train_image, label_image in zip(train_images, label_images):
#       # train_image, label_image = horizontal_flip(train_image, label_image)
#       # train_image, label_image = vertical_flip(train_image, label_image)
#       # train_image, label_image = rotate_90s(train_image, label_image)
#       # train_image, label_image = random_rotate(train_image, label_image)
#       train_image, label_image = random_scale(train_image, label_image)
#       # train_image, label_image = hue_image(train_image, label_image)
#       # train_image, label_image = saturation_image(train_image, label_image)
#       # train_image, label_image = brightness_image(train_image, label_image)
#       # train_image, label_image = contrast_image(train_image, label_image)
#       transformed_train_images[cnt] = train_image
#       transformed_label_images[cnt] = label_image
#       cnt += 1
#   return transformed_train_images, transformed_label_images

In [None]:
def image_to_patches(images, masks=None):
    # takes in a 4D np.array containing images and (optionally) a 4D np.array containing the segmentation masks
    # returns a 4D np.array with an ordered sequence of patches extracted from the image and (optionally) a np.array containing labels
    n_images = images.shape[0]  # number of images
    h, w = images.shape[1:3]  # shape of images
    assert (h % PATCH_SIZE) + (w % PATCH_SIZE) == 0  # make sure images can be patched exactly

    h_patches = h // PATCH_SIZE
    w_patches = w // PATCH_SIZE
    patches = images.reshape((n_images, h_patches, PATCH_SIZE, h_patches, PATCH_SIZE, -1))
    patches = np.moveaxis(patches, 2, 3)
    patches = patches.reshape(-1, PATCH_SIZE, PATCH_SIZE, 3)
    if masks is None:
        return patches

    masks = masks.reshape((n_images, h_patches, PATCH_SIZE, h_patches, PATCH_SIZE, -1))
    masks = np.moveaxis(masks, 2, 3)
    labels = np.mean(masks, (-1, -2, -3)) > CUTOFF  # compute labels
    labels = labels.reshape(-1).astype(np.float32)
    return patches, labels


In [None]:
import torch
from torch import nn
from torch.utils.tensorboard import SummaryWriter
from tqdm.notebook import tqdm

def np_to_tensor(x, device):
    # allocates tensors from np.arrays
    if device == 'cpu':
        return torch.from_numpy(x).cpu()
    else:
        return torch.from_numpy(x).contiguous().pin_memory().to(device=device, non_blocking=True)

def accuracy_fn(y_hat, y):
    # computes classification accuracy
    return (y_hat.round() == y.round()).float().mean()

class ImageDataset(torch.utils.data.Dataset):
    # dataset class that deals with loading the data and making it available by index.

    def __init__(self, path, device, use_patches=True, resize_to=(400, 400)):
        self.path = path
        self.device = device
        self.use_patches = use_patches
        self.resize_to=resize_to
        self.x, self.y, self.n_samples = None, None, None
        self._load_data()

    def _load_data(self):  # not very scalable, but good enough for now
        self.x = load_all_from_path(os.path.join(self.path, 'images'))
        self.y = load_all_from_path(os.path.join(self.path, 'groundtruth'))
        # self.x, self.y = pre_process_images(self.x, self.y)
        if self.use_patches:  # split each image into patches
            self.x, self.y = image_to_patches(self.x, self.y)
        elif self.resize_to != (self.x.shape[1], self.x.shape[2]):  # resize images
            self.x = np.stack([cv2.resize(img, dsize=self.resize_to) for img in self.x], 0)
            self.y = np.stack([cv2.resize(mask, dsize=self.resize_to) for mask in self.y], 0)
        self.x = np.moveaxis(self.x, -1, 1)  # pytorch works with CHW format instead of HWC
        self.n_samples = len(self.x)

    def _preprocess(self, x, y):
        # to keep things simple we will not apply transformations to each sample,
        # but it would be a very good idea to look into preprocessing
        return x, y

    def __getitem__(self, item):
        return self._preprocess(np_to_tensor(self.x[item], self.device), np_to_tensor(self.y[[item]], self.device))
    
    def __len__(self):
        return self.n_samples

In [None]:
class Block(nn.Module):
    # a repeating structure composed of two convolutional layers with batch normalization and ReLU activations
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.block = nn.Sequential(nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=3, padding=1),
                                   nn.ReLU(),
                                   nn.BatchNorm2d(out_ch),
                                   nn.Conv2d(in_channels=out_ch, out_channels=out_ch, kernel_size=3, padding=1),
                                   nn.ReLU())

    def forward(self, x):
        return self.block(x)

        
class UNet(nn.Module):
    # UNet-like architecture for single class semantic segmentation.
    def __init__(self, chs=(3,64,128,256,512,1024)):
        super().__init__()
        enc_chs = chs  # number of channels in the encoder
        dec_chs = chs[::-1][:-1]  # number of channels in the decoder
        self.enc_blocks = nn.ModuleList([Block(in_ch, out_ch) for in_ch, out_ch in zip(enc_chs[:-1], enc_chs[1:])])  # encoder blocks
        self.pool = nn.MaxPool2d(2)  # pooling layer (can be reused as it will not be trained)
        self.upconvs = nn.ModuleList([nn.ConvTranspose2d(in_ch, out_ch, 2, 2) for in_ch, out_ch in zip(dec_chs[:-1], dec_chs[1:])])  # deconvolution
        self.dec_blocks = nn.ModuleList([Block(in_ch, out_ch) for in_ch, out_ch in zip(dec_chs[:-1], dec_chs[1:])])  # decoder blocks
        self.head = nn.Sequential(nn.Conv2d(dec_chs[-1], 1, 1), nn.Sigmoid()) # 1x1 convolution for producing the output

    def forward(self, x):
        # encode
        enc_features = []
        for block in self.enc_blocks[:-1]:
            x = block(x)  # pass through the block
            enc_features.append(x)  # save features for skip connections
            x = self.pool(x)  # decrease resolution
        x = self.enc_blocks[-1](x)
        # decode
        for block, upconv, feature in zip(self.dec_blocks, self.upconvs, enc_features[::-1]):
            x = upconv(x)  # increase resolution
            x = torch.cat([x, feature], dim=1)  # concatenate skip features
            x = block(x)  # pass through the block
        return self.head(x)  # reduce to 1 channel


def patch_accuracy_fn(y_hat, y):
    # computes accuracy weighted by patches (metric used on Kaggle for evaluation)
    h_patches = y.shape[-2] // PATCH_SIZE
    w_patches = y.shape[-1] // PATCH_SIZE
    patches_hat = y_hat.reshape(-1, 1, h_patches, PATCH_SIZE, w_patches, PATCH_SIZE).mean((-1, -3)) > CUTOFF
    patches = y.reshape(-1, 1, h_patches, PATCH_SIZE, w_patches, PATCH_SIZE).mean((-1, -3)) > CUTOFF
    return (patches == patches_hat).float().mean()

In [None]:
def train(train_dataloader, eval_dataloader, model, loss_fn, metric_fns, optimizer, n_epochs):
    # training loop
    logdir = '/content/drive/MyDrive/cil-project/tensorboard/net'
    writer = SummaryWriter(logdir)  # tensorboard writer (can also log images)

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    history = {}  # collects metrics at the end of each epoch

    for epoch in range(n_epochs):  # loop over the dataset multiple times

        # initialize metric list
        metrics = {'loss': [], 'val_loss': []}
        for k, _ in metric_fns.items():
            metrics[k] = []
            metrics['val_'+k] = []

        pbar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{n_epochs}')
        # training
        model.train()
        for (x, y) in pbar:
            optimizer.zero_grad()  # zero out gradients
            y_hat = model(x)  # forward pass
            loss = loss_fn(y_hat, y)
            loss.backward()  # backward pass
            optimizer.step()  # optimize weights

            # log partial metrics
            metrics['loss'].append(loss.item())
            for k, fn in metric_fns.items():
                metrics[k].append(fn(y_hat, y).item())
            pbar.set_postfix({k: sum(v)/len(v) for k, v in metrics.items() if len(v) > 0})

        # validation
        model.eval()
        with torch.no_grad():  # do not keep track of gradients
            for (x, y) in eval_dataloader:
                y_hat = model(x)  # forward pass
                loss = loss_fn(y_hat, y)
                
                # log partial metrics
                metrics['val_loss'].append(loss.item())
                for k, fn in metric_fns.items():
                    metrics['val_'+k].append(fn(y_hat, y).item())

        # summarize metrics, log to tensorboard and display
        history[epoch] = {k: sum(v) / len(v) for k, v in metrics.items()}
        for k, v in history[epoch].items():
          writer.add_scalar(k, v, epoch)
        print(' '.join(['\t- '+str(k)+' = '+str(v)+'\n ' for (k, v) in history[epoch].items()]))
        #show_val_samples(x.detach().cpu().numpy(), y.detach().cpu().numpy(), y_hat.detach().cpu().numpy())
        
        # deep copy the model
        if history[epoch]['val_acc'] > best_acc:
          # print(history[epoch]['val_acc'])
          best_acc = history[epoch]['val_acc']
          best_model_wts = copy.deepcopy(model.state_dict())

    print('Finished Training')
    print(best_acc)
    torch.save(model.state_dict(),folder_name+'/model/model_e'+str(n_epochs)+'_val'+num_val+'.pt')
    model.load_state_dict(best_model_wts)
    torch.save(model.state_dict(),folder_name+'/model/best_val_acc_model_e'+str(n_epochs)+'_val'+num_val+'.pt')
    # plot loss curves
    plt.plot([v['loss'] for k, v in history.items()], label='Training Loss')
    plt.plot([v['val_loss'] for k, v in history.items()], label='Validation Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epochs')
    plt.legend()
    plt.show()
    return model

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# reshape the image to simplify the handling of skip connections and maxpooling
train_dataset = ImageDataset(folder_name+'/training', device, use_patches=False, resize_to=(384, 384))
val_dataset = ImageDataset(folder_name+'/validation', device, use_patches=False, resize_to=(384, 384))
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=8, shuffle=True)
model = UNet().to(device)
loss_fn = nn.BCELoss()
metric_fns = {'acc': accuracy_fn, 'patch_acc': patch_accuracy_fn}
optimizer = torch.optim.Adam(model.parameters())
n_epochs = 60

In [None]:
train(train_dataloader, val_dataloader, model, loss_fn, metric_fns, optimizer, n_epochs)

In [None]:
# model.load_state_dict(torch.load(folder_name+'/model/model_e'+str(n_epochs)+'_val'+num_val+'.pt'))

In [None]:
model.load_state_dict(torch.load(folder_name+'/model/best_val_acc_model_e'+str(n_epochs)+'_val'+num_val+'.pt'))

<All keys matched successfully>

In [None]:
test_path = '/content/drive/MyDrive/cil-project/cil-road-segmentation-2021/test_images/test_images'
def create_submission(labels,test_filenames,submission_filename):
  with open(submission_filename,'w') as f:
    f.write('id,prediction\n')
    for fn, patch_array in zip(sorted(test_filenames), test_pred):
      img_number = int(re.findall(r"\d+", fn)[-1])
      for i in range(patch_array.shape[0]):
        for j in range(patch_array.shape[1]):
          f.write("{:03d}_{}_{},{}\n".format(img_number, i*PATCH_SIZE, j*PATCH_SIZE, int(patch_array[j, i])))

In [None]:
# predict on test set
test_filenames = (glob(test_path + '/*.png'))
test_images = load_all_from_path(test_path)
batch_size = test_images.shape[0]
size = test_images.shape[1:3]
# we also need to resize the test images. This might not be the best ideas depending on their spatial resolution.
test_images = np.stack([cv2.resize(img, dsize=(384, 384)) for img in test_images], 0)
test_images = np_to_tensor(np.moveaxis(test_images, -1, 1), device)
test_pred = [model(t).detach().cpu().numpy() for t in test_images.unsqueeze(1)]
test_pred = np.concatenate(test_pred, 0)
test_pred= np.moveaxis(test_pred, 1, -1)  # CHW to HWC
test_pred = np.stack([cv2.resize(img, dsize=size) for img in test_pred], 0)  # resize to original shape
# now compute labels
test_pred = test_pred.reshape((-1, size[0] // PATCH_SIZE, PATCH_SIZE, size[0] // PATCH_SIZE, PATCH_SIZE))
test_pred = np.moveaxis(test_pred, 2, 3)
test_pred = np.round(np.mean(test_pred, (-1, -2)) > CUTOFF)

In [None]:
create_submission(test_pred, test_filenames, submission_filename=folder_name+'/predict/unet_submission_e'+str(n_epochs)+'_val'+num_val+'_best.csv')

In [None]:
loss = nn.CrossEntropyLoss()
input = torch.randn(3, 5, requires_grad=True)
print(input, input.shape)
target = torch.empty(3, dtype=torch.long).random_(5)
print(target, target.shape)
output = loss(input, target)
print(output)
output.backward()

tensor([[-0.1612, -1.1411,  0.0977,  1.4339,  0.3903],
        [-0.1963, -0.8168,  0.4892,  0.2308, -0.6857],
        [ 0.7305, -0.3553,  0.5998, -1.6141,  0.8353]], requires_grad=True) torch.Size([3, 5])
tensor([3, 1, 3]) torch.Size([3])
tensor(2.1896, grad_fn=<NllLossBackward>)
