# Reproduce DL
## Automated Pavement Crack Segmentation

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torchsummary import summary

# Use pretrained model, extend with U-architecture &  SCSE
pre_orig_model = models.resnet34(pretrained=True)
pre_model = torch.nn.Sequential(*(list(pre_orig_model.children())[:-2]))


class CSEBlock(nn.Module):
    """
    Spatial Squeeze and Channel Excitation block
    ---
    Channel-wise focus

    Recalibrates the channels by incorporating global
    spatial information. It provides a receptive field
    of whole spatial extent at the fc's.

    Assign each channel (feature) of a convolutional
    block (feature map) a different weightage
    (excitation) based on how important each channel
    is (squeeze) instead of equally weighing each
    feature. This improves channel interdependencies.
    """

    def __init__(self, in_channels, reduction=2):
        super(CSEBlock, self).__init__()
        # Global pooling == AdaptiveAvgPool2d
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(in_channels, in_channels // reduction)
        self.relu = nn.ReLU(inplace=True)
        self.fc2 = nn.Linear(in_channels // reduction, in_channels)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        batch_size, num_channels, _, _ = x.size()

        avg_pool_x = self.avg_pool(x).view(batch_size, num_channels)

        y = self.fc1(avg_pool_x)
        y = self.relu(y)
        y = self.fc2(y)
        y = self.sigmoid(y)

        return x * y.view(batch_size, num_channels, 1, 1)


class SSEBlock(nn.Module):
    """
    Channel Squeeze and Spatial Excitation block
    ---
    Spatial-wise focus

    It behaves like a spatial attention map indicating
    where the network should focus more to aid the
    segmentation.

    Assign importance to spatial locations sort of
    telling where features are better to focus
    instead of reweighing which features are more
    important.
    """

    def __init__(self, in_channels):
        super(SSEBlock, self).__init__()
        # Output channel = 1, 1x1 convolution
        self.conv = nn.Conv2d(in_channels, 1, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        batch_size, num_channels, H, W = x.size()

        y = self.conv(x)
        y = self.sigmoid(y)

        return x * y.view(batch_size, 1, H, W)


class SCSEBlock(nn.Module):
    """
    Spatial and Channel Squeeze and Excitation block
    ---
    Return the block with the most promising values.
    """

    def __init__(self, in_channels, reduction=2):
        super(SCSEBlock, self).__init__()
        self.CSE = CSEBlock(in_channels, reduction)
        self.SSE = SSEBlock(in_channels)

    def forward(self, x):
        return torch.max(self.CSE(x), self.SSE(x))


class UpsampBlock(nn.Module):
    """
    Upsampling block
    ---
    Includes SCSEBlock
    """

    def __init__(self, in_channels, out_channels):
        super(UpsampBlock, self).__init__()
        self.relu = nn.ReLU(inplace=True)
        self.bn = nn.BatchNorm2d(out_channels)
        self.scse = SCSEBlock(in_channels)

    def forward(self, x):
        y = self.relu(x)
        y = self.bn(y)

        return self.scse(y)


class NetBlock(nn.Module):

    def __init__(self):
        super(NetBlock, self).__init__()

        # 3 input image channels, 64 output channels, 7x7 convolution, stride 2
        # -- Blue --
        self.blueBlock = torch.nn.Sequential(*(list(pre_model.children())[:3]))

        # -- Green --
        # maxpooling stride default is kernel size
        self.resBlock1 = torch.nn.Sequential(*(list(pre_model.children())[3:5]))
        self.resBlock2 = torch.nn.Sequential(*(list(pre_model.children())[5:6]))
        self.resBlock3 = torch.nn.Sequential(*(list(pre_model.children())[6:7]))
        self.resBlock4 = torch.nn.Sequential(*(list(pre_model.children())[7:]))

        # -- Yellow --
        # 64/128/256 input image channels, 128 output channels, 1x1 convolution, stride 1
        # Green (residual) block to Yellow block (Up to down)
        self.conv_blue_to_yel_1 = nn.Conv2d(64, 128, 1, stride=1)
        self.conv_gr_to_yel_2 = nn.Conv2d(64, 128, 1, stride=1)
        self.conv_gr_to_yel_3 = nn.Conv2d(128, 128, 1, stride=1)
        self.conv_gr_to_yel_4 = nn.Conv2d(256, 128, 1, stride=1)

        # -- Purple --
        # 512 input image channels, 512 output channels, 1x1 convolution, stride 1
        # Green (residual) block to Purple block
        self.conv_gr_to_purp = nn.Conv2d(512, 512, 1, stride=1)
        # Magenta block to Purple block
        self.conv_mag_to_purp_1 = UpsampBlock(256, 256)
        self.conv_mag_to_purp_2 = UpsampBlock(256, 256)
        self.conv_mag_to_purp_3 = UpsampBlock(256, 256)
        self.conv_mag_to_purp_4 = UpsampBlock(256, 256)

        # -- Magenta --
        # 512/256 input image channels, 128 output channels, 2x2 convolution, stride 2
        # Purple (residual) block to Magenta block (Down to up)
        self.conv_purp_to_mag_1 = nn.ConvTranspose2d(512, 128, 2, stride=2)
        self.conv_purp_to_mag_2 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.conv_purp_to_mag_3 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.conv_purp_to_mag_4 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.conv_purp_to_mag_5 = nn.ConvTranspose2d(256, 1, 2, stride=2)

    def forward(self, x):
        # Traverse down the architecture (ResNet34)
        y = self.blueBlock(x)
        y_y1 = self.conv_blue_to_yel_1(y)

        y = self.resBlock1(y)
        y_y2 = self.conv_gr_to_yel_2(y)

        y = self.resBlock2(y)
        y_y3 = self.conv_gr_to_yel_3(y)

        y = self.resBlock3(y)
        y_y4 = self.conv_gr_to_yel_4(y)

        y = self.resBlock4(y)
        y_y5 = self.conv_gr_to_purp(y)

        # Traverse up the U-based architecture
        y_y5 = self.conv_purp_to_mag_1(y_y5)

        y_y4 = torch.cat((y_y4, y_y5), dim=1)
        y_y4 = self.conv_mag_to_purp_1(y_y4)
        y_y4 = self.conv_purp_to_mag_2(y_y4)

        y_y3 = torch.cat((y_y3, y_y4), dim=1)
        y_y3 = self.conv_mag_to_purp_2(y_y3)
        y_y3 = self.conv_purp_to_mag_3(y_y3)

        y_y2 = torch.cat((y_y2, y_y3), dim=1)
        y_y2 = self.conv_mag_to_purp_3(y_y2)
        y_y2 = self.conv_purp_to_mag_4(y_y2)

        y_y1 = torch.cat((y_y1, y_y2), dim=1)
        y_y1 = self.conv_mag_to_purp_4(y_y1)
        
        return self.conv_purp_to_mag_5(y_y1)


net = NetBlock()
# print(net)
summary(net, input_size=(3, 320, 480))

Downloading: "https://download.pytorch.org/models/resnet34-333f7ec4.pth" to /root/.cache/torch/hub/checkpoints/resnet34-333f7ec4.pth


  0%|          | 0.00/83.3M [00:00<?, ?B/s]

  return torch._C._cuda_getDeviceCount() > 0
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 160, 240]           9,408
       BatchNorm2d-2         [-1, 64, 160, 240]             128
              ReLU-3         [-1, 64, 160, 240]               0
            Conv2d-4        [-1, 128, 160, 240]           8,320
         MaxPool2d-5          [-1, 64, 80, 120]               0
            Conv2d-6          [-1, 64, 80, 120]          36,864
       BatchNorm2d-7          [-1, 64, 80, 120]             128
              ReLU-8          [-1, 64, 80, 120]               0
            Conv2d-9          [-1, 64, 80, 120]          36,864
      BatchNorm2d-10          [-1, 64, 80, 120]             128
             ReLU-11          [-1, 64, 80, 120]               0
       BasicBlock-12          [-1, 64, 80, 120]               0
           Conv2d-13          [-1, 64, 80, 120]          3

In [7]:
# pre_orig_model
# summary(pre_orig_model, input_size=(3, 320, 480))

for i in pre_orig_model.state_dict():
    print(i)

conv1.weight
bn1.weight
bn1.bias
bn1.running_mean
bn1.running_var
bn1.num_batches_tracked
layer1.0.conv1.weight
layer1.0.bn1.weight
layer1.0.bn1.bias
layer1.0.bn1.running_mean
layer1.0.bn1.running_var
layer1.0.bn1.num_batches_tracked
layer1.0.conv2.weight
layer1.0.bn2.weight
layer1.0.bn2.bias
layer1.0.bn2.running_mean
layer1.0.bn2.running_var
layer1.0.bn2.num_batches_tracked
layer1.1.conv1.weight
layer1.1.bn1.weight
layer1.1.bn1.bias
layer1.1.bn1.running_mean
layer1.1.bn1.running_var
layer1.1.bn1.num_batches_tracked
layer1.1.conv2.weight
layer1.1.bn2.weight
layer1.1.bn2.bias
layer1.1.bn2.running_mean
layer1.1.bn2.running_var
layer1.1.bn2.num_batches_tracked
layer1.2.conv1.weight
layer1.2.bn1.weight
layer1.2.bn1.bias
layer1.2.bn1.running_mean
layer1.2.bn1.running_var
layer1.2.bn1.num_batches_tracked
layer1.2.conv2.weight
layer1.2.bn2.weight
layer1.2.bn2.bias
layer1.2.bn2.running_mean
layer1.2.bn2.running_var
layer1.2.bn2.num_batches_tracked
layer2.0.conv1.weight
layer2.0.bn1.weight
laye

In [None]:
# Sjoerd
import os, sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath("")), '..'))


