# **YOLO (version 1) Implementation (from scratch):**

***author:** Mohamed Behery (with special thanks to <a href="https://www.youtube.com/@AladdinPersson">Aladdin Persson</a>)*<br/>
***email:** m.behery@live.com*<br/>
***Phone/Skype Username:** +201062989114*

## **Implementation Methodology:**

This code was developed after following the details and explanations provided by <a href="https://www.youtube.com/@AladdinPersson">Aladdin Persson</a> in the following youtube video:

**Title:** YOLOv1 from Scratch<br/>
**URL:**   *https://www.youtube.com/watch?v=n9_XyCGr-MI*

<font color = 'green'>**As a contribution**</font> to Aladdin's job that was <font color = 'red'>**amazingly**</font> done, several aspects of improvement were applied:
>- It implements the entire algorithm including fetching datasets, preprocessing and training the model in this file alone. Where the dataset subsetting and preparation phase was redeveloped to have a collective summarization algorithm implemented for inspecting the datapoints available for each class and for each dataset, so that other versions of the VOC datasets are to be downloaded for the algorithm to combine them accordingly.

>- It doesn't update, in place, the objects used in calculating the cost function because it leads to broken links in the graph when performing gradient descent. Although this doesn't show in Aladdin's video, where I might have missed something.

>- It uses a much more concise form that allows changing the number of classes, cells and bounding boxes per cell which is reflected in calculating the model's cost function making it easy to overhaul the entire architecture to the next level.

## **Conclusion:**

Implementing the algorithm requires a large number of CUDA cores for faster training times which is not available with my current local GPU (RTX 2060) and therefore training, validation and most definitely testing cannot be seen through. However, the code should run well. If you find something to be tweaked, kindly DM me over Whatsapp/Skype.

In [1]:
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from functools import reduce
from torchvision.io import read_image
from torchvision.transforms import v2
from xml.etree import ElementTree as ET
from os.path import isfile, dirname, join
from os import listdir, makedirs, getcwd
import pandas as pd
from nvidia_smi import nvmlInit, nvmlShutdown, nvmlDeviceGetHandleByIndex as nvmlGetHandle, nvmlDeviceGetMemoryInfo as nvmlGetMemory
from psutil import virtual_memory

class YOLOv1(nn.Module):

    __LAYER_INFO = [
        (64,7,2),
        (None,2,2),
        (192,3,1),
        (None,2,2),
        (128,1,1),
        (256,3,1),
        (256,1,1),
        (512,3,1),
        (None,2,2),
        *[(256,1,1), (512,3,1)] * 4,
        (512,1,1),
        (1024,3,1),
        (None,2,2),
        *[(512,1,1), (1024,3,1)] * 2,
        (1024,3,1),
        (1024,3,2),
        (1024,3,1),
        (1024,3,1)
    ]

    def __init__(self, chw = (3, 448, 448), classes = 20, bboxes = 2):
        
        super().__init__()
        self.__DEVICE = 'cpu' if torch.cuda.is_available() else 'cpu'
        
        self.__C, self.__H, self.__W = chw
        self.__S = reduce(lambda a, b: a * b, [x[-1] for x in self.__LAYER_INFO])
        self.__CLASSES = classes
        self.__BBOXES = bboxes
        
        assert self.__H % self.__S == 0 and self.__W % self.__S == 0, f'Image dimensions ({self.__H}, {self.__W}) must be divisible by the CNN stride ({self__S}).'
        
        self.__CELLS = self.__H * self.__W // self.__S ** 2

        if len(self.__LAYER_INFO[0]) < 4:
            self.__prepare_layer_info()
        
        self.darknet = nn.Sequential(*[self.cnn_unit(*x) for x in self.__LAYER_INFO])
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024 * self.__CELLS, 4096),
            nn.Dropout(0.0),
            nn.LeakyReLU(0.1),
            nn.Linear(4096, self.__CELLS * (self.__CLASSES + 5 * self.__BBOXES))
        )
        
        super().to(self.__DEVICE)

    def forward(self, x):
        return self.fc(self.darknet(x))
    
    def __prepare_layer_info(self):
        prev_out_channels = self.__C
        for i, x in enumerate(self.__LAYER_INFO):
            self.__LAYER_INFO[i] = tuple([prev_out_channels if x[0] else None] + list(x))
            if x[0]:
                prev_out_channels = x[0]
            
    @staticmethod
    def cnn_unit(*layer_args):
        in_channels, out_channels, kernel, stride = layer_args
        if in_channels:
            padding = kernel // 2
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel, stride, padding, bias = False),
                nn.BatchNorm2d(out_channels),
                nn.LeakyReLU(0.1)
            )
        return nn.MaxPool2d(kernel, stride)
        
    def chw(self):
        return self.__C, self.__H, self.__W

    def stride(self):
        return self.__S

    def cells(self):
        return self.__CELLS

    def layer_info(self):
        return self.__LAYER_INFO

    def device(self):
        return self.__DEVICE

    def classes(self):
        return self.__CLASSES

    def bboxes(self):
        return self.__BBOXES

