In [1]:
# import relevant libraries

import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Union, Tuple, Callable, List, Dict, Any

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adadelta, Adagrad, Adam, AdamW, SparseAdam, Adamax, ASGD, SGD, RAdam, Rprop, RMSprop, NAdam, LBFGS
from torch.utils.data import Dataset, random_split, DataLoader

import torchvision.models as models
from torchvision import transforms
from torchvision.io import read_image

import torchmetrics

import pytorch_lightning as pl
from pytorch_lightning.utilities.types import TRAIN_DATALOADERS, EVAL_DATALOADERS, STEP_OUTPUT
from pytorch_lightning.strategies import Strategy, DDPStrategy
from pytorch_lightning.callbacks import Callback, EarlyStopping, StochasticWeightAveraging

In [2]:
#create a custom limbo dataset class
#see more here https://pytorch.org/tutorials/beginner/basics/data_tutorial.html
class LimboDataset(Dataset):
    def __init__(self, img_dir: Path, annotations_file: Optional[Path]=None, transform=None, target_transform=None):
        img_dir = img_dir.expanduser()
        if annotations_file is None:
            self.labels_images = [(img_path.name, str(img_path)) for img_path in img_dir.glob('*.png')]
        else:
            self.labels_images = [(label, str(img_dir.joinpath(img_filename))) for label, img_filename in pd.read_csv(annotations_file).values.tolist()]
        self.transform = transform
        self.target_transform = target_transform

        # self.classes = list({str(limbo.label) for limbo in self.labels_images})
        # self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}

    def __len__(self):
        return len(self.labels_images)

    def __getitem__(self, idx):
        label, img_path = self.labels_images[idx]

        image = read_image(img_path)

        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)

        return image, label

In [3]:
###### Create your model here ########
######################################

class LimboModel(pl.LightningModule):
    def __init__(
        self,
        # potential model inputs here
        num_classes: int = 2,
        device = None,
        dtype = None,
        **kwargs
    ):
        factory_kwargs = {'device': device, 'dtype': dtype}
        super(LimboModel, self).__init__()
        
        ##############################
        # Add any model initialization here
        ##############################
        
        ##############################
        # example small convolution neural network       
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(64, num_classes)
        ##############################
        
        self.save_hyperparameters()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Add your code here that will run during the forward call
        # This is where you normally perform the matrix multiplication or other math

        ##############################
        # example small convolution neural network
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        ##############################
        
        return x

