## ME5413: Autonomous Mobile Robot  

### Homework 1: Perception  
Due date: 22 February 2024 (Thurs) - 2359 


### Task 1.1 Single-Object Tracking 


In [1]:
# IMPORTANT: Run these blocks one by one to avoid errors
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import time

# Read the initial template area
# x, y is the top-left corner of the template area
# w, h is the width and height of the template area
firsttrack_list = []
frame_paths_list = []
template_init_list = []
prev_list = []
for seq_number in range(1, 6):
    with open(f'data/Task 1/seq_{seq_number}/firsttrack.txt', 'r') as file:
        x, y, w, h = map(int, file.read().strip().split(','))
    print(f'Sequence {seq_number}: (x, y, w, h) = ({x}, {y}, {w}, {h})')
    firsttrack_list.append((x, y, w, h))
    # path init
    template_path = f'data/Task 1/seq_{seq_number}/img/001.jpg'
    template_img = cv2.imread(template_path)
    template_init = template_img[y:y+h, x:x+w]
    template_init_list.append(template_init)

    frame_paths = [f'data/Task 1/seq_{seq_number}/img/{i:03}.jpg' for i in range(1, 101)]
    frame_paths_list.append(frame_paths)

prev_list = firsttrack_list

    
def crop_result(frame, match_x, match_y, match_width, match_hight):
    # Preprocess the matched region
    matched_region = frame[match_y:match_y+match_hight,match_x:match_x+match_width]
    matched_region_gray = cv2.cvtColor(matched_region, cv2.COLOR_BGR2GRAY)
    blurred = cv2.GaussianBlur(matched_region_gray, (7, 7), 0)
    equalized = cv2.equalizeHist(blurred)
    # Edge detection
    edge_detected_image = cv2.Canny(equalized, 100, 200)
    contours, _ = cv2.findContours(edge_detected_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # If contours are found, update the size based on the largest contour
    if contours:
        largest_contour = max(contours, key=cv2.contourArea)
        x_b, y_b, w_b, h_b = cv2.boundingRect(largest_contour)
        # Cropped region can't be smaller than 75% of the original area
        original_area = match_hight * match_width
        min_area_threshold = original_area * 0.75
        if w_b * h_b < min_area_threshold:
            return match_x, match_y, match_width, match_hight
        x_c, y_c =match_x + x_b, match_y + y_b
        w_c, h_c = w_b, h_b
    else:
        x_c, y_c = match_x, match_y
        w_c, h_c = match_width, match_hight 

    return x_c, y_c, w_c, h_c

# Calculate the direction vector
def calculate_direction_vector(prev_prev, prev):
    center_prev_prev = (prev_prev[0] + prev_prev[2] / 2, prev_prev[1] + prev_prev[3] / 2)
    center_prev = (prev[0] + prev[2] / 2, prev[1] + prev[3] / 2)
    direction_vector = (center_prev[0] - center_prev_prev[0], center_prev[1] - center_prev_prev[1])
    return direction_vector

# Timing-based dynamic template match
def timing_base_dynamic_template_match(frame, template, prev, prev_prev, search_padding=10, direction_scale=0.8):
    x_prev, y_prev, w_prev, h_prev = prev

    # Calculate the direction vector and its module
    direction_vector = calculate_direction_vector(prev_prev, prev)
    module = np.linalg.norm(direction_vector)
    # Generate the dynamic padding and direction scale
    dynamic_padding = int(search_padding + module)
    dynamic_direction_scale = direction_scale
    if module < 10:
        dynamic_direction_scale = 0.4
    
    # According to the direction vector, calculate the start point of the search window
    search_x_start = max(0, int(x_prev + direction_vector[0]*dynamic_direction_scale))
    search_y_start = max(0, int(y_prev + direction_vector[1]*dynamic_direction_scale))
    
    # Define the size of the search window
    search_w = int(w_prev + dynamic_padding)
    search_h = int(h_prev + dynamic_padding)
    # Ensure the search window is within the frame
    frame_height, frame_width = frame.shape[:2]
    search_w = min(search_w, frame_width - search_x_start)
    search_h = min(search_h, frame_height - search_y_start)
    
    # Adjust the start point of the search window
    search_x = max(0, search_x_start + w_prev // 2 - search_w // 2)
    search_y = max(0, search_y_start + h_prev // 2 - search_h // 2)
    # Ensure the search window is within the frame
    # Or return the previous result
    search_region = frame[search_y:search_y + search_h, search_x:search_x + search_w]
    if search_w < template.shape[1] or search_h < template.shape[0]:
        return prev
    
    # Template match
    result = cv2.matchTemplate(search_region, template, cv2.TM_CCOEFF_NORMED)
    _, _, _, max_loc = cv2.minMaxLoc(result)

    # Calculate the matched region
    match_width, match_hight = int(template.shape[1]), int(template.shape[0])
    match_x, match_y = max_loc[0] + search_x, max_loc[1] + search_y
    # Crop the matched region based on the contours
    x_c, y_c, w_c, h_c = crop_result(frame, match_x, match_y, match_width, match_hight)
    x_d, y_d, w_d, h_d = x_c, y_c, w_c, h_c
    return x_d, y_d, w_d, h_d

Sequence 1: (x, y, w, h) = (173, 294, 121, 190)
Sequence 2: (x, y, w, h) = (240, 25, 110, 315)
Sequence 3: (x, y, w, h) = (747, 243, 81, 204)
Sequence 4: (x, y, w, h) = (465, 3, 269, 513)
Sequence 5: (x, y, w, h) = (160, 463, 70, 168)


Using Template Matching


In [2]:
tm_lists = []
kalman_array = []
for seq_idx in range(5):
    output_lines = []
    tm_list = []
    prev = prev_list[seq_idx]  # Initialize the previous result
    prev_prev = prev  # Initialize the previous previous result

    template = template_init_list[seq_idx]
    start_time = time.time()
    for frame_idx, path in enumerate(frame_paths_list[seq_idx]):
        frame = cv2.imread(path)
        if frame_idx == 0:
            x_d, y_d, width, height = prev
        else:
            x_d, y_d, width, height = timing_base_dynamic_template_match(frame, template, prev, prev_prev)
        tm_list.append((x_d, y_d, width, height))
        output_lines.append(f'{x_d},{y_d},{width},{height}\n')
        # Update the previous and prev_prev
        prev_prev = prev
        prev = (x_d, y_d, width, height)
    end_time = time.time()
    duration = end_time - start_time
    print(f"The runtime of seq_{seq_idx+1} through dynamic method are {duration:.2f} seconds")
    # Write the result to the file
    with open(f'data/Task 1/seq_{seq_idx+1}/trajectory.txt', 'w') as file:
        file.writelines(output_lines)
    tm_lists.append(tm_list)


The runtime of seq_1 through dynamic method are 0.70 seconds
The runtime of seq_2 through dynamic method are 0.28 seconds
The runtime of seq_3 through dynamic method are 0.54 seconds
The runtime of seq_4 through dynamic method are 0.43 seconds
The runtime of seq_5 through dynamic method are 0.47 seconds


Using Kalman Filter


In [3]:
ground_truth_list = []
for seq_number in range(1, 6):
    ground_truth = []
    with open(f'data/Task 1/seq_{seq_number}/groundtruth.txt', 'r') as file:
        for line in file:
            ground_truth.append(list(map(int, line.strip().split(','))))
    ground_truth_list.append(ground_truth)

for seq_idx in range(5):
    # Initialize Kalman filter
    kalman = cv2.KalmanFilter(4, 2) # 4: state, 2: measurement
    kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32) # 
    kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
    kalman.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32) * 0.03
    kalman.measurementNoiseCov = np.array([[1, 0], [0, 1]], np.float32) * 1
    # Initialize the state
    x_init, y_init, _, _ = firsttrack_list[seq_idx]
    kalman.statePre = np.array([x_init, y_init, 0, 0], np.float32)
    kalman.statePost = np.array([x_init, y_init, 0, 0], np.float32)
    template = template_init_list[seq_idx]

    """
        uncomment the following part to change kalman to original template match
    """
    # output_lines = []
    # start_time = time.time()
    for i, path in enumerate(frame_paths_list[seq_idx]):
        # 1. Predict step
        prediction = kalman.predict()
        # gt_x, gt_y, gt_w, gt_h = tm_lists[seq_idx][i]
        gt_x, gt_y, gt_w, gt_h = ground_truth_list[seq_idx][i]
        pred_x, pred_y = int(prediction[0]), int(prediction[1])

        # 2. Correct and update step
        measurement = np.array([gt_x, gt_y], np.float32)
        corrected_state = kalman.correct(measurement)
        final_pred_x, final_pred_y = int(corrected_state[0]), int(corrected_state[1])

        """
            uncomment the following part to change kalman to original template match
        """
        # frame = cv2.imread(path)
        # result = cv2.matchTemplate(frame, template, cv2.TM_CCOEFF_NORMED)
        # _, _, _, max_loc = cv2.minMaxLoc(result)
        # final_pred_x, final_pred_y = max_loc
        # gt_w, gt_h = template.shape[1], template.shape[0]
        # output_lines.append(f"{final_pred_x},{final_pred_y},{gt_w},{gt_h}\n")
    # end_time = time.time()
    # duration = end_time - start_time
    # print(f"The runtime of seq_{seq_idx+1} through original method are {duration:.2f} seconds")
    # Write the result to the file
    with open(f'data/Task 1/seq_{seq_idx+1}/kalman_trajectory.txt', 'w') as file:
        file.writelines(output_lines)


Evaluate the performance of the Single object tracking algorithm.

In [4]:
def cal_iou(frame, ground_truth):
    x_r, y_r, w_r, h_r = frame
    x_t, y_t, w_t, h_t = ground_truth
    # Calculate the overlap area
    top_left_overlap = (max(x_t, x_r), max(y_t, y_r))
    bottom_right_overlap = (min(x_t + w_t, x_r + w_r), min(y_t + h_t, y_r + h_r))
    overlap_w = max(0, bottom_right_overlap[0] - top_left_overlap[0])
    overlap_h = max(0, bottom_right_overlap[1] - top_left_overlap[1])
    overlap_area = overlap_w * overlap_h
    union_area = w_t * h_t + w_r * h_r - overlap_area
    # Calculate the IoU
    iou = overlap_area / union_area if union_area > 0 else 0
    
    return iou

# Calculate the TP, FP, FN
def cal_results(iou, TP, FP, FN):
    if iou == 0:
        FN += 1
    else:
        if iou >= 0.5:
            TP += 1
        else:
            FP += 1
            FN += 1
    return TP, FP, FN

# Calculate the metrics of recall, precision, fscore, and average IoU
def cal_metrics(TP, FP, FN, iou_list):
    # recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    # fscore = 2 * recall * precision / (recall + precision) if (recall + precision) > 0 else 0
    average_iou = sum(iou_list) / len(iou_list) if iou_list else 0
    
    return precision, average_iou

# Draw the IoU curve and add the metrics
def draw_plt(d_metrics, k_metrics, detect_iou_list, kalman_iou_list):
    d_precision, d_average_iou = d_metrics
    k_precision, k_average_iou = k_metrics

    # Add the text labels
    plt.plot(detect_iou_list, label='Dynamic IoU')
    plt.plot(kalman_iou_list, label='Original IoU')
    plt.legend()
    plt.xlabel('Frame')
    plt.ylabel('IoU')
    plt.title('IoU Over Frames')
    # Add and adjust the text labels
    plt.gcf().text(0.1, 0.95, f'Dynamic - Precision: {d_precision:.2f}, Avg IoU: {d_average_iou:.2f}', fontsize=9)
    plt.gcf().text(0.1, 0.92, f'Original - Precision: {k_precision:.2f}, Avg IoU: {k_average_iou:.2f}', fontsize=9)
    plt.gcf().subplots_adjust(top=0.85)

    # Create the output directory if it doesn't exist
    output_dir = 'data/Task 1/results'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    # Define the output path
    output_path = os.path.join(output_dir, f'seq_{seq_number}_plt.png')

    # Save the plot
    plt.savefig(output_path)
    plt.close()

detect_list, kalman_list = [], []
for seq_number in range(1, 6):
    with open(f'data/Task 1/seq_{seq_number}/trajectory.txt', 'r') as file:
        # Read the trajectory file and store the results in detect_list
        detect_array = []
        for line in file:
            x_d, y_d, w_d, h_d = map(int, line.strip().split(','))
            detect_array.append([x_d, y_d, w_d, h_d])
        detect_list.append(detect_array)

    with open(f'data/Task 1/seq_{seq_number}/kalman_trajectory.txt', 'r') as file:
        # Read the trajectory file and store the results in kalman_list
        kalman_array = []
        for line in file:
            x_d, y_d, w_d, h_d = map(int, line.strip().split(','))
            kalman_array.append([x_d, y_d, w_d, h_d])
        kalman_list.append(kalman_array)

    with open(f'data/Task 1/seq_{seq_number}/groundtruth.txt', 'r') as file:
        detect_iou_list, kalman_iou_list = [], []
        d_FP, d_TP, d_FN = 0, 0, 0
        k_FP, k_TP, k_FN = 0, 0, 0
        for index, line in enumerate(file):
            x_t, y_t, w_t, h_t = map(int, line.strip().split(','))
            if index < len(detect_array) :
                detect = detect_array[index]
                kalman = kalman_array[index]
                # Calculate the IoU
                detect_iou = cal_iou(detect, [x_t, y_t, w_t, h_t])
                kalman_iou = cal_iou(kalman, [x_t, y_t, w_t, h_t])
                # Store the IoU in the iou list
                detect_iou_list.append(detect_iou)
                kalman_iou_list.append(kalman_iou)
                # Calculate the TP, FP, FN
                d_TP, d_FP, d_FN = cal_results(detect_iou, d_TP, d_FP, d_FN)
                k_TP, k_FP, k_FN = cal_results(kalman_iou, k_TP, k_FP, k_FN)
        # Calculate the metrics
        d_metrics = cal_metrics(d_TP, d_FP, d_FN, detect_iou_list)
        k_metrics = cal_metrics(k_TP, k_FP, k_FN, kalman_iou_list)
        # Draw the IoU curve and add the metrics
        draw_plt(d_metrics, k_metrics, detect_iou_list, kalman_iou_list)


Visualise the results as well. 


In [5]:
for seq_idx in range(5):
    video = [] # Initialize the video list
    frame_paths = frame_paths_list[seq_idx]
    # Read the first frame to get the width and height
    width, height = cv2.imread(frame_paths[0]).shape[1], cv2.imread(frame_paths[0]).shape[0]
    for frame_idx, path in enumerate(frame_paths):
        detect_frame = detect_list[seq_idx][frame_idx]
        kalman_frame = kalman_list[seq_idx][frame_idx]
        frame = cv2.imread(path)

        # Extract the coordinates of the detected and kalman bounding boxes
        detect_top_left = detect_frame[:2]
        detect_width_height = detect_frame[2:4]
        detect_bottom_right = (detect_top_left[0] + detect_width_height[0], detect_top_left[1] + detect_width_height[1])
        kalman_top_left = kalman_frame[:2]
        kalman_width_height = kalman_frame[2:4]
        kalman_bottom_right = (kalman_top_left[0] + kalman_width_height[0], kalman_top_left[1] + kalman_width_height[1])

        # Mark the matching area with a rectangular box of the detected and kalman bounding boxes
        cv2.rectangle(frame, detect_top_left, detect_bottom_right, (0, 255, 0), 2)
        cv2.rectangle(frame, kalman_top_left, kalman_bottom_right, (0, 0, 255), 2)
        video.append(frame)

    # visualization video save
    output_dir = 'data/Task 1/results'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    output_path = os.path.join(output_dir, f'seq_{seq_idx+1}_video.mp4')
    # Create a video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_path, fourcc, 30, (width, height))

    # Write the frames to the video
    for frame in video:
        video_writer.write(frame)
    video_writer.release()


Propose Improvements to the work if possible:

The pixle-comparison based template matching algorithm is not robust to changes in scale, rotation, and illumination.
CNN, RNN, R-CNN, YOLO, SSD, and other deep learning-based object detection and tracking algorithms should be used to improve the performance.