In [None]:
import cv2
from matplotlib import pyplot as plt
from ultralytics import YOLO
from dataclasses import dataclass
import numpy as np
from IPython.display import Video, display, Image
import imageio

model = YOLO("models/yolo26s.pt")

In [None]:
print(cv2.__version__)
print(hasattr(cv2, "TrackerCSRT_create"))
# Use CV2 to open images
# im1 = cv2.imread("Inputs/bridge_cars.jpg")
# im1 = cv2.imread("Inputs/car_behind_rails.jpg")
im1 = cv2.imread("Inputs/car_stop.jpg")
results = model.predict(source=im1, save=True, show=False)  # save plotted images

In [None]:
# Print the im1 dimensions:
print(f"Image 1 dimensions: {im1.shape}")


print(type(results))
print(len(results))
print(dir(results[0]))
print(results[0])

In [None]:
print("-" * 38)
print("boxes")
print(results[0].boxes)
print(results[0].boxes.cls)
print(results[0].names)

print([results[0].names[int(_)] for _ in results[0].boxes.cls])
print("-" * 38)

In [None]:
print(im1.size)
# Plot only image and boxes using matplotlib
plt.imshow(im1)
for box in results[0].boxes:
    x1, y1, x2, y2 = box.xyxy[0]
    width, height = x2 - x1, y2 - y1
    rect = plt.Rectangle((x1, y1), width, height, fill=False, color='red', linewidth=2)
    plt.gca().add_patch(rect)
    # Plot class label
    cls = int(box.cls[0])
    plt.text(x1, y1, results[0].names[cls], color='white', fontsize=12, backgroundcolor='red')
plt.axis('off')  # Hide axis
plt.show()

In [None]:
results[0].names

In [None]:
cap = cv2.VideoCapture("Inputs/video.mp4")

frame_iter = 0
while cap.isOpened():
    ret, frame = cap.read()
    frame_iter += 1
    if frame_iter % 30 == 0:
        print(f"Frame {frame_iter}")
    if not ret:
        break

# Print total number of frames
print(f"Total number of frames: {frame_iter}")
# Print the rate of frames per second
fps = cap.get(cv2.CAP_PROP_FPS)
print(f"Frames per second: {fps}")


In [None]:
def create_tracker():
    return cv2.legacy.TrackerCSRT_create()

def create_tracker_specified(tracker_type: str):
    tracker_types = {
        "BOOSTING": cv2.TrackerBoosting_create,
        "MIL": cv2.TrackerMIL_create,
        "KCF": cv2.TrackerKCF_create,
        "TLD": cv2.TrackerTLD_create,
        "MEDIANFLOW": cv2.TrackerMedianFlow_create,
        "GOTURN": cv2.TrackerGOTURN_create,
        "MOSSE": cv2.TrackerMOSSE_create,
        "CSRT": cv2.TrackerCSRT_create,
    }
    if tracker_type in tracker_types:
        return tracker_types[tracker_type]()
    else:
        raise ValueError(f"Unknown tracker type: {tracker_type}")

@dataclass
class Detection:
    '''
    Class for objects to hold detection results. 
    '''

    bbox: np.ndarray  # [x1, y1, x2, y2] - Bounding box on a frame
    conf: float # Confidence score
    cls: int # Class 

