In [1]:
import numpy as np
from pathlib import Path
from glob import glob
import matplotlib.pyplot as plt

import torch
from torch.utils.data import DataLoader, Dataset 
import os
from torchvision.transforms import *
import torch
import torchvision
import torchvision.transforms as T
from PIL import Image

from torchvision.transforms import ToTensor
import logging
import torch.optim as optim
from tqdm import tqdm
from torchmetrics.functional import dice as pt_dice_score
import random
from PIL.ImageFilter import GaussianBlur
import math
from numpy import load
import segmentation_models_pytorch as smp

import segmentation_models_pytorch.utils.losses as smpLoss

In [2]:
txt_paths = r"C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/txts/"

def get_data_paths(files, data_target):
    data_paths = []
    for f in files:
        for t in data_target:
            if (f[:-4] == t[:-4]):
                data_paths.append(f)
    return data_paths


def get_data_without_folder(files, folder):
    data = []
    for f in files:
        if f not in folder:
            data.append(f)
    return data


def get_augmented(data_no_test, test_data):
    test_augmented = []
    for f in data_no_test:
        for t in test_data:
            if t[:-4]+"-aug" == f[:-6]:
                test_augmented.append(f)
    return test_augmented


def get_specific_data(dataset, data_type):
    dataset_na = []
    for f in dataset:
        if data_type in f:
            dataset_na.append(f)
    return dataset_na


def get_gan_images(dataset):
    gan_file = open(txt_paths+"cycle_gan_files.txt", "r")
    gan_data_jpg = gan_file.read().split("\n")
    gan = get_data_paths(dataset, gan_data_jpg)
    return gan


class CarDataset(Dataset):
    def __init__(self, imgs_dir, seed=42, num_opel=-1, num_door=-1,
                 num_deloitte_aug=-1, num_gan=-1, num_primary_multiple=1, augmentation=None, 
                 test=False, predictor=None, bg_manager=None, grayscale=False):
        self.imgs_dir = imgs_dir
        self.augmentation = augmentation
        self.predictor = predictor
        self.bg_manager = bg_manager
        self.grayscale = grayscale

        raw_ids = os.listdir(imgs_dir)

        random.seed(seed)
        random.shuffle(raw_ids)

        self.ids = []

        if (test == False):
            opel = get_specific_data(raw_ids, 'OPEL')
            if (num_opel == -1):
                self.ids = self.ids + opel
            else:
                assert(len(opel) >= num_opel)
                random.seed(seed)
                sample_opel = random.sample(opel, num_opel)
                self.ids = self.ids + sample_opel

            door = get_specific_data(raw_ids, 'DOOR')
            if (num_door == -1):
                self.ids = self.ids + door
            else:
                assert(len(door) >= num_door)
                random.seed(seed)
                sample_door = random.sample(door, num_door)
                self.ids = self.ids + sample_door

            aug = get_specific_data(raw_ids, '-aug')
            if (num_deloitte_aug == -1):
                self.ids = self.ids + aug
            else:
                assert(len(aug) >= num_deloitte_aug)
                random.seed(seed)
                sample_aug = random.sample(aug, num_deloitte_aug)
                self.ids = self.ids + sample_aug

            gan = get_gan_images(raw_ids)
            if (num_gan == -1):
                self.ids = self.ids + gan
            else:
                assert(len(gan) >= num_gan)
                random.seed(seed)
                sample_gan = random.sample(gan, num_gan)
                self.ids = self.ids + sample_gan

            primary_images = []
            for f in raw_ids:
                if ((f not in aug) and (f not in gan) and (f not in door) and (f not in opel)):
                    primary_images.append(f)
            for i in range(num_primary_multiple):
                self.ids = self.ids + primary_images
        else:
            self.ids = raw_ids

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, i):
        idx = self.ids[i]

        np_obj = glob(self.imgs_dir + idx)

        data = load(np_obj[0])

        img = data[0:3]
        # We try to remove the background
        if (self.predictor != None):
            image_bg = self.bg_manager.get_image(np_obj[0])
            if (image_bg == 'empty'):
                img_no_bg = self.bg_manager.get_img_no_bg(self.predictor, np.dstack(img))
                if (img_no_bg != 'empty'):
                    self.bg_manager.add_image(np_obj[0],img_no_bg)
                else: 
                    self.bg_manager.add_image(np_obj[0],'empty')
            else: 
                img_no_bg = image_bg
            if (img_no_bg != 'empty'):
                img = torch.from_numpy(img_no_bg).type(
                    torch.FloatTensor)  # numpy -> torch
                # The predictor takes H,W,C so we make it C,H,W again
                img = img.permute(2, 0, 1)
            else:
                # print("------- COULD NOT REMOVE BACKGROUND OF IMAGE: ---------")
                # print(np_obj)
                # print("-------------------------------------------------------")
                img = torch.from_numpy(img).type(
                    torch.FloatTensor)  # numpy -> torch
        else:
            img = torch.from_numpy(img).type(
                torch.FloatTensor)  # numpy -> torch

        mask = data[-1]
        mask = torch.from_numpy(mask).type(torch.FloatTensor)  # numpy -> torch
        mask = torch.nn.functional.one_hot(
            mask.to(torch.int64), 9)  # We one-hot-encode the mask
        mask = mask.permute(2, 0, 1)  # (256,256,9) -> (9,256,256)

        if (self.augmentation != None):
            # Choose a numpy seed and ensure that transforms use it
            seed = np.random.randint(2147483647)
            np.random.seed(seed)
            torch.manual_seed(seed)
            img = self.augmentation(img)
            np.random.seed(seed)
            torch.manual_seed(seed)
            mask = self.augmentation(mask)
        
        if (self.grayscale == True):
            img = T.Grayscale()(img)

        return img, mask

