In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import sklearn
from skimage.io import imread 
from PIL import Image
import csv
import torch
import copy
import torchvision
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
from torch.utils import data
import torchvision.transforms as transf
import torch.nn.functional as F
from torchvision.transforms.transforms import RandomAdjustSharpness
import torch.nn as nn
import random
import cv2
import math
import os
from torch import optim
from torch.nn.modules.conv import ConvTranspose2d
from torch.autograd import Function
import glob

In [None]:
model = torchvision.models.resnet18(pretrained=True)

# Your dataset's path and preprocessing

Structure of dataset should be as following:

--- /input ------/image1.png ------/image2.png ------/image3.png
...

--- /target ------/mask1.png ------/mask2.png ------/mask3.png
...

Dataset pipeline --- ordered list of corresponding images and mask paths ---

img_list --> ['image1.png', 'image2.png', 'image3.png',...]

msk_list --> ['mask1.png', 'mask2.png', 'mask3.png',...]


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Dataset pipeline

In [None]:
class SegDataset(data.Dataset):
  
  def __init__(self, img_list, msk_list, transform=None):
    self.img_list = img_list
    self.msk_list = msk_list
    self.transforms = transform

  def __len__(self):
    return len(self.img_list)

  @classmethod
  def preprocess(self,pil_img, size):
    # w,h = pil_img.size
    # W,H = int(scale*w), int(scale*h)
    pil_img = pil_img.resize((size,size))

    img_nd = np.array(pil_img)

    if len(img_nd.shape) == 2:
      img_nd = np.expand_dims(img_nd, axis=2)

    '''if tranforms are present paste them here'''

    #HWC to CHW
    img_trans = img_nd.transpose((2,0,1))
    if img_trans.max() > 1:
      img_trans = img_trans/225

    return img_trans

  def __getitem__(self, index):
    input_ID = self.img_list[index]
    target_ID = self.msk_list[index]

    img = Image.open(input_ID)
    msk = Image.open(target_ID)

    ''' 
    assert img.size == msk.size, \
    f'Image and mask for {index} should be the same, but are {img.size} and {msk.size}'
    '''
    # preprocessing
    img = self.preprocess(img, 224)
    msk = self.preprocess(msk, 216)
    
    return {
        'image': torch.from_numpy(img).type(torch.FloatTensor),
        'mask': torch.from_numpy(msk).type(torch.FloatTensor)
    }

In [None]:
input_tr = glob.glob('/content/drive/MyDrive/PH2/train/input/*')
input_val = glob.glob('/content/drive/MyDrive/PH2/val/input/*')

target_tr = [element[:-16]+'target'+element[-11:-4]+'_lesion'+element[-4:] for element in input_tr]
target_val = [element[:-16]+'target'+element[-11:-4]+'_lesion'+element[-4:] for element in input_val]

In [None]:
input_tr

# Hyperparameter Tuning and Dataloader

In [None]:
# Hyperparameter Tuning
batch_size_tr = 20
batch_size_val = 15
epoch_n = 200
n_channels = 3
n_classes = 1
l_rate = 0.01

In [None]:
train_ds = SegDataset(input_tr, target_tr)#,transform=transforms_tr)
val_ds = SegDataset(input_val, target_val)#,transform=transforms_val)
# test_ds = ImageFolder('/content/drive/MyDrive/Lung_Carcinoma/data_folder_2/test/', transform=val_transform)
train_load = DataLoader(dataset=train_ds, batch_size=batch_size_tr, shuffle=False, drop_last=False)
val_load = DataLoader(dataset=val_ds, batch_size=batch_size_val, shuffle=False, drop_last=False)
# test_load = DataLoader(dataset=test_ds, batch_size=batch_size_val, shuffle=True, drop_last=False)
if torch.cuda.is_available():
  device='cuda'
else:
  device='cpu'

# Structure of U-Net

In [None]:
# Segmentation architecture
# U-Net

