Importing Libraries:
==

In [None]:
%matplotlib inline

import time
from tqdm.notebook import tqdm
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch import optim
from torch.utils import data
from torchvision.transforms import functional as F
import torchvision

from sklearn.metrics import jaccard_score as jsc

from PIL import Image
from matplotlib.path import Path
import os
import pandas as pd

use_gpu = torch.cuda.is_available()
if use_gpu:
    print('GPU is available!')
    device = "cuda"
    torch.set_default_tensor_type('torch.cuda.FloatTensor') 
else:
    print('GPU is not available!')
    device = "cpu"

Hyperparams:
==

In [None]:
# depth of UNet = 3
learning_rate = 1e-3
epoch_number = 5
batch_size = 1 # make it 2-4 once bug fixing is over
train_size = 100 # divide train:val by 70:30 probably
test_size = 25 # if we break the super test image into individual images to run our model on
img_size = 768 # dimensions to resize input 750x750 into (power of 2^(depth+\delta) necessary for model to work)
max_channels = 512 # gives error if divided by 2 {or more}, not technically max_channel... number of channels after first conv is max_channels / 2^4
gradient_clip = 50 # tried 10 and 100 too   
dice_alpha = 1e-5 # maybe try 1e-4 and 1.0
pretrained = True
freeze_layers = 0 # for pretrained model

Loading Data:
==

In [None]:
def transform(x):
    return  torchvision.transforms.functional.to_tensor(x)

class DataSet(torch.utils.data.Dataset):
    def float_maker(self, a):
        if a[0]=='(':
            return a[1:]
        if a[-1]==')':
            return a[:-1]
        return a
    def __init__(self):
        super().__init__()
        self.image_names= os.listdir('./Data_ML/image_chips/')
        # self.image_names= os.listdir('/content/drive/MyDrive/Data_ML/image_chips/') for google.colab
    def __getitem__(self, index):
        image = plt.imread(os.path.join(os.getcwd(),'Data_ML','image_chips',self.image_names[index]))
        mask = pd.read_csv(os.path.join(os.getcwd(),'Data_ML','target_feature_AOI',self.image_names[index][:-4]+'.csv'))
        # for google.colab
        # image = plt.imread(os.path.join('/content/drive/MyDrive','Data_ML','image_chips',self.image_names[index]))
        # mask = pd.read_csv(os.path.join('/content/drive/MyDrive','Data_ML','target_feature_AOI',self.image_names[index][:-4]+'.csv'))
        coordinates = mask.WKT.values[0][16:-3].split(',')
        points = ([tuple(self.float_maker(i)  for i in x.split()) for x in coordinates])
        h , w = 750, 750
        point_path = Path(points)
        x, y = np.mgrid[:h, :w]
        y = - y
        coors=np.hstack((x.reshape(-1, 1), y.reshape(-1,1)))
        masked_image = (point_path.contains_points(coors))
        first = torchvision.transforms.Resize((img_size,img_size))(transform(image))
        second = (torchvision.transforms.functional.to_tensor(masked_image.reshape(h,w)).int())
        return (first,second)
    def __len__(self):
        return train_size

In [None]:
# input training images are 750x750x3 and are 100 in number
# test image is 5x5 images (hence 3750x3750x3), we need to output a masked image with 0s and 1s
input_channels = 3 
output_classes = 1

train_dataLoader = data.DataLoader(
    DataSet(), #need to get DataSet to make train and CV both
    batch_size = batch_size,
    shuffle = True,
    num_workers= 0
)

Creating Module:
==

