In [61]:
from ultralytics import YOLO
from PIL import Image
import cv2
from tqdm import tqdm
import numpy as np
from deep_sort.deep_sort import DeepSort
import yaml
import os
import glob
from data_utils import parse_pascalvoc_annotations, parse_cvat_annotations
import torch

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [133]:
def calculate_speed(tracking_history, fps, pixel_to_meter_ratio):

    if len(tracking_history) < 3:
        return None  # Not enough data to calculate speed
    else:
        num_frames_for_speed = min(10, len(tracking_history) - 1)

        tracking_history_short = tracking_history[-num_frames_for_speed:]
        total_distance = 0
        for i in range(1, len(tracking_history_short)):
            # Calculate Euclidean distance between consecutive positions
            distance = np.linalg.norm(np.array(tracking_history_short[i]) - np.array(tracking_history_short[i - 1]))
            total_distance += distance

        # Find the corresponding pixel-to-meter ratio based on the y-coordinate of the object
        current_pixel_to_meter_ratio = 0.05  # Default value
        for y_range, value in pixel_to_meter_ratio.items():
            if tracking_history[-1][1] < y_range[0] and tracking_history[-1][1] >= y_range[1]:
                current_pixel_to_meter_ratio = value
                break
        
        # Calculate speed (meters per second)
        total_distance_meters = total_distance * current_pixel_to_meter_ratio
        total_time_seconds = (len(tracking_history_short)-1) / fps  # Time in seconds
        speed_mps = total_distance_meters / total_time_seconds
        speed = speed_mps * 3.6  # Convert to km/h
        
        return speed

