In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import StepLR
import cv2
import os
from tqdm import tqdm
from PIL import Image

In [7]:
# ENet initial block
class InitialBlock(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels,
                 bias=False,
                 relu=True):
        super().__init__()

        if relu:
            activation = nn.ReLU
        else:
            activation = nn.PReLU

        # Main branch
        self.main_branch = nn.Conv2d(
            in_channels,
            out_channels - 3,
            kernel_size = 3,
            stride = 2,
            padding = 1,
            bias=bias)
        # Extension branch
        self.ext_branch = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        # Initialize batch normalization to be used after concatenation
        self.batch_norm = nn.BatchNorm2d(out_channels)
        # PReLU layer to apply after concatenating the branches
        self.out_activation = activation()
    
    def forward(self, x):
        main = self.main_branch(x)
        ext = self.ext_branch(x)
        # Concatenate branches
        out = torch.cat((main, ext), 1)
        # Apply batch normalization
        out = self.batch_norm(out)
        return self.out_activation(out)

In [8]:
# ENet regular bottleneck module, 
# 3 options: [1]:regular; [2]: changed dilated number; [3]: asymmetric
class Bottleneck(nn.Module):
    def __init__(self,
                 channels,
                 internal_ratio=4,
                 kernel_size=3,
                 padding=0,
                 dilation=1,
                 asymmetric=False,
                 dropout_prob=0,
                 bias=False,
                 relu=True):
        super().__init__()
        
        # Check in the internal_scale parameter is within the expected range
        if internal_ratio <= 1 or internal_ratio > channels:
            raise RuntimeError("Value out of range. Expected value in the "
                               "interval [1, {0}], got internal_scale={1}."
                               .format(channels, internal_ratio))
        
        internal_channels = channels // internal_ratio

        if relu:
            activation = nn.ReLU
        else:
            activation = nn.PReLU

        # 1x1 projection convolution
        self.ext_conv1 = nn.Sequential(
            nn.Conv2d(
                channels,
                internal_channels,
                kernel_size=1,
                stride=1,
                bias=bias), 
                nn.BatchNorm2d(internal_channels), 
                activation())
        
        # asymmetric convolution or regular, dilated or full convolution with 3 × 3 filters
        
        if asymmetric:
            # asymmetric convolution has a sequence of 5 × 1 and 1 × 5 convolutions
            self.ext_conv2 = nn.Sequential(
                nn.Conv2d(
                    internal_channels,
                    internal_channels,
                    kernel_size=(5,1),
                    stride=1,
                    padding=(padding, 0),
                    dilation=dilation,
                    bias=bias), 
                    nn.BatchNorm2d(internal_channels), 
                    activation(),

                nn.Conv2d(
                    internal_channels,
                    internal_channels,
                    kernel_size=(1,5),
                    stride=1,
                    padding=(0, padding),
                    dilation=dilation,
                    bias=bias), 
                    nn.BatchNorm2d(internal_channels), 
                    activation()) 
        else:
            self.ext_conv2= nn.Sequential(
                nn.Conv2d(
                    internal_channels,
                    internal_channels,
                    kernel_size = kernel_size,
                    stride = 1,
                    padding = padding,
                    dilation = dilation,
                    dropout_prob=0,
                    bias=bias), 
                    nn.BatchNorm2d(internal_channels), 
                    activation())

        self.ext_conv3 = nn.Sequential(
            nn.Conv2d(
                channels,
                internal_channels,
                kernel_size=1,
                stride=1,
                bias=bias), 
                nn.BatchNorm2d(internal_channels), 
                activation())
        
        # For the regularizer, we use Spatial Dropout
        #with p = 0.01 before bottleneck2.0, and p = 0.1 afterwards
        self.ext_reg = nn.Dropout2d(p=dropout_prob)

        # PReLU layer to apply after adding the branches
        self.out_activation = activation()

    def forward(self, x):
        # Main branch with no operations
        main = x

        # Extension branch
        ext = self.ext_conv1(x)
        ext = self.ext_conv2(ext)
        ext = self.ext_conv3(ext)
        ext = self.ext_reg(ext)

        # Add main and extension branches
        out = main + ext

        return self.out_activation(out)

