In [1]:
!pip install albumentations==1.2.1
import os
from collections import Counter
import random
import torch
import torchvision
import torch.nn as nn
import torchvision.transforms.functional as TF
import torchvision.transforms as T
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.optim as optim
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import cv2
from PIL import Image
import numpy as np
import zipfile
import warnings
import math
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from google.colab import drive
from google.colab.patches import cv2_imshow
import gc

drive.mount('/content/drive')
%matplotlib inline

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Mounted at /content/drive


#Config

In [49]:
# DATASET = 'PASCAL_VOC'
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# seed_everything()  # If you want deterministic behavior
NUM_WORKERS = 2
BATCH_SIZE = 2
IMAGE_SIZE = 416
NUM_CLASSES = 2
LEARNING_RATE = 3e-4
WEIGHT_DECAY = 1e-4
NUM_EPOCHS = 100
CONF_THRESHOLD = 0.5
MAP_IOU_THRESH = 0.5
NMS_IOU_THRESH = 0.45
S = [IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8]
PIN_MEMORY = True
LOAD_MODEL = True
SAVE_MODEL = True
CHECKPOINT_FILE = "checkpoint.pth.tar"
ANCHORS = [
    [(0.28, 0.22), (0.38, 0.48), (0.9, 0.78)],
    [(0.07, 0.15), (0.15, 0.11), (0.14, 0.29)],
    [(0.02, 0.03), (0.04, 0.07), (0.08, 0.06)],
] 
# IMG_DIR = DATASET + "/images/"
# LABEL_DIR = DATASET + "/labels/"

#Utils

In [35]:
def iou_width_height(boxes1, boxes2):
    intersection = torch.min(boxes1[..., 0], boxes2[..., 0]) * torch.min(
        boxes1[..., 1], boxes2[..., 1]
    )
    union = (
        boxes1[..., 0] * boxes1[..., 1] + boxes2[..., 0] * boxes2[..., 1] - intersection
    )
    return intersection / union

def intersection_over_union(boxes_preds, boxes_labels, box_format="midpoint"):

    if box_format == "midpoint":
        box1_x1 = boxes_preds[..., 0:1] - boxes_preds[..., 2:3] / 2
        box1_y1 = boxes_preds[..., 1:2] - boxes_preds[..., 3:4] / 2
        box1_x2 = boxes_preds[..., 0:1] + boxes_preds[..., 2:3] / 2
        box1_y2 = boxes_preds[..., 1:2] + boxes_preds[..., 3:4] / 2
        box2_x1 = boxes_labels[..., 0:1] - boxes_labels[..., 2:3] / 2
        box2_y1 = boxes_labels[..., 1:2] - boxes_labels[..., 3:4] / 2
        box2_x2 = boxes_labels[..., 0:1] + boxes_labels[..., 2:3] / 2
        box2_y2 = boxes_labels[..., 1:2] + boxes_labels[..., 3:4] / 2

    if box_format == "corners":
        box1_x1 = boxes_preds[..., 0:1]
        box1_y1 = boxes_preds[..., 1:2]
        box1_x2 = boxes_preds[..., 2:3]
        box1_y2 = boxes_preds[..., 3:4]
        box2_x1 = boxes_labels[..., 0:1]
        box2_y1 = boxes_labels[..., 1:2]
        box2_x2 = boxes_labels[..., 2:3]
        box2_y2 = boxes_labels[..., 3:4]

    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)
    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    return intersection / (box1_area + box2_area - intersection + 1e-6)

def non_max_suppression(bboxes, iou_threshold, threshold, box_format="midpoint"):

    assert type(bboxes) == list

    bboxes = [box for box in bboxes if box[1] > threshold]
    bboxes = sorted(bboxes, key=lambda x: x[1], reverse=True)
    bboxes_after_nms = []

    while bboxes:
        chosen_box = bboxes.pop(0)

        bboxes = [
            box
            for box in bboxes
            if box[0] != chosen_box[0]
            or intersection_over_union(
                torch.tensor(chosen_box[2:]),
                torch.tensor(box[2:]),
                box_format=box_format,
            )
            < iou_threshold
        ]

        bboxes_after_nms.append(chosen_box)

    return bboxes_after_nms
    