from work.CFD.data import CFD
from torch.utils.data import random_split
from torch.utils.data import DataLoader
from torchvision import transforms
import matplotlib.pyplot as plt
import torch


totensor = transforms.ToTensor()
dataset = CFD("../CFD/cfd_image", totensor, "../CFD/seg_gt", totensor)
gen = torch.Generator().manual_seed(42)
train_data, test_data = random_split(dataset, [71, 47], gen)

train_loader = DataLoader(train_data, batch_size=10)
test_loader = DataLoader(test_data, batch_size=47)


# fig, axs = plt.subplots(5, 5, figsize=(5, 5))
# for i in range(25):
#     _, gt = test_data[i]
#     ax = axs[i // 5][i % 5]
#     ax.imshow(gt.view(320, 480), cmap="gray")
#     ax.axis("off")
# plt.tight_layout()
# plt.show()



In [None]:
def batch_dice_loss(true_val, pred_val, epsilon = 1e-8):
    """
    Dice coefficient loss
    ---
    Equivalent to F1 score. Often used when there is
    imbalance in dataset (non-crack pixels outnumber 
    crack pixels by 65:1). 
    
    Args:
        true_val: a tensor of shape [N, 1, H, W]
        predicted_val: a tensor of shape [N, 1, H, W]
    Returns:
        dice_loss: the Dice loss.
    """
    # Sigmoid -> [0, 1], reflect probabilities
    pred_val = (torch.sigmoid(pred_val) >= 0.5).float()

    # Flattened from [N, 1, H, W] to [N, H*W]
    true_val = true_val.flatten(start_dim=1)
    pred_val = pred_val.flatten(start_dim=1)

    numerator = 2. * (pred_val * true_val).sum(dim=1)
    denominator = (pred_val).sum(dim=1) + (true_val).sum(dim=1)

    return torch.mean(1 - ((numerator + epsilon) / (denominator + epsilon)))


