## Library Imports

In [1]:
from time import time
notebook_start_time = time()

In [2]:
import os
import re
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from torch import nn, optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader as DL
from torch.nn.utils import weight_norm as WN
from torchvision import models, transforms

from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler

In [3]:
SEED = 0
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PATH = "../input/petfinder-pawpularity-score"

# Pretrained Data Transforms
TRANSFORM_PRE = transforms.Compose([transforms.ToTensor(), 
                                    transforms.Normalize([0.485, 0.456, 0.406],
                                                         [0.229, 0.224, 0.225]),
                                   ])

# Non-pretrained Data Transforms
TRANSFORM_NOPRE = transforms.Compose([transforms.ToTensor(),])

sc_y = StandardScaler()

In [4]:
def breaker(num=50, char="*") -> None:
    print("\n" + num*char + "\n")


def head(x, no_of_ele=5) -> None:
    print(x[:no_of_ele])


def get_filenames_and_targets(path: str) -> tuple:
    df = pd.read_csv(os.path.join(path, "train.csv"), engine="python")
    filenames = df.iloc[:, 0].copy().values
    targets  = df.iloc[:, -1].copy().values
    return filenames, targets


def get_filenames(path: str) -> np.ndarray:
    df = pd.read_csv(os.path.join(path, "test.csv"), engine="python")
    filenames  = df["Id"].copy().values
    return filenames


def get_image(path: str, name: str, size: int) -> np.ndarray:
    image = cv2.imread(os.path.join(path, name + ".jpg"), cv2.IMREAD_COLOR)
    image = cv2.cvtColor(src=image, code=cv2.COLOR_BGR2RGB)
    image = cv2.resize(src=image, dsize=(size, size), interpolation=cv2.INTER_AREA)
    return image


def setup_transform(filenames: np.ndarray, targets: np.ndarray, seed: int) -> None:
    breaker()
    print("Setting up Data Transform ...")
    
    for tr_idx, va_idx in KFold(n_splits=5, shuffle=True, random_state=seed).split(filenames):
        break
    
    tr_targets = targets[tr_idx]
    tr_targets = tr_targets.reshape(-1, 1)
    tr_targets = sc_y.fit_transform(tr_targets)


def make_submission(path: str, y_pred: np.ndarray) -> None:
    submission = pd.read_csv(os.path.join(path, "sample_submission.csv"), engine="python")
    submission["Pawpularity"] = y_pred
    submission.to_csv("./submission.csv", index=False)

## Dataset Template

In [5]:
class DS(Dataset):
    def __init__(self, base_path=None, filenames=None, image_size=None, transform=None):
        self.base_path = base_path
        self.filenames = filenames
        self.image_size = image_size
        self.transform = transform
    
    def __len__(self):
        return self.filenames.shape[0]
    
    def __getitem__(self, idx):
        image = get_image(self.base_path, self.filenames[idx], self.image_size)
        return self.transform(image)

## Build Model

