In [1]:
import os
from typing import Optional

import pandas as pd
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import cv2
import numpy as np

from matplotlib import pyplot as plt
from torchvision import transforms
import albumentations
from torch.utils.data import DataLoader
import time

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
PATH = '/content/drive/MyDrive/Studies/2024/Image Processing/NN Project/MultipleBBox/'
VIDEO_PATH = PATH + "video.mp4"
RESULT_VIDEO_PATH = PATH + "result_video.mp4"
CHECKPOINT_FILE = PATH + 'checkpoints/checkpoint_itzik_7.pt'

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

IMAGE_HEIGHT, IMAGE_WIDTH = 224, 224

INT_TO_LABEL = {
    0: "aeroplane",
    1: "bicycle",
    2: "bird",
    3: "boat",
    4: "bottle",
    5: "bus",
    6: "car",
    7: "cat",
    8: "chair",
    9: "cow",
    10: "dining table",
    11: "dog",
    12: "horse",
    13: "motorbike",
    14: "person",
    15: "potted plant",
    16: "sheep",
    17: "sofa",
    18: "train",
    19: "tv/monitor"
}

In [4]:
def resnet18(split_size, num_boxes, num_classes):
    S, B, C = split_size, num_boxes, num_classes

    model = torchvision.models.resnet18(pretrained=True)

    # Enable backprop for all layers
    for param in model.parameters():
        param.requires_grad = True

    # Remove FC layers
    modules = list(model.children())[:-2]
    model = nn.Sequential(*modules)

    # Add custom CNN layer
    model.avgpool = nn.AdaptiveAvgPool2d((7, 7))
    model = nn.Sequential(model,
                          nn.Conv2d(512, 256, kernel_size=(1, 1)))

    # Add custom FC layers
    model.fc = nn.Sequential(
        nn.Flatten(),
        nn.Linear(256 * S * S, 1024),
        nn.LeakyReLU(0.1),
        nn.Dropout(0.5),
        nn.Linear(1024, S * S * (C + B * 5)), )

    return model

In [5]:
def output_result(out_path, frame_size, frames, fps):
    out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'DIVX'), fps, frame_size)

    for frame in frames:
        out.write(frame)

    out.release()

def convert_cellboxes(predictions, S=7):
    """
    Converts bounding boxes output from Yolo with
    an image split size of S into entire image ratios
    rather than relative to cell ratios. Tried to do this
    vectorized, but this resulted in quite difficult to read
    code... Use as a black box? Or implement a more intuitive,
    using 2 for loops iterating range(S) and convert them one
    by one, resulting in a slower but more readable implementation.
    """

    predictions = predictions.to("cpu")
    batch_size = predictions.shape[0]
    predictions = predictions.reshape(batch_size, 7, 7, 30)
    bboxes1 = predictions[..., 21:25]
    bboxes2 = predictions[..., 26:30]
    scores = torch.cat(
        (predictions[..., 20].unsqueeze(0), predictions[..., 25].unsqueeze(0)), dim=0
    )
    best_box = scores.argmax(0).unsqueeze(-1)
    best_boxes = bboxes1 * (1 - best_box) + best_box * bboxes2
    cell_indices = torch.arange(7).repeat(batch_size, 7, 1).unsqueeze(-1)
    x = 1 / S * (best_boxes[..., :1] + cell_indices)
    y = 1 / S * (best_boxes[..., 1:2] + cell_indices.permute(0, 2, 1, 3))
    w_y = 1 / S * best_boxes[..., 2:4]
    converted_bboxes = torch.cat((x, y, w_y), dim=-1)
    predicted_class = predictions[..., :20].argmax(-1).unsqueeze(-1)
    best_confidence = torch.max(predictions[..., 20], predictions[..., 25]).unsqueeze(
        -1
    )
    converted_preds = torch.cat(
        (predicted_class, best_confidence, converted_bboxes), dim=-1
    )

    return converted_preds

def cellboxes_to_boxes(out, S=7):
    converted_pred = convert_cellboxes(out).reshape(out.shape[0], S * S, -1)
    converted_pred[..., 0] = converted_pred[..., 0].long()
    all_bboxes = []

    for ex_idx in range(out.shape[0]):
        bboxes = []

        for bbox_idx in range(S * S):
            bboxes.append([x.item() for x in converted_pred[ex_idx, bbox_idx, :]])
        all_bboxes.append(bboxes)

    return all_bboxes