class TrackedObject:
    _next_id = 0

    def __init__(self, detection: Detection):
        self.id = TrackedObject._next_id
        TrackedObject._next_id += 1

        x1, y1, x2, y2 = detection.bbox
        self.tracker = create_tracker()
        self.tracker.init(frame, (x1, y1, x2 - x1, y2 - y1))

        self.bbox = detection.bbox
        self.cls = detection.cls
        self.conf = detection.conf

        # Variables to store history (For velocity estimation, etc.)
        self.fame_ids = [] # List of indices of frames where this object was detected
        self.timestamps = [] # List of timestamps corresponding to the frames
        self.centers = [] # List of center points of the bounding boxes

        self.hits = 1          # number of successful matches when associating new detections to existing tracks
        self.age = 0              # total frames alive
        self.missed = 0           # frames since last match

    @classmethod
    def reset_ids(cls):
        cls._next_id = 0

    def predict(self):
        """
        For now: no motion model.
        Later: Kalman filter lives here.
        """
        return self.bbox

    def update_timeticks(self, frame_id:int, fps:float):
        timestamp = frame_id / fps
        self.fame_ids.append(frame_id)
        self.centers.append(bbox_center(self.bbox))
        self.timestamps.append(timestamp)

    def update(self, detection: Detection):
        self.bbox = detection.bbox
        self.conf = detection.conf
        self.missed = 0
        self.age += 1
        self.hits += 1

    def update_from_tracker(self, frame):
        ok, box = self.tracker.update(frame)
        if not ok:
            self.missed += 1
            return False

        x, y, w, h = box
        self.bbox = np.array([x, y, x + w, y + h])
        self.missed = 0
        self.age += 1
        return True

    def mark_missed(self):
        '''
        Mark this object as missed in the current frame.
        '''
        self.missed += 1
        self.age += 1


In [None]:
def iou(boxA, boxB):
    '''
    Method to compute Intersection over Union (IoU) between two bounding boxes.
    Compute area of overlap / area of union
    If intersection is zero, returns 0
    '''
    
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    inter = max(0, xB - xA) * max(0, yB - yA) # area of intersection
    areaA = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) # area of boxA
    areaB = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) # area of boxB

    union = areaA + areaB - inter # area of union
    return inter / union if union > 0 else 0 # Return IoU value


def bbox_center(box):
    '''
    Take a plain average to get the center of a bounding box.
    '''

    x1, y1, x2, y2 = box
    return np.array([(x1 + x2) / 2, (y1 + y2) / 2])


def center_distance(boxA, boxB):
    '''
    Evaluate the Euclidean distance between the centers of two bounding boxes.
    '''
    return np.linalg.norm(bbox_center(boxA) - bbox_center(boxB))

In [None]:
def associate_detections(tracked_objects, detections, iou_threshold=0.3):
    '''
    Method that 
    '''

    matches = [] # list of (track_idx, detection_idx) tuples
    unmatched_tracks = set(range(len(tracked_objects))) # set of unmatched track indices
    unmatched_dets = set(range(len(detections))) # set of unmatched detection indices

    for t_idx, track in enumerate(tracked_objects):
        best_iou = 0
        best_d_idx = None # Best detection index

        for d_idx in unmatched_dets:
            score = iou(track.predict(), detections[d_idx].bbox)
            if score > best_iou:
                best_iou = score
                best_d_idx = d_idx

        if best_iou > iou_threshold:
            matches.append((t_idx, best_d_idx))
            unmatched_tracks.remove(t_idx)
            unmatched_dets.remove(best_d_idx)

    return matches, unmatched_tracks, unmatched_dets

In [None]:
def overlaps_existing_track(detection, tracked_objects, iou_threshold=0.75):
    for track in tracked_objects:
        if iou(track.bbox, detection.bbox) > iou_threshold:
            return True
    return False

def bbox_length_px(bbox):
    x1, y1, x2, y2 = bbox
    w = x2 - x1
    h = y2 - y1
    return max(w, h)

def estimate_speed(track, car_length_m=4.3):
    if len(track.centers) < 2:
        return None

    position_1 = track.centers[-2]
    position_2 = track.centers[-1]
    dt = track.timestamps[-1] - track.timestamps[-2]

    dposition_px = np.linalg.norm(position_2 - position_1)

    L_px = bbox_length_px(track.bbox)
    meters_per_pixel = car_length_m / L_px

    speed_mps = dposition_px * meters_per_pixel / dt
    return speed_mps

def smooth_speed(prev, new, alpha=0.3):
    if prev is None:
        return new
    return alpha * new + (1 - alpha) * prev

