In [None]:
import os

import hydra
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import timm
import torch
import torch.optim
import torchvision.transforms as T
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision.io import read_image

from petfinder.lr_schedulers.lr_warmup import create_warmup_lr

IMAGENET_MEAN = [0.485, 0.456, 0.406]  # RGB
IMAGENET_STD = [0.229, 0.224, 0.225]  # RGB


def create_transform(image_size=224, training=True):
    tf = [T.Resize((image_size,) * 2)]
    if training:
        tf.extend(
            [
                T.RandomHorizontalFlip(),
                T.RandomVerticalFlip(),
                T.RandomAffine(15, translate=(0.1, 0.1), scale=(0.9, 1.1)),
                T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
            ]
        )

    tf.extend(
        [
            T.ConvertImageDtype(torch.float),
            T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
        ]
    )
    return T.Compose(tf)

class PetDataset(Dataset):
    def __init__(self, image_path, labels=None, transform=None):
        assert len(image_path) == len(labels)
        self.image_path = image_path
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_path)

    def __getitem__(self, index):
        image = read_image(self.image_path.iloc[index])
        if self.transform:
            image = self.transform(image)
        if self.labels is not None:
            return image, self.labels.iloc[index]
        return image


def create_dataloader(X, y=None, dataloader_cfg={}, transform_cfg={}, training=True):
    return DataLoader(
        PetDataset(
            X, y, transform=create_transform(training=training, **transform_cfg),
        ),
        shuffle=False,
        num_workers=os.cpu_count(),
        **dataloader_cfg,
    )

class Model(pl.LightningModule):
    def __init__(self, cfg):
        super().__init__()
        self.save_hyperparameters()
        self.cfg = cfg
        self.backbone = timm.create_model(**self.cfg.backbone)
        self.fc = nn.Sequential(
            nn.Dropout(self.cfg.fc_dropout),
            nn.Linear(self.backbone.num_features, self.cfg.output_dim),
        )
        self.criterion = hydra.utils.instantiate(cfg.loss)

    def forward(self, x):
        output = self.backbone(x)
        return self.fc(output)

    def shared_step(self, batch, prefix=''):
        x, y = batch
        pred = self(x)
        y = y.unsqueeze(1).float() / 100
        loss = self.criterion(pred, y)
        self.log(f'{prefix}loss', loss)
        return {
            'loss': loss,
            'pred': 100 * pred.sigmoid().detach(),
            'label': 100 * y.detach(),
        }

    def training_step(self, batch, batch_idx):
        return self.shared_step(batch, 'train_')

    def validation_step(self, batch, batch_idx):
        return self.shared_step(batch, 'val_')

    def predict_step(self, batch, batch_idx, dataloader_idx=None):
        return 100*self(batch).sigmoid().detach().cpu().numpy()

    def shared_epoch_end(self, outputs, prefix=''):
        pred = torch.cat([out['pred'] for out in outputs])
        label = torch.cat([out['label'] for out in outputs])
        rmse = torch.sqrt(((label - pred) ** 2).mean())
        self.log(f'{prefix}rmse', rmse)

    def training_epoch_end(self, outputs):
        self.shared_epoch_end(outputs, 'train_')

    def validation_epoch_end(self, outputs):
        self.shared_epoch_end(outputs, 'val_')

    def configure_optimizers(self):
        optimizer = hydra.utils.instantiate(
            self.cfg.optimizer, params=self.parameters()
        )
        if 'lr_scheduler' not in self.cfg:
            return optimizer
        lr_scheduler = hydra.utils.instantiate(
            self.cfg.lr_scheduler, optimizer=optimizer
        )
        if 'lr_warmup' in self.cfg:
            lr_scheduler = create_warmup_lr(
                optimizer, lr_scheduler, **self.cfg.lr_warmup
            )
        return [optimizer], [lr_scheduler]


In [None]:
test_path = '../input/petfinder-pawpularity-score/test.csv'
model_path = '../'
model = Model().load_from_checkpoint(model_path).eval()
test_df = pd.read_csv(test_path)
dataloader = create_dataloader(test_df['path'], training=False)
trainer = pl.Trainer(gpu=1)
pred = trainer.predict(model, dataloader)
test_df['Pawpularity'] = np.concatenate(pred)
test_df[['Id', 'Pawpularity']].to_csv('submission.csv', index=False)