In [4]:
import os
import cv2
import numpy as np
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter
from ultralytics import YOLO
from collections import defaultdict

In [6]:
# Kalman Filter configuration
dt = 1  # Time step
kf_template = KalmanFilter(dim_x=8, dim_z=5)
kf_template.F = np.array([
    [1, 0, 0, 0, 0, dt,  0,  0],
    [0, 1, 0, 0, 0,  0, dt,  0],
    [0, 0, 1, 0, 0,  0,  0,  0],
    [0, 0, 0, 1, 0,  0,  0,  0],
    [0, 0, 0, 0, 1,  0,  0, dt],
    [0, 0, 0, 0, 0,  1,  0,  0],
    [0, 0, 0, 0, 0,  0,  1,  0],
    [0, 0, 0, 0, 0,  0,  0,  1]
])
kf_template.H = np.array([
    [1, 0, 0, 0, 0, 0, 0, 0],  # x
    [0, 1, 0, 0, 0, 0, 0, 0],  # y
    [0, 0, 1, 0, 0, 0, 0, 0],  # w
    [0, 0, 0, 1, 0, 0, 0, 0],  # h
    [0, 0, 0, 0, 1, 0, 0, 0]   # Z
])
kf_template.Q = np.diag([1, 1, 10, 10, 1, 1, 1, 10])
kf_template.R = np.diag([10, 10, 100, 100, 10])

# Tracking parameters
tracked_objects = {}
next_object_id = 0
max_distance = 16
max_frames_to_skip = 50
min_confidence = 0.46
confirmation_threshold = 4

In [12]:
# Load YOLO model
model = YOLO('yolov8s.pt')

# Load images
path_to_left = '34759_final_project_rect/seq_03/image_02/data/'
path_to_right = '34759_final_project_rect/seq_03/image_03/data/'
left_images = sorted(os.listdir(path_to_left))
right_images = sorted(os.listdir(path_to_right))

assert len(left_images) == len(right_images), "Left and right image sequences must have the same length."

FileNotFoundError: [Errno 2] No such file or directory: '34759_final_project_rect/seq_03/image_02/data/'

In [8]:
# Camera parameters
Camera_Left = np.array([[961.60052669, 0, 687.80277399],
                        [0, 963.33029715, 235.14590179],
                        [0, 0, 1]])
Camera_Right = np.array([[906.92602542, 0, 683.78933836],
                         [0, 909.61530614, 234.43994747],
                         [0, 0, 1]])
focal_length = Camera_Left[0, 0]
baseline = 0.54

# StereoBM object
stereo = cv2.StereoSGBM_create(minDisparity=0,
                                numDisparities=16 * 10,
                                blockSize=15,
                                P1=8 * 3 * 5 ** 2,
                                P2=32 * 3 * 5 ** 2,
                                disp12MaxDiff=1,
                                uniquenessRatio=15,
                                speckleWindowSize=50,
                                speckleRange=2,
                                preFilterCap=63,
                                mode=cv2.STEREO_SGBM_MODE_SGBM_3WAY)

In [None]:
# Metrics
total_overlap_ratio = 0
frame_count = 0

# Add class mapping for YOLO's class IDs to target categories
class_map = {
    0: "Pedestrian",
    1: "Cyclist",
    2: "Car"
}

