# Computer Vision, Assignment 2
# Craioveanu Sergiu-Ionut, 407 AI

Link to DropBox: https://www.dropbox.com/sh/qifay2hqleoande/AABgOJYZ_1di_SKvSGZ2iPAVa?dl=0

In [2]:
# !pip install pandoc
# !pip install nbconvert

In [1]:
# !pip install tqdm  # 4.64.1
import cv2
import cv2 as cv
import numpy as np
import os
import collections
from tqdm import tqdm
from typing import List, Tuple, Union

In [2]:
mode = 'test'

In [3]:
results_dir = "results/"
if not os.path.exists(results_dir):
    os.makedirs(results_dir)
    
task1_dir_result = "results/Task1/"
if not os.path.exists(task1_dir_result):
    os.makedirs(task1_dir_result)
    
task2_dir_result = "results/Task2/"
if not os.path.exists(task2_dir_result):
    os.makedirs(task2_dir_result)
    
task3_dir_result = "results/Task3/"
if not os.path.exists(task3_dir_result):
    os.makedirs(task3_dir_result)

# Defined Functions

## Utility

In [4]:
def show_image(input_image, window_name='image', timeout=0):
    """
    Display an image in a window, resized to 40% of its original dimensions.

    Args:
        input_image (numpy array): The input image to display.
        window_name (str, optional): The name of the window displaying the image. Defaults to 'image'.
        timeout (int, optional): The number of milliseconds to wait before closing the window. Defaults to 0.

    Note:
        If the timeout is set to 0, the window will not close automatically.
    """
    resized_image = cv.resize(
        input_image,
        (int(input_image.shape[1] * 0.4), int(input_image.shape[0] * 0.4))
    )

    cv.imshow(window_name, resized_image)
    cv.waitKey(timeout)
    cv.destroyAllWindows()

In [5]:
def read_jpg_files_in_directory(directory_path: str) -> List[str]:
    """
    Reads all .jpg or .jpeg files in a given directory.

    Args:
        directory_path (str): The path of the directory where files should be read from.

    Returns:
        List[str]: A list containing the filenames of all .jpg or .jpeg files in the directory.
    """
    jpg_files = []
    for filename in os.listdir(directory_path):
        if filename.endswith('.jpg') or filename.endswith('.jpeg'):
            jpg_files.append(filename)
    return jpg_files

def read_mp4_files_in_directory(directory_path: str) -> List[str]:
    """
    Reads all .mp4 files in a given directory.

    Args:
        directory_path (str): The path of the directory where files should be read from.

    Returns:
        List[str]: A list containing the filenames of all .mp4 files in the directory.
    """
    mp4_files = []
    for filename in os.listdir(directory_path):
        if filename.endswith('.mp4'):
            mp4_files.append(filename)
    return mp4_files

def get_key(dictionary: dict, val: Union[int, str, float, list, dict]) -> Union[int, str, None]:
    """
    Get the key for a given value in a dictionary.

    Args:
        dictionary (dict): The dictionary to find the key in.
        val (Union[int, str, float, list, dict]): The value for which the key should be found.

    Returns:
        Union[int, str, None]: The key associated with the provided value in the dictionary. Returns None if not found.
    """
    for key, value in dictionary.items():
        if value == val:
            return key

## Image Processing

In [6]:
def draw_lines(img, lines, color=[255, 0, 0], thickness=2):
    """
    Draw lines on an image given a list of lines.

    Args:
        img (numpy array): The image on which lines should be drawn.
        lines (list): List of lines where each line is a list of four integers [x1, y1, x2, y2] representing the two endpoints of the line.
        color (list, optional): List of three integers representing RGB color of the line. Defaults to [255, 0, 0] (Red).
        thickness (int, optional): Thickness of the line. Defaults to 2.
    """
    for line in lines:
        for x1,y1,x2,y2 in line:
            cv2.line(img, (x1, y1), (x2, y2), color, thickness)

def select_points(event, x, y, flags, param):
    """
    If left button down event is detected, draw a circle at the mouse location and append its position to a list.

    Args:
        event (int): OpenCV event type.
        x (int): x-coordinate of the event.
        y (int): y-coordinate of the event.
        flags (int): Any relevant flags passed by OpenCV.
        param: Any extra parameters supplied by OpenCV.
    """
    if event == cv2.EVENT_LBUTTONDOWN:
        cv2.circle(image, (x,y), 5, (255,0,0), -1)
        points.append((x, y))
        print(f"Point selected: ({x}, {y})")

def get_coord_return_rect(x_min, y_min, x_max, y_max):
    """
    Convert coordinates to rectangle dimensions.

    Args:
        x_min, y_min, x_max, y_max (int): The minimum and maximum x and y coordinates.

    Returns:
        Tuple (int, int, int, int): A tuple representing the top-left corner and dimensions (width, height) of the rectangle.
    """
    w = np.abs(x_max - x_min)
    h = np.abs(y_max - y_min)
    return x_min, y_min, w, h

def get_frames_and_rect(file_data: list):
    """
    Extract the number of frames and rectangle from the file data.

    Args:
        file_data (list): The data extracted from the file.

    Returns:
        Tuple: The number of frames and the rectangle as a tuple (x, y, w, h).
    """
    no_frames = file_data[0][0]
    xmin, ymin, xmax, ymax = file_data[1][1:]
    x, y, w, h = get_coord_return_rect(xmin, ymin, xmax, ymax)

    return no_frames, (x, y, w, h)

