# Construction of YOLO v3 Model

In [1]:
import torch
import torch.nn as nn

In [2]:
_x = torch.randn(1, 3, 416, 416)
_x = nn.Conv2d(3, 3, 3, 1, padding=1)(_x)
_x = nn.Conv2d(3, 3, 3, 2, padding=1)(_x)
_x = nn.Conv2d(3, 3, 3, 2, padding=1)(_x)
_x = nn.Conv2d(3, 3, 3, 2, padding=1)(_x)
print(_x.shape)
_x = nn.Conv2d(3, 3, 3, 2, padding=1)(_x)
print(_x.shape)
_x = nn.Conv2d(3, 3, 3, 2, padding=1)(_x)
print(_x.shape)


torch.Size([1, 3, 52, 52])
torch.Size([1, 3, 26, 26])
torch.Size([1, 3, 13, 13])


In [3]:
""" 
Information about architecture config:
Tuple is structured by (filters, kernel_size, stride) 
Every conv is a same convolution. 
List is structured by "B" indicating a residual block followed by the number of repeats
"S" is for scale prediction block and computing the yolo loss
"U" is for upsampling the feature map and concatenating with a previous layer
"""
#416x416
darknet_config = [
    (32, 3, 1),     #416x416, i.e., padding="same" convolution
    (64, 3, 2),     #208x208, i.e., resize by half convolution
    ["B", 1],       #208x208
    (128, 3, 2),    #104x104
    ["B", 2],       #104x104
    (256, 3, 2),    #52x52
    ["B", 8],       #52x52
    (512, 3, 2),    #26x26
    ["B", 8],       #26x26
    (1024, 3, 2),   #None, 1024, 13, 13
    ["B", 4],       # To this point is Darknet-53  #13x13
    (512, 1, 1),    #None, 512, 13, 13
    (1024, 3, 1),   #None, 1024, 13, 13
    "S",
    (256, 1, 1),
    "U",            #None, 256+512 = 3*256, x, x
    (256, 1, 1),
    (512, 3, 1),
    "S",
    (128, 1, 1),
    "U",            #None, 128+256 = 3*128
    (128, 1, 1),
    (256, 3, 1),
    "S",
]

In [4]:
class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, bn_act=True, **kwargs):
        super(CNNBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, bias=not bn_act, **kwargs)
        self.bn = nn.BatchNorm2d(out_channels)
        self.leaky = nn.LeakyReLU(0.1)
        self.use_bn_act = bn_act
    
    def forward(self, x):
        x = self.conv(x)

        if not self.use_bn_act:
            return x
        else:
            x = self.bn(x)
            x = self.leaky(x)
            return x