In [4]:
#Create a class for the LimboNetwork
class LimboNetwork(pl.LightningModule):
    def __init__(
        self,
        optimizer: str = 'Adam',
        lr: float = 1e-3,
        classes: Optional[List[str]] = None,
        class_to_idx: Optional[Dict[str, int]] = None,
        num_classes: Optional[int] = None,
    ):
        super().__init__()

        self.lr = lr
        self.classes = classes
        self.class_to_idx = class_to_idx
        self.num_classes = num_classes

        if self.num_classes is None and self.classes is None:
            raise RuntimeError('Must specify either num_classes or classes. Both cannot be None.')
        elif self.num_classes is None:
            self.num_classes = len(self.classes)

        optimizers = {'Adadelta': Adadelta, 'Adagrad': Adagrad, 'Adam': Adam, 'AdamW': AdamW, 'SparseAdam': SparseAdam, 'Adamax': Adamax, 'ASGD': ASGD, 'SGD': SGD, 'RAdam': RAdam, 'Rprop': Rprop, 'RMSprop': RMSprop, 'NAdam': NAdam, 'LBFGS': LBFGS}
        self.optimizer = optimizers[optimizer]
        self.criterion = (
            nn.BCEWithLogitsLoss() if num_classes == 2 else nn.CrossEntropyLoss()
        )

        #########################################
        #########################################
        #### Where your model is initialized ####
        self.classification_model = LimboModel()
        #########################################
        #########################################

        metrics = torchmetrics.MetricCollection({
            'accuracy': torchmetrics.Accuracy(num_classes=self.num_classes),
            'precision': torchmetrics.Precision(num_classes=self.num_classes),
            'recall': torchmetrics.Recall(num_classes=self.num_classes),
            'f1_score': torchmetrics.F1Score(num_classes=self.num_classes),
        })
        self.train_metrics = metrics.clone(prefix='train_')
        self.val_metrics = metrics.clone(prefix='val_')
        self.test_metrics = metrics.clone(prefix='test_')

        self.val_confusion_matrix = torchmetrics.ConfusionMatrix(num_classes=self.num_classes)
        self.test_confusion_matrix = torchmetrics.ConfusionMatrix(num_classes=self.num_classes)

        self.save_hyperparameters()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        ############################################################
        ############################################################
        #### Where your model is called during the forward pass ####
        x = self.classification_model(x)
        ############################################################
        ############################################################
        
        return x

    def configure_optimizers(self):
        return self.optimizer(self.parameters(), lr=self.lr)

    def training_step(self, batch, batch_idx):
        x, y = batch
        preds = self(x)
        if self.num_classes == 2:
            y = F.one_hot(y, num_classes=2)

        loss = self.criterion(preds, y.float())
        
        self.train_metrics(preds, y)
        
        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        self.log_dict(self.train_metrics, prog_bar=True, logger=True, on_step=False, on_epoch=True)

        return loss

    def validation_step(self, batch, batch_idx, dataloader_idx=0) -> Optional[STEP_OUTPUT]:
        x, y = batch

        preds = self(x)
        if self.num_classes == 2:
            y = F.one_hot(y, num_classes=2)

        loss = self.criterion(preds, y.float())

        self.val_metrics(preds, y)
        self.val_confusion_matrix.update(preds, y)

        self.log("val_loss", loss, on_step=False, on_epoch=True, prog_bar=True, logger=True)
        self.log_dict(self.val_metrics, prog_bar=True, logger=True, on_step=False, on_epoch=True)


    def plot_confusion_matrix(self, confusion_matrix):
        cm_df = pd.DataFrame(confusion_matrix, index=self.classes, columns=self.classes)
        fig = plt.figure(figsize=(10,7))
        sns.heatmap(cm_df, annot=True, cmap='Spectral')
        return fig

    
    def validation_epoch_end(self, outputs: Union[List[STEP_OUTPUT], List[List[STEP_OUTPUT]]]) -> None:
        val_confusion_matrix = self.val_confusion_matrix.compute()
        confusion_matrix = val_confusion_matrix.cpu().numpy()

        fig = self.plot_confusion_matrix(confusion_matrix)

        pl_logger = self.logger.experiment
        pl_logger.add_figure("val_confusion_matrix", fig, global_step=self.current_epoch, close=True)

        self.val_confusion_matrix.reset()

    def test_step(self, batch, batch_idx, dataloader_idx=0) -> Optional[STEP_OUTPUT]:
        x, y = batch
        preds = self(x)
        if self.num_classes == 2:
            y = F.one_hot(y, num_classes=2)

        loss = self.criterion(preds, y.float())

        self.test_metrics(preds, y)
        self.test_confusion_matrix.update(preds, y)

        self.log("test_loss", loss, on_step=False, on_epoch=True, prog_bar=True, logger=True)
        self.log_dict(self.test_metrics, prog_bar=True, logger=True, on_step=False, on_epoch=True)
        return {'true_label': y, 'pred_label': y, 'loss': loss, 'image': x}
    
    def test_epoch_end(self, outputs: Union[List[STEP_OUTPUT], List[List[STEP_OUTPUT]]]) -> None:
        test_confusion_matrix = self.test_confusion_matrix.compute()
        confusion_matrix = test_confusion_matrix.cpu().numpy()

        fig = self.plot_confusion_matrix(confusion_matrix)

        pl_logger = self.logger.experiment
        pl_logger.add_figure("test_confusion_matrix", fig, global_step=self.current_epoch, close=True)

        self.test_confusion_matrix.reset()
        return outputs
    
    def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: int = 0) -> Any:
        x, y = batch
        preds = self(x)
        prediction = torch.argmax(preds, 1)
        # {'Label': imageLabel, 'ImageID': imageID}
        return int(prediction.cpu()), y[0]