def intersection_over_union(boxes_preds, boxes_labels):
    box1_x1 = boxes_preds[..., 0:1] - boxes_preds[..., 2:3] / 2
    box1_y1 = boxes_preds[..., 1:2] - boxes_preds[..., 3:4] / 2
    box1_x2 = boxes_preds[..., 0:1] + boxes_preds[..., 2:3] / 2
    box1_y2 = boxes_preds[..., 1:2] + boxes_preds[..., 3:4] / 2
    box2_x1 = boxes_labels[..., 0:1] - boxes_labels[..., 2:3] / 2
    box2_y1 = boxes_labels[..., 1:2] - boxes_labels[..., 3:4] / 2
    box2_x2 = boxes_labels[..., 0:1] + boxes_labels[..., 2:3] / 2
    box2_y2 = boxes_labels[..., 1:2] + boxes_labels[..., 3:4] / 2

    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    # .clamp(0) is for the case when they do not intersect
    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)

    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    return intersection / (box1_area + box2_area - intersection + 1e-6)

def non_max_suppression(bboxes, iou_threshold, threshold):
    bboxes = [box for box in bboxes if box[1] > threshold]
    bboxes = sorted(bboxes, key=lambda x: x[1], reverse=True)
    bboxes_after_nms = []

    while bboxes:
        chosen_box = bboxes.pop(0)

        bboxes = [
            box
            for box in bboxes
            if box[0] != chosen_box[0]
               or intersection_over_union(
                torch.tensor(chosen_box[2:]),
                torch.tensor(box[2:]))
               < iou_threshold
        ]

        bboxes_after_nms.append(chosen_box)

    return bboxes_after_nms

def get_batch_bboxes(
        x,
        model=None,
        iou_threshold=0.5,  # for judging True/False
        threshold=0.4,  # for NMS
        box_format="midpoint",
        device="cuda"
):
    all_boxes = []

    with torch.no_grad():
        model = model.float()
        predictions = model(x.float())

    batch_size = x.shape[0]
    bboxes = cellboxes_to_boxes(predictions)[0]

    nms_boxes = non_max_suppression(
        bboxes,
        iou_threshold=iou_threshold,
        threshold=threshold,
    )

    for nms_box in nms_boxes:
      all_boxes.append(nms_box)

    return all_boxes, x

def draw_bbox_on_image(image, boxes_pred):
  height, width, _ = image.shape

  for box in boxes_pred:
      box_label = box[0:2] # class, confidence
      box = box[2:]  # 0:5

      prediction_class = INT_TO_LABEL[box_label[0]]

      upper_left_x = (box[0] - box[2] / 2) * width
      upper_left_y = (box[1] - box[3] / 2) * height

      lower_right_x = upper_left_x + (box[2] * width)
      lower_right_y = upper_left_y + (box[3] * height)

      cv2.rectangle(image, (int(upper_left_x), int(upper_left_y)), (int(lower_right_x), int(lower_right_y)), (0, 255, 0), 2)
      image = cv2.putText(image, prediction_class, (int(upper_left_x), int(upper_left_y)), cv2.FONT_HERSHEY_SIMPLEX ,  2, (0, 0, 255) , 1, cv2.LINE_AA)

In [6]:
checkpoint = torch.load(CHECKPOINT_FILE, map_location=torch.device('cpu'))
model = resnet18(split_size=7, num_boxes=2, num_classes=20)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 127MB/s]


Sequential(
  (0): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  

In [7]:
preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((IMAGE_HEIGHT, IMAGE_WIDTH)),
    transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

In [8]:
video = cv2.VideoCapture(VIDEO_PATH)
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(video.get(cv2.CAP_PROP_FPS))

width  = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))

frames = []

for frame_num in range(frame_count):
  video.set(cv2.CAP_PROP_POS_FRAMES, frame_num)

  ret, frame = video.read()

  image_for_model = frame.copy()
  image_for_model = cv2.cvtColor(image_for_model, cv2.COLOR_BGR2RGB) / 255.0
  image_for_model = preprocess(image_for_model)
  image_for_model = image_for_model[None, :, :, :]

  pred_bboxes, image = get_batch_bboxes(
      x=image_for_model,
      model=model,
      iou_threshold=0.5,
      threshold=0.4,
      device=DEVICE
  )

  draw_bbox_on_image(frame, pred_bboxes)

  frames.append(frame)

output_result(RESULT_VIDEO_PATH, (width, height), frames, fps)

KeyboardInterrupt: 