In [126]:
!pip install Lightning loguru torch torchvision torchmetrics



# Mount Drive

In [127]:
from google.colab import drive
drive.mount('/content/drive', force_remount=False)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Import libraries



In [128]:
import os
import os.path as osp
from functools import partial
from pathlib import Path
from typing import Callable, List, Optional, Union
import lightning as L
from loguru import logger
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from torchvision import datasets, transforms
from lightning.pytorch.callbacks import LearningRateMonitor, ModelCheckpoint
from torchvision import models
import torchvision.transforms as transforms
import torchmetrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score

# Mon Reader

We need here to define :

*   A ligthning Data Module - a wrapper around Pytorch usual Dataset
*   A Ligthning model Module - a wrapper around the Pytorch Model
*   A function to create our custom CNN architecture or load torchvision pre-built model
*   Our  Custom CNN model



In [129]:
class MRDataModule(L.LightningDataModule):
    """Data module for the Mon Reader model."""

    def __init__(
        self,
        data_dir: Path | str,
        transform: Optional[Callable] = transforms.Compose(
            [
                transforms.Resize((128,128)),
                transforms.ToTensor()
            ]
        ),
        batch_size: int = 32,
    ):
        """Initialization.

        Args:
            data_dir (str): The root directory of the dataset.
        """
        super().__init__()
        self.save_hyperparameters()

    def setup(self, stage=None) -> None:
        """Setup.
        Args:
          stage: Optional[str], default=None. The stage of the setup.
        """
        # Loading the training dataset. We need to split it into a training and validation part
        train_set = datasets.ImageFolder(
            root=f"{self.hparams['data_dir']}/training",
            transform=self.hparams.transform,
        )

        print(train_set.class_to_idx)
        targets = train_set.targets
        # Stratified split train_set
        train_indices, val_indices = train_test_split(
            range(len(train_set)), test_size=0.2, stratify=targets
        )
        self.train_set = data.Subset(train_set, train_indices)
        self.val_set = data.Subset(train_set, val_indices)

         # Loading the test set
        self.test_set = datasets.ImageFolder(
            root=f"{self.hparams['data_dir']}/testing",
            transform=self.hparams.transform,
        )

    def train_dataloader(self):
        """Return the training dataloader.

        Returns:
            DataLoader: The training dataloader.
        """
        return data.DataLoader(
            self.train_set, batch_size=self.hparams["batch_size"], shuffle=True
        )

    def val_dataloader(self):
        """Return the validation dataloader.

        Returns:
            DataLoader: The validation dataloader.
        """
        return data.DataLoader(
            self.val_set, batch_size=self.hparams["batch_size"], shuffle=False
        )

    def test_dataloader(self):
        """Return the test dataloader.

        Returns:
            DataLoader: The test dataloader.
        """
        return data.DataLoader(
            self.test_set, batch_size=self.hparams["batch_size"], shuffle=False
        )


class MRNet(L.LightningModule):
    def __init__(self, optimizer_hparams):
        """Mon Reader Net."""
        super().__init__()
        # Exports the hyperparameters to a YAML file, and create "self.hparams" namespace
        self.save_hyperparameters()
        # Create model
        self.model = CustomCNN()
        # Create loss module
        self.loss_module = nn.BCELoss()

    def forward(self, imgs):
        return self.model(imgs)

    def configure_optimizers(self):
        return optim.AdamW(self.parameters(), **self.hparams.optimizer_hparams)

    def training_step(self, batch, batch_idx):
        # "batch" is the output of the training data loader.
        imgs, labels = batch
        preds = self.model(imgs)
        loss = self.loss_module(preds, labels.float())
        self.log("train_loss", loss, on_epoch=True,prog_bar=True, on_step = False)
        return loss

    def validation_step(self, batch, batch_idx):
        imgs, labels = batch
        preds = self.model(imgs)
        loss = self.loss_module(preds, labels.float())
        self.log("val_loss", loss, on_epoch=True,prog_bar=True, on_step = False)

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        imgs, labels = batch
        return self(imgs)

def compute_metrics(labels, preds):
    preds = (torch.cat(preds, dim=0).squeeze() > 0.5).long().numpy()
    f1 = f1_score(labels, preds)
    accuracy = accuracy_score(labels, preds)

    return {'F1': f1, "ACC" : accuracy}


