In [None]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
from glob import glob
import os
import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda
from torchvision import transforms
import math
from torchinfo import summary
from torch.nn.parameter import Parameter
from torch import nn
import torch.nn.functional as F
from PIL import Image
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")

In [None]:
cats = ['Clean_ins',
        'DATI',
        'OffTrkWrite',
        'Scratch_radial',
        'Ser_deg_ins',
        'Ser_instability',
        'Spacing_MD_PW_SerDeg',
        'Wr_related_1sct']

In [None]:
def get_model_size(model):
    param_size = 0
    for param in model.parameters():
        param_size += param.nelement() * param.element_size()
    buffer_size = 0
    for buffer in model.buffers():
        buffer_size += buffer.nelement() * buffer.element_size()

    size_all_mb = (param_size + buffer_size) / 1024**2
    # print('model size: {:.3f}MB'.format(size_all_mb))
    return size_all_mb

# Segregate training data to train (80%) and validation (20%) data 
*Each category is equally segregated to **training** and **validation** folder*

In [None]:
import shutil

def consolidate_data(datapaths:list,categories:list):
    img_dict = {}
    for cat in categories:
        img_dict[cat] = []
        
    dupe_all = []
    img_all = []
    for datapath in datapaths:
        imgpaths = glob(datapath)
        img_name = []
        dupe_name = []
        for imgpath in imgpaths:
            if os.path.basename(imgpath) in img_all:
                dupe_name.append(imgpath)
                continue
            no_cat = True
            for cat in categories:
                if cat in os.path.dirname(imgpath):
                    img_dict[cat].append(imgpath)
                    no_cat = False
            img_name.append(os.path.basename(imgpath))
            
        dupe_all += dupe_name
        img_all += img_name
        
    return img_dict,img_all,dupe_all

def split_train_test(datapaths:list,categories:list,train_ratio:float,dataset:str='both'):
    train_dir = './data/training'
    val_dir = './data/validation'
    
    # categories.append('NG')
    for cat in categories:
        train_path = os.path.join(train_dir,cat)
        val_path = os.path.join(val_dir,cat)
        os.makedirs(train_path,exist_ok=True)
        os.makedirs(val_path,exist_ok=True)
        
    img_dict,img_all,dupe_all = consolidate_data(datapaths,categories)
    # print(len(img_all))
    l = 0
    for k in img_dict.keys():
        # print(len(img_dict[k]))
        l += len(img_dict[k])
    # print(l)
    
    # print('\n')
    for cat in categories:
        cat_img = img_dict[cat]
        # print(len(cat_img))
        np.random.shuffle(cat_img)
        if train_ratio < 1 and dataset == 'both':
            train_len = int(len(cat_img) * train_ratio)
            train_img = cat_img[:train_len]
            val_img = cat_img[train_len:]
            # print(len(train_img),len(val_img),len(train_img)+len(val_img))

            for imgpath in train_img:
                # img = cv2.imread(imgpath)
                # cv2.imwrite(os.path.join(train_dir,cat,os.path.basename(imgpath)),img)
                shutil.copy(imgpath,os.path.join(train_dir,cat,os.path.basename(imgpath)))
                pass

            for imgpath in val_img:
                # img = cv2.imread(imgpath)
                # cv2.imwrite(os.path.join(val_dir,cat,os.path.basename(imgpath)),img)
                shutil.copy(imgpath,os.path.join(val_dir,cat,os.path.basename(imgpath)))
                pass
        elif train_ratio == 1 and (dataset == 'validate' or dataset == 'training'):
            if dataset == 'train':
                for imgpath in cat_img:
                    # img = cv2.imread(imgpath)
                    # if not cv2.imwrite(os.path.join(train_dir,cat,os.path.basename(imgpath)),img):
                    #     print("not saved")
                    shutil.copy(imgpath,os.path.join(train_dir,cat,os.path.basename(imgpath)))
            else:
                for imgpath in cat_img:
                    # img = cv2.imread(imgpath)
                    # if not cv2.imwrite(os.path.join(val_dir,cat,os.path.basename(imgpath)),img):
                    #     print("not saved")
                    shutil.copy(imgpath,os.path.join(val_dir,cat,os.path.basename(imgpath)))
        else:
            raise Exception("train ratio and dataset not tally")
            
    return img_all,dupe_all