In [5]:
data_transforms = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ConvertImageDtype(torch.float), #require to convert unit8 from read image to float tensor
    #transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

data_transformsV = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ConvertImageDtype(torch.float),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

In [6]:
# Set some default parameters used for the problem
@dataclass
class LimboParameters:
    batch_size: int = 32
    devices: Union[Tuple[int, ...], int] = 1
    accelerator: str = 'auto'
    num_epochs: int = 5
    train_pct: float = 0.8
    precision: int = 32
    default_root_dir: Union[str, Path] = os.getcwd()
    optimizer: str = 'Adam' # ['Adadelta', 'Adagrad', 'Adam', 'AdamW', 'SparseAdam', 'Adamax', 'ASGD', 'SGD', 'RAdam', 'Rprop', 'RMSprop', 'NAdam', 'LBFGS']
    lr: float = 1e-3
    num_workers: int = 2
    pin_memory: bool = False
    callbacks: Optional[Union[Callback, Tuple[Callback]]] = (
        StochasticWeightAveraging(),
        EarlyStopping(monitor="val_accuracy", min_delta=0.00, patience=5, mode="max"),
        EarlyStopping(monitor="val_loss", min_delta=0.00, patience=5, mode="min")
    )

In [7]:
params = LimboParameters()

trainer = pl.Trainer(
    max_epochs = params.num_epochs,
    accelerator = params.accelerator,
    devices = params.devices,
    callbacks = list(params.callbacks),
    default_root_dir = params.default_root_dir,
    precision = params.precision,
    num_sanity_val_steps = 2,
    # profiler = 'simple'
)

In [8]:
train_val_dataset = LimboDataset(
    img_dir = Path('/kaggle/input/containerid/train/train'),
    annotations_file = Path('/kaggle/input/containerid/train_annotations.csv'),
    transform=data_transforms
)

train_counts = int(params.train_pct * len(train_val_dataset))
val_counts = len(train_val_dataset) - train_counts

limbo_train, limbo_val = random_split(train_val_dataset, (train_counts, val_counts))
print(f'Training dataset size: {len(limbo_train)}')
print(f'Validation dataset size: {len(limbo_val)}')

Training dataset size: 2480
Validation dataset size: 620


In [9]:
limbo_predict = LimboDataset(
    img_dir = Path('/kaggle/input/containerid/test/test'),
    transform=data_transformsV
)
print(f'Predict dataset size: {len(limbo_predict)}')

Predict dataset size: 1000


In [10]:
limbo_train_loader = DataLoader(
    limbo_train,
    batch_size = params.batch_size,
    num_workers = params.num_workers,
    pin_memory = params.pin_memory,
    shuffle = True
)

limbo_val_loader = DataLoader(
    limbo_val,
    batch_size = 1,
    num_workers = params.num_workers,
    pin_memory = params.pin_memory,
    shuffle = False
)

limbo_predict_loader = DataLoader(
    limbo_predict,
    batch_size = 1,
    num_workers = params.num_workers,
    pin_memory = params.pin_memory,
    shuffle = False
)

In [11]:
class_names = ['cylinder', 'not_cylinder']
class_to_idx = {'cylinder': 1, 'not_cylinder': 0}

limbo_model = LimboNetwork(
    classes = class_names,
    class_to_idx = class_to_idx,
    optimizer = params.optimizer,
    lr = params.lr
)

In [None]:
trainer.fit(limbo_model, train_dataloaders=limbo_train_loader, val_dataloaders=limbo_val_loader)

Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

In [None]:
final_results = trainer.predict(limbo_model, dataloaders=limbo_predict_loader)
final_results

In [None]:
#prepare submission file
my_submission = pd.DataFrame(final_results, columns=['Label', 'ImageID'])
# you could use any filename. We choose submission here
my_submission.to_csv('submission.csv', index=False)