## Filter data 

In [3]:


save_path = r"C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/filtered_data/"
data_path = r"C:/Users/tala1/Downloads/carseg_data/carseg_data/clean_data_test/"
files = os.listdir(data_path)

for file in files:
    path_d = data_path + file
    img_array = np.load(path_d)
    rgb_dims = img_array[0:3]
    rgb_dims[0,:,:]=(rgb_dims[0,:,:]+0.485/0.229-0.485)/0.229
    rgb_dims[1,:,:]=(rgb_dims[1,:,:]+0.456/0.224-0.456)/0.224
    rgb_dims[2,:,:]=(rgb_dims[2,:,:]+0.406/0.225-0.406)/0.225
    #print(img_array[3])
    mask = img_array[3]
    combined_data = np.append(rgb_dims, [mask], axis=0)
    path_s = save_path + file
    np.save(path_s, combined_data)

## Split data in test/valid/train  

In [4]:
def get_data_paths(files, data_target):
    data_paths = []
    for f in files:
        for t in data_target:
            if (f[:-4] == t[:-4]):
                data_paths.append(f)
    return data_paths


def get_data_without_folder(files, folder):
    data = []
    for f in files:
        if f not in folder:
            data.append(f)
    return data


def get_augmented(data_no_test, test_data):
    test_augmented = []
    for f in data_no_test:
        for t in test_data:
            if t[:-4]+"-aug" == f[:-6]:
                test_augmented.append(f)
    return test_augmented


def get_specific_data(dataset, data_type):
    dataset_na = []
    for f in dataset:
        if data_type in f:
            dataset_na.append(f)
    return dataset_na


def save_files(data_path, save_path, arr):
    for i in arr:
        path = data_path + i
        data = np.load(path)
        new_path = save_path + i
        np.save(new_path, data)


# The folder where you run this script
txt_paths = r"C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/txts/"
data_path = r"C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/filtered_data/"
save_path =r"C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/splitted_data/"
files = os.listdir(data_path)

# First we get the 30 test images given by martin, and remove the augmented versions of them
test_file = open(txt_paths+"test_data.txt", "r")
test_data_jpg = test_file.read().split("\n")

test = get_data_paths(files, test_data_jpg)
data_no_test = get_data_without_folder(files, test)
test_augmented = get_augmented(data_no_test, test)
training_data = get_data_without_folder(data_no_test, test_augmented)

# We then get all the gan images and save them
gan_file = open(txt_paths+"cycle_gan_files.txt", "r")
gan_data_jpg = gan_file.read().split("\n")
gan = get_data_paths(files, gan_data_jpg)
training_no_gan = get_data_without_folder(training_data, gan)

# We get all the opel images and save them
opel = get_specific_data(training_no_gan, 'OPEL')
training_no_opel = get_data_without_folder(training_no_gan, opel)

# We get all the door images and save them
door = get_specific_data(training_no_opel, 'DOOR')
training_no_door = get_data_without_folder(training_no_opel, door)

# We get all the augmented images and save them
aug = get_specific_data(training_no_door, "-aug")
training_primary = get_data_without_folder(training_no_door, aug)

##################################################################################
# Now we split each of the different data kinds up in 80/20 train and validation #
seed_int = 32  # set seed to get same splits everytime