In [None]:

datapaths = [
    '../train_8c_crop_4comp/*/*.png'
]

categories = cats

img_dict,img_all,dupe_all = consolidate_data(datapaths,categories)
img_all,dupe_all = split_train_test(datapaths,categories,0.8,dataset='both')

# Train Model

In [None]:
import sklearn

class_to_label_dict = {
    'Clean_ins': 0,
    'DATI': 1,
    'OffTrkWrite': 2,
    'Scratch_radial': 3,
    'Ser_deg_ins': 4,
    'Ser_instability': 5,
    'Spacing_MD_PW_SerDeg': 6,
    'Wr_related_1sct': 7
}

label_to_class_dict = {
    0: 'Clean_ins',
    1: 'DATI',
    2: 'OffTrkWrite',
    3: 'Scratch_radial',
    4: 'Ser_deg_ins',
    5: 'Ser_instability',
    6: 'Spacing_MD_PW_SerDeg',
    7: 'Wr_related_1sct'
}

## Define a CNN architecture with 4 Conv layer + 4 FCN layer 

In [None]:
class BitflipModel(nn.Module):
    def __init__(self, chnum_in):
        super(BitflipModel, self).__init__()
        self.chnum_in = chnum_in
        self.conv1 = nn.Sequential(
            nn.Conv2d(self.chnum_in, 8, (3,3),stride=1, padding='same'),
            nn.BatchNorm2d(num_features=8),
            nn.ReLU(inplace=True)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(8, 32, (3,3), stride=1,padding='same'),
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(inplace=True)
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(32, 16, (3,3),stride=1, padding='valid'),
            nn.BatchNorm2d(num_features=16),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=0.2),
        )
        
        self.conv4 = nn.Sequential(
            nn.Conv2d(16, 8, (3,3),stride=1, padding='same'),
            nn.BatchNorm2d(num_features=8),
            nn.ReLU(inplace=True),
        )
        
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(8,32)
        self.linear2 = nn.Linear(32,64)
        self.linear3 = nn.Linear(64,16)
        self.linear4 = nn.Linear(16,8)


    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.maxpool(x)
        x = self.conv3(x)
        x = self.maxpool(x)
        x = self.conv4(x)
        x = self.maxpool(x)
        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.linear(x)
        x = self.linear2(x)
        x = self.linear3(x)
        x = self.linear4(x)
        return x

In [91]:
model = BitflipModel(3)
model.to(device)
print(summary(model,input_size=(1,3,113,224),device=device))
print(f"Wrong model size: {get_model_size(model)*1000**2}")
print(f"Actual model size: {12048*4}")
print(f"Actual competition score: {0.8972*100 + 534.72/(12048*4)}")

Layer (type:depth-idx)                   Output Shape              Param #
BitflipModel                             [1, 8]                    --
├─Sequential: 1-1                        [1, 8, 113, 224]          --
│    └─Conv2d: 2-1                       [1, 8, 113, 224]          224
│    └─BatchNorm2d: 2-2                  [1, 8, 113, 224]          16
│    └─ReLU: 2-3                         [1, 8, 113, 224]          --
├─Sequential: 1-2                        [1, 32, 113, 224]         --
│    └─Conv2d: 2-4                       [1, 32, 113, 224]         2,336
│    └─BatchNorm2d: 2-5                  [1, 32, 113, 224]         64
│    └─ReLU: 2-6                         [1, 32, 113, 224]         --
├─MaxPool2d: 1-3                         [1, 32, 56, 112]          --
├─Sequential: 1-4                        [1, 16, 54, 110]          --
│    └─Conv2d: 2-7                       [1, 16, 54, 110]          4,624
│    └─BatchNorm2d: 2-8                  [1, 16, 54, 110]          32
│    └─R

## Define dataset and dataloader