def perform_tracking(source, config, save_path, pixel_to_meter_ratio, annotation=None, model=None, deepsort=None):
    cap = cv2.VideoCapture(source)
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    if config:
        size = (int(width * config["video"]['scale_video_size']), 
                    int(height * config["video"]['scale_video_size']))

    cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # starting frame = 0
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video = cv2.VideoWriter(save_path+'.mp4', fourcc, fps, (width, height), True)

    frame_count = -1
    colors = {}
    tracking_history = {}
    speeds = {}

    while(cap.isOpened()):
        ret, frame = cap.read()
        frame_count += 1
        if ret is True:
            bboxes=[]
            if annotation:
                bboxes = annotation[frame_count]
            else:
                results = model(frame, verbose=False)
                
                frame_detections = results[0].boxes
                filtered_pred = [box.xywh[0].to(int).tolist() + box.conf.tolist() for box in frame_detections if box.cls == 2]
                frame = cv2.resize(frame, size, interpolation = cv2.INTER_AREA)

                dets = np.array(filtered_pred)
                if len(dets) > 0:
                    result = dets[:,:4]
                    conf = dets[:,-1:]
                    track_result = deepsort.update(result, conf, frame)
                    bboxes = track_result

            if len(bboxes) > 0:
                for bbox in bboxes:
                    obj_id = int(bbox[4])
                    bbox = [int(i) for i in bbox[:4]] # bb_left, bb_top, bb_right, bb_bottom
                    
                    # Assign a unique color if new object
                    if obj_id not in colors:
                        colors[obj_id] = (np.random.randint(0, 255), np.random.randint(0, 255), np.random.randint(0, 255))

                    # Draw the bounding box
                    start_point = (int(bbox[0]), int(bbox[1]))
                    end_point = (int(bbox[2]), int(bbox[3]))
                    frame = cv2.rectangle(frame, start_point, end_point, colors[obj_id], 2)
                    frame = cv2.putText(frame, str(obj_id), start_point, cv2.FONT_HERSHEY_SIMPLEX, 1, colors[obj_id], 2, cv2.LINE_AA)
                    
                    # Update tracking history
                    center_position = ((start_point[0] + end_point[0]) // 2, (start_point[1] + end_point[1]) // 2)
                    if obj_id not in tracking_history:
                        tracking_history[obj_id] = [center_position]
                    else:
                        tracking_history[obj_id].append(center_position)
                    
                    # Draw tracking line (polyline for all historical positions)
                    if len(tracking_history[obj_id]) > 1:
                        for j in range(1, len(tracking_history[obj_id])):
                            cv2.line(frame, tracking_history[obj_id][j - 1], tracking_history[obj_id][j], colors[obj_id], 2)

                    # Estimate speeds in km/h
                    speed = calculate_speed(tracking_history[obj_id], fps, pixel_to_meter_ratio)
                    if speed is not None:
                        frame = cv2.putText(frame, f"{speed:.2f} km/h", (start_point[0], end_point[1] + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, colors[obj_id], 2, cv2.LINE_AA)
                        if obj_id not in speeds.keys():
                            speeds[obj_id] = []
                        speeds[obj_id].append(speed)

                video.write(frame)

        else:
            print("Finish!")
            break   

    video.release() 
    return speeds

In [134]:
def compute_pixel_to_meter_ratio(measurements):
    # Load the first frame and get the dimensions
    image = cv2.imread("frame.jpg")
    y, x, _ = image.shape

    pixel_to_meter_ratio = {}
    for i in range(len(measurements)):
        # If it's the last measurement, assume the object is at the end of the road
        if i+1 == len(measurements): 
            t = (y, 0)
        else:
            t = (y, measurements[i][0])
        y = measurements[i][0] # Update y for the next iteration
        distance_pixels = measurements[i][0] - measurements[i][1]
        # Assume the distance we are measuring is 1 meter long
        pixel_to_meter_ratio[t] = 1 / distance_pixels
    return pixel_to_meter_ratio

In [135]:
config = {
    "model_weights": None, # ["yolov8n.pt", "last.pt", None]
    "config_deepsort_file": "./config.yaml", 
    "source": "../../data/S03/c010/vdo.avi", # ["../../data/S03/c010/vdo.avi", "../../data/UAB.mp4"]
    "save_path": "./results/out",
    "annotation_format": "pascalvoc", # ["cvat", "pascalvoc", None]
    "annotation_path": "../../data/ai_challenge_s03_c010-full_annotation.xml", # ["../../data/ai_challenge_s03_c010-full_annotation.xml", "../../data/annotations_uab.xml", None]

}
measurements = [(851, 794), (671,631), (545, 518), (454, 435), (393, 379), (349, 338), (306, 297), (275, 268), (249, 244)]

In [136]:
model, config_deepsort, deepsort, annotation = None, None, None, None
if config['model_weights']:
    model = YOLO(config['model_weights']) 
    config_deepsort = yaml.safe_load(open(config['config_deepsort_file']))
    deepsort = DeepSort(model_path=config_deepsort['deepsort_tracker']['model_path'],
                    max_dist=config_deepsort['deepsort_tracker']['max_dist'],
                    min_confidence=config_deepsort['deepsort_tracker']['min_confidence'], 
                    nms_max_overlap=config_deepsort['deepsort_tracker']['nms_max_overlap'],
                    max_iou_distance=config_deepsort['deepsort_tracker']['max_iou_distance'], 
                    max_age=config_deepsort['deepsort_tracker']['max_age'], 
                    n_init=config_deepsort['deepsort_tracker']['n_init'], 
                    nn_budget=config_deepsort['deepsort_tracker']['nn_budget'], 
                    use_cuda=config_deepsort['deepsort_tracker']['use_cuda'])

if config['annotation_format'] == "cvat":
    annotation = parse_cvat_annotations(config['annotation_path'])
elif config['annotation_format'] == "pascalvoc":
    annotation = parse_pascalvoc_annotations(config['annotation_path'], add_track_id=True)

pixel_to_meter_ratio = compute_pixel_to_meter_ratio(measurements)

speeds = perform_tracking(config["source"], config_deepsort, config["save_path"], pixel_to_meter_ratio, annotation=annotation, model=model, deepsort=deepsort)

Finish!


In [137]:
# compute mean speed for each object 
speeds = dict(sorted(speeds.items()))
with open(config["save_path"]+".txt", 'w') as f: 
    for obj_id, obj_speeds in speeds.items():
        f.write(f"Car {obj_id}: {np.mean(obj_speeds):.2f} km/h\n")