In [5]:
class ResidualBlock(nn.Module):
    def __init__(self, channels, use_skip_connection=True, num_repeats=1):
        super(ResidualBlock, self).__init__()
        self.layers = nn.ModuleList()
        for _ in range(num_repeats):
            # spatial dimension is preserved
            self.layers += [
                nn.Sequential(
                    CNNBlock(channels, channels//2, kernel_size = 1),
                    CNNBlock(channels//2, channels, kernel_size = 3, padding = 1)
                )
            ]
        
        self.use_skip_connection = use_skip_connection
        self.num_repeats =  num_repeats
    
    def forward(self, x):
        for layer in self.layers:
            if self.use_skip_connection:
                x = layer(x) + x
            else: 
                x = layer(x)

        return x

In [6]:
# Scales mean 13x13, 26x26, 52x52
class ScalePrediction(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(ScalePrediction, self).__init__()
        # pred preserve spatial dimension
        self.pred = nn.Sequential(
            CNNBlock(in_channels, 2 * in_channels, kernel_size=3, padding=1),
            CNNBlock(2 * in_channels, 3 * (num_classes + 5), bn_act=False, kernel_size=1)
        )
        self.num_classes = num_classes
    
    def forward(self, x):
        """
        Finally reshape and permute into: 
            [
                batch_size, 
                anchor_nums,
                num_of_vertical_cells, 
                num_of_horizontal_cells
                class_scores + bounding_box_predictions, 
            ]
        """
        return (
            self
            .pred(x)
            .reshape(x.shape[0], 3, self.num_classes + 5, x.shape[2], x.shape[3])
            .permute(0, 1, 3, 4, 2)            
        )

In [7]:
class YOLOv3(nn.Module):
    def __init__(self, in_channels=3, num_classes=20):
        super(YOLOv3, self).__init__()
        self.num_classes = num_classes
        self.in_channels = in_channels
        self.layers = self._create_conv_layers()
    
    def forward(self, x):
        outputs = []
        route_connections=[]

        for layer in self.layers:
            if isinstance(layer, ScalePrediction):
                output = layer(x)
                outputs.append(output)
                # i.e., don't treat the ScalePrediction as a chain in our network,
                # just record the result and continue
                continue
            
            # this will be executed as long as layer is not ScalePrediction
            x = layer(x)

            if isinstance(layer, ResidualBlock) and layer.num_repeats==8:
                route_connections.append(x)
                
            elif isinstance(layer, nn.Upsample):
                x=torch.cat([x, route_connections[-1]], dim = 1)
                route_connections.pop()
             
        return outputs

    def _create_conv_layers(self):
        layers = nn.ModuleList()
        # initialize input channels
        in_channels = self.in_channels
        for module in darknet_config:
            if isinstance(module, tuple):
                # it is a tuple iff it is a CNNBlock
                out_channels, kernel_size, stride = module
                layers.append(
                    CNNBlock(
                        in_channels=in_channels,
                        out_channels = out_channels,
                        kernel_size = kernel_size,
                        stride = stride,
                        padding = 1 if kernel_size == 3 else 0
                    )
                )
                in_channels = out_channels
            
            elif isinstance(module, list):
                # it is a list iff it is a resudial block
                num_repeats = module[1]
                layers.append(
                    ResidualBlock(
                        in_channels, 
                        num_repeats=num_repeats
                    )
                )
            
            elif isinstance(module, str):
                # Scale Prediction
                if module == "S":
                    layers += [
                        ResidualBlock(
                            in_channels,
                            use_skip_connection=False,
                            num_repeats=1
                        ),
                        CNNBlock(
                            in_channels, 
                            in_channels//2,
                            kernel_size=1
                        ),
                        ScalePrediction(
                            in_channels//2, 
                            num_classes=self.num_classes
                        )
                    ]
                    # Scale prediction will not be counted in the chain of conv nets, 
                    # it will be stored and "continued" in the for loop
                    # so the "current num of channels" is the CNNBlock's one
                    in_channels = in_channels//2
                # Upsampling
                elif module == "U":
                    layers.append(nn.Upsample(scale_factor=2))
                    # every time a ["B", 8] is executed, when then double the in_channel
                    # the output of ["B", 8] then get concated to this upsampled output
                    # this *3 is a summarized pattern of the network
                    in_channels = in_channels*3
        
        return layers

In [8]:
def test():
    num_classes=20
    IMAGE_SIZE=416
    
    model = YOLOv3(num_classes=num_classes)
    x = torch.randn((2, 3, IMAGE_SIZE, IMAGE_SIZE))
    assert model(x)[0].shape == (2, 3, IMAGE_SIZE//32, IMAGE_SIZE//32, num_classes + 5)
    assert model(x)[1].shape == (2, 3, IMAGE_SIZE//16, IMAGE_SIZE//16, num_classes + 5)
    assert model(x)[2].shape == (2, 3, IMAGE_SIZE//8, IMAGE_SIZE//8, num_classes + 5)
    print("success")

In [9]:
test()

success


In [10]:
os.getcwd()

'c:\\Users\\jameslcc\\repo\\Python\\yolov3-from-scratch\\Yolov3'

# Dataset

In [17]:
sys.path.append(os.getcwd())

In [20]:
import numpy as np
import os
import pandas as pd
import torch
import sys


from PIL import Image, ImageFile
from torch.utils.data import Dataset, DataLoader

from utils.utils import (
    iou_width_height as iou,
    non_max_suppression as nms
)


In [21]:
class YOLODataset(Dataset):
    def __init__(
        self, 
        csv_file, 
        img_dir, 
        label_dir, 
        anchors,
        image_size=416,
        strides=[13, 26, 52],
        num_classes=20,
        transform=None
        ):
        super(YOLODataset, self).__init__()
        self.annotations = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.transform = transform
        self.num_anchors = torch.tensor(anchors[0] + anchors[1] + anchors[2])
        self.num_anchors_per_scale = self.num_anchors // 3
        self.num_classes = num_classes
        self.image_size = image_size
        self.strides = strides
        self.ignore_iou_threshold = 0.5
    
    def __len__(self):
        return len(self.annotations)
    
    def __getitem__(self, index):
        label_path = os.path.join(
            self.label_dir, 
            self.annotations.iloc[index, 1]
        )
        # there will be several bboxes for a single label
        bboxes = np.roll(np.loadtxt(fname=label_path, delimiter=" ", ndim=2), 4, axis=1).tolist()
        img_path = os.path(
            self.img_dir, 
            self.annotations.iloc[index, 0]
        )
        image = np.array(Image.open(img_path).convert("RGB"))

        if self.transform:
            augmentations = self.transform(image=image, bboxes=bboxes)
            image = augmentations["image"]
            bboxes = augmentations["bboxes"]
        # (p_o, x, y, w, h, class)
        targets = [torch.zeros((self.num_anchors//3, s, s, 6)) for s in self.strides]

        for box in bboxes:
            # box: (None, 1, 2)  self.anchors: (3,2)
            # on multiplication: say (5, 1, 2) x (3, 2) --> (5,3,2) x (5,3,2)
            # then iou will act on them
            iou_anchors = iou(torch.tensor(box[2:4]), self.anchors)
            anchor_indices = iou_anchors.argsort(desceding=True, dim=0)
            x, y, width, height, class_label = box
            has_anchor = [False, False, False]

            for anchor_idx in anchor_indices:
                scale_idx = anchor_idx // self.num_anchors_per_scale
                anchor_on_scale = anchor_idx % self.num_anchors_per_scale
                s = self.strides[scale_idx]
                i, j = int(s*y), int(s*x)
                anchor_taken = targets[scale_idx][anchor_on_scale, i, j, 0]

                if not anchor_taken and not has_anchor[scale_idx]:
                    targets[scale_idx][anchor_on_scale, i, j, 0] = 1
                    x_cell, y_cell = s*x - j, s*y - i
                    width_cell, height_cell = (width*s, height*s)
                    box_coordinates = torch.tensor(
                        [x_cell, y_cell, width_cell, height_cell]
                    )
                    targets[scale_idx][anchor_on_scale,i,j, 1:5] = box_coordinates
                    targets[scale_idx][anchor_on_scale,i,j, 5] = int(class_label)
                
                elif not anchor_taken and iou_anchors[anchor_idx] > self.ignore_iou_threshold:
                    targets[scale_idx][anchor_on_scale, i, j, 0] = -1 # ignore this prediction     



In [22]:
import torch 
import torch.nn as nn

from utils.utils import intersection_over_union

class YoloLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
        self.bce = nn.BCEWithLogitsLoss()
        self.entropy = nn.CrossEntropyLoss()
        self.sigmoid = nn.Sigmoid()

        self.lambda_clas = 1
        self.lambda_noobj = 10
        self.lambda_obj = 1
        self.lambda_box = 10

    def forward(self, predictions, target, anchors):
        obj = target[..., 0] == 1
        noobj = target[..., 0] == 0

        # no object loss
        no_object_loss = self.bce(
            (predictions[..., 0:1][noobj]),
            (target[..., 0:1][noobj])
        )

        # object loss
        anchors = anchors.reshape(1, 3, 1, 1, 2)
        box_preds = torch.cat(
            [self.sigmoid(predictions[...,1:3]),
            torch.exp(predictions[..., 3:5]*anchors)],
            dim=-1
        )
        ious = intersection_over_union(box_preds[obj], target[..., 1:5][obj]).detach()
        object_loss = self.bce(
            (predictions[...,0:1][obj]), (ious * target[..., 0:1][obj])
        )

        # box coordinate loss
        predictions[..., 1:3]= self.sigmoid(
            predictions[..., 1:3]
        )
        target[...,3:5]= torch.log(
          1e-16 + target[..., 3:5] / anchors
        )
        box_loss = self.mse(
            predictions[..., 1:5][obj], target[...,1:5][obj]
        )
        

        # class loss
        class_loss = self.entropy(
            predictions[..., 5:][obj], target[..., 5][obj].long()
        )

        return (
                self.lambda_box  *  box_loss 
            +   self.lambda_obj * object_loss
            +   self.lambda_noobj * object_loss
            +   self.lambda_class * class_loss
        )


# Training

In [23]:
from utils import config

def get_loaders(train_csv_path, test_csv_path):
    IMAGE_SIZE = config.IMAGE_SIZE
    train_dataset = YOLODataset(
        train_csv_path,
        transform=config.train_transforms,
        strides=[IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8],
        img_dir=config.IMG_DIR,
        label_dir=config.LABEL_DIR,
        anchors=config.ANCHORS,
    )
    test_dataset = YOLODataset(
        test_csv_path,
        transform=config.test_transforms,
        strides=[IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8],
        img_dir=config.IMG_DIR,
        label_dir=config.LABEL_DIR,
        anchors=config.ANCHORS,
    )
    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=config.BATCH_SIZE,
        num_workers=config.NUM_WORKERS,
        pin_memory=config.PIN_MEMORY,
        shuffle=True,
        drop_last=False,
    )
    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=config.BATCH_SIZE,
        num_workers=config.NUM_WORKERS,
        pin_memory=config.PIN_MEMORY,
        shuffle=False,
        drop_last=False,
    )

    train_eval_dataset = YOLODataset(
        train_csv_path,
        transform=config.test_transforms,
        strides=[IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8],
        img_dir=config.IMG_DIR,
        label_dir=config.LABEL_DIR,
        anchors=config.ANCHORS,
    )
    train_eval_loader = DataLoader(
        dataset=train_eval_dataset,
        batch_size=config.BATCH_SIZE,
        num_workers=config.NUM_WORKERS,
        pin_memory=config.PIN_MEMORY,
        shuffle=False,
        drop_last=False,
    )

    return train_loader, test_loader, train_eval_loader


In [24]:
import torch
import torch.optim as optim

from tqdm import tqdm
from utils.utils import (
    mean_average_precision,
    cells_to_bboxes,
    get_evaluation_bboxes,
    save_checkpoint,
    load_checkpoint,
    check_class_accuracy,
    plot_couple_examples
)

import warnings
warnings.filterwarnings("ignore")

torch.backends.cudnn.benchmark = True

In [25]:
def train_fn(train_loader, model, optimizer, loss_fn, scaler, scaled_anchors):
    loop = tqdm(train_loader, leave=True)
    losses = []
    for batch_idx, (x, y) in enumerate(loop):
        x = x.to(config.DEVICE)
        y0, y1, y2 = (
            y[0].to(config.DEVICE),
            y[1].to(config.DEVICE),
            y[2].to(config.DEVICE),
        )

        with torch.cuda.amp.autocast():
            out = model(x)
            loss = (
                loss_fn(out[0], y0, scaled_anchors[0])
                + loss_fn(out[1], y1, scaled_anchors[1])
                + loss_fn(out[2], y2, scaled_anchors[2])
            )

        losses.append(loss.item())
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # update progress bar
        mean_loss = sum(losses) / len(losses)
        loop.set_postfix(loss=mean_loss)

def main():
    model = YOLOv3(num_classes=config.NUM_CLASSES).to(config.DEVICE)
    optimizer = optim.Adam(
        model.parameters(), lr=config.LEARNING_RATE, weight_decay=config.WEIGHT_DECAY
    )

    loss_fn = YoloLoss()
    scaler = torch.cuda.amp.GradScaler()

    train_loader, test_loader, train_eval_loader = get_loaders(
        train_csv_path= config.DATASET + "/8examples.csv",
        test_csv_path = config.DATASET + "/8examples.csv"
    )

    if config.LOAD_MODEL:
        load_checkpoint(
            config.CHECKPOINT_FILE, model, optimizer, config.LEARNING_RATE
        )
    
    scaled_anchors = (
        torch.tensor(config.ANCHORS)
        * torch.tensor(config.S).unsqueeze(1).unsqueeze(2).repeat(1,3,2)
    ).to(config.DEVICE)


    for epoch in range(config.NUM_EPOCHS):
        train_fn(test_loader, model, optimizer,loss_fn, scaler, scaled_anchors)

        if config.SAVE_MODEL:
            save_checkpoint(model, optimizer)

        #if config.SAVE_MODEL:
        #    save_checkpoint(model, optimizer, filename=f"checkpoint.pth.tar")

        #print(f"Currently epoch {epoch}")
        #print("On Train Eval loader:")
        #print("On Train loader:")
        #check_class_accuracy(model, train_loader, threshold=config.CONF_THRESHOLD)

        if epoch > 0 and epoch % 3 == 0:
            check_class_accuracy(
                model, 
                test_loader, 
                threshold=config.CONF_THRESHOLD
            )
            pred_boxes, true_boxes = get_evaluation_bboxes(
                test_loader,
                model,
                iou_threshold=config.NMS_IOU_THRESH,
                anchors=config.ANCHORS,
                threshold=config.CONF_THRESHOLD,
            )
            mapval = mean_average_precision(
                pred_boxes,
                true_boxes,
                iou_threshold=config.MAP_IOU_THRESH,
                box_format="midpoint",
                num_classes=config.NUM_CLASSES,
            )
            print(f"MAP: {mapval.item()}")
            model.train()

In [26]:
main()

  0%|          | 0/1 [00:05<?, ?it/s]


RuntimeError: DataLoader worker (pid(s) 15696, 22796, 25692, 21460) exited unexpectedly

In [None]:
# panda experiment
# mydict = [
# {'a': 1, 'b': 2, 'c': 3, 'd': 4},
# {'a': 100, 'b': 200, 'c': 300, 'd': 400},
# {'a': 1000, 'b': 2000, 'c': 3000, 'd': 4000 }
# ]
# df = pd.DataFrame(mydict)
# np.array(df.iloc[1,1:4])
# result: array([200, 300, 400], dtype=int64)

In [None]:
# np.loadtxt experiment
# ndmin is the least dimension
# delimiter means a string that we split the line
# np.roll(x, shift, axis)
# shift the element of x to the right (-ve means left) cylically along the given axis 

# row = np.loadtxt(fname="./dataset/labels/000001.txt", delimiter=" ", ndmin=2)
# print(row)