# def dice_loss(true_val, pred_val, epsilon = 1e-8):
#     """
#     Dice coefficient loss
#     ---
#     Equivalent to F1 score. Often used when there is
#     imbalance in dataset (non-crack pixels outnumber 
#     crack pixels by 65:1). 
    
#     Args:
#         true_val: a tensor of shape [N, 1, H, W]
#         predicted_val: a tensor of shape [N, 1, H, W]
#     Returns:
#         dice_loss: the Dice loss.
#     """
#     # Sigmoid -> [0, 1], reflect probabilities
#     pred_val = (torch.sigmoid(pred_val) >= 0.5).float()

#     # Flattened from [H, W] to [H*W]
#     true_val = true_val.flatten()
#     pred_val = pred_val.flatten()

#     numerator = 2. * (pred_val * true_val).sum()
#     denominator = (pred_val).sum() + (true_val).sum()

#     return torch.mean(1 - ((numerator + epsilon) / (denominator + epsilon)))


# prediction = torch.randint(low=-255, high=256, size=(3, 3), dtype=torch.float)


# pred_is_truth = (prediction >= 0).float()
# pred_is_false = (prediction < 0).float()
# pred_is_half = torch.flip(pred_is_truth, [0, 1])

# print(pred_is_truth, "\n\n", pred_is_false, "\n\n", pred_is_half)