In [2]:
class helper:

    @staticmethod
    def iou(bboxes_a, bboxes_b, fmt = 'midpoint'):
    
        assert fmt in ['midpoint', 'corners', 'hybrid']
        
        def reformat(bboxes):
            if fmt == 'midpoint':
                bboxes[..., :2] = bboxes[..., :2] - bboxes[..., 2:] / 2
                bboxes[..., 2:] = bboxes[..., :2] + bboxes[..., 2:] / 2
            elif fmt == 'hybrid':
                bboxes[..., 2:] = bboxes[..., :2] + bboxes[..., 2:]
            return bboxes
        
        def area(bboxes):
            return ((bboxes[..., 0] - bboxes[..., 2]) * (bboxes[..., 1] - bboxes[..., 3])).abs()
        
        def intersection(bboxes_a, bboxes_b):
            bboxes_ab = torch.cat([bboxes_a.unsqueeze(0), bboxes_b.unsqueeze(0)], dim = 0)
            bboxes = torch.empty_like(bboxes_a)
            bboxes[..., :2] = bboxes_ab[..., :2].max(dim = 0).values
            bboxes[..., 2:] = bboxes_ab[..., 2:].min(dim = 0).values
            return bboxes
    
        bboxes_a, bboxes_b = map(reformat, [bboxes_a, bboxes_b])
        area_a, area_b = map(area, [bboxes_a, bboxes_b])
        
        inter_area_ab = area(intersection(bboxes_a, bboxes_b))
        union_area_ab = (area_a + area_b) - inter_area_ab
        
        return inter_area_ab / (union_area_ab + 1e-6)

