# Philippines Audit Report Target Page Detection Model

___

## Setup & Imports

In [4]:
# !pip install -q --upgrade albumentations opencv-python optuna tensorboard torch torchvision pytorch_lightning torchmetrics

In [5]:
from typing import Union

import albumentations as album
import numpy as np
import optuna
import pytorch_lightning as pl
import torch
from albumentations import Compose
from albumentations.pytorch.transforms import ToTensorV2
from pytorch_lightning.callbacks import (EarlyStopping, ModelCheckpoint)
from pytorch_lightning.loggers import TensorBoardLogger
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchmetrics import Accuracy, Precision, Recall
from torchvision import datasets

In [6]:
# Set this for GPU with Tensor Core Training
# torch.set_float32_matmul_precision('medium')

___
#### Custom Transform 


In [7]:
# Custom transform to ensure that pages are all of the same orientation (shape)
# for the convolutions
class EnsureLandscape(album.ImageOnlyTransform):
    def __init__(self, always_apply=False, p=1.0):
        super(EnsureLandscape, self).__init__(always_apply, p)

    def apply(self, img, **params):
        if img.shape[0] > img.shape[1]:
            img = np.rot90(img)
        return img

    def get_transform_init_args_names(self):
        return ()



___
## Dataset and Datamodule

In [8]:
# Implementing a custom dataset with a __getitem__ method allows us to apply 
# transforms upon image retrieval. Saves us from having to load the entire
# dataset to memory to apply the transforms in one go.
class PhilImageDataset(Dataset):
    """PhilImageDataset.
    
    Args:
        data (torchvision.datasets.ImageFolder): ImageFolder dataset.
        transforms (Union[Compose, None]): Compose of albumentations transforms.
    
    Returns:
        image, label
    """
    def __init__(self, data, transforms: Union[Compose, None]):
        self.data = data
        self.transforms = transforms
    
    def __len__(self):
        """Dataset Length."""
        return len(self.data)

    def __getitem__(self, idx):
        image, label = self.data[idx]
        image = np.asarray(image)
        if self.transforms:
            image = self.transforms(image=image)['image']
        return image, label


In [9]:
class PhilDataModule(pl.LightningDataModule):
    """PhilDataModule

    DataModule for the PhilImages dataset. Handles the train/val/test splits
    and the transforms to be applied to the images.
    
    Args:
        data_root (str): Path to the root directory of the dataset.
        num_workers (int, optional): Number of workers to use for the
            DataLoader. Defaults to 2.
        batch_size (int, optional): Batch size for the DataLoader. Defaults to
            16.
        val_split (float, optional): Percentage of the dataset to use for
            validation. Defaults to 0.2.
        test_split (float, optional): Percentage of the dataset to use for
            testing. Defaults to 0.1.
        seed (int, optional): Seed for the random split. Defaults to 42.
        transforms (album.Compose, optional): Transforms to apply to the
            images. Defaults to None.
    
    Returns:
        PhilDataModule: DataModule for the PhilImages dataset.
    """
    
    def __init__(
        self, data_root: str,
        num_workers: int=2, 
        batch_size:int=16, 
        val_split:float=0.2, 
        test_split:float=0.1, 
        seed:int=42, 
        transforms:album.Compose=None
    ):
        super().__init__()
        self.data_root = data_root
        self.batch_size = batch_size
        self.val_split = val_split
        self.test_split = test_split
        self.seed = seed
        self.num_workers = num_workers
        self.transforms = transforms

    def setup(self, stage=None):
        train_set, val_set, test_set = self._get_dataset_splits()
        self.train_set = PhilImageDataset(train_set, transforms=self.transforms)
        self.val_set = PhilImageDataset(val_set, transforms=self.transforms)
        self.test_set = PhilImageDataset(test_set, transforms=self.transforms)

    def _get_dataset_splits(self):
        dataset = datasets.ImageFolder(root=self.data_root)

        val_size = int(self.val_split * len(dataset))
        test_size = int(self.test_split * len(dataset))
        train_size = len(dataset) - val_size - test_size

        torch.manual_seed(self.seed)

        return random_split(dataset, [train_size, val_size, test_size])

    def train_dataloader(self):
        return DataLoader(
            self.train_set, batch_size=self.batch_size, shuffle=True,
            num_workers=self.num_workers
        )

    def val_dataloader(self):
        return DataLoader(
            self.val_set, batch_size=self.batch_size, shuffle=False,
            num_workers=self.num_workers
        )

    def test_dataloader(self):
        return DataLoader(
            self.test_set, batch_size=self.batch_size, shuffle=False, 
            num_workers=self.num_workers
        )


___
## Model Architecture

