# FOMO - Faster Objects, More Objects

## Common

#### Import modules

In [23]:
from pathlib import Path

import cv2
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn

from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image
from torchvision.transforms import Resize
from torchvision.models import mobilenet_v2, MobileNet_V2_Weights
from torchmetrics import MetricCollection, MeanAbsoluteError, MeanSquaredError
from torchmetrics.classification import BinaryF1Score

import pytorch_lightning as pl
from pytorch_lightning.loggers import MLFlowLogger

from monai.losses.dice import DiceLoss

#### Constants

In [24]:
DATASET_DIR = Path("../vehicle_dataset_split")

IMAGE_RESOLUTION = 448  # model input image resolution (assume input is squared shape)
GRID_SCALE = 8          # number of pixels per cell on prediction grid (vertically and horizontally)
NUMBER_OF_CLASSES = 1
TEST_DATASET = False

NUMBER_OF_EPOCHS = 1
BATCH_SIZE = 8
LEARNING_RATE = 1e-3

DETECTION_THRESHOLD = 0.1

#### BoundingBoxToMask

In [25]:
class BoundingBoxToMask:
    """
    Convert YOLO bounding boxes to FOMO centroids. Used in label processing.
    Return mask of centroids and number of objects in mask.
    """
    def __init__(self, image_resolution: int, grid_scale: int, number_of_classes: int):
        self._image_resolution = image_resolution
        self._grid_scale = grid_scale
        self._number_of_classes = number_of_classes

    def __call__(self, label):
        grid_resolution = int(self._image_resolution / self._grid_scale)

        masks = []
        count = []
        for i in range(self._number_of_classes):
            class_mask = torch.zeros((grid_resolution, grid_resolution), dtype=float)
            bounding_boxes = label.readlines()
            for bounding_box in bounding_boxes:
                [c, x, y, _, _] = [
                    float(coords) for coords in bounding_box.split()
                ]  # get bounding box class, position and dimension
                if c == i:
                    x = int(x * grid_resolution)
                    y = int(y * grid_resolution)
                    class_mask[y, x] = 1.0
            masks.append(class_mask)
            count.append(len(bounding_boxes))

        return torch.stack(masks), count

#### Dataset

In [26]:
class VehicleDataset(Dataset):
    def __init__(
        self,
        dataset_dir: Path,
        transform=None,
        target_transform=None,
    ):
        self._images = sorted([
            element for element in (dataset_dir / Path("images")).iterdir()
        ])
        self._labels = sorted([
            element for element in (dataset_dir / Path("bounding_boxes")).iterdir()
        ])
        self._transform = transform
        self._target_transform = target_transform

    def __getitem__(self, idx: int):
        image_path = self._images[idx]
        image = read_image(str(image_path))
        image = self._transform(image.float())

        label_path = self._labels[idx]
        label_file = open(label_path)
        centroid_mask, count = self._target_transform(label_file)

        return image, centroid_mask, count

    def __len__(self):
        return len(self._images)

#### DataModule

In [27]:
class VehicleDataModule(pl.LightningDataModule):
    def __init__(
        self,
        dataset_dir: Path,
        input_resolution: int,
        grid_scale: int,
        batch_size: int,
    ):
        super().__init__()
        self._dataset_dir = dataset_dir
        self._input_resolution = input_resolution  
        self._grid_scale = grid_scale
        self._batch_size = batch_size
 
        self._transform = Resize((input_resolution, input_resolution), antialias=True)
        self._target_transform = BoundingBoxToMask(input_resolution, grid_scale, NUMBER_OF_CLASSES)
        self._train_dataset = None
        self._val_dataset = None
        self._test_dataset = None
        self._test_dataset = None

    def setup(self, stage: str):
        self._train_dataset = VehicleDataset(
            self._dataset_dir / Path("training"),
            self._transform,
            self._target_transform,
        )
        self._val_dataset = VehicleDataset(
            self._dataset_dir / Path("validation"),
            self._transform,
            self._target_transform,
        )
        self._test_dataset = VehicleDataset(
            self._dataset_dir / Path("test" if TEST_DATASET else "validation"),
            self._transform,
            self._target_transform,
        )

    def train_dataloader(self):
        return DataLoader(self._train_dataset, batch_size=self._batch_size, num_workers=7)
    
    def val_dataloader(self):
        return DataLoader(self._val_dataset, batch_size=self._batch_size, num_workers=7)

    def test_dataloader(self):
        return DataLoader(self._test_dataset, batch_size=self._batch_size, num_workers=7)