In [None]:
cap = cv2.VideoCapture("Inputs/video.mp4")

fps = cap.get(cv2.CAP_PROP_FPS)
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Get length of the video
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"Total number of frames: {length}. ")
video_duration = length / fps
print(f"Video duration (s): {video_duration:.2f}. ")
print(f'Video FPS: {fps:.2f}, Width: {width}, Height: {height}')

# writer = cv2.VideoWriter(
#     "Outputs/tracked_output.mp4",
#     cv2.VideoWriter_fourcc(*"mp4v"),
#     fps,
#     (width, height)
# )

# fourcc = cv2.VideoWriter_fourcc(*"MJPG")
# fourcc = cv2.VideoWriter_fourcc(*"mp4v")
fourcc = cv2.VideoWriter_fourcc(*"MP4V")
# fourcc = cv2.VideoWriter_fourcc(*"XVID")
# 0x31637661  # 'avc1' in little endian
# fourcc = 0x31637661  # 'avc1' in little endian
writer = cv2.VideoWriter(
    "Outputs/tracked_output.mp4",  # <-- .avi !
    fourcc,
    fps,
    (width, height),
)


gif_writer = imageio.get_writer(
    "Outputs/tracked_output.gif",
    mode="I",
    fps=10,          # lower FPS - better visible what's going on
    loop=0,   # 0 = infinite loop
)

tracked_objects = []
# TrackedObject._next_id = 0
TrackedObject.reset_ids()
# For FPS=30
MAX_MISSED = 30 # Max missed frames before deleting track
YOLO_INTERVAL = 15 # Run YOLO every N frames

