# ECSE 415 - Final Project

## 0 Setup

We start by importing the relevant libraries.

In [9]:
import numpy as np
import matplotlib.pyplot as plt
import cv2

In [10]:
PATH = "./"

np.set_printoptions(edgeitems=24)

def showImages(images, titles):
    '''This function takes a 2D array of images and titles and displays them in the notebook in color'''
    rows = len(images)
    cols = len(images[0])
    fig = plt.figure(figsize=(cols * 7 if cols > 1 else 12, rows * 5))

    # Add every picture in the plot
    for i in range(rows):
        for j in range(cols):
            if images[i][j] is None: continue
            fig.add_subplot(rows, cols, i * cols + j + 1)
            plt.imshow(images[i][j][:,:,::-1])
            plt.axis('off')
            plt.title(titles[i][j])
    
    plt.show()


def showGSImages(images, titles, cmap=[]):
    '''This function takes a 2D array of images and titles and displays them in the notebook in grayscale'''
    rows = len(images)
    cols = len(images[0])
    fig = plt.figure(figsize=(cols * 7 if cols > 1 else 12, rows * 5))

    # Add every picture in the plot
    for i in range(rows):
        for j in range(cols):
            if images[i][j] is None: continue
            fig.add_subplot(rows, cols, i * cols + j + 1)
            plt.imshow(images[i][j], cmap=cmap[i][j] if len(cmap) != 0 else 'gray')
            plt.axis('off')
            plt.title(titles[i][j])
    
    plt.show()


In [11]:
class VideoReader():
    
    def __init__(self, filename):
        self.cap = cv2.VideoCapture(filename)

        if(self.cap.isOpened() == False):
            raise SystemExit("Error opening video file!")
        
        self.frame_width = int(self.cap.get(3))
        self.frame_height = int(self.cap.get(4))
        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
    
    
    def getInfo(self):
        return self.frame_width, self.frame_height, self.fps
    

    def frames(self, freq):
        i = 0
        while(self.cap.isOpened()):
            ret, frame = self.cap.read()
            if ret == False: break

            i += 1
            if i % freq != 0: continue

            yield frame
    
    
    def release(self):
        self.cap.release()

In [12]:
class BoundingBox():
    
    def __init__(self, x, y, w, h, clazz, confidence):
        self.x = x
        self.y = y
        self.w = w
        self.h = h
        self.clazz = clazz
        self.confidence = confidence

        self.center_x = x + w / 2
        self.center_y = y + h / 2
    

    def get_distance(self, other):
        return np.sqrt((self.center_x - other.center_x) ** 2 + (self.center_y - other.center_y) ** 2)


    def get_top_left_corner(self):
        return (round(self.x), round(self.y))
    

    def get_lower_right_corner(self):
        return (round(self.x + self.w), round(self.y + self.h))

In [13]:
class Object():

    def __init__(self, id, bounding_box):
        self.id = id
        self.bounding_box = bounding_box
        self.absent_frames = 0