In [9]:
# ENet downsampling bottleneck module
class Downsampling(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels,
                 internal_ratio=4,
                 return_indices=False,
                 dropout_prob=0,
                 bias=False,
                 relu=True):
        super().__init__()
        
        # Check in the internal_scale parameter is within the expected range
        if internal_ratio <= 1 or internal_ratio > in_channels:
            raise RuntimeError("Value out of range. Expected value in the "
                               "interval [1, {0}], got internal_scale={1}."
                               .format(in_channels, internal_ratio))
        
        internal_channels = in_channels // internal_ratio

        if relu:
            activation = nn.ReLU
        else:
            activation = nn.PReLU

        # If the bottleneck is downsampling, a max pooling layer is added to the main branch
        self.main_conv1 = nn.MaxPool2d(2,stride=2,return_indices=return_indices)

        # 1x1 projection convolution
        self.ext_conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels,
                internal_channels,
                kernel_size=2,
                stride=2,
                bias=bias), 
                nn.BatchNorm2d(internal_channels), 
                activation())
        
        self.ext_conv2 = nn.Sequential(
            nn.Conv2d(
                internal_channels,
                internal_channels,
                kernel_size = 3,
                stride = 1,
                bias=bias), 
                nn.BatchNorm2d(internal_channels), 
                activation())

        self.ext_conv3 = nn.Sequential(
            nn.Conv2d(
                internal_channels,
                out_channels,
                kernel_size=1,
                stride=1,
                bias=bias), 
                nn.BatchNorm2d(internal_channels), 
                activation())
        
        # For the regularizer, we use Spatial Dropout
        #with p = 0.01 before bottleneck2.0, and p = 0.1 afterwards
        self.ext_reg = nn.Dropout2d(p=dropout_prob)

        # PReLU layer to apply after adding the branches
        self.out_activation = activation()

    def forward(self, x):
        # Main branch with Maxpooling and padding
        if self.return_indices:
            main, max_indices = self.main_conv1(x)
        else:
            main = self.main_conv1(x)

        # Extension branch
        ext = self.ext_conv1(x)
        ext = self.ext_conv2(ext)
        ext = self.ext_conv3(ext)
        ext = self.ext_reg(ext)

       # Main branch channel padding
        n, ch_ext, h, w = ext.size()
        ch_main = main.size()[1]
        padding = torch.zeros(n, ch_ext - ch_main, h, w)

        # Before concatenating, check if main is on the CPU or GPU and
        # convert padding accordingly
        if main.is_cuda:
            padding = padding.cuda()

        # Concatenate the Maxpooling layer and Padding layer
        main = torch.cat((main, padding), 1)

        # Add main and extension branches
        out = main + ext

        return self.out_activation(out), max_indices

In [None]:
# ENet upsampling bottleneck module
class Upsampling(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels,
                 internal_ratio=4,
                 dropout_prob=0,
                 bias=False,
                 relu=True):
        super().__init__()

        # Check in the internal_scale parameter is within the expected range
        if internal_ratio <= 1 or internal_ratio > in_channels:
            raise RuntimeError("Value out of range. Expected value in the "
                               "interval [1, {0}], got internal_scale={1}."
                               .format(in_channels, internal_ratio))
        
        internal_channels = in_channels // internal_ratio

        if relu:
            activation = nn.ReLU
        else:
            activation = nn.PReLU

        # If the bottleneck is downsampling, a max pooling layer is added to the main branch
        self.main_conv1 = nn.Sequential(
            nn.Conv2d(in_channels, 
                      out_channels, 
                      kernel_size=1, 
                      bias=bias),
            
            nn.BatchNorm2d(out_channels))
        
        self.main_unpool1 = nn.MaxUnpool2d(kernel_size=2)

        self.ext_conv1 = nn.Sequential(
            nn.Conv2d(in_channels, 
                      internal_channels, 
                      kernel_size=1,
                      stride=1,
                      bias=bias),

            nn.BatchNorm2d(internal_channels),
            activation())
        
        # transpose layer

        self.ext_trans1 = nn.ConvTranspose2d(internal_channels, 
                               internal_channels, 
                               kernel_size=2,
                               stride=2, 
                               bias=bias),

        self.ext_trans1_bnorm = nn.BatchNorm2d(internal_channels)
        self.ext_tconv1_activation = activation()

        self.ext_conv2 = nn.Sequential(
            nn.Conv2d(internal_channels, 
                      out_channels, 
                      kernel_size=1,
                      stride=1,
                      bias=bias),

            nn.BatchNorm2d(out_channels))
        
        self.ext_regul = nn.Dropout2d(p=dropout_prob)

        self.out_activation = activation()

    def forward(self, x, max_indices, output_size):
        # Main branch shortcut
        main = self.main_conv1(x)
        main = self.main_unpool1(main, max_indices, output_size=output_size)

        # Extension branch
        ext = self.ext_conv1(x)
        ext = self.ext_trans1(ext, output_size=output_size)
        ext = self.ext_trans1_bnorm(ext)
        ext = self.ext_trans1_activation(ext)
        ext = self.ext_conv2(ext)
        ext = self.ext_regul(ext)

        # Add main and extension branches
        out = main + ext

        return self.out_activation(out)