In [None]:
class UNetConv(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()
        self.conv2 = nn.Sequential(
            nn.Conv2d(input_channels, output_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(output_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(output_channels, output_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(output_channels),
            nn.ReLU(inplace=True)
        )
    def forward(self, x):
        return self.conv2(x)
class DownConv(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()
        self.conv_down = nn.Sequential(
            nn.MaxPool2d(2, 2),
            UNetConv(input_channels, output_channels)
        )
    def forward(self, x):
        return self.conv_down(x)
class UpConv(nn.Module): 
    def __init__(self, input_channels, output_channels, padding=0):
        super().__init__()
        self.conv_up = nn.Sequential(
            nn.ConvTranspose2d(input_channels , input_channels // 2, kernel_size=2, stride=2, padding = padding),
        )
        self.conv_level = nn.Sequential(
            UNetConv(input_channels,output_channels) 
        )
    def forward(self, x1, x2):
        return self.conv_level(torch.cat([x2,self.conv_up(x1)],dim = 1)) 
class LastConv(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()
        self.conv_final = nn.Sequential(
            nn.Conv2d(input_channels, output_channels, kernel_size=1)
        )
    def forward(self, x):
        return self.conv_final(x)
class UNet(nn.Module):
    def __init__(self, input_channels, max_channels):
        super(UNet, self).__init__()
        self.current_channels = max_channels // 2**4
        self.start = UNetConv(input_channels, self.current_channels)
        self.current_channels = self.current_channels*2
        self.down1 = DownConv(self.current_channels // 2 , self.current_channels)
        self.current_channels = self.current_channels*2
        self.down2 = DownConv(self.current_channels // 2 , self.current_channels)
        self.current_channels = self.current_channels*2
        self.down3 = DownConv(self.current_channels // 2 , self.current_channels)
        #self.current_channels = self.current_channels*2
        #self.down4 = DownConv(self.current_channels // 2 , self.current_channels)
        #self.current_channels = self.current_channels // 2
        #self.up1 = UpConv(self.current_channels * 2, self.current_channels) 
        self.current_channels = self.current_channels // 2
        self.up2 = UpConv(self.current_channels * 2, self.current_channels)
        self.current_channels = self.current_channels // 2
        self.up3 = UpConv(self.current_channels * 2, self.current_channels)
        self.current_channels = self.current_channels // 2
        self.up4 = UpConv(self.current_channels * 2, self.current_channels)
        self.final = LastConv(self.current_channels,output_classes)
    def forward(self, x):
        x1 = self.start(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        #x5 = self.down4(x4)
        #y1 = self.up1(x5,x4)
        y2 = self.up2(x4,x3) #x4 was y1 earlier
        y3 = self.up3(y2,x2) 
        x = self.up4(y3,x1)
        return torch.sigmoid(self.final(x))

if(pretrained):
    Model = torch.hub.load('mateuszbuda/brain-segmentation-pytorch', 'unet', in_channels=3, out_channels=1, init_features=32, pretrained=True)
    Model = Model.train(True).to(device)    
    for parents in Model.children():
        for count, child in enumerate(parents.children()):
            if count == freeze_layers:
                break
            for param in child.parameters():
                param.requires_grad=False
else:
    Model = UNet(input_channels, max_channels)
    Model = Model.to(device)

Training Model:
==

In [None]:
epochs = epoch_number

class DiceLoss(nn.Module):
    def __init__(self):
        super(DiceLoss, self).__init__()
        self.smooth = dice_alpha #maybe try 1e-4
    def forward(self, y_pred, y_true):
        assert y_pred.size() == y_true.size()
        y_pred = y_pred[:, 0].contiguous().view(-1)
        y_true = y_true[:, 0].contiguous().view(-1)
        intersection = (y_pred * y_true).sum()
        dsc = (2. * intersection + self.smooth) / (y_pred.sum() + y_true.sum() + self.smooth)
        return 1. - dsc

loss_func = DiceLoss()
optimizer = optim.Adam(Model.parameters(),lr=learning_rate)
acc_sanity = jsc
trainLoss = []
sanityAccc = []
for epoch in range(epochs):  
    epochStart = time.time()
    runningLoss = 0.0
    sanityAcc = 0.0
    i = 0
    for inputs, labels in tqdm(train_dataLoader): 
        if(i==3):
            break
        i+=1
        inputs = inputs.to(device)
        labels = labels.to(device)
        if(pretrained):
            inputs = torchvision.transforms.Resize((256,256))(inputs)
        optimizer.zero_grad()  
        outputs = Model(inputs) 
        outputs = torchvision.transforms.Resize((750,750))(outputs)
        loss = loss_func(outputs, labels)
        loss.backward() 
        acc_san = acc_sanity(torch.round(outputs).cpu().detach().numpy().reshape(-1), labels.cpu().detach().numpy().reshape(-1))
        nn.utils.clip_grad_norm_(Model.parameters(), gradient_clip)
        optimizer.step()
        runningLoss += loss.item()
        sanityAcc += acc_san.item()
    runningLoss *= batch_size/train_size
    sanityAcc *= batch_size/train_size
    sanityAccc.append(sanityAcc)
    trainLoss.append(runningLoss)
    epochEnd = time.time()-epochStart
    print('Iteration: {:.0f} /{:.0f}  ;  Training Loss: {:.6f} ; Jaccard Score: {:.6f} ; Time consumed: {:.0f}m {:.0f}s '\
        .format(epoch + 1, epochs, runningLoss, sanityAcc, epochEnd//60, epochEnd%60))   
print('Finished Training')

Testing Model:
==

In [None]:
test_loss = 0.0
for inputs, labels in tqdm(test_dataLoader):
    outputs = Model(inputs)
    loss = loss_func(outputs, labels)
    test_loss += loss.item()
test_loss /= len(inputs)
print(test_loss)