In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F

from transformers import ViTFeatureExtractor, ViTModel, ViTConfig, AutoConfig

from PIL import Image
from torchsummary import summary
import GPUtil
%matplotlib inline

In [4]:
# Load Training data
df = pd.read_csv("train.csv")
df["file_path"] = df["Id"].apply(lambda Id: "train/" + Id + ".jpg") # Create image path
df["Pawpularity"] = df["Pawpularity"].values.astype(np.float32)

In [2]:
# Define function to add data/model in to GPU (cuda)
def get_default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
def to_device(data, device):
    # if data is list or tuple, move each of them to device
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    def __init__(self, dl, device) -> None:
        self.dl = dl
        self.device = device

    def __iter__(self):
        for b in self.dl:
            # yield only execuate when the function is called
            yield to_device(b, self. device)

    def __len__(self):
        return len(self.dl)

In [5]:
# Define training dataset
class petDataset(Dataset):
    def __init__(self, dataframe, trans_transform=None, res_transform=None):
        self.labels = dataframe["Pawpularity"]
        self.images = dataframe["file_path"]
        self.trans_transform = trans_transform
        self.res_transform = res_transform

    def __len__ (self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path)

        image_trans = self.trans_transform(np.array(image), return_tensors='pt')
        image_trans = image_trans['pixel_values'].squeeze()

        image_res = self.res_transform(image)

        label = self.labels[idx]

        return image_trans, image_res, label

trans_transform = ViTFeatureExtractor.from_pretrained('google/vit-large-patch16-224')
res_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


train_ds = petDataset(df, trans_transform=trans_transform, res_transform=res_transform)
train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)


In [6]:
# Modify the model - ResNet
model_Res = torch.hub.load('pytorch/vision:v0.10.0', 'resnet50', pretrained=True)

# Remove the last layer of the model Res
layers_Res = list(model_Res.children())
model_Res = nn.Sequential(*layers_Res[:-1])

# Set the top layers to be not trainable
count = 0
for child in model_Res.children():
    count += 1
    if count < 8:
        for param in child.parameters():
            param.requires_grad = False

Using cache found in C:\Users\yscaa/.cache\torch\hub\pytorch_vision_v0.10.0


In [8]:
# Modify the model - ViT model
model_trans = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
count = 0
for child in model_trans.children():
    count += 1
    if count < 4:
        for param in child.parameters():
            param.requires_grad = False

layers_trans = list(model_trans.children()) # Get all the layers from the Transformer model
model_trans_top = nn.Sequential(*layers_trans[:-2]) # Remove the normalization layer and pooler layer
trans_layer_norm = list(model_trans.children())[2] # Get the normalization layer

In [9]:
# Merge the two models
class model_final(nn.Module):
    def __init__(self, model_trans_top, trans_layer_norm, model_Res, dp_rate = 0.3):
        super().__init__()
        # All the trans model layers
        self.model_trans_top = model_trans_top
        self.trans_layer_norm = trans_layer_norm
        self.trans_flatten = nn.Flatten()
        self.trans_linear = nn.Linear(150528, 2048)

        # All the ResNet model
        self.model_Res = model_Res

        # Merge the result and pass the
        self.dropout = nn.Dropout(dp_rate)
        self.linear1 = nn.Linear(4096, 500)
        self.linear2 = nn.Linear(500,1)

    def forward(self, trans_b, res_b):
        # Get intermediate outputs using hidden layer
        result_trans = self.model_trans_top(trans_b)
        patch_state = result_trans.last_hidden_state[:,1:,:] # Remove the classification token and get the last hidden state of all patchs
        result_trans = self.trans_layer_norm(patch_state)
        result_trans = self.trans_flatten(patch_state)
        result_trans = self.dropout(result_trans)
        result_trans = self.trans_linear(result_trans)

        result_res = self.model_Res(res_b)
        # result_res = result_res.squeeze() # Batch size cannot be 1
        result_res = torch.reshape(result_res, (result_res.shape[0], result_res.shape[1]))

        result_merge = torch.cat((result_trans, result_res),1)
        result_merge = self.dropout(result_merge)
        result_merge = self.linear1(result_merge)
        result_merge = self.dropout(result_merge)
        result_merge = self.linear2(result_merge)

        return result_merge

model = model_final(model_trans_top, trans_layer_norm, model_Res, dp_rate = 0.3)
# model.load_state_dict(torch.load('model_weights_1228'))

<All keys matched successfully>

In [10]:
# Add data and model to GPU
device = get_default_device()
train_dl = DeviceDataLoader(train_dl, device)
model = to_device(model, device)


In [11]:
# Define optimizer and learning_rate scheduler
params = [param for param in list(model.parameters()) if param.requires_grad]
optimizer = torch.optim.SGD(params, lr=1e-7, momentum=0.2)
lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    mode='min', 
    factor=0.1, 
    patience=2, 
    verbose=True)

In [12]:
# Fit function doing grad steps
def fit(epochs, model, train_dl):   
    opt = optimizer
    sched = lr_scheduler
    loss_func = nn.MSELoss()
    for epoch in range(epochs):
        model.train()
        batch_num = 1
        for x_trans, x_res, yb in train_dl:
            # Pass the opt so that funciton will get trained
            total_loss = 0
            preds = model(x_trans, x_res)
            loss = loss_func(preds.squeeze(), yb)
            loss.backward()
            opt.step()
            opt.zero_grad()
            print('\r', f'batch #{batch_num}: {loss}', end='')
            batch_num += 1
            total_loss += loss.item()
        sched.step(total_loss)
        print('\n', f'Epoch: ({epoch+1}/{epochs}) Loss = {total_loss}')

In [None]:
# Training the model and save weights
fit(10, model, train_dl)
torch.save(model.state_dict(), "model_weights_1228")


In [13]:
# Load and build the test dataset
df_prd = pd.read_csv("test.csv")
df_prd["file_path"] = df_prd["Id"].apply(lambda Id: "test/" + Id + ".jpg") # Create image path
class petDataset_prd(Dataset):
    def __init__(self, dataframe, trans_transform=None, res_transform=None):
        self.images = dataframe["file_path"]
        self.trans_transform = trans_transform
        self.res_transform = res_transform

    def __len__ (self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path)

        image_trans = self.trans_transform(np.array(image), return_tensors='pt')
        image_trans = image_trans['pixel_values'].squeeze()

        image_res = self.res_transform(image)

        return image_trans, image_res

trans_transform = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k')
res_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_ds = petDataset_prd(df_prd, trans_transform=trans_transform, res_transform=res_transform)
test_dl = DataLoader(test_ds, batch_size=8, shuffle=False)
test_dl = DeviceDataLoader(test_dl, device)

In [46]:
def predict(model_trans, model_Res, model_linear, test_dl):
    for image_trans, image_res in test_dl:
        trans_output = model_trans(image_trans)
        cls_tokens = trans_output.pooler_output
        res_output = model_Res(image_res)
        output = torch.cat((cls_tokens,res_output),1)
        preds = model_linear(output)      
    return preds

In [49]:
result = predict(model_trans, model_Res, model_linear, test_dl)
df_prd["Pawpularity"] = result.cpu().detach().numpy()
df_prd[["Id", "Pawpularity"]].to_csv("test1.csv", index=False)

