In [27]:
from google.colab import drive
import os
import sys

drive.mount('/content/drive')

#project root path
PROJECT_ROOT = '/content/drive/My Drive/car-behaviour-project'

#key dictionaries defined
#input for this notebook:
EXTRACTED_FRAMES_INPUT_DIR = os.path.join(PROJECT_ROOT, 'data/extracted_frames')

#output for this notebook:
TRACKING_OUTPUTS_DIR = os.path.join(PROJECT_ROOT, 'data/tracking_outputs')
VISUALIZED_FRAMES_DIR = os.path.join(PROJECT_ROOT, 'data/visualized_tracked_frames') #optional

#models directory
MODEL_DIR = os.path.join(PROJECT_ROOT, 'models/yolov8')

#create directories if they don't exist
os.makedirs(EXTRACTED_FRAMES_INPUT_DIR, exist_ok=True) #should exist from Notebook 1
os.makedirs(TRACKING_OUTPUTS_DIR, exist_ok=True)
os.makedirs(VISUALIZED_FRAMES_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)

print(f"Project Root: {PROJECT_ROOT}")
print(f"Reading extracted frames from: {EXTRACTED_FRAMES_INPUT_DIR}")
print(f"Saving tracking data to: {TRACKING_OUTPUTS_DIR}")
print(f"Saving visualized frames to: {VISUALIZED_FRAMES_DIR}")
print(f"Models directory: {MODEL_DIR}")



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Project Root: /content/drive/My Drive/car-behaviour-project
Reading extracted frames from: /content/drive/My Drive/car-behaviour-project/data/extracted_frames
Saving tracking data to: /content/drive/My Drive/car-behaviour-project/data/tracking_outputs
Saving visualized frames to: /content/drive/My Drive/car-behaviour-project/data/visualized_tracked_frames
Models directory: /content/drive/My Drive/car-behaviour-project/models/yolov8


In [28]:
# Ultralytics for YOLO, filterpy for Kalman Filter in SORT, scipy for Hungarian Algorithm in SORT
!pip install ultralytics opencv-python numpy filterpy scipy matplotlib



In [29]:
#Python implementation of SORT tracker
import numpy as np
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment

def iou_batch(bb_test, bb_gt):
    """computes Intersection Over Union (IOU) between two sets of bounding boxes.
    bb_test:  Nx4 array of N bounding boxes (x1,y1,x2,y2)
    bb_gt:    Mx4 array of M bounding boxes (x1,y1,x2,y2)
    Returns:  NxM IOU matrix"""
    bb_gt = np.asarray(bb_gt) #ensure bb_gt is a numpy array
    bb_test = np.asarray(bb_test) #ensure bb_test is a numpy array

    #ensure bb_gt is 2D if it's a single box
    if bb_gt.ndim == 1:
        bb_gt = bb_gt[np.newaxis, :]
    if bb_test.ndim == 1:
        bb_test = bb_test[np.newaxis, :]

    xx1 = np.maximum(bb_test[:, 0, np.newaxis], bb_gt[:, 0])
    yy1 = np.maximum(bb_test[:, 1, np.newaxis], bb_gt[:, 1])
    xx2 = np.minimum(bb_test[:, 2, np.newaxis], bb_gt[:, 2])
    yy2 = np.minimum(bb_test[:, 3, np.newaxis], bb_gt[:, 3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ( (bb_test[:, 2, np.newaxis] - bb_test[:, 0, np.newaxis]) * (bb_test[:, 3, np.newaxis] - bb_test[:, 1, np.newaxis]) + \
               (bb_gt[:, 2] - bb_gt[:, 0]) * (bb_gt[:, 3] - bb_gt[:, 1]) - wh)
    return(o)

def convert_bbox_to_z(bbox):
    """takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is the aspect ratio"""
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w/2.
    y = bbox[1] + h/2.
    s = w * h    #scale is just area
    r = w / float(h) if h != 0 else 0 #aspect ratio
    return np.array([x, y, s, r]).reshape((4, 1))

def convert_x_to_bbox(x, score=None):
    """takes a bounding box in the centre form [x,y,s,r] and returns it in the form [x1,y1,x2,y2] where x1,y1 is the top-left and x2,y2 is the bottom-right"""
    w = np.sqrt(x[2] * x[3])
    h = x[2] / w if w != 0 else 0
    if(score==None):
        return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
    else:
        return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))