vehicle_data_module = VehicleDataModule(
    dataset_dir=DATASET_DIR,
    input_resolution=IMAGE_RESOLUTION,
    grid_scale=GRID_SCALE,
    batch_size=BATCH_SIZE,
)

#### FOMO network architecture

In [28]:
def fomo_net(weights: None, number_of_classes: int) -> nn.Module:
    """
    Constructs a FOMO net model with a specified number of classes.
    Input resolution is decreased by a factor of 8.
    """
    model = mobilenet_v2(weights=weights)

    # Remove layers after 1/8 size reduction
    cut_inverted_residual = 7
    features = [
        nn.Identity() if i > cut_inverted_residual else model.features[i]
        for i, _ in enumerate(model.features)
    ]
    features[cut_inverted_residual].conv[1] = nn.Identity()
    features[cut_inverted_residual].conv[2] = nn.Identity()
    features[cut_inverted_residual].conv[3] = nn.Identity()
    model = nn.Sequential(*features)
    
    # Add classification layers
    model.classifier = nn.Sequential(
        nn.Conv2d(
            in_channels=32*6, 
            out_channels=32, 
            kernel_size=1, 
            stride=1),
        nn.ReLU(),
        nn.Conv2d(
            in_channels=32,
            out_channels=number_of_classes,
            kernel_size=1,
            stride=1,
        ),
    )    
    return model

#### Funtion to postprocess the mask generated by FOMO

In [29]:
def postprocess_prediction(
    mask_pred: torch.Tensor, threshold: float, binary: bool=True, connected: bool=True
) -> torch.Tensor:
    """
    Postprocesses FOMO mask prediction to squish output between [0:1] and remove multiple detections of the same object
    """
    mask_pred = torch.sigmoid(mask_pred)
    obj_cnt = []
    for single_mask_pred in mask_pred.unbind(0):
        for i, channel in enumerate(single_mask_pred.unbind(0)):
            channel = channel.numpy()
            if binary:
                binary_channel = np.where(channel > threshold, 1.0, 0.0).astype(np.int8)
            else:
                binary_channel = channel
            if connected:
                number_of_blobs, blobs = cv2.connectedComponentsWithAlgorithm(
                    binary_channel,
                    connectivity=8,
                    ltype=cv2.CV_32S,
                    ccltype=cv2.CCL_WU,
                )
                binary_channel.fill(0)
                for blob in range(1, number_of_blobs):
                    blob_mask = (blobs == blob)
                    max_val = np.max(channel[blob_mask])
                    channel_max = np.where((channel == max_val), 1, 0)
                    binary_channel += np.logical_and(channel_max, blob_mask)
                obj_cnt.append(number_of_blobs)

            single_mask_pred[i] = torch.from_numpy(binary_channel)
    return mask_pred, torch.IntTensor(obj_cnt)

#### LightningModule