In [14]:
class Yolo:
    
    def __init__(self):
        # Import Darknet Yolov3 CNN from file as well as weights from COCO pretraining
        self.yolov3 = cv2.dnn.readNet(PATH + "/data/yolov3.weights", PATH + "/data/yolov3.cfg")

        # Import COCO classes from file
        self.classes = []
        classes_file = open(PATH + "/data/yolov3.classes", "r")
        for l in classes_file.readlines():
            self.classes.append(l.strip())

        # Generate random colors for each class to draw bounding boxes
        self.colors = np.random.uniform(0, 255, size=(len(self.classes), 3))
    
    
    def process_frame(self, frame):
        frame_blob = cv2.dnn.blobFromImage(frame, 0.00392, (416,416), (0,0,0), True, crop=False)

        # Execute neural network with image as input
        self.yolov3.setInput(frame_blob)
        output = self.yolov3.forward()

        class_ids = []
        confidences = []
        boxes = []

        CONF_THRESH = 0.5
        # Extract data from output matrix
        for o in output:
            # Get predicted class and confidence for bounding box
            scores = o[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]

            # Consider bounding box only if confidence is above threshold value
            if confidence > CONF_THRESH:
                center_x = int(o[0] * frame.shape[1])
                center_y = int(o[1] * frame.shape[0])
                w = int(o[2] * frame.shape[1])
                h = int(o[3] * frame.shape[0])
                x = center_x - w / 2
                y = center_y - h / 2
                class_ids.append(class_id)
                confidences.append(float(confidence))
                boxes.append([x, y, w, h])

        # Apply non-max suppression
        NMS_THRESH = 0.4
        indices = cv2.dnn.NMSBoxes(boxes, confidences, CONF_THRESH, NMS_THRESH)

        bounding_boxes = []
        for idx in indices:
            bounding_boxes.append(BoundingBox(boxes[idx][0], boxes[idx][1], boxes[idx][2], boxes[idx][3], class_ids[idx], confidences[idx]))

        return bounding_boxes


    # function to draw bounding box on the detected object with class name
    def draw_bounding_box(self, img, obj):
        label = "{} {} (#{})".format(self.classes[obj.bounding_box.clazz], round(obj.bounding_box.confidence, 2), obj.id)
        color = self.colors[obj.bounding_box.clazz]
        cv2.rectangle(img, obj.bounding_box.get_top_left_corner(), obj.bounding_box.get_lower_right_corner(), color, 2)
        cv2.putText(img, label, (round(obj.bounding_box.x - 10), round(obj.bounding_box.y - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

In [15]:
DISTANCE_THRESHOLD = 75
MAX_ABSENT_FRAMES = 5

class Objects():

    def __init__(self):
        self.idx = -1
        self.objects = dict({})


    def add_frame(self, bbs: list[BoundingBox]):
        for c in self.objects.keys():
            bbs_c = list(filter(lambda bb: bb.clazz == c, bbs))
            distances = self.compute_distances(c, bbs_c)
            matches, used_objs, used_bbs = self.match_bounding_boxes(distances)

            for match in matches:
                if distances[match] > (match[0].absent_frames + 1) * DISTANCE_THRESHOLD:
                    used_objs.remove(match[0])
                    used_bbs.remove(match[1])
                    matches.remove(match)

            for obj in self.objects[c]:
                if not obj in used_objs:
                    obj.absent_frames += 1

                    if obj.absent_frames > MAX_ABSENT_FRAMES:
                        self.objects[c].remove(obj)
            
            for bb in bbs_c:
                if not bb in used_bbs:
                    self.objects[c].append(Object(self.next_id(), bb))

            for match in matches:
                match[0].bounding_box = match[1]
                match[0].absent_frames = 0
        
        for bb in list(filter(lambda bb: (not bb.clazz in self.objects), bbs)):
            if bb.clazz not in self.objects: self.objects[bb.clazz] = []
            self.objects[bb.clazz].append(Object(self.next_id(), bb))


    def compute_distances(self, clazz, bbs):
        distances = dict({})
        for obj in self.objects[clazz]:
            for bb in bbs:
                distances[(obj, bb)] = obj.bounding_box.get_distance(bb)
        return distances


    def match_bounding_boxes(self, distances):
        sorted_keys = list(sorted(distances, key=distances.get))
        mutex_keys = []

        used_objs = set([])
        used_bbs = set([])
        for k in sorted_keys:
            if k[0] in used_objs or k[1] in used_bbs: continue

            used_objs.add(k[0])
            used_bbs.add(k[1])

            mutex_keys.append(k)

        return mutex_keys, used_objs, used_bbs


    def next_id(self):
        self.idx += 1
        return self.idx
    

    def get_all_objects(self):
        return  [value for sublist in self.objects.values() for value in sublist]

In [16]:
yolo = Yolo()

video_in = VideoReader(PATH + "/data/mcgill_drive.mp4")
frame_width, frame_height, fps = video_in.getInfo()

print("Frame dimensions: ({}, {})".format(frame_width, frame_height))
print("FPS: {}".format(fps))

out = cv2.VideoWriter(PATH + "/out/mcgill_drive.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width,frame_height))

objects = Objects()

for frame in video_in.frames(1):

    bounding_boxes = yolo.process_frame(frame)
    objects.add_frame(bounding_boxes)
    
    # For each bounding box selectedby NMS
    for obj in objects.get_all_objects():
        # Draw the bounding box in the prediction picture
        yolo.draw_bounding_box(frame, obj)
    
    # showImages([[frame]], [[""]])
    out.write(frame)


video_in.release()
out.release()


Frame dimensions: (2562, 1440)
FPS: 29.97002997002997