In [6]:
def build_model(model_name: str, pretrained: bool, HL: list, seed: int):
    class ImageModel(nn.Module):
        def __init__(self, model_name=None, pretrained=False, HL=None):
            super(ImageModel, self).__init__()

            if re.match(r"^resnet$", model_name, re.IGNORECASE):
                self.features = models.resnet50(pretrained=pretrained, progress=True)
                if pretrained:
                    self.freeze()
                self.features = nn.Sequential(*[*self.features.children()][:-1])
                self.features.add_module("Flatten", nn.Flatten())

                in_features = self.features[-3][2].bn3.num_features


            elif re.match(r"^vgg$", model_name, re.IGNORECASE):
                self.features = models.vgg16_bn(pretrained=pretrained, progress=True)
                if pretrained:
                    self.freeze()
                self.features = nn.Sequential(*[*self.features.children()][:-2])
                self.features.add_module("Adaptive Average Pool", nn.AdaptiveAvgPool2d(output_size=(2, 2)))
                self.features.add_module("Flatten", nn.Flatten())

                in_features = self.features[-3][41].num_features * 2 * 2


            elif re.match(r"^mobilenet$", model_name, re.IGNORECASE):
                self.features = models.mobilenet_v2(pretrained=pretrained, progress=True)
                if pretrained:
                    self.freeze()
                self.features = nn.Sequential(*[*self.features.children()][:-1])
                self.features.add_module("Adaptive Average Pool", nn.AdaptiveAvgPool2d(output_size=(1, 1)))
                self.features.add_module("Flatten", nn.Flatten())

                in_features = self.features[-3][-1][1].num_features


            elif re.match(r"^densenet$", model_name, re.IGNORECASE):
                self.features = models.densenet169(pretrained=pretrained, progress=True)
                if pretrained:
                    self.freeze()
                self.features = nn.Sequential(*[*self.features.children()][:-1])
                self.features.add_module("Adaptive Average Pool", nn.AdaptiveAvgPool2d(output_size=(1, 1)))
                self.features.add_module("Flatten", nn.Flatten())

                in_features = self.features[0].norm5.num_features

            self.predictor = nn.Sequential()
            if len(HL) == 0:
                self.predictor.add_module("BN1", nn.BatchNorm1d(num_features=in_features, eps=1e-5))
                self.predictor.add_module("FC1", WN(nn.Linear(in_features=in_features, out_features=1)))
            elif len(HL) == 1:
                self.predictor.add_module("BN1", nn.BatchNorm1d(num_features=in_features, eps=1e-5))
                self.predictor.add_module("FC1", WN(nn.Linear(in_features=in_features, out_features=HL[0])))
                self.predictor.add_module("AN1", nn.ReLU())
                self.predictor.add_module("BN2", nn.BatchNorm1d(num_features=HL[0], eps=1e-5))
                self.predictor.add_module("FC2", WN(nn.Linear(in_features=HL[0], out_features=1)))
            elif len(HL) == 2:
                self.predictor.add_module("BN1", nn.BatchNorm1d(num_features=in_features, eps=1e-5))
                self.predictor.add_module("FC1", WN(nn.Linear(in_features=in_features, out_features=HL[0])))
                self.predictor.add_module("AN1", nn.ReLU())
                self.predictor.add_module("BN2", nn.BatchNorm1d(num_features=HL[0], eps=1e-5))
                self.predictor.add_module("FC2", WN(nn.Linear(in_features=HL[0], out_features=HL[1])))
                self.predictor.add_module("AN2", nn.ReLU())
                self.predictor.add_module("BN3", nn.BatchNorm1d(num_features=HL[1], eps=1e-5))
                self.predictor.add_module("FC3", WN(nn.Linear(in_features=HL[1], out_features=1)))
            elif len(HL) == 3:
                self.predictor.add_module("BN1", nn.BatchNorm1d(num_features=in_features, eps=1e-5))
                self.predictor.add_module("FC1", WN(nn.Linear(in_features=in_features, out_features=HL[0])))
                self.predictor.add_module("AN1", nn.ReLU())
                self.predictor.add_module("BN2", nn.BatchNorm1d(num_features=HL[0], eps=1e-5))
                self.predictor.add_module("FC2", WN(nn.Linear(in_features=HL[0], out_features=HL[1])))
                self.predictor.add_module("AN2", nn.ReLU())
                self.predictor.add_module("BN3", nn.BatchNorm1d(num_features=HL[1], eps=1e-5))
                self.predictor.add_module("FC3", WN(nn.Linear(in_features=HL[1], out_features=HL[2])))
                self.predictor.add_module("AN3", nn.ReLU())
                self.predictor.add_module("BN4", nn.BatchNorm1d(num_features=HL[2], eps=1e-5))
                self.predictor.add_module("FC4", WN(nn.Linear(in_features=HL[2], out_features=1)))

        def freeze(self):
            for params in self.parameters():
                params.requires_grad = False

        def get_optimizer(self, lr=1e-3, wd=0):
            params = [p for p in self.parameters() if p.requires_grad]
            return optim.Adam(params, lr=lr, weight_decay=wd)

        def get_plateau_scheduler(self, optimizer=None, patience=5, eps=1e-8):
            return optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, patience=patience, eps=eps, verbose=True)

        def forward(self, x):
            return self.predictor(self.features(x))
    
    breaker()
    print("Building Model ...")
    print("\n{} features -> {} -> 1".format(model_name, HL))
    torch.manual_seed(seed)
    model = ImageModel(model_name=model_name, pretrained=pretrained, HL=HL)
    
    return model

## Predict Helper

In [7]:
def predict_batch(model=None, dataloader=None, mode="test", path=None):
    model.load_state_dict(torch.load(path, map_location=DEVICE)["model_state_dict"])
    model.to(DEVICE)
    model.eval()

    y_pred = torch.zeros(1, 1).to(DEVICE)
    if re.match(r"valid", mode, re.IGNORECASE):
        for X, _ in dataloader:
            X = X.to(DEVICE)
            with torch.no_grad():
                output = model(X)
            y_pred = torch.cat((y_pred, output.view(-1, 1)), dim=0)
    elif re.match(r"test", mode, re.IGNORECASE):
        for X in dataloader:
            X = X.to(DEVICE)
            with torch.no_grad():
                output = model(X)
            y_pred = torch.cat((y_pred, output.view(-1, 1)), dim=0)
    
    return y_pred[1:].detach().cpu().numpy()

## Generate Submission

In [8]:
def submit():
    breaker()
    print("Reading Filenames ...")
    
    filenames = get_filenames(PATH)
    tr_filenames, tr_targets = get_filenames_and_targets(PATH)
    
    breaker()
    print("Building Test DataLoader ...")
    
    ts_data_setup = DS(base_path=os.path.join(PATH, "test"), 
                       filenames=filenames,
                       image_size=224,
                       transform=TRANSFORM_PRE)
    ts_data = DL(ts_data_setup, batch_size=64, shuffle=False)
    
    model = build_model(model_name="densenet", pretrained=False, HL=[], seed=SEED)
    
    breaker()
    print("Making Predictions ...")
    
    y_pred = predict_batch(model=model, dataloader=ts_data, mode="test", 
                           path="../input/petfinder-images-baseline-train/densenet_state.pt")
    setup_transform(tr_filenames, tr_targets, SEED)
    
    breaker()
    print("Transforming Final Predictions ...")
    y_pred = sc_y.inverse_transform(y_pred)
    
    breaker()
    print("Generating Submission File ...")
    make_submission(PATH, y_pred)
    breaker()

submit()


**************************************************

Reading Filenames ...

**************************************************

Building Test DataLoader ...

**************************************************

Building Model ...

densenet features -> [] -> 1

**************************************************

Making Predictions ...

**************************************************

Setting up Data Transform ...

**************************************************

Transforming Final Predictions ...

**************************************************

Generating Submission File ...

**************************************************



In [9]:
breaker()
print("Notebook Run Time : {:.2f} minutes".format((time()-notebook_start_time)/60))
breaker()


**************************************************

Notebook Run Time : 0.10 minutes

**************************************************