In [None]:
class ENet(nn.Module):

    def __init__(self, num_classes, encoder_relu=False, decoder_relu=True):
        super().__init__()

        self.initial_block = InitialBlock(3, 16, relu=encoder_relu)

        # Stage 1 - Encoder
        self.downsample1_0 = Downsampling(16,64,return_indices=True,dropout_prob=0.01,relu=encoder_relu)
        
        self.regular1_1 = Bottleneck(64, padding=1, dropout_prob=0.01, relu=encoder_relu)

        self.regular1_2 = Bottleneck(64, padding=1, dropout_prob=0.01, relu=encoder_relu)

        self.regular1_3 = Bottleneck(64, padding=1, dropout_prob=0.01, relu=encoder_relu)

        self.regular1_4 = Bottleneck(64, padding=1, dropout_prob=0.01, relu=encoder_relu)

        # Stage 2 - Encoder
        self.downsample2_0 = Downsampling(64,128,return_indices=True,dropout_prob=0.1,relu=encoder_relu)

        self.regular2_1 = Bottleneck(128, padding=1, dropout_prob=0.1, relu=encoder_relu)

        self.dilated2_2 = Bottleneck(128, dilation=2, padding=2, dropout_prob=0.1, relu=encoder_relu)

        self.asymmetric2_3 = Bottleneck(128,kernel_size=5, padding=2,asymmetric=True,dropout_prob=0.1,relu=encoder_relu)

        self.dilated2_4 = Bottleneck(128, dilation=4, padding=4, dropout_prob=0.1, relu=encoder_relu)
        
        self.regular2_5 = Bottleneck(128, padding=1, dropout_prob=0.1, relu=encoder_relu)
        
        self.dilated2_6 = Bottleneck(128, dilation=8, padding=8, dropout_prob=0.1, relu=encoder_relu)
        
        self.asymmetric2_7 = Bottleneck(128,kernel_size=5, padding=2,asymmetric=True,dropout_prob=0.1,relu=encoder_relu)

        self.dilated2_8 = Bottleneck(128, dilation=16, padding=16, dropout_prob=0.1, relu=encoder_relu)

        # Stage 3 - Encoder
        self.regular3_0 = Bottleneck(128, padding=1, dropout_prob=0.1, relu=encoder_relu)

        self.dilated3_1 = Bottleneck(128, dilation=2, padding=2, dropout_prob=0.1, relu=encoder_relu)

        self.asymmetric3_2 = Bottleneck(128,kernel_size=5, padding=2,asymmetric=True,dropout_prob=0.1,relu=encoder_relu)

        self.dilated3_3 = Bottleneck(128, dilation=4, padding=4, dropout_prob=0.1, relu=encoder_relu)
        
        self.regular3_4 = Bottleneck(128, padding=1, dropout_prob=0.1, relu=encoder_relu)
        
        self.dilated3_5 = Bottleneck(128, dilation=8, padding=8, dropout_prob=0.1, relu=encoder_relu)
        
        self.asymmetric3_6 = Bottleneck(128,kernel_size=5, padding=2,asymmetric=True,dropout_prob=0.1,relu=encoder_relu)

        self.dilated3_7 = Bottleneck(128, dilation=16, padding=16, dropout_prob=0.1, relu=encoder_relu)

        # Stage 4 - Decoder
        self.upsample4_0 = Upsampling(128, 64, dropout_prob=0.1, relu=decoder_relu)

        self.regular4_1 = Bottleneck(128, padding=1, dropout_prob=0.1, relu=encoder_relu)

        self.regular4_2 = Bottleneck(128, padding=1, dropout_prob=0.1, relu=encoder_relu)

        # Stage 5 - Decoder
        self.upsample5_0 = Upsampling(64, 16, dropout_prob=0.1, relu=decoder_relu)

        self.regular5_1 = Bottleneck(128, padding=1, dropout_prob=0.1, relu=encoder_relu)

        self.transposed_conv = nn.ConvTranspose2d(16,num_classes,kernel_size=3,stride=2,padding=1,bias=False)

    def forward(self, x):
        # Initial block
        input_size = x.size()
        x = self.initial_block(x)

        # Stage 1 - Encoder
        stage1_input_size = x.size()
        x, max_indices1_0 = self.downsample1_0(x)
        x = self.regular1_1(x)
        x = self.regular1_2(x)
        x = self.regular1_3(x)
        x = self.regular1_4(x)

        # Stage 2 - Encoder
        stage2_input_size = x.size()
        x, max_indices2_0 = self.downsample2_0(x)
        x = self.regular2_1(x)
        x = self.dilated2_2(x)
        x = self.asymmetric2_3(x)
        x = self.dilated2_4(x)
        x = self.regular2_5(x)
        x = self.dilated2_6(x)
        x = self.asymmetric2_7(x)
        x = self.dilated2_8(x)

        # Stage 3 - Encoder
        x = self.regular3_0(x)
        x = self.dilated3_1(x)
        x = self.asymmetric3_2(x)
        x = self.dilated3_3(x)
        x = self.regular3_4(x)
        x = self.dilated3_5(x)
        x = self.asymmetric3_6(x)
        x = self.dilated3_7(x)

        # Stage 4 - Decoder
        x = self.upsample4_0(x, max_indices2_0, output_size=stage2_input_size)
        x = self.regular4_1(x)
        x = self.regular4_2(x)

        # Stage 5 - Decoder
        x = self.upsample5_0(x, max_indices1_0, output_size=stage1_input_size)
        x = self.regular5_1(x)
        x = self.transposed_conv(x, output_size=input_size)

        return x