def get_rect_return_coord(x, y, w, h):
    """
    Convert rectangle dimensions to coordinates.

    Args:
        x, y, w, h (int): The top-left corner and dimensions (width, height) of the rectangle.

    Returns:
        Tuple (int, int, int, int): A tuple representing the minimum and maximum x and y coordinates.
    """
    return x, y, x+w, y+h


In [7]:
def get_roi(input_image, rectangle: tuple, display=False):
    """
    Extract the region of interest (ROI) from an image based on the provided rectangle.

    Args:
        input_image (numpy array): The input image to extract the ROI from.
        rectangle (tuple): A tuple containing the coordinates (x, y) and dimensions (w, h) of the ROI.
        display (bool): If True, display the extracted ROI using the show_image() function.

    Returns:
        numpy array: The extracted region of interest (ROI) from the input image.
    """
    (x_coord, y_coord, width, height) = rectangle
    drawn_image = input_image.copy()
    cv2.rectangle(drawn_image, (x_coord, y_coord), (x_coord + width, y_coord + height), (0, 0, 255), 2)  # Red

    roi_image = input_image[y_coord:y_coord + height, x_coord:x_coord + width]
    if display:
        show_image(roi_image)

    return roi_image

## Masking Image

In [8]:
def generate_mask_given_points(image, points) -> np.array:
    """Function used for masking entire image except the given polygon"""
    mask = np.zeros_like(image)
    cv2.fillPoly(mask, np.array([points]), (255,255,255))
    # Apply the mask to the image
    masked_image = cv2.bitwise_and(image, mask)
    return masked_image


In [9]:
def mask_rectangle_except_roi(image, x, y, w, h):
    # Create a black image with the same size as the original image
    mask = np.zeros_like(image)
    
    # Set the region of interest (ROI) in the mask to white
    mask[y:y+h, x:x+w] = 255

    # Perform a bitwise-and operation to mask the original image
    masked_image = cv2.bitwise_and(image, mask)
    
    return masked_image

# Compute IoU

In [10]:
def compute_iou(mask1, mask2):
    """
    Compute the Intersection over Union (IoU) score of two binary masks.
    
    Args:
        mask1 (numpy array): A binary mask.
        mask2 (numpy array): A binary mask.

    Returns:
        float: The IoU score of the two masks.
    """
    
    # Compute the intersection of the two masks
    intersection = np.logical_and(mask1, mask2)
    
    # Compute the union of the two masks
    union = np.logical_or(mask1, mask2)
    
    # Compute the IoU score
    iou_score = np.sum(intersection) / np.sum(union)
    
    return iou_score


def get_iou(bb1, bb2):
    """
    Compute the Intersection over Union (IoU) of two bounding boxes.

    Args:
        bb1 (tuple): A bounding box in the format (x, y, width, height).
        bb2 (tuple): A bounding box in the format (x, y, width, height).

    Returns:
        float: The IoU score of the two bounding boxes.
    """

    # Determine the coordinates of the intersection rectangle
    x_left = max(bb1[0], bb2[0])
    y_top = max(bb1[1], bb2[1])
    x_right = min(bb1[0]+bb1[2], bb2[0]+bb2[2])
    y_bottom = min(bb1[1]+bb1[3], bb2[1]+bb2[3])

    # If there is no overlap, return 0
    if x_right < x_left or y_bottom < y_top:
        return 0.0

    # Compute the area of the intersection rectangle
    intersection_area = (x_right - x_left) * (y_bottom - y_top)

    # Compute the area of both bounding boxes
    bb1_area = bb1[2] * bb1[3]
    bb2_area = bb2[2] * bb2[3]

    # Compute the IoU score
    iou = intersection_area / float(bb1_area + bb2_area - intersection_area)
    
    return iou


def get_max_iou_from_candidates(bounding_box: tuple, candidates: list):
    """
    Compute the maximum IoU score between a given bounding box and a list of candidate bounding boxes.

    Args:
        bounding_box (tuple): A bounding box in the format (x, y, width, height).
        candidates (list): A list of candidate bounding boxes in the format (x, y, width, height).

    Returns:
        Tuple (float, tuple): The maximum IoU score and the corresponding candidate bounding box.
    """
    
    # Initialize the maximum IoU score and the best candidate
    max_iou = 0.0
    best_candidate = None
    
    # Iterate over all candidate bounding boxes
    for candidate in candidates:
        # Compute the IoU score for the current candidate
        temp_iou = get_iou(candidate, bounding_box)
        
        # If the current candidate has a higher IoU score, update the maximum IoU score and the best candidate
        if temp_iou > max_iou:
            max_iou = temp_iou
            best_candidate = candidate
            
    return max_iou, best_candidate


## YoloV7 Setup + Functions 

It was pretty clear to me, starting with Task 1, that I would be in need of a model which can (somewhat) accurately detect vehicles in images. It was also relevant to me for the model to be good, whilst having little computational overhead. After some digging, I was very pleased to find that `OpenCV` has its own mechanism of importing Deep Learning models, to some extent. Luckily, `YOLO` was among them. `YOLO` is able to detect at least 80 classes of common objects, including cars, trucks, busses or motorbikes. 

At the beginning, I actually solved all 3 tasks using `YOLOv3`, as it was the best documented and seemingly the most popular model out there. The accuracy obtained was OK on Task1 on the train dataset (92%), but posed somewhat of a problem for task 2 and 3 in some cases where the lighting was poor (i.e. at night). As such, after having all the boilerplate ready, I wondered if there was a better model out there, that would require minimal changes. Apparently, there were newew, better, and faster versions of the YOLO model. At first I opted for `YOLOv4` which was better but considerably slower. I then found `YOLOv7` as one of the better models, moving as fast as `YOLOv3`, but with the accuracy of `YOLOv4` - best of both worlds, if you will.