# Model
class CustomCNN(nn.Module):
    def __init__(self, img_width=128, img_height=128, img_channels=3):
        super(CustomCNN, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels=img_channels, out_channels=16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
            nn.Linear(16 * (img_width // 2) * (img_height // 2), 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x).squeeze()

## Quick summary of the dataset

In [130]:
def count_files(main_folder: str):
    main_folder_path = Path(main_folder)
    subfolder_counts = {}
    for subfolder in main_folder_path.iterdir():
        if subfolder.is_dir():
            for s in subfolder.iterdir():
                file_count = len(list(s.glob("*.jpg")))
                subfolder_counts[f"{subfolder.name}/{s.name}"] = file_count
    return subfolder_counts

In [131]:
data_path = "/content/drive/MyDrive/images"
res = count_files(data_path)

for subfolder, file_count in res.items():
    print(f"  Subfolder: {subfolder}")
    print(f"    Number of files: {file_count}")

  Subfolder: testing/pos
    Number of files: 290
  Subfolder: testing/neg
    Number of files: 307
  Subfolder: training/pos
    Number of files: 1162
  Subfolder: training/neg
    Number of files: 1230


# Main Program

In [134]:
def train_model(
    data_params,
    optimizer_hparams,
    checkpoint_path: Optional[str] = None,
    num_epochs: int = 1,
    model_name : str = "custom",
    **kwargs,
):
    """Train model."""
    L.seed_everything(42)  # To be reproducible

    data = MRDataModule(**data_params)
    data.setup()
    print(f"Train ---> {len(data.train_set)}")
    print(f"Val ---> {len(data.val_set)}")
    print(f"Test ---> {len(data.test_set)}")
    # Create a PyTorch Lightning trainer with the generation callback
    trainer = L.Trainer(
        default_root_dir=os.path.join(
            checkpoint_path, model_name
        ),
        accelerator="auto",
        devices=1,
        max_epochs=num_epochs,
        # limit_train_batches= 1, # FOR DEBUG
        # limit_val_batches = 1, # FOR DEBUG
        # limit_test_batches= 1, #FOR DEBUG
        callbacks=[
            ModelCheckpoint(
                save_weights_only=True, mode="min", monitor="val_loss"
            ),  # Save the best checkpoint based on the maximum val_acc recorded. Saves only weights and not optimizer
            LearningRateMonitor("epoch"),
        ],  # Log learning rate every epoch
    )
    trainer.logger._default_hp_metric = (
        None  # Optional logging argument that we don't need
    )

    # Check whether pretrained model exists. If yes, load it and skip training
    pretrained_filename = os.path.join(checkpoint_path, f"{model_name}.ckpt")
    if os.path.isfile(pretrained_filename):
        print(f"Found pretrained model at {pretrained_filename}, loading...")
        # Automatically loads the model with the saved hyperparameters
        model = MRNet.load_from_checkpoint(pretrained_filename)
    else:
        model = MRNet(optimizer_hparams)
        trainer.fit(model, data)

    # Test best model
    preds = trainer.predict(model, data.test_dataloader(), return_predictions=True)
    y_true = data.test_set.targets

    return compute_metrics(y_true, preds)

In [135]:
data_params = {"data_dir": data_path, "batch_size": 32}
checkpoint_path = "../.checkpoints/"
optimizer_hparams = {"lr": 1e-4, "weight_decay": 1e-4}
baseline = train_model(
    data_params,
    optimizer_hparams,
    checkpoint_path,
    num_epochs=20,
)
baseline

INFO: Seed set to 42
INFO:lightning.fabric.utilities.seed:Seed set to 42
INFO: GPU available: False, used: False
INFO:lightning.pytorch.utilities.rank_zero:GPU available: False, used: False
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: IPU available: False, using: 0 IPUs
INFO:lightning.pytorch.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO: 
  | Name        | Type      | Params
------------------------------------------
0 | model       | CustomCNN | 2.1 M 
1 | loss_module | BCELoss   | 0     
------------------------------------------
2.1 M     Trainable params
0         Non-trainable params
2.1 M     Total params
8.391     Total estimated model params size (MB)
INFO:lightning.pytorch.callbacks.model_summary:
  | Name        | Type      | Params
------------------

{'neg': 0, 'pos': 1}
Train ---> 1913
Val ---> 479
Test ---> 597
{'neg': 0, 'pos': 1}


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO: `Trainer.fit` stopped: `max_epochs=20` reached.
INFO:lightning.pytorch.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=20` reached.


{'neg': 0, 'pos': 1}


Predicting: |          | 0/? [00:00<?, ?it/s]

{'F1': 0.9726027397260274, 'ACC': 0.9731993299832495}

In [136]:
baseline

{'F1': 0.9726027397260274, 'ACC': 0.9731993299832495}