In [1]:
import torch
import torch.utils.data as data
import cv2
import os
from glob import glob

class TrainDataset(data.Dataset):
    def __init__(self, root=''):
        super(TrainDataset, self).__init__()
        self.img_files = glob(os.path.join(root,'image','*.png'))
        self.img_files.sort()
        self.mask_files = []
        for img_path in self.img_files:
            basename = os.path.basename(img_path)
            self.mask_files.append(os.path.join(root,'mask',basename[:-4]+'_mask.png'))
        # get the path of these images  

    def __getitem__(self, index):
            img_path = self.img_files[index]
            mask_path = self.mask_files[index]
            data = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
            label = cv2.imread(mask_path, cv2.IMREAD_UNCHANGED)
            return torch.from_numpy(data).float(), torch.from_numpy(label).float(), img_path
            #change it from numpy to tesnor

    def __len__(self):
        return len(self.img_files)
        # how many train images

class TestDataset(data.Dataset):
    def __init__(self, root=''):
        super(TestDataset, self).__init__()
        self.img_files = glob(os.path.join(root,'image','*.png'))
        self.img_files.sort()

    def __getitem__(self, index):
            img_path = self.img_files[index]
            data = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
            return torch.from_numpy(data).float(), img_path

    def __len__(self):
        return len(self.img_files)

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class DoubleConv(nn.Module):


    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        
        if not mid_channels:
            mid_channels = out_channels
            
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):


    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
#         diffY = x2.size()[2] - x1.size()[2]
#         diffX = x2.size()[3] - x1.size()[3]

#         x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
#                         diffY // 2, diffY - diffY // 2])
        x = torch.cat([x2, x1], dim=1)

        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

In [3]:
import torch.nn.functional as F



class UNet(nn.Module):
    def __init__(self, n_channels, n_classes):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes


        self.inc = DoubleConv(n_channels, 64)
        
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        self.down4 = Down(512, 512)
        
        self.up1 = Up(1024, 256)
        self.up2 = Up(512, 128)
        self.up3 = Up(256, 64)
        self.up4 = Up(128, 64)
        
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)

        
        x2 = self.down1(x1)

        x3 = self.down2(x2)

        x4 = self.down3(x3)

        x5 = self.down4(x4)

        
        x = self.up1(x5, x4)

        x = self.up2(x, x3)

            
        x = self.up3(x, x2)

        
        x = self.up4(x, x1)

        logits = self.outc(x)

        return logits

model = UNet(1, 4)

In [5]:
learning_rate = 0.001
Loss = nn.CrossEntropyLoss()
#optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.175, nesterov=True)


In [6]:
from torch.utils.data import DataLoader
import time

data_path = './data/train'
path = './data/unet_sgd_0.175N.pt'
num_workers = 0
batch_size = 4
train_set = TrainDataset(data_path)
training_data_loader = DataLoader(dataset=train_set, num_workers=num_workers, batch_size=batch_size, shuffle=True)
num_epochs = 20
# Fetch images and labels. 

start = time.time()
for epoch in range(num_epochs):
    print(epoch)
    model.train()
    for iteration, sample in enumerate(training_data_loader):
        img, mask, name = sample

        img1 = img.unsqueeze(1)

        # Write your FORWARD below
        # Note: Input image to your model and ouput the predicted mask and Your predicted mask should have 4 channels
    

 
        y_predict = model.forward(img1)
        

#         mask = torch.tensor(mask, dtype = torch.long)
        mask = mask.type(torch.long)

        loss = Loss(y_predict, mask)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
        # Then write your BACKWARD & OPTIMIZE below
        # Note: Compute Loss and Optimize
    
       
        
    torch.save(model.state_dict(), path)
finish = time.time()
train_time = finish - start
print(train_time)