def mean_average_precision(
    pred_boxes, true_boxes, iou_threshold=0.5, box_format="midpoint", num_classes=2
):

    # list storing all AP for respective classes
    average_precisions = []

    # used for numerical stability later on
    epsilon = 1e-6

    for c in range(num_classes):
        detections = []
        ground_truths = []

        # Go through all predictions and targets,
        # and only add the ones that belong to the
        # current class c
        for detection in pred_boxes:
            if detection[1] == c:
                detections.append(detection)

        for true_box in true_boxes:
            if true_box[1] == c:
                ground_truths.append(true_box)

        # find the amount of bboxes for each training example
        # Counter here finds how many ground truth bboxes we get
        # for each training example, so let's say img 0 has 3,
        # img 1 has 5 then we will obtain a dictionary with:
        # amount_bboxes = {0:3, 1:5}
        amount_bboxes = Counter([gt[0] for gt in ground_truths])

        # We then go through each key, val in this dictionary
        # and convert to the following (w.r.t same example):
        # ammount_bboxes = {0:torch.tensor[0,0,0], 1:torch.tensor[0,0,0,0,0]}
        for key, val in amount_bboxes.items():
            amount_bboxes[key] = torch.zeros(val)

        # sort by box probabilities which is index 2
        detections.sort(key=lambda x: x[2], reverse=True)
        TP = torch.zeros((len(detections)))
        FP = torch.zeros((len(detections)))
        total_true_bboxes = len(ground_truths)

        # If none exists for this class then we can safely skip
        if total_true_bboxes == 0:
            continue

        for detection_idx, detection in enumerate(detections):
            # Only take out the ground_truths that have the same
            # training idx as detection
            ground_truth_img = [
                bbox for bbox in ground_truths if bbox[0] == detection[0]
            ]

            num_gts = len(ground_truth_img)
            best_iou = 0

            for idx, gt in enumerate(ground_truth_img):
                iou = intersection_over_union(
                    torch.tensor(detection[3:]),
                    torch.tensor(gt[3:]),
                    box_format=box_format,
                )

                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = idx

            if best_iou > iou_threshold:
                # only detect ground truth detection once
                if amount_bboxes[detection[0]][best_gt_idx] == 0:
                    # true positive and add this bounding box to seen
                    TP[detection_idx] = 1
                    amount_bboxes[detection[0]][best_gt_idx] = 1
                else:
                    FP[detection_idx] = 1

            # if IOU is lower then the detection is a false positive
            else:
                FP[detection_idx] = 1

        TP_cumsum = torch.cumsum(TP, dim=0)
        FP_cumsum = torch.cumsum(FP, dim=0)
        recalls = TP_cumsum / (total_true_bboxes + epsilon)
        precisions = TP_cumsum / (TP_cumsum + FP_cumsum + epsilon)
        precisions = torch.cat((torch.tensor([1]), precisions))
        recalls = torch.cat((torch.tensor([0]), recalls))
        # torch.trapz for numerical integration
        average_precisions.append(torch.trapz(precisions, recalls))

    return sum(average_precisions) / len(average_precisions)

def cells_to_bboxes(predictions, anchors, S, is_preds=True):
    BATCH_SIZE = predictions.shape[0]
    num_anchors = len(anchors)
    box_predictions = predictions[..., 1:5]
    if is_preds:
        anchors = anchors.reshape(1, len(anchors), 1, 1, 2)
        box_predictions[..., 0:2] = torch.sigmoid(box_predictions[..., 0:2])
        box_predictions[..., 2:] = torch.exp(box_predictions[..., 2:]) * anchors
        scores = torch.sigmoid(predictions[..., 0:1])
        best_class = torch.argmax(predictions[..., 5:], dim=-1).unsqueeze(-1)
    else:
        scores = predictions[..., 0:1]
        best_class = predictions[..., 5:6]

    cell_indices = (
        torch.arange(S)
        .repeat(predictions.shape[0], 3, S, 1)
        .unsqueeze(-1)
        .to(predictions.device)
    )
    x = 1 / S * (box_predictions[..., 0:1] + cell_indices)
    y = 1 / S * (box_predictions[..., 1:2] + cell_indices.permute(0, 1, 3, 2, 4))
    w_h = 1 / S * box_predictions[..., 2:4]
    converted_bboxes = torch.cat((best_class, scores, x, y, w_h), dim=-1).reshape(BATCH_SIZE, num_anchors * S * S, 6)
    return converted_bboxes.tolist()

