[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/johnpolsh/inf721-tpfinal/blob/main/colab/Object_detection_model.ipynb)
## Setup
### Download dependencies

In [None]:
!pip install wget fiftyone fiftyone-db==0.4.3 torchinfo

### Default imports

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import torch
import wget
import fiftyone as fo

### Select back-end device

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
torch.set_default_device(device)

print(f"Using {device} as default device")

## Dataset

### Setting up

In [None]:
labels = [
        "book",
        "bottle",
        "cup",
        "bowl",
        "knife",
        "remote",
        "vase",
        "cell phone",
        "spoon",
        "laptop",
        "fork",
        "keyboard",
        "mouse"]
dataset = fo.zoo.load_zoo_dataset(
    "coco-2017",
    split="validation",
    label_types=["detections"],
    classes=labels
)
dataset.persistent = True

print(dataset)

view = (
    dataset
    .filter_labels("detections", F("label").is_in(labels))
)
dataset.save_view("labels", view)

### Visualizing

In [None]:
from fiftyone import ViewField as F

session = fo.launch_app(dataset)

### Generating files

In [None]:
export_dir = "./dataset"
dataset_type = fo.types.YOLOv5Dataset
label_fields = "detections"

view.export(
    export_dir=export_dir,
    dataset_type=dataset_type,
    label_field=label_fields
)

### Torch dataset

In [None]:
import re
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
from torchvision.transforms import functional
from torchvision.transforms.v2 import functional

class CustomDetectionDataset(Dataset):
    def __init__(self, images_folder, labels_folder, labels):
        self.images_folder = images_folder
        self.labels_folder = labels_folder
        self.labels = labels

        self.images_files = os.listdir(self.images_folder)
        self.images_files.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))
        self.labels_files = os.listdir(self.labels_folder)
        self.labels_files.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))
        
        assert len(self.images_files) == len(self.labels_files)

        self.__gen_labels_dict()
        self.__gen_dataset_norm()

    def __len__(self):
        return self.dataset_size

    def __getitem__(self, index):
        def apply_transforms(img):
            jitter = transforms.ColorJitter(brightness=.3, contrast=.3, hue=.1)
            blur = transforms.GaussianBlur(kernel_size=(3, 13), sigma=(0.2, 6))
            if np.random.rand() < 0.5:
                img = functional.hflip(img)
            if np.random.rand() < 0.5:
                img = functional.vflip(img)
            img = jitter(img)
            img = blur(img)
            img = functional.to_tensor(img)
            img = functional.resize(img, (224, 224))
            img = functional.normalize(img, self.norm_mean, self.norm_std)
            return img

        label = self.i2label[index]
        image_file_path = os.path.join(self.images_folder, self.images_files[self.li2file[index]])
        image = Image.open(image_file_path)
        image = apply_transforms(image)

        return (image, label)

    def __gen_labels_dict(self):
        self.dataset_size = 0
        self.i2label = {}
        self.li2file = {}
        for i, label_file in enumerate(self.get_labels_paths()):
            with open(label_file, 'r') as file:
                for line in file:
                    cls = re.search(r'\d+', line.strip())
                    if cls:
                        self.i2label[self.dataset_size] = int(cls.group()) # dataset item idx has class cls
                        self.li2file[self.dataset_size] = i # dataset item idx belongs to label_file i
                        self.dataset_size += 1
    
    def __gen_dataset_norm(self):
        self.norm_mean = (0.,)
        self.norm_std = (0.,)
        images = self.get_images_paths()
        for img_file in images:
            img = Image.open(img_file)
            img = functional.to_tensor(img)
            img = img.numpy().transpose((1, 2, 0))
            w, h, c = img.shape
            img = np.resize(img, (w * h, c))
            self.norm_mean += img.mean(0)
            self.norm_std += img.std(0)
        self.norm_mean /= len(images)
        self.norm_std /= len(images)

    def get_label(self, index):
        return self.labels[index]

    def get_images_paths(self):
        return [os.path.join(self.images_folder, self.images_files[i])
                for i in range(len(self.images_files))]

    def get_labels_paths(self):
        return [os.path.join(self.labels_folder, self.labels_files[i])
                for i in range(len(self.labels_files))]

our_dataset = CustomDetectionDataset("/content/dataset/images/val",
                                     "/content/dataset/labels/val",
                                     labels)

### Sanity checks

In [None]:
from random import randint

def denorm(img):
    img = img.transpose((1, 2, 0))
    img = np.array(our_dataset.norm_std) * img + np.array(our_dataset.norm_mean)
    return np.clip(img, 0, 1)

def matplotlib_imshow(img):
    if not isinstance(img, np.ndarray):
        img = img.transpose((1, 2, 0))
    plt.imshow(img)

print(f"dataset size: {len(our_dataset)}")
print(f"dataset normalization mean: {our_dataset.norm_mean}")
print(f"dataset normalization std: {our_dataset.norm_std}")
image, label = our_dataset.__getitem__(randint(0, len(our_dataset)))