In [10]:
class PhilTableDetection(pl.LightningModule):
    """PhilTableDetection
    
    PyTorch Lightning Module for the PhilImages dataset. Handles the model
    architecture, training, validation, and testing. Class has been created
    to be flexible to allow for hyperparameter tuning with Optuna.
    
    Args:
        num_classes (int, optional): Number of classes in the dataset.
            Defaults to 1.
        learning_rate (float, optional): Learning rate for the optimizer.
            Defaults to 1e-4.
        weight_decay (float, optional): Weight decay for the optimizer.
            Defaults to 0.
        dropout_rate (float, optional): Dropout rate for the dropout layers.
            Defaults to 0.
        num_filters1 (int, optional): Number of filters for the first
            convolutional layer. Defaults to 16.
        num_filters2 (int, optional): Number of filters for the second 
            convolutional layer. Defaults to 32.
        padding (int, optional): Padding for the convolutional layers.
            Defaults to 1.
        stride (int, optional): Stride for the convolutional layers.
            Defaults to 1.
        filter_size (int, optional): Filter size for the convolutional layers.
            Defaults to 3.
        num_fc_nodes (int, optional): Number of nodes for the fully connected
            layer. Defaults to 64.
        image_size (tuple, optional): Size of the input images. Defaults to
            (442, 572).
        
    Returns:
        PhilTableDetection: PyTorch Lightning Module for the PhilImages dataset.
    """
    
    def __init__(self, num_classes=1, learning_rate=1e-4, weight_decay=0,
                 dropout_rate=0, num_filters1=16, num_filters2=32, 
                 padding=1, stride=1, filter_size=3, num_fc_nodes=64,
                 image_size=(442, 572)):
        super().__init__()
        self.save_hyperparameters()
        self.num_classes = num_classes
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.dropout_rate = dropout_rate
        self.num_filters1 = num_filters1
        self.num_filters2 = num_filters2
        self.num_fc_nodes = num_fc_nodes
        self.padding = padding
        self.stride = stride
        self.filter_size = filter_size
        self.image_size = image_size

        # Adjust padding to be (filter_size - 1) / 2 for 'same' padding
        self.padding = (self.filter_size - 1) // 2

        self.loss_function = nn.BCEWithLogitsLoss()
        self.acc = Accuracy(task="binary").to(self.device)
        self.rec = Recall(task="binary").to(self.device)
        self.prec = Precision(task="binary").to(self.device)

        self.conv1 = nn.Conv2d(3, self.num_filters1, self.filter_size,
                               stride=self.stride, padding=self.padding)
        self.rel1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(self.num_filters1)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.dropout1 = nn.Dropout(self.dropout_rate)

        self.conv2 = nn.Conv2d(self.num_filters1, self.num_filters2, 
                               self.filter_size, stride=self.stride,
                               padding=self.padding)
        self.rel2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(self.num_filters2)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.dropout2 = nn.Dropout(self.dropout_rate)
                              
        self.fc1 = nn.Linear(self._calculate_linear_input_size(), self.num_fc_nodes)
        self.rel3 = nn.ReLU()
        self.dropout3 = nn.Dropout(self.dropout_rate)

        self.fc2 = nn.Linear(self.num_fc_nodes, self.num_classes)

    def _calculate_linear_input_size(self):
        # Pass a dummy input through the convolutional and pooling layers: 
        # dynamic calculation of the input size to the fully connected layer
        x = torch.zeros(1, 3, *self.image_size)
        x = self.dropout1(self.pool1(self.bn1(self.rel1(self.conv1(x)))))
        x = self.dropout2(self.pool2(self.bn2(self.rel2(self.conv2(x)))))
        return x.numel()

    def forward(self, x):
        x = self.dropout1(self.pool1(self.bn1(self.rel1(self.conv1(x)))))
        x = self.dropout2(self.pool2(self.bn2(self.rel2(self.conv2(x)))))
        x = x.view(x.size(0), -1) # Flatten the input tensor
        x = self.dropout3(self.rel3(self.fc1(x)))
        x = self.fc2(x)
        return x

    def training_step(self, batch, batch_idx):
        inputs, targets = batch
        inputs.to(self.device), targets.to(self.device)
        logits = self(inputs)
        loss = self.loss_function(logits.view(-1), targets.to(logits.dtype))
        return loss

    def validation_step(self, batch, batch_idx):
        inputs, targets = batch
        inputs.to(self.device), targets.to(self.device)
        logits = self(inputs)
        loss = self.loss_function(logits.view(-1), targets.to(logits.dtype))
        preds = (torch.sigmoid(logits.view(-1)) > 0.5).type(torch.FloatTensor)

        acc = self.acc(preds, targets.type(torch.FloatTensor))
        rec = self.rec(preds, targets.type(torch.FloatTensor))
        prec = self.prec(preds, targets.type(torch.FloatTensor))

        self.log_dict(
            {"val_loss": loss, "val_acc": acc, "val_prec": prec, "val_recall": rec},
            prog_bar=True,
        )

    def test_step(self, batch, batch_idx):
        inputs, targets = batch
        inputs.to(self.device), targets.to(self.device)
        logits = self(inputs)
        loss = self.loss_function(logits.view(-1), targets.to(logits.dtype))
        preds = (torch.sigmoid(logits.view(-1)) > 0.5).type(torch.FloatTensor)

        acc = self.acc(preds, targets.type(torch.FloatTensor))
        rec = self.rec(preds, targets.type(torch.FloatTensor))
        prec = self.prec(preds, targets.type(torch.FloatTensor))

        self.log_dict(
            {"val_loss": loss, "val_acc": acc, "val_prec": prec, "val_recall": rec},
            prog_bar=True,
        )

    def configure_optimizers(self):
        optimizer = optim.Adam(
            self.parameters(),
            lr=self.learning_rate,
            weight_decay=self.weight_decay
        )
        return optimizer