In [30]:
class FomoModule(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.save_hyperparameters()
        # Network architecture
        self._network = fomo_net(
            weights=MobileNet_V2_Weights.DEFAULT,
            number_of_classes=NUMBER_OF_CLASSES,
        )
        # Loss function
        self._loss_function = DiceLoss(sigmoid=True)
        # Metrics
        metrics = MetricCollection([
            BinaryF1Score(),
        ])
        cnt_metrics = MetricCollection([
            MeanAbsoluteError(),
            MeanSquaredError()
        ])
        self._metrics = {
            "classification":{
                "train": metrics.clone(prefix='train_'),
                "valid": metrics.clone(prefix='val_'),
                "test": metrics.clone(prefix='test_')
            },
            "counting":{
                "train": cnt_metrics.clone(prefix='train_'),
                "valid": cnt_metrics.clone(prefix='val_'),
                "test": cnt_metrics.clone(prefix='test_')
            }
        }

    def forward(self, inputs: torch.Tensor) -> torch.Tensor:
        return self._network(inputs)

    def step(self, batch, type: str):
        image, mask, cnt = batch
        mask_pred = self(image)
        loss = self._loss_function(mask_pred, mask)
        mask_pred, cnt_pred = postprocess_prediction(
            mask_pred.detach().cpu(), threshold=DETECTION_THRESHOLD
        )
        metric = self._metrics["classification"][type](mask_pred, mask.cpu())
        cnt_metric = self._metrics["counting"][type](cnt_pred, cnt[0].cpu())
        return loss, metric, cnt_metric

    def log_metrics(self, loss, metric, cnt_metric, type: str):
        self.log(type + "_loss", loss, prog_bar=True)
        self.log_dict(metric)
        self.log_dict(cnt_metric)

    def training_step(self, batch, batch_idx):
        loss, metric, cnt_metric = self.step(batch, "train")
        self.log_metrics(loss, metric, cnt_metric, "train")
        return loss

    def validation_step(self, batch, batch_idx):
        loss, metric, cnt_metric = self.step(batch, "valid")
        self.log_metrics(loss, metric, cnt_metric, "valid")
        return loss

    def test_step(self, batch, batch_idx):
        loss, metric, cnt_metric = self.step(batch, "test")
        self.log_metrics(loss, metric, cnt_metric, "test")

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=LEARNING_RATE)
        scheduler = MultiStepLR(optimizer, milestones=[100, 125], gamma=0.1)
        return [optimizer], [scheduler]

fomo_module = FomoModule()

## Training

### Logger

In [31]:
mlflow_logger = MLFlowLogger(
    experiment_name="FOMO_Crossroad",
    run_name= str(IMAGE_RESOLUTION) + '/' + str(GRID_SCALE) + '/' + str(NUMBER_OF_EPOCHS),
    tracking_uri="http://127.0.0.1:8080",
    save_dir="MLFlow/mlruns",
    log_model=True
)

### Training

In [33]:
trainer = pl.Trainer(
    max_epochs=NUMBER_OF_EPOCHS,
    logger=mlflow_logger,
    log_every_n_steps=1,
    default_root_dir="Checkpoints"
)

trainer.fit(model=fomo_module, datamodule=vehicle_data_module)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
Experiment with name FOMO_Crossroad not found. Creating it.


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name           | Type       | Params
----------------------------------------------
0 | _network       | Sequential | 68.2 K
1 | _loss_function | DiceLoss   | 0     
----------------------------------------------
68.2 K    Trainable params
0         Non-trainable params
68.2 K    Total params
0.273     Total estimated model params size (MB)


Epoch 0: 100%|██████████| 13/13 [00:10<00:00,  1.19it/s, v_num=8e1b, train_loss=0.987, valid_loss=0.988]

`Trainer.fit` stopped: `max_epochs=1` reached.


Epoch 0: 100%|██████████| 13/13 [00:10<00:00,  1.19it/s, v_num=8e1b, train_loss=0.987, valid_loss=0.988]


## Testing

### Inspect dataset images and masks

