In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.v2 as T
from torchvision.io import read_image
import timm
from timm import create_model

import pytorch_lightning as pl
from pytorch_lightning import callbacks
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning import LightningDataModule

from sklearn.model_selection import StratifiedKFold, train_test_split

import glob

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
DATA_DIR = "data"
TRAIN_DIR = "data/train"
TEST_DIR = "data/test"
OUTPUT_DIR = "output"
MODEL_CHECKPOINT_PATH = "output/model_checkpoints/best_loss.ckpt"

In [3]:
# Dataset
columns = ['Subject Focus', 'Eyes', 'Face', 'Near', 'Action', 'Accessory', 'Group', 'Collage', 'Human', 'Occlusion', 'Info', 'Blur']

class PetFinderDataset(Dataset):
    def __init__(self, df, image_dir, image_size=224):
        self.image_ids = df["Id"].values
        self.features = df[columns].values
        self.labels = None

        if "Pawpularity" in df.keys():
            self.labels = df["Pawpularity"].values

        self.image_dir = image_dir
        self.transform = T.Resize([image_size, image_size], antialias=True)
        
    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        features = self.features[idx]

        image_id = self.image_ids[idx]
        image = read_image(os.path.join(self.image_dir, image_id + '.jpg'))
        image = self.transform(image)
        
        if self.labels is not None:
            label = self.labels[idx]
            return image_id, features, image, label
        
        return image_id, features, image

# Data Module
class PetFinderDataModule(LightningDataModule):
    def __init__(self, df_train=None, df_val=None, df_test=None, train_dir=None, val_dir=None, test_dir=None, batch_size=64, image_size=224):
        super().__init__()
        self.df_train = df_train
        self.df_val = df_val
        self.df_test = df_test

        self.train_dir = train_dir
        self.val_dir = val_dir
        self.test_dir = test_dir

        self.batch_size = batch_size
        self.image_size = image_size

    def train_dataloader(self):
        return DataLoader(PetFinderDataset(self.df_train, self.train_dir, self.image_size), batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(PetFinderDataset(self.df_val, self.val_dir, self.image_size), batch_size=self.batch_size, shuffle=False)
    
    def test_dataloader(self):
        return DataLoader(PetFinderDataset(self.df_test, self.test_dir, self.image_size), batch_size=self.batch_size, shuffle=False) 


In [4]:
# augmentations

IMAGENET_MEAN = [0.485, 0.456, 0.406]  # RGB
IMAGENET_STD = [0.229, 0.224, 0.225]  # RGB

train_transforms = T.Compose(
    [
        T.RandomHorizontalFlip(),
        T.RandomVerticalFlip(),
        T.RandomAffine(15, translate=(0.1, 0.1), scale=(0.9, 1.1)),
        T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
        T.ToDtype(torch.float32),
        T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
    ]
)

test_transforms = T.Compose(
    [
        T.ConvertImageDtype(torch.float),
        T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
    ]
)

# https://arxiv.org/abs/1710.09412v2
def mixup(x: torch.Tensor, y: torch.Tensor, alpha: float = 1.0):
    assert alpha > 0, "alpha should be larger than 0"
    assert x.size(0) > 1, "Mixup cannot be applied to a single instance."

    lam = np.random.beta(alpha, alpha)
    rand_index = torch.randperm(x.size()[0])
    mixed_x = lam * x + (1 - lam) * x[rand_index, :]
    target_a, target_b = y, y[rand_index]
    return mixed_x, target_a, target_b, lam

class PawpularityModel(pl.LightningModule):
    def __init__(self, model_name="swin_large_patch4_window7_224", pretrained=True):
        super().__init__()
        self.validation_step_outputs = []
        self.training_step_outputs = []

        self.backbone = create_model(model_name, pretrained=pretrained, num_classes=0, in_chans=3).to('cuda')
        self.dropout = nn.Dropout(0.2)
        num_features = self.backbone.num_features

        self.fc = nn.Sequential(
            nn.Linear(num_features + len(columns), int(num_features / 2)),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(int(num_features / 2), int(num_features / 4)),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(int(num_features / 4), 1)
        )
        
        self.criterion = nn.BCEWithLogitsLoss()
        self.train_transforms = train_transforms
        self.test_transforms = test_transforms
        
    def forward(self, input, features):
        x = self.backbone(input)
        x = self.dropout(x)
        x = torch.cat([x, features], dim=1)
        x = self.fc(x)

        return x

    def step(self, batch, mode):
        image_ids, features, images, labels = batch
        labels = labels.float() / 100.0

        images = self.train_transforms(images) if mode == "train" else self.test_transforms(images)
        logits = self.forward(images, features).squeeze(1)
        loss = self.criterion(logits, labels)

        predictions = logits.sigmoid().detach().cpu() * 100
        labels = labels.detach().cpu() * 100

        self.log(f'{mode}_loss', loss)
        
        return loss, predictions, labels

    def training_step(self, batch, batch_indexes):
        loss, predictions, labels = self.step(batch, 'train')
        self.training_step_outputs.append(loss)
        return { 'loss': loss, 'predictions': predictions, 'labels': labels }

    def validation_step(self, batch, batch_indexes):
        loss, predictions, labels = self.step(batch, 'val')
        self.validation_step_outputs.append(loss)
        return { 'loss': loss, 'predictions': predictions, 'labels': labels }
    
    def on_train_epoch_end(self):
        print(f"Training loss: {torch.stack(self.training_step_outputs).mean()}")
        self.training_step_outputs.clear()

    def on_validation_epoch_end(self):
        print(f"Validation loss: {torch.stack(self.validation_step_outputs).mean()}")
        self.validation_step_outputs.clear()

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-4)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0 = 20, eta_min=1e-4)

        return [optimizer], [scheduler]

In [9]:
model = PawpularityModel()
model.to("cuda")
checkpoint = torch.load(MODEL_CHECKPOINT_PATH)
model.load_state_dict(checkpoint['state_dict'])

df_test = pd.read_csv(os.path.join(DATA_DIR, "test.csv"))
df_test = pd.concat([df_test] * 100)
df_test = df_test[:100]
test_dataloader = PetFinderDataModule(df_test=df_test, test_dir=TEST_DIR, batch_size=8).test_dataloader()

final_image_ids = []
final_predictions = []
for batch, (image_ids, features, images) in enumerate(test_dataloader):
    with torch.no_grad():
        predictions =  model(torch.as_tensor(images, dtype=torch.float32).cuda(), features.cuda())
        predictions = predictions.sigmoid() * 100
        predictions = predictions.cpu().data.numpy().reshape(-1)
    
    final_image_ids += list(image_ids)
    final_predictions += list(predictions)

df_submission = pd.DataFrame({ "Id": final_image_ids, "Pawpularity": final_predictions })
df_submission.to_csv(os.path.join(OUTPUT_DIR, 'submission.csv'), index=False)

In [10]:
df_submission.head()

Unnamed: 0,Id,Pawpularity
0,4128bae22183829d2b5fea10effdb0c3,38.945671
1,43a2262d7738e3d420d453815151079e,38.983631
2,4e429cead1848a298432a0acad014c9d,36.041359
3,80bc3ccafcc51b66303c2c263aa38486,37.130951
4,8f49844c382931444e68dffbe20228f4,34.928699