___
## Optuna Hyperparameter Tuning

In [11]:
def objective(trial):
    data_root = ""
    logdir = ""
    checkpointdir = ""

    resized_image_size = (442, 572)
    transforms = album.Compose(transforms=[
        album.Resize(*resized_image_size, always_apply=True),
        EnsureLandscape(always_apply=True),
        album.Normalize(),
        ToTensorV2()
    ])

    datamodule = PhilDataModule(data_root=data_root, num_workers=12,
                                batch_size=32, transforms=transforms)

    # Suggest hyperparameters
    learning_rate = trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True)
    dropout_rate = trial.suggest_float("dropout_rate", 0, 0.01)
    filter_size = trial.suggest_categorical("filter_size", [5])
    stride = trial.suggest_categorical("stride", [1])

    # Create the model with the suggested hyperparameters
    model = PhilTableDetection(
        num_classes=1,
        learning_rate=learning_rate,
        dropout_rate=dropout_rate,
        num_filters1=64,
        num_filters2=64,
        num_fc_nodes=156,
        filter_size = filter_size,
        stride=stride
    )
    f_string = '{epoch}-{val_loss:.2f}-{val_prec:.2f}-{val_acc:.2f}-{val_recall:.2f}'
    callbacks = [
        EarlyStopping(monitor='val_prec', min_delta=0.005, patience=3),
        EarlyStopping(monitor='val_loss', min_delta=0.005, patience=3),
        ModelCheckpoint(dirpath = checkpointdir,
                        monitor='val_prec',
                        save_top_k=1,
                        filename=f_string,
                        mode='max',
                        every_n_epochs=2,
                        save_last=True,
                        save_on_train_epoch_end=True
                        ),
        ]

    logger = TensorBoardLogger(save_dir=logdir, name="final")

    # Train the model
    trainer = pl.Trainer(
        max_epochs=7,  # Adjust this to your needs
        accelerator='gpu' if torch.cuda.is_available() else None,
        precision='16-mixed',
        devices=1,
        logger=logger,
        enable_model_summary=True,
        enable_progress_bar=True,
        callbacks=callbacks,
    )
    trainer.fit(model, datamodule=datamodule)

    return trainer.logged_metrics["val_loss"]


In [None]:
study = optuna.create_study(direction="minimize")

# Adjust the number of trials as needed
study.optimize(objective, n_trials=10)

___
## Continuation Training

In [None]:
def continuation_training(checkpoint_path, data_root, logdir, checkpointdir):

    resized_image_size = (442, 572)
    transforms = album.Compose(transforms=[
        album.Resize(*resized_image_size, always_apply=True),
        EnsureLandscape(always_apply=True),
        album.Normalize(),
        ToTensorV2()
    ])

    datamodule = PhilDataModule(data_root=data_root, num_workers=12,
                                batch_size=128, transforms=transforms)

    model = PhilTableDetection.load_from_checkpoint(
        checkpoint_path=checkpoint_path
    )

    callbacks = [
        EarlyStopping(monitor='val_prec', min_delta=0.01, patience=3),
        EarlyStopping(monitor='val_loss', min_delta=0.01, patience=3),
        ModelCheckpoint(dirpath = checkpointdir, monitor='val_loss', save_top_k=1,
                        filename='{epoch}-{val_loss:.2f}-{val_prec:.2f}', mode='min',
                        every_n_epochs=3),
    ]

    EXPERIMENT_NAME = f"{model.__class__.__name__}"
    logger = TensorBoardLogger(save_dir=logdir, name=EXPERIMENT_NAME)
   
    # Train the model
    trainer = pl.Trainer(
        max_epochs=100,  # Adjust this to your needs
        accelerator='gpu' if torch.cuda.is_available() else None,
        precision='16-mixed',
        devices=1,
        logger=logger,
        enable_model_summary=True,
        enable_progress_bar=True,
        callbacks=callbacks,
    )
    trainer.fit(model, datamodule=datamodule)

    # Return the validation loss from the last epoch
    return trainer.logged_metrics["val_loss"]

In [None]:
continuation_training()