In [None]:
def inspect_dataset(data_idx: int, class_idx: int):
    dataset = VehicleDataset(
                DATASET_DIR / Path("training"),
                vehicle_data_module._transform,
                vehicle_data_module._target_transform,
    )
    image, mask, count = dataset[data_idx]

    image = image.permute(1, 2, 0).numpy().astype(int)
    class_mask = mask[class_idx]
    mask_res = cv2.resize(
        class_mask.numpy(),
        dsize=(IMAGE_RESOLUTION, IMAGE_RESOLUTION),
        interpolation=cv2.INTER_NEAREST,
    )
    image[mask_res == 1] = [0, 0, 255]

    plt.figure(figsize = (10,10))
    plt.axis('off')
    plt.imshow(image)
    print(str(count[0]), "object(s) in the picture")

inspect_dataset(data_idx=6, class_idx=0)

### Test trained model

In [None]:
MODEL_PATH = 'Checkpoints/373381907947535324/3c2a8ac78ee341d2b6634d4574e9db5d/checkpoints/epoch=0-step=13.ckpt'

FOMO_model = FomoModule.load_from_checkpoint(MODEL_PATH)

result = trainer.test(FOMO_model, datamodule=vehicle_data_module)

In [None]:
def mask_to_display(mask: torch.Tensor):
    mask_res = cv2.resize(
        mask.numpy().squeeze(),
        dsize=(IMAGE_RESOLUTION, IMAGE_RESOLUTION),
        interpolation=cv2.INTER_NEAREST,
    )
    return mask_res

def test_model(model_path: str, data_idx: int, mask_idx: int, threshold: float):
    dataset = VehicleDataset(
                DATASET_DIR / Path("training"),
                vehicle_data_module._transform,
                vehicle_data_module._target_transform,
    )
    image, mask, count = dataset[data_idx]
    model = FomoModule.load_from_checkpoint(model_path)
    with torch.no_grad():
        model.eval()
        mask_pred = model(image.unsqueeze(0).cuda()).cpu()
        mask_post_sig, cnt = postprocess_prediction(mask_pred, threshold=threshold, binary=False, connected=False)
        mask_post_bin, cnt = postprocess_prediction(mask_pred, threshold=threshold, binary=True, connected=False)
        mask_post_con, cnt= postprocess_prediction(mask_pred, threshold=threshold, binary=True, connected=True)
    
    print('cnt:', cnt)
    image = image.permute(1, 2, 0).numpy().astype(int)
    mask_result = mask_to_display(mask_post_con)
    mask_ref = mask_to_display(mask[mask_idx])
    mask_check = np.logical_and(mask_result==1, mask_ref==1)
    image[mask_result==1] = [0, 0, 255]     # blue
    # image[mask_ref==1]    = [255, 0, 0]     # red
    # image[mask_check==1]  = [255, 0, 255]   # purple

    _, axs = plt.subplots(2, 2, figsize=(20, 20))
    for ax in axs.ravel():
        ax.set_axis_off()
    axs[0][0].imshow(image)
    image[mask_result==1] = [0, 0, 255]
    axs[0][1].imshow(mask_to_display(mask_post_bin))
    axs[1][0].imshow(mask_to_display(mask_post_bin))
    axs[1][1].imshow(mask_to_display(mask_post_con))

    count_pred = np.count_nonzero(mask_result)//(pow(GRID_SCALE, 2))
    print(str(count_pred), " from " + str(count[0]) + " objects in the picture")

test_model(model_path="219408738261620929/9ea9c07e92c34661a62a0dda2712c21a/checkpoints/epoch=49-step=500.ckpt",
           data_idx=5, mask_idx=0, threshold=DETECTION_THRESHOLD)

## Export

In [None]:
def export_fomo_model():
    model = FomoModule.load_from_checkpoint(
        "399010269064744334/56634d5296d1445793f845250bb3c4e0/checkpoints/epoch=9-step=540.ckpt"
    )

    model.to_onnx(
        "fomo.onnx",
        export_params=True,
        input_names=["image"],
        output_names=["masks"],
        input_sample=torch.randn(1, 3, IMAGE_RESOLUTION, IMAGE_RESOLUTION),
    )

export_fomo_model()