The `YOLO` model requires 3 files:
- `yolov7.cfg` -> model configuration file
- `yolov7.weights` -> artefact of model weights, essential for our predictions
- `coco.names` -> YOLO was trained on the COCO dataset, based on the classes found within those images. It needs this file in order to correctly associate prediction with their class.

In the cell below, we'll notice that the setup is relatively quite easy.

In [11]:
# modelConfiguration = './yolov3.cfg' -> video 6 task 3 greseste 1, cam 12 min
# modelWeights = './yolov3.weights'
# modelConfiguration = './yolov4-p5.cfg' -> merge mai bine,  video 6 task 3 15 min
# modelWeights = './yolov4-p5.weights'
modelConfiguration = './yolov7.cfg' # -> merge cam ca v4, dar mai rapid, video 6 task 3 12 min
modelWeights = './yolov7.weights'
net = cv2.dnn.readNetFromDarknet(modelConfiguration, modelWeights)

net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)

# Load names of classes from coco
classes = None
with open('coco.names', 'r') as f:
    classes = [line.strip() for line in f.readlines()]

# Get the output layer names
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers().flatten()]

The function below performs "the magic" - it will receive an image and return the bounding boxes of all objects that match our classes of interest. 

In [12]:
def identify_vehicles_in_image(input_image: np.ndarray, display_results: bool = False) -> List[Tuple[int, int, int, int]]:
    """
    Identify and draw bounding boxes around vehicles in an image using YOLO object detection.

    Args:
        input_image (numpy array): A 3D numpy array representing an image where vehicles should be detected.
        display_results (bool): A boolean that, if True, displays the image with detected objects bounded.

    Returns:
        list of tuples: Each tuple represents the rectangle bounding box of a detected vehicle. Each tuple contains 4 integers, 
                        representing (x, y, width, height) of the bounding box.
    """

    # Copy the image to prevent in-place modifications
    image_copy = input_image.copy()
    image_height, image_width, image_channels = image_copy.shape

    # Create a 4D blob from image, with given parameters
    blob = cv2.dnn.blobFromImage(image_copy, 0.00392, (1920, 864), (0, 0, 0), True, crop=False)

    net.setInput(blob)

    # Execute a forward pass through the network to get output from the output layers
    output_layers_data = net.forward(output_layers)

    detected_classes = []
    detection_confidences = []
    bounding_boxes = []
    classes_of_interest = {'car', 'motorbike', 'bus', 'truck'}

    for output_layer in output_layers_data:
        for detected_object in output_layer:
            scores = detected_object[5:]
            class_index = np.argmax(scores)
            object_confidence = scores[class_index]

            # Filter for classes of interest
            if object_confidence > 0.5 and classes[class_index] in classes_of_interest:
                # Object has been detected
                center_x = int(detected_object[0] * image_width)
                center_y = int(detected_object[1] * image_height)
                box_width = int(detected_object[2] * image_width)
                box_height = int(detected_object[3] * image_height)

                # Rectangle coordinates
                x_coord = int(center_x - box_width / 2)
                y_coord = int(center_y - box_height / 2)

                bounding_boxes.append([x_coord, y_coord, box_width, box_height])
                detection_confidences.append(float(object_confidence))
                detected_classes.append(class_index)

    valid_indexes = cv2.dnn.NMSBoxes(bounding_boxes, detection_confidences, 0.5, 0.3)
    final_rectangles = []

    for i in range(len(bounding_boxes)):
        if i in valid_indexes:
            x, y, box_width, box_height = bounding_boxes[i]
            object_label = str(classes[detected_classes[i]])
            cv2.rectangle(image_copy, (x, y), (x + box_width, y + box_height), (0,255,0), 2)
            cv2.putText(image_copy, object_label, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
            final_rectangles.append((x, y, box_width, box_height))
    
    if display_results:
        show_image(image_copy)

    return final_rectangles


## Task-specific functions

### Task 1

In [13]:
def read_numbers_task1(filename):
    with open(filename, 'r') as file:
        first_number = int(file.readline().strip())
        rest_of_numbers = [int(line.strip()) for line in file.readlines()]
        return first_number, rest_of_numbers

def read_numbers_gt(filename):
    with open(filename, 'r') as file:
        lines = file.readlines()
        numbers = [tuple(map(int, line.strip().split())) for line in lines]
        return numbers

### Task 2

In [14]:
def read_file_structure_task2(filename):
    with open(filename, 'r') as file:
        lines = file.readlines()
        data = []
        for line in lines:
            numbers = list(map(int, line.strip().split()))
            data.append(numbers)
        return data

### Task 3

In [15]:
def process_tracker_path(tracker_path: dict) -> dict:
    for k, v in tracker_path.items():
        tmp_list = [i for i in v if i is not None]
        tmp_list = list(dict.fromkeys(tmp_list))
        tracker_path[k] = tmp_list
    return tracker_path


def check_region_task3_bounding_box(image, rectangle):
    
    reg1 = [(232, 422), (893, 343), (226, 7), (21, 4)]
    reg2 = [(1029, 380), (1384, 556), (1902, 430), (1906, 277)]
    reg3 = [(438, 700), (1378, 559), (1919, 869), (582, 878)]

    # Generate masked image of non-masked car (in bounding box)
    unmasked_car = mask_rectangle_except_roi(image, *rectangle)
    
    max_iou = 0.0
    
    for reg in [reg1, reg2, reg3]:
    
        # Generate masked image of non-masked lane
        mask_for_reg = generate_mask_given_points(image, reg)

        # Compute IoU between non-masked rectangle and non-masked lane area
        iou = compute_iou(unmasked_car, mask_for_reg)

        if iou > max_iou:
            max_iou = iou
            reg_pred = reg
            
    if max_iou > 0.02:
        if reg_pred == reg1:
            return 1
        elif reg_pred == reg2:
            return 2
        elif reg_pred == reg3:
            return 3
    return None


def compute_score_tracker_path(tracker_path: dict, display = True) -> str:
    a1_2 = a1_3 = a2_1 = a2_3 = a3_1 = a3_2 = 0
    
    for k, v in tracker_path.items():
        if len(v) == 2:
            start = v[0]
            end = v[1]
            
            if start == 1:
                if end == 2:
                    a1_2 += 1
                elif end == 3:
                    a1_3 += 1
            elif start == 2:
                if end == 1:
                    a2_1 += 1
                elif end == 3:
                    a2_3 += 1
            elif start == 3:
                if end == 1:
                    a3_1 += 1
                elif end == 2:
                    a3_2 += 1
                    
    output_str = f"1-2 {a1_2}\n1-3 {a1_3}\n2-1 {a2_1}\n2-3 {a2_3}\n3-1 {a3_1}\n3-2 {a3_2}"
    if display:
        print(output_str)
    return output_str

# Task 1

### Generate custom poligons for each lane
This cell is used for creating a polygon containing each lane from the image, and masking everything that is not within that polygon (lane). As such, we can generate 9 polygons belonging to the 9 lanes in the camera field of view. We can then predict cars using bounding boxes and perform logical operations using only the masks. Using this method, the car detection operation is performed once per frame.

In [16]:
# # # List to store points
# points = []

# # Load the image
# image = task1_imgs[8].copy()
# # image = cv2.imread('image.jpg')
# cv2.namedWindow('image')
# cv2.setMouseCallback('image', select_points)

# while(1):
#     cv2.imshow('image', image)
#     if cv2.waitKey(20) & 0xFF == 27 or len(points) == 4:
#         break

# cv2.destroyAllWindows()

# # Check if 4 points were selected
# if len(points) == 4:
#     # Create a mask and fill in the polygon
#     mask = np.zeros_like(image)
#     cv2.fillPoly(mask, np.array([points]), (255,255,255))

#     # Apply the mask to the image
#     masked_image = cv2.bitwise_and(image, mask)

#     # Show the image
#     cv2.imshow('Image', masked_image)
#     cv2.waitKey(0)
#     cv2.destroyAllWindows()
# else:
#     print("Four points were not selected")
# print(points)

In [17]:
# Lanes "bounding coordinates" within an image, in order to compute if a car's bounding box is on a specific lane or not
lanes = [
    [(227, 418), (405, 378), (135, 119), (99, 202)],     # 1
    [(405, 395), (555, 379), (108, 39), (65, 52)],       # 2
    [(530, 379), (690, 367), (114, 2), (45, 4)],        # 3
    [(1114, 350), (1212, 409), (1914, 334), (1914, 276)], # 4
    [(1178, 387), (1273, 433), (1913, 358), (1913, 315)], # 5
    [(1233, 414), (1321, 467), (1914, 395), (1914, 338)], # 6
    [(1189, 621), (1610, 877), (1838, 770), (1443, 580)], # 7
    [(1028, 643), (1368, 877), (1734, 873), (1276, 610)], # 8
    [(894, 663), (1123, 876), (1474, 875), (1121, 628)],  # 9  
]

## Task 1 Approach

The approach towards task 1 is somewhat intuitive, in my opinion. We will take all images of interest from Task 1, and for each image, we will predict all the vehicles in that image. Each prediction will output some 'rectangles' coresponding to our bounding boxes of objects. 

As mentioned earlier, I have created polygons matching the shape of a lane within a given frame. What I then do: for any given image, mask everything (black it out) except a given lane. As such, I am able to single out that lane, check with the bounding boxes generated by `YOLO`, and see how many of my objects overlap with any given lane. The overlap is computed using a modified IoU. I then do this for all lanes in that frame.

This cell required some tweaking in the values of IoU, as well as playing with `YOLO`'s confidence paramteres, but on the train set, it outputs an accuracy of 92%.

### Generate Predictions for Task 1
This cell should take at most 2-3 minutes

In [19]:
%%time

# Set data path
path = f'{mode}/Task1/'


# Store image names
image_names = read_jpg_files_in_directory(path)
print(image_names[:5])

queries = [f"{img_name.split('.')[0]}_query.txt" for img_name in image_names]
print(queries[:5])

gt_files = [f"{img_name.split('.')[0]}_gt.txt" for img_name in image_names]
print(gt_files[:5])

# Store images
task1_imgs = [np.array(cv2.imread(os.path.join(path, file))).astype(np.uint8) for file in image_names]

for i, img in enumerate(task1_imgs):
    
    # List of lanes to search
    first_number, lanes_to_search = read_numbers_task1(path + queries[i])
    
    # Generate output
    output_str = f"{first_number}\n"
    
    # Load the image
    image = img.copy()
    
    # Generate car predictions (bounding boxes) over non-masked image
    rectangles = identify_vehicles_in_image(image)
    
    # Iterate through lanes idx (which is offset by 1)
    for lane in lanes_to_search:
        
        # Generate masked image of non-masked lane
        mask_for_lane = generate_mask_given_points(image, lanes[lane - 1])
        
        car_count_lane = 0
        # Iterate through car prediction bounding boxes
        for rectangle in rectangles:

            # Generate masked image of non-masked car (in bounding box)
            unmasked_car = mask_rectangle_except_roi(image, *rectangle)

            # Compute IoU between non-masked rectangle and non-masked lane area
            iou = compute_iou(unmasked_car, mask_for_lane)
            
            if iou > 0.05:
                car_count_lane += 1
#                 print(f"Car detected on lane {j + 1} with IoU {iou}")
        
        # Update output string
        if car_count_lane > 0:
            output_str += f"{lane} 1\n"
        else:
            output_str += f"{lane} 0\n"
#         print(f"Detected {car_count_lane} cars on lane {j + 1}\n")

    # Save output string to file
    with open(task1_dir_result + image_names[i].split(".")[0] + "_predicted.txt", 'w') as f:
            f.write(output_str)

['01_1.jpg', '01_2.jpg', '01_3.jpg', '02_1.jpg', '02_2.jpg']
['01_1_query.txt', '01_2_query.txt', '01_3_query.txt', '02_1_query.txt', '02_2_query.txt']
['01_1_gt.txt', '01_2_gt.txt', '01_3_gt.txt', '02_1_gt.txt', '02_2_gt.txt']
Wall time: 1min 9s


**Train**: Mistakes: 12; Total preds: 154; Score: 0.922077922077922

### CHECK: Compare gt with preds

In [29]:
gt_path = path + "ground-truth/"
print(task1_dir_result)
print(gt_path)

total_preds = 0
mistakes = 0
for i, name in enumerate(image_names):
    
    
    print(f"IMG NAME: {name}\n")
    # get image name
    img_name = name.split(".")[0]
    
    # reconstruct preds file name
    pred_file = img_name + "_predicted.txt"
    
    # get the ground truths
    gt_list = read_numbers_gt(gt_path + gt_files[i])
    
    total_preds += gt_list[0][0]
    
    # get the predictions
    pred_list = read_numbers_gt(task1_dir_result + pred_file)
    
    # count mistakes
    for gt, pred in zip(gt_list, pred_list):
        if gt != pred:
            print(f"GT: {gt}; PRED: {pred}")
            mistakes += 1
    
    print("--------------------------")

print(f"Mistakes: {mistakes}; Total preds: {total_preds}; Score: {(total_preds - mistakes) / total_preds}")

results/Task1/
test/Task1/ground-truth/
IMG NAME: 01_1.jpg

--------------------------
IMG NAME: 01_2.jpg

--------------------------
IMG NAME: 01_3.jpg

GT: (4, 0); PRED: (4, 1)
--------------------------
IMG NAME: 02_1.jpg

--------------------------
IMG NAME: 02_2.jpg

--------------------------
IMG NAME: 02_3.jpg

--------------------------
IMG NAME: 03_1.jpg

--------------------------
IMG NAME: 03_2.jpg

--------------------------
IMG NAME: 03_3.jpg

--------------------------
IMG NAME: 04_1.jpg

--------------------------
IMG NAME: 04_2.jpg

GT: (7, 0); PRED: (7, 1)
--------------------------
IMG NAME: 04_3.jpg

--------------------------
IMG NAME: 05_1.jpg

--------------------------
IMG NAME: 05_2.jpg

--------------------------
IMG NAME: 05_3.jpg

--------------------------
IMG NAME: 06_1.jpg

GT: (2, 1); PRED: (2, 0)
--------------------------
IMG NAME: 06_2.jpg

--------------------------
IMG NAME: 06_3.jpg

--------------------------
IMG NAME: 07_1.jpg

GT: (9, 0); PRED: (

# --- END OF TASK 1 ---

# Task 2

## First approach - just tracker
In my first attempt at task 2, I leveraged a `CSRT` object tracker within OpenCV in order to track a car throughout the frames of a video. Given the initial coordinates, I would convert these coordinates to a bounding box, and use it to initialize my tracker. It worked OK in general (say, 10 videos out of 15), but had major issues if a car moved too fast, or if it suddenly overlapped with multiple other cars or objects. I then thought of how I could make my solution more robust, which was actually inspired by task 3. In the next approach, I'll describe the changes brought.

I also tried `KCF, GOTURN, MOSSE, TLD` trackers. All failed quite miserably, unfortunately.

In [20]:
# %%time
# path = f'{mode}/Task2/'
# gt_path = path + "ground-truth/"

# # Store Video names
# video_names = read_mp4_files_in_directory(path)
# print(video_names[:5])

# task2_txt = [f"{video_name.split('.')[0]}.txt" for video_name in video_names]
# print(task2_txt[:5])

# test_names = ['01.mp4', '02.mp4', '05.mp4', '08.mp4']

# tracker_types = [ 'MOSSE', 'TLD']

# for tracker_type in tracker_types:
#     # replace test_names with video_names
#     print(f"================{tracker_type}===============\n")
#     for j, video_name in enumerate(test_names):
        
#         print(f'-------{video_name}------\n')

#         txt_file = video_name.split(".")[0] + ".txt"

#         # Path to text file
#         file_path = path + txt_file

#         # Read in the 2 lists
#         file_data = read_file_structure_task2(file_path)

#         # Get number of frames and bounding box
#         no_frames, rect_init = get_frames_and_rect(file_data)
#         print(no_frames)

#         # Create Output String
#         output_str = f"{no_frames} -1 -1 -1 -1\n" \
#                      f"0 {rect_init[0]} {rect_init[1]} {rect_init[2]} {rect_init[3]}\n"

#         # Path to your video file
#         video_path = path + video_name

#         # Create a VideoCapture object
#         cap = cv2.VideoCapture(video_path)

#         # Check if video opened successfully
#         if (cap.isOpened()== False): 
#             print("Error opening video stream or file")

#         # Create a tracker
#         if tracker_type == 'KCF':
#             tracker = cv2.TrackerKCF_create()
#         if tracker_type == 'GOTURN':
#             tracker = cv2.TrackerGOTURN_create()
#         if tracker_type == 'MOSSE':
#             tracker = cv2.TrackerMOSSE_create()
#         if tracker_type == 'TLD':
#             tracker = cv2.TrackerTLD_create()
# #         tracker = cv2.TrackerCSRT_create()

#         # Get first frame
#         ret, frame_init = cap.read()

#         cv2.rectangle(
#             frame_init, 
#             (rect_init[0], rect_init[1]), 
#             (rect_init[0] + rect_init[2], rect_init[1] + rect_init[3]), 
#             (0, 255, 0), 
#             2
#         )

#         # DEBUG: check frame
# #         show_image(frame_init)

#         # Initialize tracker with first frame and bounding box
#         ok = tracker.init(frame_init, rect_init)

#         # Read until video is completed
#         for i in tqdm(range(1, no_frames)):

#             # Read a new frame
#             ok, frame = cap.read()
#             if not ok:
#                 break

#             # Update tracker
#             ok, bbox = tracker.update(frame)

#             # Tracking success
# #             p1 = (int(bbox[0]), int(bbox[1]))
# #             p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
# #             cv2.rectangle(frame, p1, p2, (0,255, 0), 2, 1)

#             # Create coordinates from rectangle and append to output str
#             xmin, ymin, xmax, ymax = get_rect_return_coord(*bbox)

#             # Append output_str
#             output_str += f"{i} {xmin} {ymin} {xmax} {ymax}\n"

# #             if i % 20 == 0:
# #                 show_image(frame)
        
#         print(output_str)

#         # When everything done, release the video capture object
#         cap.release()

#         # Close all the frames
#         cv2.destroyAllWindows()

        
#     #     # Reconstruct pred file name
#     #     pred_file = video_names[j].split(".")[0] + "_predicted.txt"

#     #     # Save output string to file
#     #     with open(task2_dir_result + pred_file, 'w') as f:
#     #             f.write(output_str)

## Second approach - Using tracker & YOLOv7 to recalibrate tracker
The next approach is a bit more computationally expensive, but also more robust. It's still not perfect, as I'll mention in a second. The main idea here was to still use the tracker to track the car/vehicle, but once every few frames, make a YOLO prediction on all cars and make sure our bounding box is still covering our desired car. We do this by re-initializing our tracker with a given YOLO prediction, every few frames. 

This covers the problem of cars going too fast or too slow for our tracker, whilst also providing a more robust bounding box. By robust, I mean a bounding box that really covers the entire car. The tracker would ocasionally have too small or large bounding boxes for the car it was trying to track.

On the other hand, it still fails to account for obstacles in front of the camera. If a pillar or a sempahore gets in the way of the camera, then even YOLO fails at predicting that car in the frames where view is obstructed. As such, the tracker can be only as good as YOLO will allow it. YOLO is still a good algorithm, but not perfect. As such, it will still fail on a some videos after a certain point.

### Actual processing of video
Processing each video takes around 5 minutes per video, so entire cell should take around 1h.

In [21]:
%%time

path = f'{mode}/Task2/'
# gt_path = path + "ground-truth/"

# Store Video names
video_names = read_mp4_files_in_directory(path)
print(video_names[:5])

# Get names of text files containing starting information
task2_txt = [f"{video_name.split('.')[0]}.txt" for video_name in video_names]
print(task2_txt[:5])

# Iterate through all the video names
for j, video_name in enumerate(video_names):
    
    # Get text file name given video name
    txt_file = video_name.split(".")[0] + ".txt"
    
    # Path to text file
    file_path = path + txt_file

    # Read in the 2 lists
    file_data = read_file_structure_task2(file_path)
    
    # Get number of frames and bounding box
    no_frames, rect_init = get_frames_and_rect(file_data)
    print(no_frames)

    # Create Output String
    output_str = f"{no_frames} -1 -1 -1 -1\n" \
                 f"0 {rect_init[0]} {rect_init[1]} {rect_init[2]} {rect_init[3]}\n"
   
    # Path to video file
    video_path = path + video_name
    print(f"{video_name}\n")

    # Create a VideoCapture object
    cap = cv2.VideoCapture(video_path)
    
    # Check if video opened successfully
    if not cap.isOpened(): 
        print("Error opening video stream or file")
        exit()
        
    # Create a tracker
    tracker = cv2.TrackerCSRT_create()

    # Get first frame
    ret, frame_init = cap.read()

    cv2.rectangle(
        frame_init, 
        (rect_init[0], rect_init[1]), 
        (rect_init[0] + rect_init[2], rect_init[1] + rect_init[3]), 
        (0, 255, 0), 
        2
    )

    # DEBUG: check frame
#     show_image(frame_init)

    # Initialize tracker with first frame and bounding box
    ok = tracker.init(frame_init, rect_init)
        
    # Frame counter
    frame_cnt = 0
    
    # Iterate through all the frames in the video
    for _ in tqdm(range(no_frames)):
        
        # Read current frame
        ret, frame = cap.read()
        if not ret:
            break
        
        # Update tracker every every frame
        ok, bbox = tracker.update(frame)
        
        # Every 3 frames, run YOLO and update currently tracked objects
        if frame_cnt % 3 == 0:
            
            # Use YOLO to detect cars in the current frame
            objects_bbox = identify_vehicles_in_image(frame)
            
            # Try to see if any of the yolo predicted boxes is close to an existing tracked box
            iou, candidate = get_max_iou_from_candidates(bbox, objects_bbox)
            
            # If we have a match, update current tracker
            if iou > 0.1 and candidate:
            
                # You want to re-init the trackers with YOLO predictions
                tracker.init(frame, candidate)
                
                # Update bbox value with candidate
                bbox = candidate
                 
        # Create coordinates from rectangle and append to output str
        xmin, ymin, xmax, ymax = get_rect_return_coord(*bbox)
        frame_cnt += 1
        
        # Append output_str
        output_str += f"{frame_cnt} {xmin} {ymin} {xmax} {ymax}\n"
       
        # DEBUG
#         if frame_cnt % 20 == 0:
#             p1 = (int(bbox[0]), int(bbox[1]))
#             p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
#             cv2.rectangle(frame, p1, p2, (0,255, 0), 2, 1)
#             show_image(frame)     
    
    # Reconstruct pred file name
    pred_file = video_names[j].split(".")[0] + "_predicted.txt"

    # Save output string to file
    with open(task2_dir_result + pred_file, 'w') as f:
            f.write(output_str)

['01.mp4', '02.mp4', '03.mp4', '04.mp4', '05.mp4']
['01.txt', '02.txt', '03.txt', '04.txt', '05.txt']
884
01.mp4



100%|█████████▉| 883/884 [05:51<00:00,  2.51it/s]


653
02.mp4



100%|█████████▉| 652/653 [04:19<00:00,  2.51it/s]


885
03.mp4



100%|█████████▉| 884/885 [05:56<00:00,  2.48it/s]


935
04.mp4



100%|█████████▉| 934/935 [06:09<00:00,  2.53it/s]


1056
05.mp4



100%|█████████▉| 1055/1056 [06:53<00:00,  2.55it/s]


1446
06.mp4



100%|█████████▉| 1445/1446 [09:36<00:00,  2.51it/s]


589
07.mp4



100%|█████████▉| 588/589 [03:54<00:00,  2.51it/s]


348
08.mp4



100%|█████████▉| 347/348 [02:26<00:00,  2.38it/s]


1513
09.mp4



100%|█████████▉| 1512/1513 [09:56<00:00,  2.53it/s]


1520
10.mp4



100%|█████████▉| 1519/1520 [10:05<00:00,  2.51it/s]


678
11.mp4



100%|█████████▉| 677/678 [04:29<00:00,  2.51it/s]


461
12.mp4



100%|█████████▉| 460/461 [03:04<00:00,  2.50it/s]


511
13.mp4



100%|█████████▉| 510/511 [03:23<00:00,  2.50it/s]


409
14.mp4



100%|█████████▉| 408/409 [02:42<00:00,  2.52it/s]


492
15.mp4



100%|█████████▉| 491/492 [03:17<00:00,  2.49it/s]

Wall time: 1h 22min 7s





# --- END OF TASK 2 ---

#  TASK 3 Approach

So actually, the idea of task 2 is simply expanded to task 3. I'll admit, I actually came with the idea of "synchronising" tracker and `YOLO` predictions in task 3, and then thought of trying it for task 2 as well. It's essential for us to know we can (for the most part) reliably track one car. Given we know how to track a single car, tracking more cars should just imply using more trackers.

As such, each vehicle that appears in our video will have its own tracker. We will sync our tracker predcictions with what YOLO is able to detect. We will 'interlink' the predictions, to insure we have the highest rate of success. There are still edge cases where our code fails, which are similar to the ones present in Task2 (i.e. car being obstructed).

But one question still remains - How do we know that a car traversed from, say, region 1 to region 2?

We'll actually borrow part of the approach from Task 1. Instead of masking just one lane, we'll just mask regions (1, 2, 3). Like in Task 1, we'll predict, given a frame, where the cars are. We'll perform an overlap computation (IoU), to see which cars are in which regions, for a given frame. So for each frame, we know where there are cars, and if they're on a certain region.

In order to put it all together, I actually instantiate some dictionaries (hashmaps) that track the path of a tracker (vehicle):
- A dictionary for storing the tracker and its current location (bounding box) in the video
- A dictionary where we store where we have first detected a tracker (source location)
- A dictionary where we store the tracker and its path throughout the video (i.e. 2, None, 3) -> we know it was first seen in region 2, it passed the intersection (marked None), and then went in region 3

At the end of the video, we'll check our paths, our sources, and increment the counts of cars that passed from one region to another.

### Actual processing of video

Processing of a video depends on the GPU, but may take around 10-15 minutes per video. This cell will finish running in 3h, on average.

In [22]:
%%time

path = f'{mode}/Task3/'
# gt_path = path + "ground-truth/"

# Store Video names
video_names = read_mp4_files_in_directory(path)
print(video_names[:5])

# task3_txt = [f"{video_name.split('.')[0]}.txt" for video_name in video_names]
# print(task3_txt[:5])

for j, video_name in enumerate(video_names):
     # Path to your video file
    video_path = path + video_name
    print(f"{video_name}\n")

    # Create a VideoCapture object
    cap = cv2.VideoCapture(video_path)
    
    # Check if video opened successfully
    if not cap.isOpened(): 
        print("Error opening video stream or file")
        exit()
    
    # Count number of frames
    no_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
    trackers = {}  # {tracker: bounding box}
    tracker_source = {}  # {tracker: first_seen_in_region (int)}
    tracker_path = {} # {tracker: list (regions)}
    
    # Init frame count
    frame_cnt = 0
    
    for _ in tqdm(range(no_frames)):
        
        # Read current frame
        ret, frame = cap.read()
        if not ret:
            break
        
        # Update trackers every 2 frames for efficiency
        if frame_cnt % 2 == 0:
            for tracker in trackers:
                tracker.update(frame)
        
        # Every 4 frames, run YOLO and update currently tracked objects
        if frame_cnt % 4 == 0:
            
            # Use YOLO to detect cars in the current frame every few frames
            new_boxes = identify_vehicles_in_image(frame)
            
            new_trackers = {}
            
            for new_box in new_boxes:
                
                # Try to see if predicted box is close to an existing tracked box
                iou, candidate = get_max_iou_from_candidates(new_box, trackers.values())
                
                # If we have a match, update current tracker
                if iou > 0.02 and candidate:
                    
                    # Update the tracker bounding box as YOLO's preds are more robust
                    tracker = get_key(trackers, candidate)
                    
                    # You want to re-init the trackers with YOLO predictions
                    tracker.init(frame, new_box)
                    
                    # Store new-Trackers to replace trackers at end of prediction
                    new_trackers[tracker] = new_box
                    
                    # Check location of current bounding box (as source)
                    current_region = check_region_task3_bounding_box(frame, new_box)
                    
                    # Check if object has changed region and add to path if so
                    if tracker_path[tracker][-1] != current_region:
                        tracker_path[tracker].append(current_region)
                    
                else:
                    # The object is new, added to tracked objects
                    tracker = cv2.TrackerCSRT_create()
                    
                    # Initialize tracker with first frame and bounding box
                    ok = tracker.init(frame, new_box)
                    
                    # Store the tracker in our trackers dict
                    new_trackers[tracker] = new_box
                    
                    # Check location of current bounding box (as source)
                    source_region = check_region_task3_bounding_box(frame, new_box)
                    
                    # Update tracker_source
                    tracker_source[tracker] = source_region
                    
                    # Update tracker_current
                    tracker_path[tracker] = [source_region]
                
        # Rremove trackers that did not match predicted YOLO boxes (by replacing old trackers with new one)
        trackers = new_trackers    
        
        # DEBUG
#         if frame_cnt % 20 == 0:
#             # Used for debugging the trackers
#             for bbox in trackers.values():
#                 p1 = (int(bbox[0]), int(bbox[1]))
#                 p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
#                 cv2.rectangle(frame, p1, p2, (0,255,0), 2, 1)
#             show_image(frame)
        
        frame_cnt += 1
        
    tracker_path = process_tracker_path(tracker_path)
#     print(tracker_path)
    output_str = compute_score_tracker_path(tracker_path)
    
    # Reconstruct pred file name
    pred_file = video_names[j].split(".")[0] + "_predicted.txt"

    # Save output string to file
    with open(task3_dir_result + pred_file, 'w') as f:
            f.write(output_str)

['01.mp4', '02.mp4', '03.mp4', '04.mp4', '05.mp4']
01.mp4



100%|██████████| 1689/1689 [17:08<00:00,  1.64it/s]


1-2 0
1-3 0
2-1 0
2-3 3
3-1 0
3-2 0
02.mp4



100%|██████████| 2123/2123 [14:54<00:00,  2.37it/s]


1-2 0
1-3 3
2-1 2
2-3 1
3-1 0
3-2 2
03.mp4



100%|██████████| 1748/1748 [12:34<00:00,  2.32it/s]


1-2 0
1-3 1
2-1 0
2-3 1
3-1 5
3-2 2
04.mp4



100%|██████████| 1987/1987 [12:39<00:00,  2.62it/s]


1-2 0
1-3 6
2-1 0
2-3 0
3-1 7
3-2 0
05.mp4



100%|██████████| 2058/2058 [12:19<00:00,  2.78it/s]


1-2 0
1-3 3
2-1 0
2-3 0
3-1 1
3-2 1
06.mp4



100%|██████████| 2063/2063 [11:36<00:00,  2.96it/s]


1-2 0
1-3 1
2-1 1
2-3 1
3-1 1
3-2 1
07.mp4



100%|██████████| 2351/2351 [13:04<00:00,  3.00it/s]


1-2 0
1-3 0
2-1 0
2-3 0
3-1 0
3-2 1
08.mp4



100%|██████████| 1832/1832 [10:04<00:00,  3.03it/s]


1-2 0
1-3 0
2-1 0
2-3 0
3-1 0
3-2 0
09.mp4



100%|██████████| 1324/1324 [13:58<00:00,  1.58it/s]


1-2 1
1-3 3
2-1 0
2-3 0
3-1 3
3-2 2
10.mp4



100%|██████████| 1653/1653 [16:22<00:00,  1.68it/s]


1-2 0
1-3 0
2-1 0
2-3 5
3-1 0
3-2 0
11.mp4



100%|██████████| 1693/1693 [11:04<00:00,  2.55it/s]


1-2 1
1-3 0
2-1 0
2-3 0
3-1 4
3-2 0
12.mp4



100%|██████████| 2006/2006 [11:09<00:00,  3.00it/s]


1-2 0
1-3 2
2-1 0
2-3 0
3-1 4
3-2 1
13.mp4



100%|██████████| 1782/1782 [15:17<00:00,  1.94it/s]


1-2 0
1-3 4
2-1 0
2-3 0
3-1 3
3-2 2
14.mp4



100%|██████████| 2100/2100 [12:11<00:00,  2.87it/s]


1-2 0
1-3 4
2-1 0
2-3 0
3-1 3
3-2 1
15.mp4



100%|██████████| 1269/1269 [08:42<00:00,  2.43it/s]

1-2 0
1-3 1
2-1 1
2-3 0
3-1 0
3-2 0
Wall time: 3h 13min 8s