class KalmanBoxTracker(object):
    """this class represents the internal state of individual tracked objects observed as bbox"""
    count = 0
    def __init__(self,bbox):
        """initialises a tracker using initial bounding box"""
        #define constant velocity model
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
        self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])

        self.kf.R[2:,2:] *= 10. #measurement noise covariance matrix
        self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities
        self.kf.P *= 10.
        self.kf.Q[-1,-1] *= 0.01 #process noise covariance matrix
        self.kf.Q[4:,4:] *= 0.01

        self.kf.x[:4] = convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0

    def update(self,bbox):
        """updates the state vector with observed bbox"""
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(convert_bbox_to_z(bbox))

    def predict(self):
        """advances the state vector and returns the predicted bounding box estimate"""
        if((self.kf.x[6]+self.kf.x[2])<=0): # if s + ds <=0
            self.kf.x[6] *= 0.0 # ds = 0
        self.kf.predict()
        self.age += 1
        if(self.time_since_update>0):
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        """returns the current bounding box estimate"""
        return convert_x_to_bbox(self.kf.x)

class Sort(object):
    def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):
        """sets key parameters for SORT"""
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.trackers = []
        self.frame_count = 0
        KalmanBoxTracker.count = 0 #reset static counter for multiple Sort instances

    def update(self, dets=np.empty((0, 5))):
        """Params:
          dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
        Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections).
        Returns the a similar array, where the last column is the object ID.
        NOTE: The number of objects returned may be different from the number of detections provided"""
        self.frame_count += 1
        #get predicted locations from existing trackers.
        trks = np.zeros((len(self.trackers), 5))
        to_del = []
        ret = []
        for t, trk in enumerate(trks):
            pos = self.trackers[t].predict()[0]
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)):
                to_del.append(t)
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks)) #remove invalid (NaN) predictions
        for t in reversed(to_del):
            self.trackers.pop(t)

        #associate detections with existing trackers
        if dets.size > 0 and trks.size > 0: #detections and Trackers exist
            iou_matrix = iou_batch(dets[:, :4], trks[:, :4])
            #hungarian algorithm for assignment
            row_ind, col_ind = linear_sum_assignment(-iou_matrix) #we want to maximize IoU, so negate
            matched_indices = []
            for r, c in zip(row_ind, col_ind):
                if iou_matrix[r, c] >= self.iou_threshold:
                    matched_indices.append((r,c))

            unmatched_detections = [d for d in range(dets.shape[0]) if not d in [m[0] for m in matched_indices]]
            unmatched_trackers = [t for t in range(trks.shape[0]) if not t in [m[1] for m in matched_indices]]

            #update matched trackers with assigned detections
            for m in matched_indices:
                self.trackers[m[1]].update(dets[m[0], :4])
        else: #no trackers or no detections
            unmatched_detections = list(range(dets.shape[0]))
            unmatched_trackers = [] # If trks.size was 0

        #create and initialise new trackers for unmatched detections
        for i in unmatched_detections:
            trk = KalmanBoxTracker(dets[i,:4])
            self.trackers.append(trk)
        i = len(self.trackers)
        for trk in reversed(self.trackers):
            d = trk.get_state()[0]
            #only output tracks that have met min_hits and are not too old
            if (trk.time_since_update < self.max_age) and (trk.hits >= self.min_hits or self.frame_count <= self.min_hits):
                ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) #id+1 as MOT benchmark requires positive
            i -= 1
            #remove dead tracklet
            if(trk.time_since_update > self.max_age):
                self.trackers.pop(i)
        if(len(ret)>0):
            return np.concatenate(ret)
        return np.empty((0,5))

print("SORT Tracker classes defined.")

SORT Tracker classes defined.


In [30]:
#detection and tracking logic
import cv2
import torch #coz YOLOv8 uses PyTorch
from ultralytics import YOLO
import time
import pandas as pd #for saving tracking data to CSV
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow #for displaying images in Colab

#config
VIDEO_NAME_Processed = "video1"  #video name

FRAMES_SOURCE_DIR = os.path.join(EXTRACTED_FRAMES_INPUT_DIR, VIDEO_NAME_Processed)


#YOLO_MODEL_NAME = 'yolov8s.pt' #tried this but it's giving many false positives and false negatives so we'll try larger model
YOLO_MODEL_NAME = 'yolov8l.pt'
#the model will be downloaded automatically if not found in MODEL_DIR or cache

"""
Classes to detect (COCO class names for vehicles)
You can find all COCO class names here: https://docs.ultralytics.com/datasets/coco/#dataset-yaml
Or check model.names after loading model
Common vehicle classes: 2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'
"""
TARGET_CLASSES_INDICES = [2, 3, 5, 7] #indices for car, motorcycle, bus, truck