random.seed(seed_int)
train_primary = random.sample(
    training_primary, math.ceil(len(training_primary)*0.80))
val_primary = [i for i in training_primary if i not in train_primary]
print("train prim len:", len(train_primary),
      "- val prim len:", len(val_primary))

# We get all the augmented of our primary data and add them in train and aug
train_aug = get_augmented(aug, train_primary)
val_aug = get_augmented(aug, val_primary)
print("train aug len:", len(train_aug), "- val aug len:", len(val_aug))

random.seed(seed_int)
train_opel = random.sample(opel, math.ceil(len(opel)*0.80))
val_opel = [i for i in opel if i not in train_opel]

random.seed(seed_int)
train_door = random.sample(door, math.ceil(len(door)*0.80))
val_door = [i for i in door if i not in train_door]

random.seed(seed_int)
train_gan = random.sample(gan, math.ceil(len(gan)*0.80))
val_gan = [i for i in gan if i not in train_gan]

# And finally we now have a completely fair split of 80/20 of each kind
validation = val_gan+val_door+val_opel+val_aug+val_primary
train = train_gan+train_door+train_opel+train_aug+train_primary

# Unit tests to ensure that the 3 are completely split up
for i in train:
    assert(i not in validation)
    assert(i not in test)

for i in validation:
    assert(i not in test)

# Save images
save_files(data_path, save_path+"test/", test)
save_files(data_path, save_path+"validation/", validation)
save_files(data_path, save_path+"train/", train)

train prim len: 8 - val prim len: 1
train aug len: 0 - val aug len: 0


## Background manager

In [5]:
class BgManager():
    detectron_map = {}

    def add_image(self, image_name, nobg_image):
        self.detectron_map[image_name] = nobg_image

    def get_image(self, image_name):
        if image_name in self.detectron_map.keys():
            return self.detectron_map.get(image_name)
        else: 
            return 'empty' 

    def get_img_no_bg(self, predictor, image):
        pred = predictor(image)
        mask = pred["instances"].pred_masks
        if (len(mask) == 0):
            return 'empty'
        i = len(mask[0][0])
        j = len(mask[0])
        test = image
        for j1 in range(j):
            for i1 in range(i):
                if (pred["instances"].pred_masks[0][j1][i1] == False):
                    test[j1,i1] = 2
        return 

In [6]:
validation_path =  r"C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/splitted_data/validation/"
train_path = r"C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/splitted_data/train/"
test_path =r"C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/splitted_data/test/"

In [7]:
transform = transforms.Compose([
        RandomHorizontalFlip(p=0.5),
        RandomPerspective(distortion_scale=0.3, p=0.4),
        transforms.RandomApply(transforms=[
            RandomResizedCrop(size=(256, 256), scale=(0.40, 1.0)),
        ], p=0.4),
  #      transforms.RandomApply(transforms=[
  #          GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
  #      ], p=0.2),
        transforms.RandomErasing(p=0.2), 
        transforms.RandomRotation(degrees=(-10, 10)), 
    ])

In [8]:
# Trained with this data:

  # def __init__(self, imgs_dir, seed=42, num_opel=-1, num_door=-1,
   #              num_deloitte_aug=-1, num_gan=-1, num_primary_multiple=1, augmentation=None, 
   #              test=False, predictor=None, bg_manager=None, grayscale=False):

train_dataset = CarDataset(train_path, augmentation=transform)
validation_dataset = CarDataset(validation_path, num_gan=0, num_deloitte_aug=0, num_opel=0, num_door=0, num_primary_multiple=1)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=0, drop_last=True)
valid_loader = DataLoader(validation_dataset, batch_size=1, shuffle=False, num_workers=0)
 
# We define the model: 
model = smp.Unet(
    encoder_name='timm-resnest200e', # We use the ResNeSt 200 backbone
    encoder_weights='imagenet', # The backbone is trained on imagenet
    classes=9, # We have 9 classes
    activation='softmax2d', # The last activation is a softmax
    in_channels=3
)


In [None]:
def save_logs(train_log, valid_log):
    np.save("./models/train_log.npy", train_log)
    np.save("./models/valid_log.npy", valid_log)
    
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

criterion = smpLoss.DiceLoss() # The SMP library also contains various loss functions


optimizer = torch.optim.Adam([ 
    dict(params=model.parameters(), lr=0.0001),
])

model.to(DEVICE)

min_score = 1 

train_logs = []
valid_logs = []