In [None]:
class CustomDataset(Dataset):

    def __init__(self, image_path, transform):
        self.image_path = image_path
        self.transform = transform

    def __len__(self):
        return len(self.image_path)

    def __getitem__(self, idx):
        image = Image.open(self.image_path[idx]).convert('RGB')
        image = self.transform(image)
        
        for k in class_to_label_dict.keys():
            if k in self.image_path[idx]:
                label = class_to_label_dict[k]
            
        return (image,label)
        # return (img,roi,label)

In [None]:
transform_dict = {
    'train': transforms.Compose(
        [transforms.RandomChoice([
             transforms.Compose([
                 transforms.RandomVerticalFlip(p=0.5),
                 transforms.RandomHorizontalFlip(p=0.5)
             ]),
             transforms.RandomVerticalFlip(p=0.5),
             transforms.RandomHorizontalFlip(p=0.5)
         ]),
         transforms.RandomAffine(degrees=0,translate=(0,0.1)),
         transforms.Resize((113,224)), #145,435, #128,336
         transforms.CenterCrop((113,210)),
         transforms.ToTensor()
         ]),
    'test': transforms.Compose(
        [transforms.Resize((113,224)),
         transforms.CenterCrop((113,210)),
         transforms.ToTensor()
         ])}

def load_data(data_folder, batch_size, phase='train', train_val_split=True, train_ratio=.8):

    data = CustomDataset(data_folder,transform_dict[phase])
    
    if phase == 'train':
        if train_val_split:
            train_size = int(train_ratio * len(data))
            test_size = len(data) - train_size
            data_train, data_val = torch.utils.data.random_split(data, [train_size, test_size])
            train_loader = torch.utils.data.DataLoader(data_train, batch_size=batch_size, shuffle=True, drop_last=False,
                                                    num_workers=4)
            val_loader = torch.utils.data.DataLoader(data_val, batch_size=batch_size, shuffle=False, drop_last=False,
                                                num_workers=4)
            return [train_loader, val_loader]
        else:
            train_loader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=True, drop_last=False,
                                                    num_workers=4)
            return train_loader
    else: 
        test_loader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=False, drop_last=False,
                                                    num_workers=4)
        return test_loader

num_batch = 8
directory_train = glob('./data/training/*/*.png')
directory_test = glob('./data/validation/*/*.png') 
loader_train = load_data(directory_train, batch_size=num_batch, phase='train', train_val_split=False, train_ratio=0.5)
loader_test = load_data(directory_test, batch_size=num_batch, phase='test', train_val_split=False, train_ratio=0.5)

## Calculate class weight of dataset

In [None]:
yw = []
for d in directory_train:
    for k in class_to_label_dict.keys():
        if k in d:
            label = class_to_label_dict[k]
    yw.append(label)
    
class_weight = sklearn.utils.class_weight.compute_class_weight(class_weight='balanced', classes=np.unique(yw),y=yw)
class_weight = torch.tensor(class_weight).float().to(device)
class_weight
# pos_weight = class_weight[1]/class_weight[0]
# pos_weight

## Define optimizer, loss function, epochs and learning rate

In [None]:
learning_rate = 0.001
epochs = 1

loss_fn = nn.CrossEntropyLoss(weight=class_weight)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
schedulerplat = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min',factor=0.9,patience=5)

In [None]:
def get_accuracy(y_true, y_prob):
    _,y_prob = torch.max(y_prob,1)
    return (y_prob == y_true).float().mean().item()

def train_loop(dataloader, model, loss_fn, optimizer):
    model.train()
    # model.apply(set_bn_eval)
    size = len(dataloader.dataset)
    print(size)
    loss_acc = 0
    correct_acc = 0
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        X, y = X.to(device), y.type(torch.LongTensor).to(device)
        X, y = X.to(device), y.to(device)
        # y = y.float()
        pred = model(X)
        # pred = pred.squeeze(1)
        loss = loss_fn(pred, y)
        # print(loss)
        correct = get_accuracy(y,pred)
        loss_acc += loss.item()
        correct_acc += correct

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # print(len(X))
        if batch % 50 == 0:
            # print(batch,len(X))
            loss, correct,current = loss.item(), correct,batch * len(X)
            print(f"loss: {loss:>7f}  correct: {correct:>7f} [{current:>5d}/{size:>5d}]")
            
    loss_acc /= (np.ceil(size/num_batch))
    correct_acc /= (np.ceil(size/num_batch))
    
    print(f"Train Error: \n Accuracy: {(100*correct_acc):>0.1f}%, Avg loss: {loss_acc:>8f} \n")
            
    return loss_acc
            