In [None]:
frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    use_yolo = frame_idx % YOLO_INTERVAL == 0

    if use_yolo:


        results = model.predict(frame, conf=0.3, verbose=False)[0]

        detections = []
        if results.boxes is not None:
            for box, cls, conf in zip(
                results.boxes.xyxy.cpu().numpy(),
                results.boxes.cls.cpu().numpy(),
                results.boxes.conf.cpu().numpy(),
            ):
                found_object_class = results.names[int(cls)]
                if found_object_class == "car":
                    detections.append(
                        Detection(bbox=box, conf=conf, cls=int(cls))
                    )
                # else: 
                    # print(f'Found object of class: [{found_object_class}]')


            # --- association ---
            matches, unmatched_tracks, unmatched_dets = associate_detections(
                tracked_objects, detections
            )

            # update matched tracks
            for t_idx, d_idx in matches:
                tracked_objects[t_idx].update(detections[d_idx])

            # mark missed tracks
            for t_idx in unmatched_tracks:
                tracked_objects[t_idx].mark_missed()

            # create new tracks
            # for d_idx in unmatched_dets:
                # tracked_objects.append(TrackedObject(detections[d_idx]))
            
            for d_idx in unmatched_dets:
                det = detections[d_idx]
                # With condition to avoid duplicates
                if not overlaps_existing_track(det, tracked_objects):
                    tracked_objects.append(TrackedObject(det))

            # remove dead tracks
            tracked_objects = [
                t for t in tracked_objects if t.missed <= MAX_MISSED
            ]


        # --- draw all detections ---
        for det in detections:
            x1, y1, x2, y2 = map(int, det.bbox)
            # BRG
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)

        # --- draw unmatched detections (orange) ---
        for d_idx in unmatched_dets:
            det = detections[d_idx]
            x1, y1, x2, y2 = map(int, det.bbox)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)

    else:

        # Tracker-only update
        for track in tracked_objects:
            track.update_from_tracker(frame)

        # No detections: mark all as missed
        for track in tracked_objects:
            track.mark_missed()

        # remove dead tracks
        tracked_objects = [
            t for t in tracked_objects if t.missed <= MAX_MISSED
        ]


    if frame_idx%30 == 0:
        # Create a sorted list of IOU values between all tracked objects
        iou_values = []
        for i in range(len(tracked_objects)):
            for j in range(i + 1, len(tracked_objects)):
                iou_score = iou(tracked_objects[i].bbox, tracked_objects[j].bbox)
                iou_values.append((tracked_objects[i].id, tracked_objects[j].id, iou_score))
        iou_values.sort(key=lambda x: x[2], reverse=True)
        print(f'Frame {frame_idx}: Top IOU values between tracked objects:')
        for id1, id2, score in iou_values[:5]:  # Print top 5 IOU values
            print(f'  IDs {id1} & {id2}: IOU = {score:.4f}')



    # On top right corner, print frame idx, number of active tracks and info if it's yolo frame or not:
    info_text = f'Frame {frame_idx}; Duration {frame_idx/fps:.2f}s; Active Tracks: {len(tracked_objects)}; ' + \
                ('[YOLO]' if use_yolo else '[Trac]')
    cv2.putText(
        frame,
        info_text,
        (10, 30),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.7,
        (0, 255, 255),
        2,
    )
    


    if frame_idx % 10 == 0:
        print(f'Frame {frame_idx}; Duration {frame_idx/fps:.2f}: {len(tracked_objects)} active tracks')
        # Print number of matches, unmatched tracks, unmatched detections
        print(f'  Matches: {len(matches)}, Unmatched Tracks: {len(unmatched_tracks)}, Unmatched Detections: {len(unmatched_dets)}')


    # --- draw ---
    for obj in tracked_objects:
        x1, y1, x2, y2 = map(int, obj.bbox)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(
            frame,
            f"ID {obj.id}",
            (x1, y1 - 5),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.6,
            (0, 255, 0),
            2,
        )
    
    # Update timeticks for all tracked objects
    for obj_id, obj in enumerate(tracked_objects):
        obj.update_timeticks(frame_idx, fps)

        obj_speed = estimate_speed(obj)
        if frame_idx % 10 == 0:
            if obj_speed is not None:
                print(f'{obj.id} : {obj_speed:.2f} [m/s] : {(obj_speed*3.6):.2f} [km/h]')

    

    if frame_idx % 4 == 0:
        gif_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    gif_writer.append_data(gif_frame)

    # Add early stop for testing
    # if frame_idx >= 180:
    if frame_idx >= 30:
        break

    writer.write(frame)
    frame_idx += 1

cap.release()
writer.release()
gif_writer.close()

print(f"Writer opened: {writer.isOpened()}")


In [None]:
display(Image("Outputs/tracked_output.gif", embed=True))


In [None]:
display(Video("Inputs/video.mp4", embed=True))


In [None]:
# cv2.legacy.TrackerKCF_create()
# cv2.TrackerKCF.create()
# cv2.TrackerCSRT_create()
# cv2.TrackerCSRT_create()
cv2.Tracker_create()
# cv2.TrackerKCF_create()
print(cv2.__version__)
print(cv2.getBuildInformation())



# The issue is from opencv-python, uninstall it and install cv2 using opencv-contrib-python. That solved it!


# tracker_type = 'KCF'
# # Initialize tracker
# if tracker_type == "BOOSTING":
#     tracker = cv2.legacy.TrackerBoosting.create()
# elif tracker_type == "MIL":
#     tracker = cv2.legacy.TrackerMIL.create()
# elif tracker_type == "KCF":
#     tracker = cv2.TrackerKCF.create()
# elif tracker_type == "CSRT":
#     tracker = cv2.legacy.TrackerCSRT.create()
# elif tracker_type == "TLD":
#     tracker = cv2.legacy.TrackerTLD.create()
# elif tracker_type == "MEDIANFLOW":
#     tracker = cv2.legacy.TrackerMedianFlow.create()
# elif tracker_type == "GOTURN":
#     tracker = cv2.TrackerGOTURN.create()
# else:
#     tracker = cv2.legacy.TrackerMOSSE.create()