In [None]:
# Main loop
for idx, (left_img_name, right_img_name) in enumerate(zip(left_images, right_images)):
    left_img_path = os.path.join(path_to_left, left_img_name)
    right_img_path = os.path.join(path_to_right, right_img_name)

    img_left = cv2.imread(left_img_path)
    img_right = cv2.imread(right_img_path)
    
    if img_left is None or img_right is None:
        print(f"Skipping invalid image pair: {left_img_path}, {right_img_path}")
        continue

    # Compute disparity map
    gray_left, gray_right = cv2.cvtColor(img_left, cv2.COLOR_BGR2GRAY), cv2.cvtColor(img_right, cv2.COLOR_BGR2GRAY)
    disparity = stereo.compute(gray_left, gray_right).astype(np.float32) / 16.0
    disparity[disparity <= 0] = np.nan

    # Object detection
    results = model(img_left)
    boxes = results[0].boxes.xyxy.cpu().numpy()  # Bounding box coordinates
    confidences = results[0].boxes.conf.cpu().numpy()  # Confidence scores
    classes = results[0].boxes.cls.cpu().numpy()  # Class IDs

    detections_list = []
    for i in range(len(confidences)):
        if confidences[i] >= min_confidence and int(classes[i]) in [0, 1, 2]:
            x1, y1, x2, y2 = boxes[i]
            width, height = x2 - x1, y2 - y1
            x_center, y_center = x1 + width / 2, y1 + height / 2
            disp = disparity[int(y_center), int(x_center)] if not np.isnan(disparity[int(y_center), int(x_center)]) else np.nan
            if np.isnan(disp):
                continue
            Z = (focal_length * baseline) / disp

            # Add detection to the list with mapped class name
            detections_list.append({
                'measurement': np.array([[x_center], [y_center], [width], [height], [Z]]),
                'class_id': int(classes[i]),
                'class_name': class_map.get(int(classes[i]), "Unknown")
            })
    
    # Cyclist-Pedestrian Merging Logic
    merged_indices = set()
    
    for i, cyclist in enumerate(detections_list):
        if cyclist["class_name"] == "Cyclist" and i not in merged_indices:
            # Cyclist bounding box
            x_c, y_c, w_c, h_c, z_c = cyclist['measurement'].flatten()
            cyclist_bbox = [x_c - w_c / 2, y_c - h_c / 2, x_c + w_c / 2, y_c + h_c / 2]
    
            for j, pedestrian in enumerate(detections_list):
                if pedestrian["class_name"] == "Pedestrian" and j not in merged_indices:
                    # Pedestrian bounding box
                    x_p, y_p, w_p, h_p, _ = pedestrian['measurement'].flatten()
                    pedestrian_bbox = [x_p - w_p / 2, y_p - h_p / 2, x_p + w_p / 2, y_p + h_p / 2]
    
                    # Check if the Pedestrian overlaps or is near the Cyclist
                    horizontal_overlap = (cyclist_bbox[0] < pedestrian_bbox[2] and cyclist_bbox[2] > pedestrian_bbox[0])
                    vertical_overlap = (cyclist_bbox[1] < pedestrian_bbox[3] and cyclist_bbox[3] > pedestrian_bbox[1])
                    size_match = (w_c / 2) < w_p < (w_c * 2)
    
                    if horizontal_overlap and vertical_overlap and size_match:
                        # Merge Pedestrian into Cyclist
                        cyclist['measurement'][1] = min(cyclist['measurement'][1], pedestrian['measurement'][1])  # Update y_min
                        cyclist['measurement'][3] = max(cyclist['measurement'][3], pedestrian['measurement'][3])  # Update y_max
                        cyclist['measurement'][0] = min(cyclist['measurement'][0], pedestrian['measurement'][0])  # Update x_min
                        cyclist['measurement'][2] = max(cyclist['measurement'][2], pedestrian['measurement'][2])  # Update x_max
    
                        merged_indices.add(j)  # Mark pedestrian as merged
                        break
    
    # Remove all merged Pedestrians
    detections_list = [det for idx, det in enumerate(detections_list) if idx not in merged_indices]
    
    # Remove overlaps after merging Cyclists and Pedestrians
    final_detections = []
    
    for i, det in enumerate(detections_list):
        if det["class_name"] == "Pedestrian":
            # Check if this Pedestrian overlaps with any Cyclist
            is_overlapping = False
            for cyclist in detections_list:
                if cyclist["class_name"] == "Cyclist":
                    # Cyclist bounding box
                    x_c, y_c, w_c, h_c, _ = cyclist['measurement'].flatten()
                    cyclist_bbox = [x_c - w_c / 2, y_c - h_c / 2, x_c + w_c / 2, y_c + h_c / 2]
    
                    # Pedestrian bounding box
                    x_p, y_p, w_p, h_p, _ = det['measurement'].flatten()
                    pedestrian_bbox = [x_p - w_p / 2, y_p - h_p / 2, x_p + w_p / 2, y_p + h_p / 2]
    
                    # Check if Pedestrian overlaps Cyclist
                    horizontal_overlap = (cyclist_bbox[0] < pedestrian_bbox[2] and cyclist_bbox[2] > pedestrian_bbox[0])
                    vertical_overlap = (cyclist_bbox[1] < pedestrian_bbox[3] and cyclist_bbox[3] > pedestrian_bbox[1])
    
                    if horizontal_overlap and vertical_overlap:
                        is_overlapping = True
                        break
    
            # Keep the Pedestrian only if it does not overlap with any Cyclist
            if not is_overlapping:
                final_detections.append(det)
        else:
            # Keep all non-Pedestrian detections (e.g., Cyclists, Cars)
            final_detections.append(det)
    
    detections_list = final_detections

    # Kalman filter prediction step
    for obj_id, obj in tracked_objects.items():
        obj['kf'].predict()
        obj['x_pred'], obj['P_pred'] = obj['kf'].x.copy(), obj['kf'].P.copy()

    # Association via Linear Sum Assignment
    track_ids, detection_ids = list(tracked_objects.keys()), list(range(len(detections_list)))
    cost_matrix = np.zeros((len(tracked_objects), len(detections_list)))

    for i, obj_id in enumerate(track_ids):
        for j, detection in enumerate(detections_list):
            kf = tracked_objects[obj_id]['kf']
            innovation = detection['measurement'] - kf.H @ kf.x
            innovation = innovation.reshape(-1, 1)
            S = kf.H @ kf.P @ kf.H.T + kf.R
            
            # Compute the Mahalanobis distance
            mahalanobis_distance = innovation.T @ np.linalg.inv(S) @ innovation
    
            # Ensure it is a scalar
            cost_matrix[i, j] = np.sqrt(mahalanobis_distance.item())

    row_ind, col_ind = linear_sum_assignment(cost_matrix)

    # Update matched tracks
    matched_pairs = []
    for i, j in zip(row_ind, col_ind):
        if cost_matrix[i, j] < max_distance:
            obj_id, detection = track_ids[i], detections_list[j]
            tracked_objects[obj_id]['kf'].update(detection['measurement'])
            tracked_objects[obj_id]['hit_streak'] += 1
            tracked_objects[obj_id]['class_name'] = detection['class_name']  # Update class name
            matched_pairs.append((obj_id, j))
    
    # Update unmatched tracks
    for obj_id in set(track_ids) - set([x[0] for x in matched_pairs]):
        tracked_objects[obj_id]['age'] += 1
        if tracked_objects[obj_id]['age'] > max_frames_to_skip:
            del tracked_objects[obj_id]

    # Add new tracks for unmatched detections
    for j in set(detection_ids) - set([x[1] for x in matched_pairs]):
        kf = KalmanFilter(dim_x=8, dim_z=5)
        kf.F, kf.H, kf.Q, kf.R = kf_template.F, kf_template.H, kf_template.Q, kf_template.R
        detection = detections_list[j]
        kf.x = np.vstack((detection['measurement'], np.zeros((3, 1))))
        kf.P = np.diag([1000] * 8)
        tracked_objects[next_object_id] = {
            'kf': kf,
            'hit_streak': 1,
            'age': 0,
            'class_id': detection['class_id'],
            'class_name': detection['class_name']  # Store class name
        }
        next_object_id += 1

    # Visualization
    for obj_id, obj in tracked_objects.items():
        if obj['hit_streak'] >= confirmation_threshold:
            x, y, w, h = obj['kf'].x[:4, 0]
            x1, y1, x2, y2 = int(x - w / 2), int(y - h / 2), int(x + w / 2), int(y + h / 2)
            
            if obj['class_name'] == "Pedestrian":
                color = (0, 255, 0)  # Red for Pedestrians
            elif obj['class_name'] == "Cyclist":
                color = (0, 0, 255)  # Green for Cyclists
            elif obj['class_name'] == "Car":
                color = (255, 0, 0)  # Blue for Cars
            else:
                color = (255, 255, 255)  # White for Unknown (if any)
    
            cv2.rectangle(img_left, (x1, y1), (x2, y2), color, 2)
            
            label = f"ID {obj_id} | {obj['class_name']} | Z: {obj['kf'].x[4, 0]:.2f}"
            cv2.putText(img_left, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    
    cv2.imshow('Tracking', img_left)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cv2.destroyAllWindows()

NameError: name 'left_images' is not defined