# print(f"different tensors should output 1: {dice_loss(pred_is_false, prediction)}")
# print(f"the opposite tensors should output 0: {dice_loss(pred_is_truth, prediction)}")
# print(f"the avg tensors should output [0,1]: {dice_loss(pred_is_half, prediction)}")

In [None]:
# Some functions used during training

def try_gpu():
    """
    If GPU is available, return torch.device as cuda:0; else return torch.device
    as cpu.
    """
    if torch.cuda.is_available():
        device = torch.device('cuda:0')
    else:
        device = torch.device('cpu')
    return device


def evaluate_accuracy(data_loader, net, device=torch.device('cpu')):
    """Evaluate accuracy of a model on the given data set."""
    net.eval()  #make sure network is in evaluation mode

    #init
    acc_sum = torch.tensor([0], dtype=torch.float32, device=device)
    n = 0

    for X, y in data_loader:
        # Copy the data to device.
        X, y = X.to(device), y.to(device)
        with torch.no_grad():
            y = y.long()
            acc_sum += torch.sum((torch.argmax(net(X), dim=1) == y))
            n += y.shape[0] #increases with the number of samples in the batch
    return acc_sum.item()/n


In [None]:
# training parameters
lr = 1e-3
epochs = 1

#Initialize network
net = NetBlock()
optimizer = torch.optim.AdamW(net.parameters(), lr=lr, betas=(0.9, 0.999))
criterion = batch_dice_loss

# Define list to store losses and performances of each interation
train_losses = []
train_accs = []
test_accs = []

# Try using gpu instead of cpu
device = try_gpu()

for epoch in range(epochs):

    # Network in training mode and to device
    net.train()
    net.to(device)

    # Training loop
    for i, (x_batch, y_batch) in enumerate(train_loader):

         # Set to same device
         x_batch, y_batch = x_batch.to(device), y_batch.to(device)

         # Set the gradients to zero
         optimizer.zero_grad()

         # Perform forward pass
         y_pred = net(x_batch)

         # Compute the loss
         loss = criterion(y_pred, y_batch)
         train_losses.append(loss)

         # Backward computation and update
         loss.backward()
         optimizer.step()
    
    # Compute train and test error
    train_acc = 100*evaluate_accuracy(train_loader, net.to('cpu'))
    test_acc = 100*evaluate_accuracy(test_loader, net.to('cpu'))
    
    # Development of performance
    train_accs.append(train_acc)
    test_accs.append(test_acc)

    # Print performance
    print('Epoch: {:.0f}'.format(epoch+1))
    print('Accuracy of train set: {:.00f}%'.format(train_acc))
    print('Accuracy of test set: {:.00f}%'.format(test_acc))
    print('')


NameError: name 'NetBlock' is not defined

KernelInterrupted: Execution interrupted by the Jupyter kernel.

KernelInterrupted: Execution interrupted by the Jupyter kernel.

KernelInterrupted: Execution interrupted by the Jupyter kernel.

KernelInterrupted: Execution interrupted by the Jupyter kernel.

KernelInterrupted: Execution interrupted by the Jupyter kernel.

KernelInterrupted: Execution interrupted by the Jupyter kernel.

In [None]:
# from torchviz import make_dot

# x = torch.randn(1,3, 320, 480)
# y = net(x) 

# make_dot(y).view()

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=411d58e9-cb4b-4924-bef0-2f383eff0187' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>