In [8]:
import os
from ultralytics import YOLO

# Define the path to the best model checkpoint
run_dir = 'runs/detect/train23'  
best_model_path = os.path.join(run_dir, 'weights', 'best.pt')

# Load the YOLO model
model = YOLO(best_model_path)



In [9]:
import cv2
from ultralytics import YOLO

# load the best YOLO model
model = YOLO('runs/detect/train23/weights/best.pt')  

In [10]:
image_path = 'samples/sample_image.jpg'
image = cv2.imread(image_path)

results = model(image)


0: 384x640 10 Vehicles, 54.6ms
Speed: 11.2ms preprocess, 54.6ms inference, 8.6ms postprocess per image at shape (1, 3, 384, 640)


In [14]:
detections = results[0].boxes.xyxy.cpu().numpy()

In [15]:
scores = results[0].boxes.conf.cpu().numpy()

In [16]:
classes = results[0].boxes.cls.cpu().numpy()

In [17]:
# draw bounding boxes
for i, bbox in enumerate(detections):
    if classes[i] == 0:  
        x1, y1, x2, y2 = map(int, bbox)
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(image, f'{scores[i]:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

cv2.imwrite('output_samples/sample_detections.jpg', image)

True

In [19]:
from sort.sort import Sort
import numpy as np
import cv2

sort_tracker = Sort()

video_path = 'samples/sample_video2.mp4'
cap = cv2.VideoCapture(video_path)

# video dimensions
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# video writer settings 
output_path = 'output_samples/output_video2.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# store trajectories in a dictionary 
trajectories = {}
score_threshold = 0.5
frame_idx = 0
c = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # vehicle detection in the frame
    results = model(frame)
    cv2.imwrite(f'images/{c}.jpg', frame)
    c = c + 1
    
    detections = results[0].boxes.xyxy.cpu().numpy()  
    scores = results[0].boxes.conf.cpu().numpy()
    classes = results[0].boxes.cls.cpu().numpy()

    # detections for SORT 
    sort_detections = []
    for i, cls in enumerate(classes):
        if cls == 0 and scores[i] >= score_threshold:
            bbox = detections[i]
            score = scores[i]
            sort_detections.append([bbox[0], bbox[1], bbox[2], bbox[3], score])

    # update tracker with detections 
    if len(sort_detections) > 0:
        sort_tracks = sort_tracker.update(np.array(sort_detections))
    else:
        sort_tracks = []

    # show bounding boxes, track IDs, and trajectories on the frame
    for track in sort_tracks:
        x1, y1, x2, y2, track_id = track
        cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
        cv2.putText(frame, str(int(track_id)), (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
        
        # update trajectories
        if track_id not in trajectories:
            trajectories[track_id] = []
        center = (int((x1 + x2) / 2), int((y1 + y2) / 2))
        trajectories[track_id].append(center)
        
        # draw trajectory line in the frame for showing mapping
        for j in range(1, len(trajectories[track_id])):
            cv2.line(frame, trajectories[track_id][j-1], trajectories[track_id][j], (0, 0, 255), 2)

    out.write(frame)
    
    frame_idx += 1

cap.release()
out.release()



0: 384x640 8 Vehicles, 62.2ms
Speed: 7.7ms preprocess, 62.2ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 Vehicles, 36.6ms
Speed: 1.6ms preprocess, 36.6ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 Vehicles, 38.4ms
Speed: 1.4ms preprocess, 38.4ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 Vehicles, 36.8ms
Speed: 1.4ms preprocess, 36.8ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 Vehicles, 29.4ms
Speed: 1.4ms preprocess, 29.4ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 Vehicles, 31.6ms
Speed: 1.5ms preprocess, 31.6ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 Vehicles, 28.5ms
Speed: 1.3ms preprocess, 28.5ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 Vehicles, 27.7ms
Speed: 1.2ms preprocess, 27.7ms inference, 0.4ms postprocess per ima

In [11]:
import cv2
import numpy as np

In [12]:
# Points in the frame (image space)
points_frame = np.array([
    [672, 261],
    [1186, 278],
    [2668, 1079],
    [-990, 1079]
], dtype='float32')

# Corresponding points in real-world space (or second frame)
points_realworld = np.array([
    [0, 0],
    [29, 0],
    [29, 299],
    [0, 299]
], dtype='float32')

## Apply homography

In [13]:
import numpy as np

def dlt_homography(points_src, points_dst):
    """ Compute homography that maps from points_src to points_dst using DLT """
    num_points = points_src.shape[0]
    A = np.zeros((2 * num_points, 9))
    
    for i in range(num_points):
        x_src, y_src = points_src[i]
        x_dst, y_dst = points_dst[i]
        
        A[2 * i] = [-x_src, -y_src, -1, 0, 0, 0, x_dst * x_src, x_dst * y_src, x_dst]
        A[2 * i + 1] = [0, 0, 0, -x_src, -y_src, -1, y_dst * x_src, y_dst * y_src, y_dst]
    
    U, S, Vt = np.linalg.svd(A)
    H = Vt[-1].reshape(3, 3)
    
    return H / H[2, 2]  

def compute_homography_ransac(points_src, points_dst, num_iters=1000, threshold=2):
    """ RANSAC algorithm to compute homography with outlier rejection """
    max_inliers = 0
    best_homography = None
    num_points = points_src.shape[0]

    for _ in range(num_iters):
        # randomly select 4 points for the minimal set
        idxs = np.random.choice(num_points, 4, replace=False)
        subset_src = points_src[idxs]
        subset_dst = points_dst[idxs]

        H = dlt_homography(subset_src, subset_dst)
        if H is None:
            continue

        # apply homography 
        points_src_hom = np.column_stack((points_src, np.ones(num_points)))
        estimated_dst = points_src_hom @ H.T
        estimated_dst /= estimated_dst[:, 2][:, np.newaxis]  # Convert from homogeneous

        # errors computation
        errors = np.linalg.norm(points_dst - estimated_dst[:, :2], axis=1)
        num_inliers = np.sum(errors < threshold)

        # update the homography
        if num_inliers > max_inliers:
            max_inliers = num_inliers
            best_homography = H

    return best_homography, max_inliers

H, inliers = compute_homography_ransac(points_frame, points_realworld)
print("Best Homography Matrix:\n", H)

Best Homography Matrix:
 [[  -0.068625    -0.13943      82.507]
 [    0.11383     -3.4418      821.82]
 [ 0.00038072  -0.0098906           1]]


In [14]:
H

array([[  -0.068625,    -0.13943,      82.507],
       [    0.11383,     -3.4418,      821.82],
       [ 0.00038072,  -0.0098906,           1]])

In [16]:
def compute_homography():
    points_frame = np.array([[100, 200], [200, 200], [200, 300], [100, 300]], dtype='float32')
    points_realworld = np.array([[0, 0], [10, 0], [10, 10], [0, 10]], dtype='float32')
    H, _ = cv2.findHomography(points_frame, points_realworld, cv2.RANSAC)
    return H

def apply_homography(H, point):
    point_hom = np.append(point, 1)  
    transformed_point = np.dot(H, point_hom)
    return transformed_point[:2] / transformed_point[2] 

In [22]:
from sort.sort import Sort
import numpy as np
import cv2
    

sort_tracker = Sort()

video_path = 'samples/sample_video2.mp4'
cap = cv2.VideoCapture(video_path)

width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

output_path = 'output_samples/output_video2.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

scale_f = 10
trajectory_width = 300  
trajectory_height = 3000 
trajectory_video_path = 'output_samples/trajectory_video.mp4'
out_trajectory = cv2.VideoWriter(trajectory_video_path, fourcc, fps, (trajectory_width, trajectory_height))

# homography points
points_frame = np.array([
    [672, 261],
    [1186, 278],
    [2668, 1079],
    [-990, 1079]
], dtype='float32')
points_realworld = np.array([
    [0, 0],
    [29, 0],
    [29, 179],
    [0, 179]
], dtype='float32')

# homography computation
H, inliers = compute_homography_ransac(points_frame, points_realworld)
print("Best Homography Matrix:\n", H)
print("Number of Inliers:", inliers)

trajectories = {}
real_world_trajectories = {}
score_threshold = 0.5
frame_idx = 0

# create a blank image for trajectories
trajectory_image = np.zeros((trajectory_height, trajectory_width, 3), dtype=np.uint8)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model(frame)
    detections = results[0].boxes.xyxy.cpu().numpy()  # xyxy format
    scores = results[0].boxes.conf.cpu().numpy()
    classes = results[0].boxes.cls.cpu().numpy()

    sort_detections = []
    for i, cls in enumerate(classes):
        if cls == 0 and scores[i] >= score_threshold:
            bbox = detections[i]
            score = scores[i]
            sort_detections.append([bbox[0], bbox[1], bbox[2], bbox[3], score])

    sort_tracks = sort_tracker.update(np.array(sort_detections))

    # clear previous trajectory image for each frame to avoid clutter
    trajectory_image.fill(0)

    for track in sort_tracks:
        x1, y1, x2, y2, track_id = track
        if x1 >= 0 and y1 >= 0 and x2 <= width and y2 <= height:
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
            cv2.putText(frame, str(int(track_id)), (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

            if track_id not in trajectories:
                trajectories[track_id] = []
            center = (int((x1 + x2) / 2), int((y1 + y2) / 2))
            trajectories[track_id].append(center)

            real_world_point = apply_homography(H, center)
            real_world_point = np.int32(real_world_point * scale_f)
            if track_id not in real_world_trajectories:
                real_world_trajectories[track_id] = []

            real_world_trajectories[track_id].append(real_world_point)

            for j in range(1, len(real_world_trajectories[track_id])):
                pt1 = tuple(real_world_trajectories[track_id][j - 1])
                pt2 = tuple(real_world_trajectories[track_id][j])
                cv2.line(trajectory_image, pt1, pt2, (0, 255, 0), 2)

                if j == len(real_world_trajectories[track_id]) - 1:
                    cv2.putText(trajectory_image, str(track_id), (pt2[0] + 10, pt2[1]), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    out.write(frame)
    out_trajectory.write(trajectory_image)

cap.release()
out.release()
out_trajectory.release()

Best Homography Matrix:
 [[  -0.068625    -0.13943      82.507]
 [   0.068148     -2.0605      491.99]
 [ 0.00038072  -0.0098906           1]]
Number of Inliers: 4

0: 384x640 8 Vehicles, 51.1ms
Speed: 2.7ms preprocess, 51.1ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 Vehicles, 34.9ms
Speed: 1.6ms preprocess, 34.9ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 Vehicles, 33.1ms
Speed: 1.3ms preprocess, 33.1ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 Vehicles, 32.8ms
Speed: 1.4ms preprocess, 32.8ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 Vehicles, 30.7ms
Speed: 1.4ms preprocess, 30.7ms inference, 0.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 Vehicles, 25.8ms
Speed: 1.2ms preprocess, 25.8ms inference, 0.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 Vehicles, 31.1ms
Speed: 1.2ms preprocess, 31.1ms i

## Speed 

In [25]:
import numpy as np
import cv2
from sort.sort import Sort

def compute_distance(point1, point2):
    return np.sqrt((point1[0] - point2[0]) ** 2 + (point1[1] - point2[1]) ** 2)

def compute_speed(distance, fps):
    time_seconds = 1 / fps
    speed_mps = distance / time_seconds  
    speed_kmph = speed_mps * 3.6  
    return speed_kmph

# Setup
video_path = 'samples/sample_video2.mp4'
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)

# Initialize SORT tracker and video output
sort_tracker = Sort()
output_path = 'output_samples/output_video2_speed.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))


def moving_average(speeds, window_size=25):
    if len(speeds) < window_size:
        return np.mean(speeds)
    else:
        return np.mean(speeds[-window_size:])

# init speed storage
speeds = {}

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model(frame)
    detections = results[0].boxes.xyxy.cpu().numpy()  # xyxy format
    scores = results[0].boxes.conf.cpu().numpy()
    classes = results[0].boxes.cls.cpu().numpy()
    
    # Prepare detections for SORT
    sort_detections = [
        [det[0], det[1], det[2], det[3], scores[i]]
        for i, det in enumerate(detections) if classes[i] == 0
    ]
    sort_tracks = sort_tracker.update(np.array(sort_detections))

    for track in sort_tracks:
        x1, y1, x2, y2, track_id = track
        center = ((x1 + x2) / 2, (y1 + y2) / 2)
        real_world_point = apply_homography(H, np.array([center]))

        if track_id not in real_world_trajectories:
            real_world_trajectories[track_id] = []
        real_world_trajectories[track_id].append(real_world_point)

        if len(real_world_trajectories[track_id]) > 1:
            dist = compute_distance(real_world_trajectories[track_id][-1], real_world_trajectories[track_id][-2])
            current_speed = compute_speed(dist, fps)
            
            if track_id not in speeds:
                speeds[track_id] = []
            speeds[track_id].append(current_speed)
            
            smoothed_speed = moving_average(speeds[track_id])
            speed_display = f"{smoothed_speed:.1f} km/h"
        else:
            speed_display = "Calculating..."
        
        cv2.putText(frame, speed_display, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

    out.write(frame)


cap.release()
out.release()



0: 384x640 8 Vehicles, 56.9ms
Speed: 2.0ms preprocess, 56.9ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 Vehicles, 25.7ms
Speed: 1.2ms preprocess, 25.7ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 Vehicles, 27.4ms
Speed: 1.2ms preprocess, 27.4ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 Vehicles, 27.4ms
Speed: 1.1ms preprocess, 27.4ms inference, 0.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 Vehicles, 25.4ms
Speed: 1.2ms preprocess, 25.4ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 Vehicles, 27.6ms
Speed: 1.3ms preprocess, 27.6ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 Vehicles, 26.2ms
Speed: 1.2ms preprocess, 26.2ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 Vehicles, 29.1ms
Speed: 1.1ms preprocess, 29.1ms inference, 0.2ms postprocess per ima

In [29]:
import cv2
import numpy as np

main_video_path = 'output_samples/output_video2.mp4'
inset_video_path = 'output_samples/trajectory_video.mp4'
main_cap = cv2.VideoCapture(main_video_path)
inset_cap = cv2.VideoCapture(inset_video_path)

if not main_cap.isOpened() or not inset_cap.isOpened():
    print("Error opening video streams")
    exit(1)

frame_width = int(main_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(main_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = main_cap.get(cv2.CAP_PROP_FPS)

main_new_width = int(frame_width * 0.8)
inset_new_width = frame_width - main_new_width

output_path = 'output_samples/output_video_with_trajectory.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

while main_cap.isOpened() and inset_cap.isOpened():
    ret_main, main_frame = main_cap.read()
    ret_inset, inset_frame = inset_cap.read()

    if ret_main and ret_inset:
        main_frame_resized = cv2.resize(main_frame, (main_new_width, frame_height))
        inset_frame_resized = cv2.resize(inset_frame, (inset_new_width, frame_height))

        combined_frame = np.hstack((main_frame_resized, inset_frame_resized))

        out.write(combined_frame)
    else:
        break

main_cap.release()
inset_cap.release()
out.release()
cv2.destroyAllWindows()