EPOCHS = 2
for i in range(0, EPOCHS):
    print('\nEpoch: {}'.format(i))
    train_log = []
    model.train()
    for image, mask in train_loader:
        image = image.to(DEVICE)
        mask = mask.to(DEVICE)
        
        optimizer.zero_grad()

        pred = model(image)

        loss = criterion(pred, mask)
        loss.backward()

        optimizer.step()

        train_log.append(loss.item())
    
    train_mean = np.mean(train_log)
    print("Training loss: ",train_mean)
    train_logs.append(train_mean)

    valid_log = []
    model.eval()
    for image, mask in valid_loader:
        image = image.to(DEVICE)
        mask = mask.to(DEVICE)   

        pred = model(image)

        loss = criterion(pred,mask)
        valid_log.append(loss.item())

    valid_mean = np.mean(valid_log)
    print("Validation loss: ",valid_mean)
    valid_logs.append(valid_mean)

    if (min_score > valid_mean):
        min_score = valid_mean
        torch.save(model.state_dict(), 'best_model_dict.pth')
        print("Model saved!")
    if i == EPOCHS/2:
        optimizer.param_groups[0]['lr'] = 1e-5
        print('---- Decreased Learning Rate to 1e-5! ----')

save_logs(train_logs, valid_logs)


Epoch: 0


In [None]:
train_log = np.load(r'C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/models/train_log.npy')
valid_log = np.load(r'C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/models/valid_log.npy')
 
plt.clf()
plt.plot(train_log, label="Training")
plt.plot(valid_log, label="Validation")
plt.xlabel('Epochs')
plt.ylabel('Micro Dice Loss')
plt.legend()
plt.show()


In [None]:
from torchmetrics.functional import dice_score, accuracy

def calc_test_metrics(model, test_dataloader):
    
    dice_scores_macro = []
    accuracy_macro =  []
    
    for i in test_dataloader:
        img, mask = i
        pr_mask = model.predict(img) # Predict the mask according to the image
        pred = pr_mask[0]
        truth = mask[0]
        
        # We go from [9,256,256] -> [256,256] - e.i. onehot encode to integer encode
        pred_label = torch.argmax(pred, dim=0)
        truth_label = torch.argmax(truth, dim=0)
        
        truth_flat = truth_label.view(-1) # go from [256,256] -> [256*256]
        pred_flat = torch.flatten(pred, start_dim=1) # go from [9,256,256] -> [9,256*256]
        pred_flat = pred_flat.permute(1,0) # go from [9,256*256] -> [256*256,9]]

        # calculate dice score macro with only present channels
        data_dicescore = dice_score(pred_flat, truth_flat, reduction='none', no_fg_score=-1)
        masked_dices = torch.masked_select(data_dicescore,data_dicescore.not_equal(-1))
        dice_scores_macro.append(masked_dices.mean())
        
        # calculate accuracy
        acc = accuracy(pred_label, truth_label, average='macro', num_classes=9)
        accuracy_macro.append(acc)

    return np.mean(dice_scores_macro), np.mean(accuracy_macro)    

In [None]:
models_base_path = r'C:/Users/tala1/Skrivebord/deeplearning/deeplearning-final-project/models'
test_dataset = CarDataset(test_path, test=True)
test_dataloader = DataLoader(test_dataset, shuffle=False) 

In [None]:
# We use the model to measure performance on the test data: 
model.eval()
dice, accuracy = calc_test_metrics(model, test_dataloader)
print("Dice Score: ", dice)
print("Accuracy: ", accuracy)

## Plots

In [None]:
def visualize(car_img=None, mask=None, predicted=None):
    n = 3
    plt.figure(figsize=(16, 5))
    plt.subplot(1, n, 1)
    plt.imshow(np.dstack(car_img))
    plt.title("Actual image")
    plt.subplot(1, n, 2)
    plt.imshow(mask)
    plt.title("True mask")
    plt.subplot(1, n, 3)
    plt.imshow(predicted)
    plt.title("Model prediction")
    plt.show()
    
def prep_and_viz(data, model):
    img, mask = data

    mask = mask.permute(1,2,0)
    mask = torch.argmax(mask, dim=2)

    pred = model.predict(img.unsqueeze(0))
    
    pred = pred.squeeze().cpu().permute(1, 2, 0)
    pred = torch.argmax(pred, dim=2)

    img1 = img.permute(1,2,0)
    visualize(img1,mask,pred)

In [None]:
prep_and_viz(test_dataset[4], model)


In [None]:
plt.imshow((test_dataset[1]))