def get_evaluation_bboxes(
    loader,
    model,
    iou_threshold,
    anchors,
    threshold,
    box_format="midpoint",
    device="cuda",
):
    # make sure model is in eval before get bboxes
    model.eval()
    train_idx = 0
    all_pred_boxes = []
    all_true_boxes = []
    for batch_idx, (x, labels) in enumerate(tqdm(loader)):
        x = x.to(device)

        with torch.no_grad():
            predictions = model(x)

        batch_size = x.shape[0]
        bboxes = [[] for _ in range(batch_size)]
        for i in range(3):
            S = predictions[i].shape[2]
            anchor = torch.tensor([*anchors[i]]).to(device) * S
            boxes_scale_i = cells_to_bboxes(
                predictions[i], anchor, S=S, is_preds=True
            )
            for idx, (box) in enumerate(boxes_scale_i):
                bboxes[idx] += box

        # we just want one bbox for each label, not one for each scale
        true_bboxes = cells_to_bboxes(
            labels[2], anchor, S=S, is_preds=False
        )

        for idx in range(batch_size):
            nms_boxes = non_max_suppression(
                bboxes[idx],
                iou_threshold=iou_threshold,
                threshold=threshold,
                box_format=box_format,
            )

            for nms_box in nms_boxes:
                all_pred_boxes.append([train_idx] + nms_box)

            for box in true_bboxes[idx]:
                if box[1] > threshold:
                    all_true_boxes.append([train_idx] + box)

            train_idx += 1

    model.train()
    return all_pred_boxes, all_true_boxes

def plot_image(image, boxes):
    cmap = plt.get_cmap("tab20b")
    class_labels = ["fine", "not fine"]
    colors = [cmap(i) for i in np.linspace(0, 1, len(class_labels))]
    im = np.array(image)
    height, width, _ = im.shape

    # Create figure and axes
    fig, ax = plt.subplots(1)
    # Display the image
    ax.imshow(im)

    # box[0] is x midpoint, box[2] is width
    # box[1] is y midpoint, box[3] is height

    # Create a Rectangle patch
    for box in boxes:
        assert len(box) == 6, "box should contain class pred, confidence, x, y, width, height"
        class_pred = box[0]
        box = box[2:]
        upper_left_x = box[0] - box[2] / 2
        upper_left_y = box[1] - box[3] / 2
        rect = patches.Rectangle(
            (upper_left_x * width, upper_left_y * height),
            box[2] * width,
            box[3] * height,
            linewidth=2,
            edgecolor=colors[int(class_pred)],
            facecolor="none",
        )
        # Add the patch to the Axes
        ax.add_patch(rect)
        plt.text(
            upper_left_x * width,
            upper_left_y * height,
            s=class_labels[int(class_pred)],
            color="white",
            verticalalignment="top",
            bbox={"color": colors[int(class_pred)], "pad": 0},
        )

    plt.show()


def check_class_accuracy(model, loader, threshold):
    model.eval()
    tot_class_preds, correct_class = 0, 0
    tot_noobj, correct_noobj = 0, 0
    tot_obj, correct_obj = 0, 0

    for idx, (x, y) in enumerate(tqdm(loader)):
        x = x.to(DEVICE)
        with torch.no_grad():
            out = model(x)

        for i in range(3):
            y[i] = y[i].to(DEVICE)
            obj = y[i][..., 0] == 1 # in paper this is Iobj_i
            noobj = y[i][..., 0] == 0  # in paper this is Iobj_i

            correct_class += torch.sum(
                torch.argmax(out[i][..., 5:][obj], dim=-1) == y[i][..., 5][obj]
            )
            tot_class_preds += torch.sum(obj)

            obj_preds = torch.sigmoid(out[i][..., 0]) > threshold
            correct_obj += torch.sum(obj_preds[obj] == y[i][..., 0][obj])
            tot_obj += torch.sum(obj)
            correct_noobj += torch.sum(obj_preds[noobj] == y[i][..., 0][noobj])
            tot_noobj += torch.sum(noobj)

    print(f"Class accuracy is: {(correct_class/(tot_class_preds+1e-16))*100:2f}%")
    print(f"No obj accuracy is: {(correct_noobj/(tot_noobj+1e-16))*100:2f}%")
    print(f"Obj accuracy is: {(correct_obj/(tot_obj+1e-16))*100:2f}%")
    model.train()