0


  Variable._execution_engine.run_backward(


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1516.0692520141602


In [7]:
import numpy as np
from torch.autograd import Variable
from torch.utils.data import DataLoader

from torchvision.utils import save_image

path = './data/unet_sgd_0.175N.pt'

num_workers = 0
batch_size = 2

val_path = './data/val/'
val_set = TrainDataset(val_path)
val_data_loader = DataLoader(dataset=val_set, num_workers=num_workers,batch_size=batch_size, shuffle=False)

i= 101

model.load_state_dict(torch.load(path))
model.eval()
for iteration, sample in enumerate(val_data_loader):
    
    img, mask, name = sample
    img2 = img.unsqueeze(1)

    x = model.forward(img2)
    
    mask1 = x[0]
    mask2 = x[1]

    
    mask1 = torch.argmax(mask1.squeeze(), dim=0)
    mask2 = torch.argmax(mask2.squeeze(), dim=0)

    mask1 = mask1.detach().numpy()
    mask2 = mask2.detach().numpy()
    cv2.imwrite('./data/val/mask2/cmr{}_mask.png'.format(i), mask1)
    i += 1
    cv2.imwrite('./data/val/mask2/cmr{}_mask.png'.format(i), mask2)
    i += 1

In [116]:
# In this block you are expected to write code to load saved model and deploy it to all data in test set to 
# produce segmentation masks in png images valued 0,1,2,3, which will be used for the submission to Kaggle.
from torchvision.utils import save_image

path = './data/unet_sgd_0.175.pt'

test_path = './data/test'
num_workers = 0
batch_size = 2

test_set = TestDataset(test_path)
test_data_loader = DataLoader(dataset=test_set, num_workers=num_workers, batch_size = batch_size, shuffle=False)
i= 121

model.load_state_dict(torch.load(path))
model.eval()
for sample, name in test_data_loader:

    img = sample.unsqueeze(1)

    x = model.forward(img)
    
    mask1 = x[0]
    mask2 = x[1]
    
    mask1 = torch.argmax(mask1.squeeze(), dim=0)
    mask2 = torch.argmax(mask2.squeeze(), dim=0)


    mask1 = mask1.detach().numpy()
    mask2 = mask2.detach().numpy()

    cv2.imwrite('./data/test/mask/cmr{}_mask.png'.format(i), mask1)
    i += 1
    cv2.imwrite('./data/test/mask/cmr{}_mask.png'.format(i), mask2)
    i += 1

In [8]:
def categorical_dice(mask1, mask2, label_class=1):
    """
    Dice score of a specified class between two volumes of label masks.
    (classes are encoded but by label class number not one-hot )
    Note: stacks of 2D slices are considered volumes.

    Args:
        mask1: N label masks, numpy array shaped (H, W, N)
        mask2: N label masks, numpy array shaped (H, W, N)
        label_class: the class over which to calculate dice scores

    Returns:
        volume_dice
    """
    mask1_pos = (mask1 == label_class).astype(np.float32)
    mask2_pos = (mask2 == label_class).astype(np.float32)
    dice = 2 * np.sum(mask1_pos * mask2_pos) / (np.sum(mask1_pos) + np.sum(mask2_pos))
    return dice

In [9]:

data_dir = './data/val'
mask1 =cv2.imread(os.path.join(data_dir,'mask','cmr103_mask.png'), cv2.IMREAD_UNCHANGED)
mask2 = cv2.imread(os.path.join(data_dir,'mask2','cmr103_mask.png'), cv2.IMREAD_UNCHANGED)

label_class = 2
x = categorical_dice(mask1, mask2, label_class)
mask11 =cv2.imread(os.path.join(data_dir,'mask','cmr101_mask.png'), cv2.IMREAD_UNCHANGED)
mask12 = cv2.imread(os.path.join(data_dir,'mask2','cmr101_mask.png'), cv2.IMREAD_UNCHANGED)
    
mask21 =cv2.imread(os.path.join(data_dir,'mask','cmr102_mask.png'), cv2.IMREAD_UNCHANGED)
mask22 = cv2.imread(os.path.join(data_dir,'mask2','cmr102_mask.png'), cv2.IMREAD_UNCHANGED)
    
mask31 =cv2.imread(os.path.join(data_dir,'mask','cmr103_mask.png'), cv2.IMREAD_UNCHANGED)
mask32 = cv2.imread(os.path.join(data_dir,'mask2','cmr103_mask.png'), cv2.IMREAD_UNCHANGED)
    
mask41 =cv2.imread(os.path.join(data_dir,'mask','cmr104_mask.png'), cv2.IMREAD_UNCHANGED)
mask42 = cv2.imread(os.path.join(data_dir,'mask2','cmr104_mask.png'), cv2.IMREAD_UNCHANGED)
    
mask51 =cv2.imread(os.path.join(data_dir,'mask','cmr105_mask.png'), cv2.IMREAD_UNCHANGED)
mask52 = cv2.imread(os.path.join(data_dir,'mask2','cmr105_mask.png'), cv2.IMREAD_UNCHANGED)


true_mask =np.array([mask11, mask21, mask31, mask41, mask51])
predict_mask = np.array([mask12, mask22, mask32, mask42, mask52])

                        

# label_class = 2

#y = categorical_dice(mask1, mask2, 0)
#print(y)
sum5 = 0
for i in range(5):

    summary = 0
    labe_class = 0
    for j in range(3):
        x = categorical_dice(true_mask[i], predict_mask[i], labe_class)
        summary +=x
        labe_class +=1
    average = summary/3
    sum5 +=average
    #print(average)

print(sum5/5)

0.7738146544147702


UPDATED USING 20 VALIDATION SAMPLES INSTEAD OF 5

In [18]:
data_dir = './data/val'
#true_mask = np.array([])
#predict_mask = np.array([])
true_mask = []
predict_mask = []
for i in range(101,121):
    true_mask.append(cv2.imread(os.path.join(data_dir,'mask','cmr{}_mask.png'.format(i)), cv2.IMREAD_UNCHANGED))
    predict_mask.append(cv2.imread(os.path.join(data_dir,'mask2','cmr{}_mask.png'.format(i)), cv2.IMREAD_UNCHANGED))
    
#print(true_mask)

sum20 = 0
for i in range(20):
    summary = 0
    labe_class = 0
    for j in range(3):
        x = categorical_dice(true_mask[i], predict_mask[i], labe_class)
        summary +=x
        labe_class +=1
    average = summary/3
    sum20 +=average
    #print(average)

print(sum20/20)


0.7295323859322067


In [27]:
import numpy as np
import os
import cv2

def rle_encoding(x):
    '''
    *** Credit to https://www.kaggle.com/rakhlin/fast-run-length-encoding-python ***
    x: numpy array of shape (height, width), 1 - mask, 0 - background
    Returns run length as list
    '''
    dots = np.where(x.T.flatten() == 1)[0]
    run_lengths = []
    prev = -2
    for b in dots:
        if (b > prev + 1): run_lengths.extend((b + 1, 0))
        run_lengths[-1] += 1
        prev = b
    return run_lengths


def submission_converter(mask_directory, path_to_save):
    writer = open(os.path.join(path_to_save, "submission.csv"), 'w')
    writer.write('id,encoding\n')

    files = os.listdir(mask_directory)



    for file in files:
        name = file[:-4]
        mask = cv2.imread(os.path.join(mask_directory, file), cv2.IMREAD_UNCHANGED)

        
        mask1 = (mask == 1)
        mask2 = (mask == 2)
        mask3 = (mask == 3)


        encoded_mask1 = rle_encoding(mask1)
        encoded_mask1 = ' '.join(str(e) for e in encoded_mask1)
        encoded_mask2 = rle_encoding(mask2)
        encoded_mask2 = ' '.join(str(e) for e in encoded_mask2)
        encoded_mask3 = rle_encoding(mask3)
        encoded_mask3 = ' '.join(str(e) for e in encoded_mask3)

        writer.write(name + '1,' + encoded_mask1 + "\n")
        writer.write(name + '2,' + encoded_mask2 + "\n")
        writer.write(name + '3,' + encoded_mask3 + "\n")

    writer.close()

In [119]:
submission_converter("C:/Users/MattH/Desktop/CW2 - Neural Computation/CW2/data/test/mask", "C:/Users/MattH/Desktop/CW2 - Neural Computation/CW2/data/test/submission_directory")
