In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torch.optim as optim
import os
import pandas as pd
import numpy as np
import albumentations as A
from PIL import Image
import matplotlib.pyplot as plt
import cv2, math

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

# Architecture Definition

In [None]:
yolo_architecture = [
    (32, 3, 1),
    (64, 3, 2),
    ["residual", 1],
    (128, 3, 2),
    ["residual", 2],
    (256, 3, 2),
    ["residualYolo", 8],
    # first yolo route
    (512, 3, 2),
    ["residualYolo", 8],
    # second yolo route
    (1024, 3, 2),
    ["residual", 4],
    # third yolo route
    ["yolo", 1024],
    ["yolo", 512],
    ["yolo", 256],
]

# Blocks Definition

In [None]:
# Convolutional Layer
class ConvLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, bn_act=True, **kwargs):
        super().__init__()
        padding=1 if kernel_size == 3 else 0
        self.conv = nn.Conv2d(in_channels,
                              out_channels,
                              kernel_size=kernel_size,
                              bias=not bn_act,
                              padding=padding, 
                              **kwargs)
        # if batchnorm, then leaky relu is the activation function
        self.use_bn_act = bn_act
        if self.use_bn_act:
            self.bn = nn.BatchNorm2d(out_channels)
            self.leaky = nn.LeakyReLU(0.1)

    def forward(self, x):
        if self.use_bn_act:
            return self.leaky(self.bn(self.conv(x)))
        else:
            return self.conv(x)