CONFIDENCE_THRESHOLD = 0.65 #min detection confidence

#SORT Parameters
SORT_MAX_AGE = 30    #max frames to keep a track without new detections
SORT_MIN_HITS = 5    #min number of detections to start a track
SORT_IOU_THRESHOLD = 0.2 #IoU threshold for matching detections to tracks

SAVE_VISUALIZED_FRAMES = True #set to True to save frames with bounding boxes
OUTPUT_CSV_FILENAME = f"{VIDEO_NAME_Processed}_tracked_vehicles.csv"
OUTPUT_CSV_PATH = os.path.join(TRACKING_OUTPUTS_DIR, OUTPUT_CSV_FILENAME)

#output directory for visualized frames (if SAVE_VISUALIZED_FRAMES is True)
VIDEO_VISUALIZED_OUTPUT_DIR = os.path.join(VISUALIZED_FRAMES_DIR, VIDEO_NAME_Processed)
if SAVE_VISUALIZED_FRAMES:
    os.makedirs(VIDEO_VISUALIZED_OUTPUT_DIR, exist_ok=True)


#load YOLOv8 Model
try:
    model_path = os.path.join(MODEL_DIR, YOLO_MODEL_NAME)
    if not os.path.exists(model_path):
        print(f"Model {YOLO_MODEL_NAME} not found in {MODEL_DIR}. YOLO will attempt to download it.")
        model_path = YOLO_MODEL_NAME #let ultralytics handle download

    yolo_model = YOLO(model_path)
    print(f"YOLOv8 model '{YOLO_MODEL_NAME}' loaded successfully.")
    #you can print yolo_model.names to see all class names it can detect
    #print("Available classes:", yolo_model.names)
except Exception as e:
    print(f"Error loading YOLOv8 model: {e}")
    #terminate or handle error appropriately
    raise SystemExit("YOLO Model loading failed.")


#initialize SORT tracker
mot_tracker = Sort(max_age=SORT_MAX_AGE,
                   min_hits=SORT_MIN_HITS,
                   iou_threshold=SORT_IOU_THRESHOLD)
print("SORT tracker initialized.")

#process frames
if not os.path.exists(FRAMES_SOURCE_DIR):
    print(f"ERROR: Extracted frames directory not found: {FRAMES_SOURCE_DIR}")
    print(f"Please ensure '{VIDEO_NAME_Processed}' matches a subfolder in '{EXTRACTED_FRAMES_INPUT_DIR}'")
    print(f"This subfolder should have been created by Notebook 1.")