In [3]:
class Loss(nn.Module):
    
    def __init__(self, model, coefs):
        super().__init__()
        _, self.__H, self.__W = model.chw()
        self.__S = model.stride()
        self.__BBOXES = model.bboxes()
        self.__CLASSES = model.classes()
        self.__DEVICE = model.device()
        self.__COEF_NOOBJ, self.__COEF_COORD = coefs
        self.mse = nn.MSELoss(reduction = 'sum')
        super().to(self.__DEVICE)

    def bboxes(y, h, w, d):
        return y.reshape(-1, h, w, d)
        y_pred.reshape(-1, self.__H // self.__S, self.__W // self.__S, self.__CLASSES + 5 * self.__BBOXES)
        
    def forward(self, y_pred, y_true):
        
        y_pred, y_true = map(lambda y: y.reshape(-1, self.__H // self.__S, self.__W // self.__S, self.__CLASSES + 5 * self.__BBOXES), [y_pred, y_true])
        
        bbox_slice = lambda idx: slice(self.__CLASSES + 5 * idx + 1, self.__CLASSES + 5 * (idx + 1))
        
        _, best_idxs = torch.cat([helper.iou(y_pred[..., bbox_slice(i)], y_true[..., bbox_slice(0)]).unsqueeze(0) for i in range(self.__BBOXES)], dim = 0).max(0)
        best_idxs = F.one_hot(best_idxs, self.__BBOXES)
        
        bboxes_pred = torch.cat([y_pred[..., bbox_slice(i)].unsqueeze(0) for i in range(self.__BBOXES)], dim = 0).permute(1,2,3,4,0)
        best_bboxes_pred = (best_idxs.unsqueeze(-2) * bboxes_pred).sum(-1)
        
        bboxes_exist_true = y_true[..., self.__CLASSES].unsqueeze(-1)
        
        best_bboxes_pred = bboxes_exist_true * best_bboxes_pred
        bboxes_true = bboxes_exist_true * y_true[..., bbox_slice(0)]

        best_bboxes_pred__lw = torch.sign(best_bboxes_pred[..., 2:]) * torch.sqrt(best_bboxes_pred[..., 2:].abs() + 1e-6)
        bboxes_true__lw = torch.sign(bboxes_true[..., 2:]) * torch.sqrt(bboxes_true[..., 2:].abs() + 1e-6)

        best_bboxes_pred = torch.cat([best_bboxes_pred[..., :2], best_bboxes_pred__lw], dim = -1)
        bboxes_true = torch.cat([bboxes_true[..., :2], bboxes_true__lw], dim = -1)
        
        bbox_loss = ((best_bboxes_pred - bboxes_true) ** 2).sum()

        bboxes_exist_pred = y_pred[..., self.__CLASSES::5]
        best_bboxes_exist_pred = (best_idxs * bboxes_exist_pred).sum(-1)

        bboxes_exist_true = bboxes_exist_true.squeeze(-1)
        obj_loss = ((bboxes_exist_true * best_bboxes_exist_pred - bboxes_exist_true) ** 2).sum()

        bboxes_not_exist_true = (1 - bboxes_exist_true).unsqueeze(-1)
        noobj_loss = ((bboxes_not_exist_true * bboxes_exist_pred - bboxes_not_exist_true) ** 2).sum()

        bboxes_exist_true = bboxes_exist_true.unsqueeze(-1)
        class_loss = ((bboxes_exist_true * y_pred[..., :self.__CLASSES] - bboxes_exist_true * y_true[..., :self.__CLASSES]) ** 2).sum()

        return (self.__COEF_COORD * bbox_loss + obj_loss + self.__COEF_NOOBJ * noobj_loss + class_loss) / y_pred.shape[0]

In [5]:
class VOCSubsets:

    def __init__(self, root_dirpath, difficult_allowed):
        self.__ROOT = root_dirpath
        self.__DIFF = difficult_allowed
        self.__SUBS, self.__CLSS = self.build(self.__ROOT, self.__DIFF)
    
    @staticmethod
    def parse_label(xml_path, difficult_allowed = True):        
        with open(xml_path) as f:
            root = ET.parse(f).getroot()
        node = root.find('size')
        W, H = map(lambda x: float(node.find(x).text), ['width', 'height'])
        objs = []
        for node in root.iter('object'):
            is_difficult = int(node.find('difficult').text)
            if not difficult_allowed and is_difficult:
                continue
            name = node.find('name').text
            node = node.find('bndbox')
            x1, x2, y1, y2 = [float(node.find(x).text) / (W if i < 2 else H) for i, x in enumerate(['xmin', 'xmax', 'ymin', 'ymax'])]
            x, y = (x1 + x2) / 2, (y1 + y2) / 2
            w, h = x2 - x1, y2 - y1
            x, y, w, h = map(lambda v: round(v, 4), [x, y, w, h])
            objs.append((name, x, y, w, h))
        return objs

    @staticmethod
    def build(root_dirpath, difficult_allowed):
        if root_dirpath.startswith('./'):
            root_dirpath = getcwd() + '/' + root_dirpath[2:]
        subset_keys = ('train', 'val', 'test')
        subset_dirpaths = [join(root_dirpath, x) + '/ImageSets/Main' for x in listdir(root_dirpath) if x.startswith('VOC')]
        subset_filepaths = { y : [f'{x}/{y}.txt' for x in subset_dirpaths if isfile(f'{x}/{y}.txt')] for y in subset_keys }
        subsets, classes = dict(), set()
        for key in subset_keys:
            subsets[key] = []
            for path in subset_filepaths[key]:
                image_dirpath, label_dirpath = [f'{dirname(dirname(dirname(path)))}/{x}' for x in ('JPEGImages', 'Annotations')]
                with open(path) as f:
                    lines = filter(lambda x: x.strip(), f.readlines())
                for line in lines:
                    image_path = f'{image_dirpath}/{line.strip()}.jpg'
                    objs = VOCSubsets.parse_label(f'{label_dirpath}/{line.strip()}.xml', difficult_allowed)
                    classes.update([x[0] for x in objs])
                    label = '|'.join([','.join([str(x) for x in obj]) for obj in objs])
                    subsets[key].append(f'{image_path} {label}')
            dst_dirpath = join(root_dirpath, 'Subsets')
            makedirs(dst_dirpath, exist_ok = True)
            dst = f'{dst_dirpath}/{key}.txt'
            with open(dst, 'w') as f:
                f.write('\n'.join(subsets[key]) + '\n')
            subsets[key] = dst
        return subsets, sorted(classes)

    def __getitem__(self, item):
        return self.__SUBS[item]

    def difficult_allowed(self):
        return self.__DIFF

    def root_dirpath(self):
        return self.__ROOT

    def subsets(self):
        return self.__SUBS

    def classes(self):
        return self.__CLSS

In [6]:
class VOCDataset(Dataset):

    def __init__(self, src_filepath, chw, classes, divs, bboxes):
        with open(src_filepath) as f:
            self.__IMAGES, self.__LABELS = zip(*[x.split() for x in f.readlines() if x.strip()])
        self.__CLASSES = classes
        self.__DIVS = divs
        self.__BBOXES = bboxes
        self.__COUNTS = ['\n'.join(self.__LABELS).count(x) for x in self.__CLASSES]
        self.__CHW = chw
        self.__PREPROCESS = v2.Compose([
            v2.Resize(size = self.__CHW[1:], antialias = True),
            v2.Normalize(mean = [0.0] * 3, std = [1.0] * 3)
        ])
    
    def formulate(self, label):        
        objs = [[float(y) if j > 0 else y for j, y in enumerate(x.split(','))] for x in label.split('|')]
        cell_index = lambda pos: int(pos * self.__DIVS) - 1
        n_classes = len(self.__CLASSES)
        label = torch.zeros(self.__DIVS, self.__DIVS, n_classes + 5 * self.__BBOXES)
        for obj in objs:
            name, x, y, w, h = obj
            class_id = self.__CLASSES.index(name)
            i, j = map(cell_index, [x, y])
            label[i, j, class_id] = 1
            label[i, j, n_classes : n_classes + 5] = torch.tensor([1, x, y, w, h])
        return label.flatten()

    def __len__(self):
        return len(self.__IMAGES)

    def __getitem__(self, item):
        image = self.__PREPROCESS(read_image(self.__IMAGES[item]) / 255)
        label = self.formulate(self.__LABELS[item])
        return image, label
    
    def summary(self):
        series = pd.Series(self.__COUNTS, index = self.__CLASSES, name = 'count')
        series[''] = series.sum()
        df = series.reset_index()
        df.rename(columns = {'index':'class'}, index = {len(self.__CLASSES):''}, inplace = True)
        return df
    
    @staticmethod
    def collective_summary(**datasets):
        summary = pd.concat([x.summary().set_index('class') for x in datasets.values()], axis = 1)
        summary.columns = [x.upper() for x in datasets.keys()]
        summary[''] = summary.sum(axis = 1)
        return summary.reset_index().rename(index = {summary.shape[0]-1:''})

In [7]:
def get_ram_used(device):
    assert device in {'cpu', 'cuda'}
    if device == 'cpu':
        return virtual_memory()[2]/100
    else:
        info = nvmlGetMemory(nvmlGetHandle(0))
        return info.used / info.total
    
def train(model, criterion, optimizer, dataloaders, epochs, best_weights):
    
    torch.autograd.set_detect_anomaly(True)
    
    def load_best():
        if isfile(best_weights):
            model.load_state_dict(torch.load(best_weights))
        return model
    
    model = load_best()
    device = model.device()
    
    if device == 'cuda':
        nvmlInit()
        
    train_batches, val_batches = map(len, dataloaders)

    flag = val_batches > train_batches
    factor = val_batches // train_batches if flag else train_batches // val_batches

    train_cost, val_cost = 0.0, 0.0
    train_loader, val_loader = dataloaders

    min_cost = 0.0
    for epoch_idx in range(epochs):
        
        val_idx = 0
        for train_idx, (x_train, y_train) in enumerate(train_loader):

            try:
                
                model.train()
                y_train = y_train.to(device)
                batch_cost = criterion(model(x_train), y_train)
                train_cost += batch_cost.item()
                
                val_flag = flag or (not flag and ((train_idx + 1) % factor == 0 or (train_idx + 1) == train_batches))
                if val_flag:
                    with torch.no_grad():
                        for _ in range(factor if flag else 1):
                            x_val, y_val = next(val_loader)
                            val_idx += 1
                            y_val = y_val.to(device)
                            val_cost += criterion(model(x_val), y_val).item()
                            print(f'\rMemory [{get_ram_used(device):.1%}] - Epoch [{epoch_idx + 1}/{epochs}] - Train Batch [{train_idx + 1}/{train_batches}] - Valid Batch [{val_idx + 1}/{val_batches}]:\tTrain Cost: {train_cost:.05f} | Valid Cost: {val_cost:.05f}', end = '')
                print(f'\rMemory [{get_ram_used(device):.1%}] - Epoch [{epoch_idx + 1}/{epochs}] - Train Batch [{train_idx + 1}/{train_batches}] - Valid Batch [{val_idx + 1}/{val_batches}]:\tTrain Cost: {train_cost:.05f} | Valid Cost: {val_cost:.05f}', end = '')
                
                optimizer.zero_grad()
                batch_cost.backward()
                optimizer.step()

            except KeyboardInterrupt:
                if device == 'cuda':
                    nvmlShutdown()
                    
                return load_best()
                

        if val_cost < min_cost:
            torch.save(model.state_dict(), best_weights)
            min_cost = val_cost
    
    if device == 'cuda':
        nvmlShutdown()
        
    return load_best() 

In [None]:
%mkdir ./data

!wget http://host.robots.ox.ac.uk/pascal/VOC/voc2007/VOCtrainval_06-Nov-2007.tar -P ./data
!tar xf ./data/VOCtrainval_06-Nov-2007.tar -C ./data

!wget http://host.robots.ox.ac.uk/pascal/VOC/voc2007/VOCtest_06-Nov-2007.tar -P ./data
!tar xf ./data/VOCtest_06-Nov-2007.tar -C ./data

!wget http://host.robots.ox.ac.uk/pascal/VOC/voc2012/VOCtrainval_11-May-2012.tar -P ./data
!tar xf ./data/VOCtrainval_11-May-2012.tar -C ./data

In [8]:
subsets = VOCSubsets('./data/VOCdevkit/', difficult_allowed = True)
datasets = { x : VOCDataset(subsets[x], (3, 448, 448), subsets.classes(), 7, 2) for x in ['train', 'val', 'test'] }
display(VOCDataset.collective_summary(**datasets))

Unnamed: 0,class,TRAIN,VAL,TEST,Unnamed: 5
0.0,aeroplane,626,659,311,1596
1.0,bicycle,612,596,389,1597
2.0,bird,886,934,576,2396
3.0,boat,716,681,393,1790
4.0,bottle,1087,1029,657,2773
5.0,bus,448,461,254,1163
6.0,car,2017,1991,1541,5549
7.0,cat,800,816,370,1986
8.0,chair,2183,2155,1374,5712
9.0,cow,540,518,329,1387


In [None]:
dataloaders = {x : iter(DataLoader(dataset = datasets[x], batch_size = 1, shuffle = True)) for x in datasets}
model = YOLOv1((3, 448, 448), 20, 2)
loss = Loss(model, coefs = (0.5, 5))
optimizer = Adam(params = model.parameters(), lr = 2e-5, weight_decay = 0.0)
train(model, loss, optimizer, list(dataloaders.values())[:-1], 100, './best_weights.pth')

Memory [46.1%] - Epoch [1/100] - Train Batch [1369/8218] - Valid Batch [1370/8333]:	Train Cost: 16293.56092 | Valid Cost: 15663.66891