class DoubleConv(nn.Module):
  def __init__(self, in_channels, out_channels, mid_channels=None):
    super().__init__()
    if mid_channels is None:
      mid_channels = out_channels
    self.double_conv = nn.Sequential(
        nn.Conv2d(in_channels, mid_channels, 3, 1),
        nn.BatchNorm2d(mid_channels),
        nn.ReLU(inplace=True),
        nn.Conv2d(mid_channels, out_channels, 3, 1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
    )

  def forward(self, x):
    return self.double_conv(x)


class Down(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()
    self.maxpool_conv = nn.Sequential(
        nn.MaxPool2d(2),
        DoubleConv(in_channels, out_channels)
    )

  def forward(self, x):
    return self.maxpool_conv(x)


class Up(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()
    self.up = nn.ConvTranspose2d(in_channels, in_channels//2, 2, 2)
    self.conv = DoubleConv(in_channels, out_channels)

  def forward(self, x1, x2):
    x1=self.up(x1)
    delta_W = x2.size()[3]-x1.size()[3]
    delta_H = x2.size()[2]-x1.size()[2]
    x1 = F.pad(x1,[delta_W//2,delta_W-delta_W//2,delta_H//2,delta_H-delta_H//2])
    x = torch.cat([x2,x1],dim=1)
    return self.conv(x)


class FinalConv(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()
    self.conv_final = nn.Conv2d(in_channels, out_channels,1)

  def forward(self,x):
    return self.conv_final(x)


class U_Net(nn.Module):
  def __init__(self, in_channels, out_classes):
    super(U_Net,self).__init__()
    self.in_channels = in_channels
    self.out_classes = out_classes

    self.initial_conv = DoubleConv(in_channels, 64)#(N*N*64)
    self.down1 = Down(64, 128)#(N/2*N/2*128)
    self.down2 = Down(128, 256)#(N/4*N/4*256)
    self.down3 = Down(256, 512)#(N/8*N/8*512)
    self.down4 = Down(512, 1024)#(N/16*N/16*1024)
    self.up1 = Up(1024,512)
    # i_up = N/8*N/8*512 & i(after concatenation) --> (N/8*N/8*1024) i_final --> (N/8*N/8*512)
    self.up2 = Up(512,256)
    # i_up = N/4*N/4*256 & i(after concatenation) --> N/4*N/4*256*2
    self.up3 = Up(256,128)
    self.up4 = Up(128,64)
    self.final = FinalConv(64, out_classes)
    
  
  def forward(self,i):
    i1 = self.initial_conv(i)
    i2 = self.down1(i1)
    i3 = self.down2(i2)
    i4 = self.down3(i3)
    i5 = self.down4(i4)
    i = self.up1(i5,i4)
    i = self.up2(i,i3)
    i = self.up3(i,i2)
    i = self.up4(i,i1)
    i = self.final(i)
    return torch.sigmoid(i)

# Loss Functions (IoU Score and IoU Loss)

In [None]:
''' IoU Score '''

def IoU_score(mask_gen, mask):
  s = torch.FloatTensor(1).cuda().zero_()
  for i,t in enumerate(zip(mask_gen, mask)):
    s += get_IoU(t[0], t[1])
  return s/(i+1)

  # loss functions

'''IoU-Loss'''

def get_IoU(src, tar):
  inter = (src*tar).sum(dim=(1,2))
  union_inter = (src+tar).sum(dim=(1,2))
  #print(inter, union_inter)
  score = (inter.float()+1)/((union_inter-inter).float()+1)
  return score

def loss_iou(mask_gen, mask):
    s=torch.FloatTensor(1).cuda().zero_()
    for i,t in enumerate(zip(mask_gen, mask)):
      # print(t[0].shape, t[1].shape)
      s += get_IoU(t[0], t[1])
    return 1-(s/(i+1))

# Loss Functions (Dice Loss)

In [None]:
from torch.autograd import Function, Variable

class DiceCoeff(Function):
    """Dice coeff for individual examples"""

    def forward(self, input, target):
        # target = _make_one_hot(target, 2)
        self.save_for_backward(input, target)
        eps = 0.0001
        self.inter = torch.dot(input.view(-1), target.view(-1))
        self.union = torch.sum(input) + torch.sum(target) + eps
        # print("inter,uniun:",self.inter,self.union)

        t = (2 * self.inter.float() + eps) / self.union.float()
        return t

def dice_coeff(input, target):
    """Dice coeff for batches"""
    if input.is_cuda:
        s = torch.FloatTensor(1).cuda().zero_()
    else:
        s = torch.FloatTensor(1).zero_()

    # print("size of input, target:", input.shape, target.shape)

    for i, c in enumerate(zip(input, target)):
        s = s + DiceCoeff().forward(c[0], c[1])

    return s / (i + 1)


def dice_coeff_loss(input, target):
    return 1 - dice_coeff(input, target)

# Model specifications and training

In [None]:
model = U_Net(n_channels, n_classes)
#state=torch.load('/content/drive/MyDrive/PH2/model_wts/U_Net_weights_1_IoU.pth') # ekhane previous weight ta src korbi 
#model.load_state_dict(state['model_state'])
model = model.to(device)
# criterion = nn.MSELoss()
optim = torch.optim.Adam(model.parameters(), l_rate)
#optim.load_state_dict(state['optimizer_state'])

In [None]:
# Training

def train(model, epoch_n, optim): #,criterion):
  best_loss=0.0058 # ekhane previous best acc ta dibi in the format 0.XX
  train_loss_list = []
  val_loss_list = []
  for epoch in range(epoch_n):
    model.train()
    train_loss=iou_tr=total=dice_tr=0.0
    for _,data in enumerate(train_load):
      image = data['image'].to(device, dtype=torch.float32)
      mask_type = torch.float32 if n_classes == 1 else torch.long
      mask = data['mask'].to(device, dtype=mask_type)
      with torch.set_grad_enabled(True):
        mask_gen = model(image)
        loss = dice_coeff_loss(mask_gen,mask)
        loss.backward()
        optim.step()
      optim.zero_grad()
      train_loss += loss.item()
      total += mask.size(0)
      dice_tr += dice_coeff(mask_gen, mask).item()
      iou_tr += IoU_score(mask_gen,mask).item()
    val_loss, iou_val, dice_val = eval(model)#, criterion)
    epoch_train_loss = train_loss/len(train_ds)
    print("Epoch: {}".format(epoch+1))
    print('-'*10)
    print('Train Loss: {:.9f}'.format(epoch_train_loss))
    epoch_val_loss = val_loss/len(val_ds)
    print('Val Loss: {:.9f}'.format(epoch_val_loss))
    print('\n')
    print('Dice score train: {:.9f}'.format(dice_tr/len(train_load)))
    print('\n')
    print('Dice score val: {:.9f}'.format(dice_val/len(val_load)))
    print('\n')
    print('IoU score train: {:.9f}'.format(iou_tr/len(train_load)))
    print('\n')
    print('IoU score val: {:.9f}'.format(iou_val/len(val_load)))
    print('\n')
    plt.imshow(mask_gen.cpu().detach().numpy()[-9][0])
    _, axarr = plt.subplots(1,3)
    axarr[0] = plt.imshow(image.cpu().detach().numpy()[-9].transpose((1,2,0)))
    if best_loss > val_loss:
      state={
        "model_state":model.state_dict(),
        "optimizer_state":optim.state_dict(),
          }
      torch.save(state,'/content/drive/MyDrive/ISBI-2016_pre-processed/models/U_Net_weights_ConvNext.pth') # ekhane notun weight er src ta korbi 
      best_loss = val_loss
    train_loss_list = train_loss_list + [epoch_train_loss]
    val_loss_list = val_loss_list + [epoch_val_loss]
  #print("The model with the best performance has a score of :{:.4f}".format(best_loss))
  return model, train_loss_list, val_loss_list  


def eval(model):#, criterion):
  model.eval()
  with torch.no_grad():
    val_loss=dice_val=iou_val=0.0
    for _,data in enumerate(val_load):
      image = data['image'].to(device, dtype=torch.float32)
      mask_type = torch.float32 if n_classes == 1 else torch.long
      mask = data['mask'].to(device, dtype=mask_type)
      mask_gen = model(image)
      loss = dice_coeff_loss(mask_gen, mask)
      val_loss += loss.item()
      iou_val += IoU_score(mask_gen,mask).item()
      dice_val += dice_coeff(mask_gen,mask).item()
  return val_loss, iou_val, dice_val


# Mask generation of entire dataset

In [None]:

model, train_loss_list, val_loss_list = train(model, epoch_n, optim)#, criterion)