def seed_everything(seed=0):
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def make_divisible(x, divisor):
    # Returns nearest x divisible by divisor
    if isinstance(divisor, torch.Tensor):
        divisor = int(divisor.max())  # to int
    return math.ceil(x / divisor) * divisor

#Model

In [36]:
config = [
  ["Conv", [64, 6, 2, 2]], #0
  ["Conv", [128, 3, 2]], #1
  ["C3", 128, 3], #2
  ["Conv", [256, 3, 2]], #3
  ["C3", 256, 6], #4
  ["Conv", [512, 3, 2]], #5
  ["C3", 512, 9], #6
  ["Conv", [1024, 3, 2]], #7
  ["C3", 1024, 3], #8
  ["SPPF", 1024], #9
  # To this point is backbone

  ["Conv", [512, 1, 1]], #10
  "U", #11
  ["Concat", 6], #12
  ["C3", 512, 3], #13

  ["Conv", [256, 1, 1]], #14
  "U", #15
  ["Concat", 4], #16
  ["C3", 256, 3], #17
  "S",

  ["Conv", [256, 3, 2]], #18
  ["Concat", 14], #19
  ["C3", 512, 3], #20
  "S",

  ["Conv", [512, 3, 2]], #21
  ["Concat", 10], #22
  ["C3", 1024, 3], #23
  "S",
]
GROWTH_DEPTH = 0.33
GROWTH_WIDTH = 0.5

def autopad(k, p=None, d=1):
    if d > 1:
        k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k] 
    if p is None:
        p = k // 2 if isinstance(k, int) else [x // 2 for x in k]
    return p

class CNNBlock(nn.Module):
  default_act = nn.ReLU()

  def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True):
    super().__init__()
    self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=False)
    self.bn = nn.BatchNorm2d(c2)
    self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()

  def forward(self, x):
    x = self.conv(x)
    return self.act(self.bn(x))

  def forward_fuse(self, x):
    return self.act(self.conv(x))