plt.figure(figsize=(16, 9))
img = image.numpy()
img = denorm(img)
print(our_dataset.get_label(label))
matplotlib_imshow(img)

### Torch dataloader

In [None]:
from torch.utils.data import DataLoader

batch_size = 32
train_dataloader = DataLoader(our_dataset, batch_size=batch_size, shuffle=True, generator=torch.Generator(device=device))

# Model
### Our architecture definition

In [None]:
from torch import nn

def _make_divisible(v, divisor, min_value=None):
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    :param v:
    :param divisor:
    :param min_value:
    :return:
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v


#dw
class DepthWiseConvolution(nn.Sequential):
    def __init__(self, in_fts, stride = 1):
        super(DepthWiseConvolution,self).__init__(
            nn.Conv2d(in_fts,in_fts,kernel_size=(3,3),stride=stride,padding=(1,1), groups=in_fts, bias=False),
            nn.BatchNorm2d(in_fts),
            nn.ReLU6(inplace=True))


#pw
class PointWiseConvolution(nn.Sequential):
    def __init__(self,in_fts,out_fts):
        super(PointWiseConvolution,self).__init__(
            nn.Conv2d(in_fts,out_fts,kernel_size=(1,1),bias=False),
            nn.BatchNorm2d(out_fts),
            nn.ReLU6(inplace=True))


class ConvBNReLU(nn.Sequential):
    def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1, norm_layer=None):
        padding = (kernel_size - 1) // 2
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        super(ConvBNReLU, self).__init__(
            nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False),
            norm_layer(out_planes),
            nn.ReLU6(inplace=True)
        )



class Bottleneck(nn.Module):
    def __init__(self,inp, oup, stride, expand_ratio, norm_layer=nn.BatchNorm2d):
        super(Bottleneck, self).__init__()
        self.stride = stride

        hidden_dim = int(round(inp*expand_ratio))
        layers = []
        self.use_res_connect = self.stride == 1 and inp == oup

        #pw
        if expand_ratio != 1:
            layers.append(PointWiseConvolution(inp,hidden_dim))

        #dw
        layers.extend([
            DepthWiseConvolution(hidden_dim,stride),
            #pw-linear
            nn.Conv2d(hidden_dim,oup,1,1,0,bias=False),
            nn.BatchNorm2d(oup)])

        self.conv = nn.Sequential(*layers)


    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class OurObjectDetectionNet(nn.Module):
    def __init__(self,bottleneckLayerDetail,inp = 3,num_classes = 50,width_mult = 1.0,round_nearest=8):
        super(OurObjectDetectionNet, self).__init__()

        self.out = None

        bloco = Bottleneck
        inverted_residual_setting = bottleneckLayerDetail

        input_channel = 32
        last_channel = 1280

        input_channel = _make_divisible(input_channel*width_mult,round_nearest)
        self.last_channel = _make_divisible(last_channel*width_mult,round_nearest)

        #first layer
        features = [ConvBNReLU(inp, input_channel, stride=2)]

        #build layers
        for t, c, n, s in inverted_residual_setting:
            output_channel = _make_divisible(c*width_mult,round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(bloco(input_channel,output_channel,stride = stride,expand_ratio=t))
                input_channel = output_channel


        #last layer
        features.append(ConvBNReLU(input_channel, self.last_channel, kernel_size=1))

        #make sequential
        self.features = nn.Sequential(*features)

        #classificador
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.last_channel, num_classes))

    def __forward_impl(self, x):
        x = self.features(x)
        x = nn.functional.adaptive_avg_pool2d(x,1).reshape(x.shape[0],-1)
        x = self.classifier(x)

        return x

    def forward(self, x):
        x = self.__forward_impl(x)
        return x


### Model declaration

In [None]:
from torchinfo import summary

bottleneckLayerDetail = [
    # t, c, n, s
    [1, 16, 1, 1],
    [6, 24, 2, 2],
    [6, 32, 3, 2],
    [6, 64, 4, 2],
    [6, 96, 3, 1],
    [6, 160, 3, 2],
    [6, 320, 1, 1],
]

our_model = OurObjectDetectionNet(bottleneckLayerDetail)
summary(our_model, (1, 3, 224, 224), col_names=("input_size", "output_size",
                                                      "num_params", "kernel_size",
                                                      "mult_adds"))

### MobileNet V2

In [None]:
from torchvision.models import mobilenet_v2, MobileNet_V2_Weights

mobilenet_model = mobilenet_v2(weights=MobileNet_V2_Weights.IMAGENET1K_V2)

### Convert model for mobile

In [None]:
!pip install wget

In [None]:
import wget
import os.path

if not os.path.isfile("convert.py"):
    wget.download(
        "https://raw.githubusercontent.com/johnpolsh/inf721-tpfinal/main/colab/convert.py",
        "convert.py")

In [None]:
from convert import convert_for_mobile

convert_for_mobile(mobilenet_model, "object_detection")