else:
    frame_files = sorted([f for f in os.listdir(FRAMES_SOURCE_DIR) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
    if not frame_files:
        print(f"No image frames found in {FRAMES_SOURCE_DIR}. Check the directory and file extensions.")
    else:
        print(f"Found {len(frame_files)} frames to process in {FRAMES_SOURCE_DIR}.")

        all_tracking_data = [] #to store data for CSV

        for frame_idx, frame_filename in enumerate(frame_files):
            frame_path = os.path.join(FRAMES_SOURCE_DIR, frame_filename)
            try:
                frame = cv2.imread(frame_path)
                if frame is None:
                    print(f"Warning: Could not read frame {frame_filename}. Skipping.")
                    continue

                start_time = time.time()

                #perform detection
                results = yolo_model.predict(source=frame, verbose=False) #verbose=False to reduce console output
                detections = [] #list to store [x1, y1, x2, y2, score]

                for res in results: #iterate through results for the single image
                    boxes = res.boxes.cpu().numpy() #get boxes on CPU in numpy format
                    for box in boxes: #iterate through detected boxes
                        class_id = int(box.cls[0])
                        confidence = box.conf[0]

                        if class_id in TARGET_CLASSES_INDICES and confidence >= CONFIDENCE_THRESHOLD:
                            x1, y1, x2, y2 = box.xyxy[0].astype(int) #bounding box coordinates
                            detections.append([x1, y1, x2, y2, confidence])

                detections_np = np.array(detections) if detections else np.empty((0, 5))

                #update SORT tracker
                #the dets format for SORT is [[x1,y1,x2,y2,score],...]
                tracked_objects = mot_tracker.update(detections_np)
                #tracked_objects format: [[x1,y1,x2,y2,track_id],...]

                end_time = time.time()
                processing_time = end_time - start_time

                #store tracking data and draw on frame
                frame_number = int(frame_filename.split('_f')[-1].split('_orig')[0]) if '_f' in frame_filename else frame_idx

                for trk in tracked_objects:
                    x1_trk, y1_trk, x2_trk, y2_trk, track_id = trk.astype(int)
                    all_tracking_data.append({
                        'frame_id': frame_number, #or frame_idx
                        'track_id': track_id,
                        'x1': x1_trk,
                        'y1': y1_trk,
                        'x2': x2_trk,
                        'y2': y2_trk,
                    })
                    '''You could try to find the original detection score if needed, but SORT output doesn't directly carry it over for the track.
                       We can infer the class based on what we fed to YOLO if all are same, or do a more complex association if YOLO detects multiple classes we track.
                       For now, we assume all tracked objects are from TARGET_CLASSES_INDICES '''

                    if SAVE_VISUALIZED_FRAMES:
                        #draw bounding box
                        cv2.rectangle(frame, (x1_trk, y1_trk), (x2_trk, y2_trk), (0, 255, 0), 2)
                        #put track ID
                        cv2.putText(frame, f"ID: {track_id}", (x1_trk, y1_trk - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

                if SAVE_VISUALIZED_FRAMES:
                    cv2.putText(frame, f"Frame: {frame_number}", (10, 30),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                    cv2.putText(frame, f"Time: {processing_time:.3f}s", (10, 70),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, cv2.LINE_AA)

                    output_frame_path = os.path.join(VIDEO_VISUALIZED_OUTPUT_DIR, frame_filename)
                    cv2.imwrite(output_frame_path, frame)

                # Optional: Display every Nth frame in Colab (can be slow)
                # if frame_idx % 20 == 0:
                #     print(f"Displaying frame {frame_number} (original filename: {frame_filename})")
                #     # Convert BGR (OpenCV) to RGB (Matplotlib) for display
                #     # display_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                #     # plt.figure(figsize=(10,8))
                #     # plt.imshow(display_frame)
                #     # plt.title(f"Tracked Frame: {frame_filename}")
                #     # plt.axis('off')
                #     # plt.show()
                #     cv2_imshow(frame) # Simpler display in Colab

                if (frame_idx + 1) % 50 == 0: #print progress every 50 frames
                    print(f"Processed {frame_idx + 1}/{len(frame_files)} frames. Last frame: {frame_filename}")

            except Exception as e:
                print(f"Error processing frame {frame_filename}: {e}")
                import traceback
                traceback.print_exc()
                continue # Skip to next frame

        #save all tracking data to CSV
        if all_tracking_data:
            df_tracking = pd.DataFrame(all_tracking_data)
            df_tracking.to_csv(OUTPUT_CSV_PATH, index=False)
            print(f"\nTracking data saved to: {OUTPUT_CSV_PATH}")
            print(f"CSV Head:\n{df_tracking.head()}")
        else:
            print("\nNo tracking data was generated.")

        print("\nDetection and tracking complete.")
        if SAVE_VISUALIZED_FRAMES:
            print(f"Visualized frames saved in: {VIDEO_VISUALIZED_OUTPUT_DIR}")

Model yolov8l.pt not found in /content/drive/My Drive/car-behaviour-project/models/yolov8. YOLO will attempt to download it.
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8l.pt to 'yolov8l.pt'...


100%|██████████| 83.7M/83.7M [00:00<00:00, 228MB/s]


YOLOv8 model 'yolov8l.pt' loaded successfully.
SORT tracker initialized.
Found 466 frames to process in /content/drive/My Drive/car-behaviour-project/data/extracted_frames/video1.
Processed 50/466 frames. Last frame: video1_f00049_orig000147.png
Processed 100/466 frames. Last frame: video1_f00099_orig000297.png
Processed 150/466 frames. Last frame: video1_f00149_orig000447.png
Processed 200/466 frames. Last frame: video1_f00199_orig000597.png
Processed 250/466 frames. Last frame: video1_f00249_orig000747.png
Processed 300/466 frames. Last frame: video1_f00299_orig000897.png
Processed 350/466 frames. Last frame: video1_f00349_orig001047.png
Processed 400/466 frames. Last frame: video1_f00399_orig001197.png
Processed 450/466 frames. Last frame: video1_f00449_orig001347.png

Tracking data saved to: /content/drive/My Drive/car-behaviour-project/data/tracking_outputs/video1_tracked_vehicles.csv
CSV Head:
   frame_id  track_id    x1    y1    x2    y2
0         0         5   240   873   348  