class ResidualBlock(nn.Module):
  def __init__(self, channels, use_residual=True, num_repeats=1):
    super().__init__()
    self.layers = nn.ModuleList()
    for repeat in range(num_repeats):
      self.layers += [
        nn.Sequential(
          CNNBlock(channels, channels // 2, k=1),
          CNNBlock(channels // 2, channels, k=3, p=1),
        )
      ]

    self.use_residual = use_residual
    self.num_repeats = num_repeats

  def forward(self, x):
    for layer in self.layers:
      if self.use_residual:
        x = x + layer(x)
      else:
        x = layer(x)

    return x

class C3(nn.Module):
    # CSP Bottleneck with 3 convolutions
  def __init__(self, in_channels, out_channels, num_repeats=1, use_residual=True, g=1, e=0.5):
    super().__init__()
    hidden_dim = int(out_channels * e) 
    self.conv1 = CNNBlock(in_channels, hidden_dim, 1, 1)
    self.conv2 = CNNBlock(in_channels, hidden_dim, 1, 1)
    self.FinalConv = CNNBlock(2 * hidden_dim, out_channels, 1) 
    self.res = ResidualBlock(hidden_dim, use_residual, num_repeats)

  def forward(self, input):
    x = self.conv1(input)
    x1 = self.conv2(input)
    x = self.res(x)
    x = torch.cat([x, x1], 1)
    x = self.FinalConv(x)
    return x
    # return self.FinalConv(torch.cat((self.m(self.conv1(x)), self.conv2(x)), 1))




class ScalePrediction(nn.Module):
  def __init__(self, in_channels, num_classes):
    super().__init__()
    self.res = ResidualBlock(in_channels, use_residual=False, num_repeats=1)
    self.conv = CNNBlock(in_channels, in_channels // 2, k=1)
    self.pred = nn.Sequential(
      CNNBlock(in_channels // 2, in_channels, k=3, p=1),
      CNNBlock(
        in_channels, (num_classes + 5) * 3, act=False, k=1
      ),
    )
    self.num_classes = num_classes

  def forward(self, x):
    x = self.res(x)
    x = self.conv(x)
    x = (
      self.pred(x).reshape(x.shape[0], 3, self.num_classes + 5, x.shape[2], x.shape[3]).permute(0, 1, 3, 4, 2)
    )
    return x

class Concat(nn.Module):
  def __init__(self, f:int, dimension=1):
    super().__init__()
    self.f = f
    self.d = dimension

  def forward(self, x1, x2):
    if x1.shape != x2.shape:
      x1 = TF.resize(x1, size=x2.shape[2:])
    return torch.cat([x1, x2], self.d)

class SPPF(nn.Module):
  # Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher
  def __init__(self, in_channels, out_channels, k=5):  # equivalent to SPP(k=(5, 9, 13))
    super().__init__()
    hidden_dim = in_channels // 2  # hidden channels
    self.cv1 = CNNBlock(in_channels, hidden_dim, k=1, s=1)
    self.cv2 = CNNBlock(hidden_dim * 4, out_channels, k=1, s=1)
    self.m = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)

  def forward(self, x):
    x = self.cv1(x)
    with warnings.catch_warnings():
      warnings.simplefilter('ignore')  # suppress torch 1.9.0 max_pool2d() warning
      y1 = self.m(x)
      y2 = self.m(y1)
      x = self.cv2(torch.cat((x, y1, y2, self.m(y2)), 1))
      return x

class YOLOv5(nn.Module):
  def __init__(self, in_channels=3, num_classes=2):
    super().__init__()
    self.num_classes = num_classes
    self.in_channels = in_channels
    self._out_channels_list = [in_channels]
    self.layers = self._create_conv_layers()
    

  def forward(self, x):
    outputs = []  # for each scale
    features = []
    for layer in self.layers:
      if isinstance(layer, ScalePrediction):
        outputs.append(layer(x))
        continue

      

      elif isinstance(layer, CNNBlock):
        x = layer(x)
        features.append(x)

      elif isinstance(layer, C3):
        x = layer(x)
        features.append(x)
      
      elif isinstance(layer, nn.Upsample):
        x = layer(x)
        features.append(x)
      elif isinstance(layer, SPPF):
        x = layer(x)
        features.append(x)

      elif isinstance(layer, Concat):
        x = layer(x, features[layer.f])
        features.append(x)

    return outputs[::-1]

  def _create_conv_layers(self):
    layers = nn.ModuleList()
    in_channels = self.in_channels

    for module in config:
      if isinstance(module, list):
        if module[0] == "Conv":
          args = module[1]
          args[0] = make_divisible(args[0] * GROWTH_WIDTH, 8)
          out_channels = args[0]
          args = [in_channels, *args]
          layers.append(CNNBlock(*args))
          self._out_channels_list.append(out_channels)
          in_channels = out_channels

        if module[0] == "C3":
          num_repeats = module[2]
          num_repeats = max(round(num_repeats * GROWTH_DEPTH), 1) if num_repeats > 1 else num_repeats
          out_channels = make_divisible(module[1] * GROWTH_WIDTH, 8)
          layers.append(C3(in_channels, out_channels=out_channels, num_repeats=num_repeats))
          self._out_channels_list.append(out_channels)
          in_channels = out_channels

        elif module[0] == "Concat":
          f = module[1]
          layers.append(Concat(f, dimension=1))
          out_channels = self._out_channels_list[-1] + self._out_channels_list[f+1]
          self._out_channels_list.append(out_channels)
          in_channels = out_channels
          
        elif module[0] == "SPPF":
          out_channels = make_divisible(module[1] * GROWTH_WIDTH, 8)
          layers.append(SPPF(in_channels, out_channels))
          self._out_channels_list.append(out_channels)
          in_channels = out_channels

      elif isinstance(module, str):
        if module == "S":
          layers.append(ScalePrediction(in_channels, num_classes=self.num_classes))

        elif module == "U":
          layers.append(nn.Upsample(scale_factor=2))
          self._out_channels_list.append(in_channels)


    return layers

In [37]:
# model = YOLOv5(num_classes=2)
# x = torch.randn((1, 3, 416, 416))
# out = model(x)
# print(out[0].shape)
# print(out[1].shape)
# print(out[2].shape)

#Dataset

In [38]:
zipFile = zipfile.ZipFile('/content/drive/MyDrive/Datasets/detection_db_new.zip', 'r')
zipFile.extractall('dataset')
zipFile.close()

In [39]:
IMAGE_SIZE = 416
train_transforms1 = A.Compose(
    [
        A.Resize(height=IMAGE_SIZE, width=IMAGE_SIZE),
        A.Blur(p=0.3),
        A.CLAHE(p=0.4),
        A.ChannelShuffle(p=0.5),
        A.Normalize(mean=[0, 0, 0], std=[1, 1, 1], max_pixel_value=255,),
        ToTensorV2(),
    ],
    bbox_params=A.BboxParams(format="yolo", min_visibility=0.4, label_fields=[],),
)

train_transforms2 = A.Compose(
    [
        A.Resize(height=IMAGE_SIZE, width=IMAGE_SIZE),
        A.Posterize(p=0.1),
        A.ToGray(p=0.1),
        A.Normalize(mean=[0, 0, 0], std=[1, 1, 1], max_pixel_value=255,),
        ToTensorV2(),
    ],
    bbox_params=A.BboxParams(format="yolo", min_visibility=0.4, label_fields=[],),
)
test_transforms = A.Compose(
    [
        A.Resize(height=IMAGE_SIZE, width=IMAGE_SIZE),
        A.Normalize(mean=[0, 0, 0], std=[1, 1, 1], max_pixel_value=255,),
        ToTensorV2(),
    ],
    bbox_params=A.BboxParams(format="yolo", min_visibility=0.4, label_fields=[]),
)

In [40]:
class YOLODataset(Dataset):
  def __init__(
    self,
    img_dir,
    label_dir,
    anchors,
    image_size=416,
    S=[13, 26, 52],
    C=2,
    transform=None,
  ):

    self.img_dir = img_dir
    self.label_dir = label_dir
    all_images = os.listdir(self.img_dir)
    all_labels = os.listdir(self.label_dir)
    all_images.sort()
    all_labels.sort()

    all_images.pop(0)
    all_labels.pop(0)

    for i in range(len(all_images)):
      all_images[i] = all_images[i].replace('.jpeg', '')

    for i in range(len(all_labels)):
      all_labels[i] = all_labels[i].replace('.txt', '')

    self.images = []
    for image in all_images:
      if image in all_labels:
        self.images.append(image)


    self.image_size = image_size
    self.transform = transform
    self.S = S
    self.anchors = torch.tensor(anchors[0] + anchors[1] + anchors[2])  # for all 3 scales
    self.num_anchors = self.anchors.shape[0]
    self.num_anchors_per_scale = self.num_anchors // 3
    self.C = C
    self.ignore_iou_thresh = 0.5

  def __len__(self):
    return len(self.images)

  def __getitem__(self, index):
    annotation = self.images[index]+".txt"
    img_name = self.images[index]+".jpeg"
    label_path = os.path.join(self.label_dir, annotation)
    img_path = os.path.join(self.img_dir, img_name)
    bboxes = np.roll(np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1).tolist()
    bboxes = np.clip(bboxes,0,1)
    image = np.array(Image.open(img_path).convert("RGB"))

    if self.transform:
      augmentations = self.transform(image=image, bboxes=bboxes)
      image = augmentations["image"]
      bboxes = augmentations["bboxes"]



    # Below assumes 3 scale predictions (as paper) and same num of anchors per scale
    targets = [torch.zeros((self.num_anchors // 3, S, S, 6)) for S in self.S]
    for box in bboxes:
      iou_anchors = iou_width_height(torch.tensor(box[2:4]), self.anchors)
      anchor_indices = iou_anchors.argsort(descending=True, dim=0)
      x, y, width, height, class_label = box
      has_anchor = [False] * 3  # each scale should have one anchor
      for anchor_idx in anchor_indices:
        scale_idx = anchor_idx // self.num_anchors_per_scale
        anchor_on_scale = anchor_idx % self.num_anchors_per_scale
        S = self.S[scale_idx]
        i, j = int(S * y), int(S * x)  # which cell
        anchor_taken = targets[scale_idx][anchor_on_scale, i, j, 0]
        if not anchor_taken and not has_anchor[scale_idx]:
          targets[scale_idx][anchor_on_scale, i, j, 0] = 1
          x_cell, y_cell = S * x - j, S * y - i  # both between [0,1]
          width_cell, height_cell = (
              width * S,
              height * S,
          )  # can be greater than 1 since it's relative to cell
          box_coordinates = torch.tensor(
              [x_cell, y_cell, width_cell, height_cell]
          )
          targets[scale_idx][anchor_on_scale, i, j, 1:5] = box_coordinates
          targets[scale_idx][anchor_on_scale, i, j, 5] = int(class_label)
          has_anchor[scale_idx] = True

        elif not anchor_taken and iou_anchors[anchor_idx] > self.ignore_iou_thresh:
          targets[scale_idx][anchor_on_scale, i, j, 0] = -1  # ignore prediction

    return image, tuple(targets)

In [41]:
train_dataset1 = YOLODataset("/content/dataset/detection_db/images/train",
                            "/content/dataset/detection_db/labels/train",
                            anchors=ANCHORS,
                            S=[IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8],
                            transform=train_transforms1)

train_dataset2 = YOLODataset("/content/dataset/detection_db/images/train",
                            "/content/dataset/detection_db/labels/train",
                            anchors=ANCHORS,
                            S=[IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8],
                            transform=train_transforms2)
train_dataset3 = YOLODataset("/content/dataset/detection_db/images/train",
                            "/content/dataset/detection_db/labels/train",
                            anchors=ANCHORS,
                            S=[IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8],
                            transform=test_transforms)

val_dataset = YOLODataset("/content/dataset/detection_db/images/val",
                            "/content/dataset/detection_db/labels/val",
                            anchors=ANCHORS,
                            S=[IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8],
                            transform=test_transforms)

test_dataset = YOLODataset("/content/dataset/detection_db/images/val",
                            "/content/dataset/detection_db/labels/val",
                            anchors=ANCHORS,
                            S=[IMAGE_SIZE // 32, IMAGE_SIZE // 16, IMAGE_SIZE // 8],
                            transform=test_transforms)

train_dataset = torch.utils.data.ConcatDataset([train_dataset1, train_dataset2])

train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=BATCH_SIZE,
        num_workers=NUM_WORKERS,
        pin_memory=PIN_MEMORY,
        shuffle=True,
        drop_last=False,
    )
val_loader = DataLoader(
        dataset=val_dataset,
        batch_size=BATCH_SIZE,
        num_workers=NUM_WORKERS,
        pin_memory=PIN_MEMORY,
        shuffle=True,
        drop_last=False,
    )
test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=BATCH_SIZE,
        num_workers=NUM_WORKERS,
        pin_memory=PIN_MEMORY,
        shuffle=True,
        drop_last=False,
    )


#Loss Function

In [42]:
class YoloLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
        self.bce = nn.BCEWithLogitsLoss()
        self.entropy = nn.CrossEntropyLoss()
        self.sigmoid = nn.Sigmoid()

        # Constants signifying how much to pay for each respective part of the loss
        self.lambda_class = 1
        self.lambda_noobj = 1
        self.lambda_obj = 1
        self.lambda_box = 1

    def forward(self, predictions, target, anchors):
        # Check where obj and noobj (we ignore if target == -1)
        obj = target[..., 0] == 1  # in paper this is Iobj_i
        noobj = target[..., 0] == 0  # in paper this is Inoobj_i

        # ======================= #
        #   FOR NO OBJECT LOSS    #
        # ======================= #

        no_object_loss = self.bce(
            (predictions[..., 0:1][noobj]), (target[..., 0:1][noobj]),
        )

        # ==================== #
        #   FOR OBJECT LOSS    #
        # ==================== #

        anchors = anchors.reshape(1, 3, 1, 1, 2)
        box_preds = torch.cat([self.sigmoid(predictions[..., 1:3]), torch.exp(predictions[..., 3:5]) * anchors], dim=-1)
        ious = intersection_over_union(box_preds[obj], target[..., 1:5][obj]).detach()
        object_loss = self.mse(self.sigmoid(predictions[..., 0:1][obj]), ious * target[..., 0:1][obj])

        # ======================== #
        #   FOR BOX COORDINATES    #
        # ======================== #

        predictions[..., 1:3] = self.sigmoid(predictions[..., 1:3])  # x,y coordinates
        target[..., 3:5] = torch.log(
            (1e-16 + target[..., 3:5] / anchors)
        )  # width, height coordinates
        box_loss = self.mse(predictions[..., 1:5][obj], target[..., 1:5][obj])

        # ================== #
        #   FOR CLASS LOSS   #
        # ================== #

        class_loss = self.entropy(
            (predictions[..., 5:][obj]), (target[..., 5][obj].long()),
        )

        #print("__________________________________")
        #print(self.lambda_box * box_loss)
        #print(self.lambda_obj * object_loss)
        #print(self.lambda_noobj * no_object_loss)
        #print(self.lambda_class * class_loss)
        #print("\n")

        return (
            self.lambda_box * box_loss
            + self.lambda_obj * object_loss
            + self.lambda_noobj * no_object_loss
            + self.lambda_class * class_loss
        )

#Train

In [43]:
def train_fn(train_loader, model, optimizer, loss_fn, scaler, scaled_anchors):
    loop = tqdm(train_loader, leave=True)
    losses = []
    for batch_idx, (x, y) in enumerate(loop):
        scaled_anchors = scaled_anchors.to(DEVICE)
        x = x.to(DEVICE)
        y0, y1, y2 = (
            y[0].to(DEVICE),
            y[1].to(DEVICE),
            y[2].to(DEVICE),
        )

        with torch.cuda.amp.autocast():
            out = model(x)
            loss = (
                loss_fn(out[0], y0, scaled_anchors[0])
                + loss_fn(out[1], y1, scaled_anchors[1])
                + loss_fn(out[2], y2, scaled_anchors[2])
            )

        losses.append(loss.item())
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # update progress bar
        mean_loss = sum(losses) / len(losses)
        loop.set_postfix(loss=mean_loss)

In [50]:
gc.collect()
torch.cuda.empty_cache()

In [51]:
# seed_everything()
model = YOLOv5(num_classes=2).to(DEVICE)

optimizer = optim.Adam(
        model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY
    )
loss_fn = YoloLoss()
scaler = torch.cuda.amp.GradScaler()

S = [13, 26, 52]
scaled_anchors = torch.tensor(ANCHORS) / (
        1 / torch.tensor(S).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2)
)


# writer = SummaryWriter("/content/boards/board")
# step = 0
for epoch in range(NUM_EPOCHS):
  model.train()
  #plot_couple_examples(model, test_loader, 0.6, 0.5, scaled_anchors)
  train_fn(train_loader, model, optimizer, loss_fn, scaler, scaled_anchors)


  #check_class_accuracy(model, test_loader, threshold=CONF_THRESHOLD)
model.eval()
pred_boxes, true_boxes = get_evaluation_bboxes(
      test_loader,
      model,
      iou_threshold=NMS_IOU_THRESH,
      anchors=ANCHORS,
      threshold=CONF_THRESHOLD,
  )
mapval = mean_average_precision(
        pred_boxes,
        true_boxes,
        iou_threshold=MAP_IOU_THRESH,
        box_format="midpoint",
        num_classes=NUM_CLASSES,
    )
print("mAP = ", mapval.item())
# writer.add_scalar("mAP", mapval.item(), global_step=step)
# step += 1
# writer.close()

100%|██████████| 100/100 [00:23<00:00,  4.31it/s, loss=7.94]
100%|██████████| 100/100 [00:26<00:00,  3.77it/s, loss=4.28]
100%|██████████| 100/100 [00:22<00:00,  4.36it/s, loss=3.67]
100%|██████████| 100/100 [00:23<00:00,  4.32it/s, loss=3.15]
100%|██████████| 100/100 [00:23<00:00,  4.29it/s, loss=2.97]
100%|██████████| 100/100 [00:23<00:00,  4.29it/s, loss=2.56]
100%|██████████| 100/100 [00:23<00:00,  4.33it/s, loss=2.5]
100%|██████████| 100/100 [00:23<00:00,  4.30it/s, loss=2.29]
100%|██████████| 100/100 [00:23<00:00,  4.34it/s, loss=2.24]
100%|██████████| 100/100 [00:23<00:00,  4.34it/s, loss=2.22]
100%|██████████| 100/100 [00:23<00:00,  4.28it/s, loss=2.19]
100%|██████████| 100/100 [00:22<00:00,  4.35it/s, loss=2.04]
100%|██████████| 100/100 [00:22<00:00,  4.35it/s, loss=1.99]
100%|██████████| 100/100 [00:23<00:00,  4.35it/s, loss=1.93]
100%|██████████| 100/100 [00:23<00:00,  4.33it/s, loss=1.87]
100%|██████████| 100/100 [00:26<00:00,  3.79it/s, loss=1.81]
100%|██████████| 100/100 

mAP =  0.9667037129402161