def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    print(size)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.type(torch.LongTensor).to(device)
            X, y = X.to(device), y.to(device)
            # y = y.float()
            pred = model(X)
            # pred = pred.squeeze(1)
            loss = loss_fn(pred, y).item()
            correct += get_accuracy(y,pred)
            test_loss += loss
            # print(correct)

            
    test_loss /= (np.ceil(size/num_batch))
    correct /= (np.ceil(size/num_batch))
    # print(correct)
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    
    return test_loss
    

## Run model training, save model weight to **model_save** folder

In [None]:
train_loss_acc = []
test_loss_acc = []
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loss = train_loop(loader_train, model, loss_fn, optimizer)
    test_loss = test_loop(loader_test, model, loss_fn)
    train_loss_acc.append(train_loss)
    test_loss_acc.append(test_loss)
    if t >= 170:
        schedulerplat.step(test_loss)
    os.makedirs('./model_save',exist_ok=True)
    torch.save(model.state_dict(), f'./model_save/epoch_{t}.pth')
print("Done!")

In [None]:
plt.plot(train_loss_acc)
plt.plot(test_loss_acc)
plt.show()

# Load and test model

In [None]:
state_dict = torch.load('./model/epoch_407.pth',map_location=device) #335,355 382
model.load_state_dict(state_dict)
model.eval()
model.to(device)

def predModel(x,verbose=False):
    with torch.no_grad():
        pred = model(x)
    return pred

def arr2tensor(img_arr):
    img = Image.fromarray(img_arr)
    img = transform_dict['test'](img)
    # mean, std = torch.mean(img), torch.std(img)
    # img = (img - mean)/std
    img = img.unsqueeze(0)
    img = img.to(device)
    
    return img
    
def tensor2arr(img_tensor):
    # img_tensor = invTrans(img_tensor)
    img_tensor = (img_tensor[0,...].cpu().detach().permute(1,2,0).numpy()*255).astype(np.uint8)
    return img_tensor

In [None]:
import random

imgpaths = glob(f'./data/test_resized/**.png') #VL8_Far641_sd_sp
np.random.shuffle(imgpaths)
imgpath = imgpaths[0]
print(imgpath)

img = cv2.imread(imgpath)
img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
plt.imshow(cv2.resize(img,(224,113)))
plt.show()

img = Image.fromarray(img)
img = transforms.Resize((113,224))(img)
img = transforms.CenterCrop((113,210))(img)
img = transforms.ToTensor()(img)
img = img.unsqueeze(0)
img = img.to(device)

pred = predModel(img)
pred = nn.Softmax(dim=1)(pred)
pred = torch.argmax(pred).item()
class_type = label_to_class_dict[pred]
print(pred,class_type)

# Generate test result CSV file

In [None]:
import pandas as pd

df = pd.DataFrame(columns=['imgpath','pred'])
df

In [None]:

imgpaths = glob(f'./data/test_resized/*.png')

for i,imgpath in enumerate(imgpaths):
    
    img = Image.open(imgpath).convert('RGB')
    img = transforms.Resize((113,224))(img)
    img = transforms.CenterCrop((113,210))(img)
    img = transforms.ToTensor()(img)
    img = img.unsqueeze(0)
    img = img.to(device)

    pred = predModel(img)
    pred = nn.Softmax(dim=1)(pred)
    pred = torch.argmax(pred).item()
    class_type = label_to_class_dict[pred]
    # print(pred,truth,class_type,cat,imgpath)
    df.loc[i,'imgpath'] = os.path.basename(imgpath)
    df.loc[i,'pred'] = pred

In [None]:
df = df.sort_index()
df.to_csv('./submission.csv',index=False,header=False)