In [None]:
# Block of convolutional layers
class ConvBlock(nn.Module):
    def __init__(self, channels, use_residual=True, num_repeats=1):
        super().__init__()
        self.layers = nn.ModuleList()
        for i in range(num_repeats):
            self.layers += [
                nn.Sequential(
                    ConvLayer(channels, channels // 2, kernel_size=1),
                    ConvLayer(channels // 2, channels, kernel_size=3),
                )
            ]

        self.use_residual = use_residual

    def forward(self, x):
        for layer in self.layers:
            x = layer(x) + self.use_residual * x

        return x

In [None]:
# Block of convolutional layers with feature map to be saved for detections
class ConvBlockYolo(nn.Module):
    def __init__(self, channels, num_repeats=1):
        super().__init__()
        self.layers = nn.ModuleList()
        self.feat_map = None
        num_layer = 1
        for i in range(num_repeats):
            if num_layer == 5:
                self.layers += [
                    ConvLayer(channels, channels // 2, kernel_size=1),
                    ConvLayer(channels // 2, channels, kernel_size=3)
                ]
            else:
                self.layers += [
                    nn.Sequential(
                        ConvLayer(channels, channels // 2, kernel_size=1),
                        ConvLayer(channels // 2, channels, kernel_size=3),
                    )
                ]
            num_layer += 2
            
    def forward(self, x):
        self.feat_map = None
        for layer in self.layers:
            if isinstance(layer, ConvLayer):
                if self.feat_map == None:
                    self.feat_map = layer(x)
                else:
                    x = layer(self.feat_map) + x
            else:
                x = layer(x) + x

        return x

In [None]:
# detection block which outputs bboxes for each grid cell
class DetectionBlock(nn.Module):
    def __init__(self, channels, num_classes):
        super().__init__()
        # feature map to be passed to the the next yolo prediction block
        self.feat_map = None
        # conv layers
        self.convBlock = ConvBlock(channels, use_residual=False, num_repeats=2)
        # penultimate conv block, its feature map must be saved to be passed to other detections block
        self.penultConvBlock = ConvLayer(channels, channels // 2, kernel_size=1)
        self.ultConvBlock = ConvLayer(channels // 2, channels, kernel_size=3)
        # output of last convolution is a tensor: 
        # batch_size x 
        # predictions_size (= n_anchor_boxes * (bbox_coord + obj + n_classes) ) x
        # grid_size x grid_size (13 small objects, 26 medium objects, 52 big objects)
        self.outputPredictions = ConvLayer(channels, 3 * (4+1+num_classes), kernel_size=1, bn_act=False)
    
    def forward(self, x):
        x = self.convBlock(x)
        self.feat_map = self.penultConvBlock(x)
        x = self.ultConvBlock(self.feat_map)
        x = self.outputPredictions(x)
        # change order of dimensions in: batch_size x grid_size x grid_size x predictions_size
        x = torch.permute(x, (0, 2, 3, 1))
        x_shape = x.shape
        # reshape output to: batch_size x grid_size x grid_size x num_anchor_boxes_per_cell x (bbox_coord + obj + n_classes)  
        return x.reshape(x_shape[0], x_shape[1], x_shape[2], 3,  -1)
        

# Network Definition

In [None]:
class yolov3(nn.Module):
    def __init__(self, architecture, num_classes, in_channels=3):
        super().__init__()
        self.num_classes = num_classes
        self.in_channels = in_channels
        self.layers = self._create_layers(architecture)
        
    def forward(self, x):
        num_detection = 0
        # list of feature maps values needed for yolo detection
        feat_maps = []
        # tensor of the detections, ordered from the biggest detections to the smallest
        detections = []
        for layer in self.layers:
            x = layer(x)
            # if conv block of darknet, save feature maps used later for detection
            if isinstance(layer, ConvBlockYolo):
                feat_maps.append(layer.feat_map)
                
            # if detection block, save detection and set x to value needed for the next detection
            elif isinstance(layer, DetectionBlock):
                # x is a tensor of dim: n_anchor_boxes * (bbox_coord + obj + n_classes)
                detections.append(x)
                x = layer.feat_map
                num_detection += 1
            # concat darknet feat map with upsampled tensor of previous detection
            elif isinstance(layer, nn.Upsample):
                # feat maps from darkent are ordered from smallest to biggest
                # yolo detection uses feat maps from biggest to smallest
                # then we take darknet feat maps backwards
                x = torch.cat((feat_maps[-num_detection], x), dim=1)
        # return list of detections from the biggest to the smallest
        # each detection is a tensor
        return detections
        
    def _create_layers(self, architecture):
        layers = nn.ModuleList()
        in_channels = self.in_channels
        num_detection = 1
        for module in architecture:
            # conv layer
            if isinstance(module, tuple):
                if module[-1] == "linear":
                    bn_act = False
                else:
                    bn_act = True
                layers.append(ConvLayer(in_channels, 
                                        module[0], 
                                        kernel_size=module[1],
                                        bn_act=bn_act,
                                        stride=module[2])
                             )
                in_channels = module[0]
                continue
            
            if isinstance(module, list):
                # residual block
                if module[0] == "residual":
                    layers.append(ConvBlock(in_channels,
                                            num_repeats=module[1])
                                 )
                    continue
                # residual block with feature map to be saved for detection
                elif module[0] == "residualYolo":
                    layers.append(ConvBlockYolo(in_channels,
                                            num_repeats=module[1])
                                 )
                    continue
                # detection Block
                elif module[0] == "yolo":
                    if num_detection == 3:
                        layers.append(DetectionBlock(module[1],
                                                     self.num_classes)
                                     )
                    else:
                        layers += [DetectionBlock(module[1], 
                                                  self.num_classes),
                                   ConvLayer(module[1] // 2, 
                                             module[1] // 4,
                                             kernel_size=1),
                                   nn.Upsample(scale_factor=2)
                                  ]
                    num_detection += 1
                    continue
                    
        return layers

# Data Preprocessing

In [None]:
from nuimages import NuImages

In [None]:
nuim = NuImages(dataroot='/home/loreleva/Desktop/Big Data Project/data/sets/nuimages', version='v1.0-train', verbose=True, lazy=True)

In [None]:
NUIMAGES_LABELS = [
    "human",
    "barrier",
    "trafficcone",
    "bicycle",
    "bus",
    "car",
    "motorcycle",
    "truck",
    "construction_vehicles",
    "trailer",
]

In [None]:
selection_category = {
 "animal" : None,
 "human.pedestrian.adult" : "human",
 "human.pedestrian.child" : "human",
 "human.pedestrian.construction_worker" : "human",
 "human.pedestrian.personal_mobility" : "human",
 "human.pedestrian.police_officer" : "human",
 "human.pedestrian.stroller" : "human",
 "human.pedestrian.wheelchair" : "human",
 "movable_object.barrier" : "barrier",
 "movable_object.debris" : None,
 "movable_object.pushable_pullable" : None,
 "movable_object.trafficcone" : "trafficcone",
 "static_object.bicycle_rack" : None,
 "vehicle.bicycle" : "bicycle",
 "vehicle.bus.bendy" : "bus",
 "vehicle.bus.rigid" : "bus",
 "vehicle.car" : "car",
 "vehicle.ego" : None,
 "vehicle.construction" : "construction_vehicles",
 "vehicle.emergency.ambulance" : "truck",
 "vehicle.emergency.police" : "car",
 "vehicle.motorcycle" : "motorcycle",
 "vehicle.trailer" : "trailer",
 "vehicle.truck" : "truck"
}

In [None]:
def get_bboxes(sample_token):
    # returns list of annotations: [[bbox, category], ...]
    ann_tokens, _ = nuim.list_anns(sample_token, verbose=False)
    annotations = []
    for token in ann_tokens:
        object_ann = nuim.get("object_ann", token)
        category = selection_category[nuim.get("category", object_ann["category_token"])["name"]]
        if category == None:
            continue
        annotations.append(object_ann["bbox"] + [category])
    return annotations

In [None]:
def normalize_bboxes(img_h, img_w, bboxes, confidence=False):
    # transform elements of bboxes in range [0,1]
    new_bboxes = []
    for bbox in bboxes:
        if confidence:
            new_bboxes.append([bbox[0]/img_w, bbox[1]/img_h, bbox[2]/img_w, bbox[3]/img_h, bbox[4], bbox[5]])
        else:
            new_bboxes.append([bbox[0]/img_w, bbox[1]/img_h, bbox[2]/img_w, bbox[3]/img_h, bbox[4]])
    return new_bboxes

def unnormalize_bboxes(img_h, img_w, bboxes, confidence=False):
    # transform elements of bboxes from range [0,1] in range of integers [0, img_w], [0, img_h]
    new_bboxes = []
    for bbox in bboxes:
        if confidence:
            new_bboxes.append([int(bbox[0]*img_w), int(bbox[1]*img_h), int(bbox[2]*img_w), int(bbox[3]*img_h), bbox[4], bbox[5]])
        else:
            new_bboxes.append([int(bbox[0]*img_w), int(bbox[1]*img_h), int(bbox[2]*img_w), int(bbox[3]*img_h), bbox[4]])
    return new_bboxes
    
def pascal_to_yolo(img_h, img_w, bboxes, confidence=False):
    # convert bboxes from pascal notation to yolo notation
    # i.e., from [xmin, ymin, xmax, ymax, category] notation to [xcenter, ycenter, width, height, category] normalized
    new_bboxes = []
    for bbox in bboxes:
        # check if bbox exceed image bounds
        if bbox[2] > img_w:
            bbox[2] = img_w
        if bbox[3] > img_h:
            bbox[3] = img_h
        bbox_h = bbox[3] - bbox[1]
        bbox_w = bbox[2] - bbox[0]
        if bbox_h == 0 or bbox_w == 0:
            continue
        if confidence:
            new_bboxes.append([bbox[0] + bbox_w/2, 
                           bbox[1] + bbox_h/2, 
                           bbox_w,
                           bbox_h, 
                           bbox[4], 
                           bbox[5]
                          ])
        else:
            new_bboxes.append([bbox[0] + bbox_w/2, 
                           bbox[1] + bbox_h/2, 
                           bbox_w,
                           bbox_h, 
                           bbox[4]
                          ])
    return normalize_bboxes(img_h, img_w, new_bboxes, confidence=confidence)
    
def yolo_to_pascal(img_h, img_w, bboxes, confidence=False):
    # convert bboxes from yolo notation to pascal notation
    # i.e., from [xcenter, ycenter, width, height, category] notation to [xmin, ymin, xmax, ymax, category] notation
    new_bboxes = []
    for bbox in bboxes:
        if bbox[2] == 0 or bbox[3] == 0:
            continue
        if confidence:
            new_bboxes.append([bbox[0] - bbox[2]/2, 
                           bbox[1] - bbox[3]/2, 
                           bbox[0] + bbox[2]/2,
                           bbox[1] + bbox[3]/2, 
                           bbox[4],
                           bbox[5]
                          ])
        else:
            new_bboxes.append([bbox[0] - bbox[2]/2, 
                           bbox[1] - bbox[3]/2, 
                           bbox[0] + bbox[2]/2,
                           bbox[1] + bbox[3]/2, 
                           bbox[4]
                          ])
    return unnormalize_bboxes(img_h, img_w, new_bboxes, confidence=confidence)

In [None]:
resize = A.Compose(
    [
        A.Resize(416, 416)

    ], bbox_params=A.BboxParams(format="yolo")
)

In [None]:
def create_dataset(nuim, path_nuimages, type_dataset, path_dataset, resize):
    path_nuimages_samples = os.path.join(path_nuimages, "samples")
    samples_cam_names = os.listdir(path_nuimages_samples)
    # create directories if not exist
    if not os.path.exists(path_dataset):
        os.mkdir(path_dataset)
    path_type_dataset = os.path.join(path_dataset, type_dataset)
    if not os.path.exists(path_type_dataset):
        os.mkdir(path_type_dataset)
    path_samples = os.path.join(path_type_dataset, "samples")
    if not os.path.exists(path_samples):
        os.mkdir(path_samples)
    path_bbox_ann = os.path.join(path_type_dataset, "bbox_annotations")
    if not os.path.exists(path_bbox_ann):
        os.mkdir(path_bbox_ann)
    for cam_name in samples_cam_names:
        path_cam_sample = os.path.join(path_samples, cam_name)
        if not os.path.exists(path_cam_sample):
            os.mkdir(path_cam_sample)
        path_cam_annotations = os.path.join(path_bbox_ann, cam_name)
        if not os.path.exists(path_cam_annotations):
            os.mkdir(path_cam_annotations)
    i=0
    N = len(nuim.sample)
    i = 1
    
    main_csv = "filename_img; filename_annotations\n"
    for sample in nuim.sample:
        print(f"SAMPLE: {i}/{N}")
        i+=1
        bboxes = get_bboxes(sample["token"])
        filename_img = nuim.get("sample_data", sample["key_camera_token"])["filename"]
        np_img = np.array(Image.open(os.path.join(path_nuimages, filename_img)).convert("RGB"))
        bboxes = pascal_to_yolo(np_img.shape[0], np_img.shape[1], bboxes)
        resize_img = resize(image=np_img, bboxes=bboxes)
        new_img = resize_img["image"]
        bboxes = resize_img["bboxes"]

        plt.imsave(os.path.join(path_type_dataset, filename_img), new_img.astype(np.uint8))
        filename_bbox_ann = ""
        if len(bboxes) != 0:
            bboxes_csv = "xcenter; ycenter; w; h; class\n"
            for bbox in bboxes:
                bboxes_csv = bboxes_csv + f"{bbox[0]}; {bbox[1]}; {bbox[2]}; {bbox[3]}; {NUIMAGES_LABELS.index(bbox[4])}\n"
            filename_bbox_ann = filename_img.split(".")[0] + ".csv"
            filename_bbox_ann = "bbox_annotations" + filename_bbox_ann[7:]
            with open(os.path.join(path_type_dataset, filename_bbox_ann), "w") as f:
                f.write(bboxes_csv)
                f.close()
        main_csv = main_csv + f"{filename_img}; {filename_bbox_ann}\n"
    with open(os.path.join(path_type_dataset, "main.csv"), "w") as f:
        f.write(main_csv)
        f.close()

# Dataset

In [None]:
def IOU(bbox1, bbox2, img_h=416, img_w=416, confidence=True):
    # transform boxes in [xmin, ymin, xmax, ymax]
    bbox1 = yolo_to_pascal(img_h, img_w, [bbox1], confidence=confidence)[0]
    bbox2 = yolo_to_pascal(img_h, img_w, [bbox2], confidence=confidence)[0]
    
    xA = max(bbox1[0], bbox2[0])
    yA = max(bbox1[1], bbox2[1])
    xB = min(bbox1[2], bbox2[2])
    yB = min(bbox1[3], bbox2[3])
    
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    
    box1Area = (bbox1[2] - bbox1[0] + 1) * (bbox1[3] - bbox1[1] + 1)
    box2Area = (bbox2[2] - bbox2[0] + 1) * (bbox2[3] - bbox2[1] + 1)
    
    iou = interArea / float(box1Area + box2Area - interArea)
    
    return iou

In [None]:
def sigmoid(x):
    return 1 / (1 + 1 / np.exp(x))

def inverse_sigmoid(x):
    return np.log(x/(1-x))

def get_tx(b_x, c_x):
    return inverse_sigmoid(b_x - c_x)

def get_ty(b_y, c_y):
    return inverse_sigmoid(b_y - c_y)

def get_bx(t_x, c_x, cell_size):
    return sigmoid(t_x)*cell_size + c_x

def get_by(t_y, c_y, cell_size):
    return sigmoid(t_y)*cell_size + c_y

def get_bw(p_w, t_w):
    return p_w * np.exp(t_w)

def get_bh(p_h, t_h):
    return p_h * np.exp(t_h)

def get_tw(b_w, p_w):
    return np.log(b_w/p_w)

def get_th(b_h, p_h):
    return np.log(b_h/p_h)

In [None]:
class trainset(Dataset):
    def __init__(self, path_trainset, anchors):
        self.path_trainset = path_trainset
        self.main_df = pd.read_csv(os.path.join(self.path_trainset, "main.csv"),
                                   sep=";", 
                                   skipinitialspace=True, 
                                   na_filter=False)
        self.transform = transforms.PILToTensor()
        self.anchors = anchors
        self.S = [13, 26, 52]
        self.thr_ignore = 0.5
        
    def __len__(self):
        return len(self.main_df)
    
    def __getitem__(self, idx):
        # retrieve ground truth bboxes
        row = self.main_df.iloc[idx]
        bboxes = []
        if row["filename_annotations"] != "":
            # load bboxes
            bbox_df = pd.read_csv(os.path.join(self.path_trainset, row["filename_annotations"]), 
                              sep=";", 
                              skipinitialspace=True, 
                              na_filter=False)
            for i in range(len(bbox_df)):
                # insert objectness to bbox to compatiblity with anchor boxes
                bbox = bbox_df.iloc[i].tolist()
                bbox.insert(4, 1)
                bboxes.append(bbox)
                
        # create anchor bboxes
        # list of indices, where each idx is associated to a ground truth bbox
        # and its value is the iou and the index of the associated anchor box
        idx_assigned_anchors_boxes = [[0, None] for bbox in bboxes]
        ground_truth_bboxes = []
        # init anchor boxes
        for s in range(len(self.S)):
            S_size = self.S[s]
            # divide the normalized image in S_size parts, then we take the size of each cell
            cell_size = 1 / S_size
            for idx in range(S_size*S_size):
                x = idx%S_size
                y = idx//S_size

                xcenter = x*cell_size + cell_size/2
                ycenter = y*cell_size + cell_size/2
        
                # add the 3 anchor bboxes to the cell
                anchor_boxes = []
                for j in range(3):
                    # create anchor box: [xcenter, ycenter, width, height, object score, class, 
                    # coord_x, coord_y, cell_size, t_x, t_y, t_w, t_h]
                    anchor_box = [xcenter, 
                                  ycenter, 
                                  self.anchors[s][j][0], 
                                  self.anchors[s][j][1], 
                                  0, 
                                  0,
                                  x,
                                  y,
                                  cell_size,
                                  0,
                                  0, 
                                  0,
                                  0
                                 ]
                    ground_truth_bboxes.append(anchor_box)
                    
        
        for bbox in bboxes:
            temp_best_iou = 0
            idx_best_anchor = None
            idx_anchor_box = 0
            for anchor in ground_truth_bboxes:
                # check if center of image is inside the anchor's cell
                cell_size = anchor[8]
                # obtain box coordinates
                box_cx = bbox[0] / cell_size
                box_cy = bbox[1] / cell_size
                # obtain anchor's cell coordinates
                cx = anchor[6]
                cy = anchor[7]
                # continue only if box center is in anchor's cell
                if box_cx >= cx and box_cx < cx+1 and box_cy >= cy and box_cy < cy+1:
                    if box_cx == cx:
                        box_cx += 1e-8
                    if box_cy == cy:
                        box_cy += 1e-8
                    res_iou = IOU(bbox, anchor)
                    # check if iou is better than previous and if the anchor box is free
                    if res_iou > temp_best_iou and anchor[4] == 0:
                        # free previous selected anchor if any
                        if idx_best_anchor != None:
                            if temp_best_iou >= self.thr_ignore:
                                ground_truth_bboxes[idx_best_anchor][4] = -1
                            else:
                                # reset objectness and class
                                ground_truth_bboxes[idx_best_anchor][4] = 0
                                ground_truth_bboxes[idx_best_anchor][5] = 0
                                # reset previous values for coordinates
                                ground_truth_bboxes[idx_best_anchor][9:] = [0 for x in range(4)]
                            
                        # update values with new anchor
                        temp_best_iou = res_iou
                        idx_best_anchor = idx_anchor_box
                        # set objectness and class
                        anchor[4] = 1
                        anchor[5] = bbox[5]
                        # set t_x
                        anchor[9] = get_tx(box_cx, cx)
                        #print(f"BOX_CY: {box_cy} COORDINATE Y: {cy}")
                        #print(f"RES: {get_ty(box_cy, cy)}")
                        #print(f"START TY, BOX_CY: {box_cy}, CY: {cy}")
                        anchor[10] = get_ty(box_cy, cy)
                        #print(f"RES: {anchor[10]}")
                        anchor[11] = get_tw(bbox[2], anchor[2])
                        anchor[12] = get_th(bbox[3], anchor[3])
                    
                idx_anchor_box += 1
  
        img = Image.open(os.path.join(self.path_trainset, row["filename_img"]))
        return self.transform(img).float()/255, torch.Tensor(ground_truth_bboxes)

# Train

## Load Darknet weights into yolo model

In [None]:
def yolo_loss(pred, y):
    lambda_obj = 1
    lambda_box = 10
    lambda_class = 1
    lambda_noobj = 10
    
    # objectness loss
    bce_loss = nn.BCEWithLogitsLoss()
    # takes coordinates of anchor bboxes imputed for objectnes loss
    obj_loss_idx = y[..., 4] == 1
    obj_loss = bce_loss(pred[obj_loss_idx][:,4], y[obj_loss_idx][:,4])
    
    # coordinates loss
    mse_loss = nn.MSELoss()
    box_loss = mse_loss(pred[obj_loss_idx][:, :4], y[obj_loss_idx][:, -4:])
    
    # class loss
    cross_entropy_loss = nn.CrossEntropyLoss()
    class_loss = cross_entropy_loss(pred[obj_loss_idx][:, 5:], y[obj_loss_idx][:, 5].type(torch.LongTensor).to(device))
    
    # no object loss
    noobj_loss_idx = y[..., 4] == 0
    noobj_loss = bce_loss(pred[noobj_loss_idx][:,4], y[noobj_loss_idx][:,4])
    
    return lambda_obj * obj_loss + lambda_box * box_loss + lambda_class * class_loss + lambda_noobj * noobj_loss
    

In [None]:
yolo_model = yolov3(yolo_architecture, 10).to(device)
yolo_state_dict = yolo_model.state_dict()

# load darknet weights
darknet_state_dict = torch.load("./backup_models/darknet_weights")
for layer in darknet_state_dict:
    # skip last layer of darknet
    if layer.split(".")[1] != "12":
        yolo_state_dict[layer] = darknet_state_dict[layer]
yolo_model.load_state_dict(yolo_state_dict)

In [None]:
# (width, height)
# from big grid to small
ANCHORS = [
    [(0.28, 0.22), (0.38, 0.48), (0.9, 0.78)],
    [(0.07, 0.15), (0.15, 0.11), (0.14, 0.29)],
    [(0.02, 0.03), (0.04, 0.07), (0.08, 0.06)],
]
nuim_train = trainset('/home/loreleva/Big Data Project/new_dataset/train', ANCHORS)

In [None]:
# data augmentation
augmentation = A.Compose(
    [
        A.ColorJitter(brightness=0.6, contrast=0.6, saturation=0.6, hue=0.6, p=0.4),
        A.Blur(p=0.1),
        A.CLAHE(p=0.1),
        A.Posterize(p=0.1),
        A.ToGray(p=0.1),
        A.ChannelShuffle(p=0.05),

    ], bbox_params=A.BboxParams(format="yolo")
)

In [None]:
# 28 EPOCHS, loss = 3.3306902824344022

i=0
batch_size = 8
lr = 1e-3
best_parameters = None
best_loss = math.inf
optimizer = optim.Adam(params=yolo_model.parameters(), lr=lr)
while(True):
    losses = []
    n_batch = 0
    train_loader = DataLoader(nuim_train, batch_size=batch_size, shuffle=True)
    total_batches = len(train_loader)
    for x, y in train_loader:
        x = x.to(device)
        y = y.to(device)
        detections = yolo_model(x)
        detections = torch.cat([torch.reshape(detection,  (x.shape[0], -1, 15)) for detection in detections], 1)
        loss = yolo_loss(detections, y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        #print(f"EPOCH: {i} BATCH: {n_batch}/{len(train_loader)} LOSS: {loss.item()}")
        if n_batch % 1000 == 0:
            print(f"BATCH: {n_batch+1}/{total_batches}")
            print(f"loss: {loss.item()}\n")
        n_batch+=1
        losses.append(loss.item())
        
    mean_losses = sum(losses)/len(losses)
    if mean_losses < best_loss:
        if best_loss != math.inf:
            os.remove(f"./backup_models/yolo_weights_loss_{best_loss}")
        best_loss = mean_losses
        best_parameters = yolo_model.state_dict()
        torch.save(best_parameters, f"./backup_models/yolo_weights_loss_{best_loss}")
    print(f"EPOCH: {i} MEAN LOSSES EPOCH: {sum(losses)/len(losses)}")
    print(f"Loss: {loss.item()}")
    i+=1

KeyboardInterrupt: 

# Plot BBOXES

In [None]:
NUIMAGES_LABELS = [
    "human",
    "barrier",
    "trafficcone",
    "bicycle",
    "bus",
    "car",
    "motorcycle",
    "truck",
    "construction_vehicles",
    "trailer",
]

# color in format BGR
label_to_color = {
    "human" : (102, 0, 0),
    "barrier" : (0, 0, 0),
    "trafficcone" : (0, 97, 243),
    "bicycle" : (0, 102, 0),
    "bus" : (0, 176, 159),
    "car" : (0, 0, 102),
    "motorcycle" : (84, 105, 0),
    "truck" : (99, 104, 81),
    "construction_vehicles" : (0, 76, 153),
    "trailer" : (76, 153, 0),
}

In [None]:
def plot_bboxes(img, bboxes, type_bbox="yolo", confidence=False, img_format="torch"):
    # when confidence=True, bbox is like: [coordinates, confidence, class]
    fontScale = 1
    thickness = 2
    font = cv2.FONT_HERSHEY_COMPLEX
    
    # modify img array to make it compatible with cv2
    if img_format == "torch":
        img = torch.permute(img, (1, 2, 0)).numpy()
    elif img_format == "numpy":
        img = np.transpose(img, (1, 2, 0))
    if img_format != "cv2":
        img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    if img.dtype != np.uint8:
        img *= 255
        img = img.astype(np.uint8)
        
    img_h, img_w = img.shape[:2]
    if type_bbox == "yolo":
        # transforming yolo boxes in [xmin, ymin, xmax, ymax, class] type
        bboxes = yolo_to_pascal(img_h, img_w, bboxes, confidence=confidence)

    for bbox in bboxes:
            if confidence:
                # add confidence score to text
                label = NUIMAGES_LABELS[int(bbox[5])]
                text =  label + f" {bbox[4] * 100}%"
                color = label_to_color[label]
            else:
                text = NUIMAGES_LABELS[int(bbox[4])]
                color = label_to_color[text]
                
            img = cv2.rectangle(img,
                                (bbox[0], bbox[1]),
                                (bbox[2], bbox[3]), 
                                color, 
                                2)
            
            text_size, _ = cv2.getTextSize(text, 
                                           font, 
                                           fontScale=fontScale, 
                                           thickness=thickness)
            text_w, text_h = text_size
            text_x, text_y = bbox[:2]
            
            # check if text goes out of the image
            if text_x + text_w > img_w:
                text_x = img_w - text_w
            if text_y - text_h < 0:
                text_y = 0
            img = cv2.rectangle(img, 
                                (text_x, text_y), 
                                (text_x + text_w, text_y - text_h), 
                                color, 
                                -1)
            img = cv2.putText(img, 
                              text, 
                              (text_x, text_y), 
                              font, 
                              fontScale=fontScale, 
                              color=(255, 255, 255),
                              thickness=thickness)
    return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)