In [1]:
import cv2 as cv
import numpy as np
%matplotlib tk
from ultralytics import YOLO
import math as m
import os

In [2]:
#Represents each object to be tracked and projected---
#center_pt -- center point of the bounding box of a tracked object, used to track position in frame, represented
# as a coordinate pair [x,y]
#ctr_pts -- list of center point values over time, used to mark path, corresponds with timestp_list
class TrackedObject:
    def __init__(self, center_pt, id):
        self.center_pt = center_pt
        self.ctr_pts = [self.center_pt]
        self.id = id

    #Updating list of center point locations
    def update_pts(self, new_pt):
        self.ctr_pts.append(new_pt)

#Uses Yolov11 to track
def object_tracking(video_path):
    model = YOLO("yolo11n.pt")
    results = model.track(source=video_path, show=True, tracker="bytetrack.yaml")

    #Visualize the results on the frame
    #annotated_frame = results[0].plot()
    # check https://docs.ultralytics.com/modes/predict/#inference-arguments for tracking model configs

    #Dictionary of tracked objects
    box_info = {}
    #https://docs.ultralytics.com/reference/engine/results/#ultralytics.engine.results.Boxes
    
    for result in results:
        if (not result.boxes is None):
            for detection in result.boxes:
                x = int(detection.id)
                cls = int(detection.cls) #Class 0 = person

                #Storing center point coordinates
                center_x = detection.xywh.numpy()[0,0] 
                center_y = detection.xywh.numpy()[0,1]
                new_pt = [center_x, center_y]
                if x in box_info: #Update TrackedObject
                    box_info[x].update_pts(new_pt)

                #Only creating TrackedObjects for people
                elif cls == 0:
                    #creating new TrackObject for new bounding box
                    box_info[x] = TrackedObject(new_pt, x)

                

    return box_info


#Gathering video and image points for homography transformation
#Mouse callback function to store clicked points
def select_points(event, x, y, flags, param):
    
    if event == cv.EVENT_LBUTTONDOWN:
        param.append((x, y))
        print(f"Point selected: {x}, {y}")

#Displays image and lets user select four DISTINCT points to calculate homography matrix
def point_selection(image_path, window_name):
    
    image = cv.imread(cv.samples.findFile(image_path))
    points = []
    
    cv.namedWindow(window_name)
    cv.setMouseCallback(window_name, select_points, points)

    print(f"Select four points in {window_name}.")
    while True:
        temp_image = image.copy()
        for point in points:
            cv.circle(temp_image, point, 5, (0, 0, 255), -1)
        cv.imshow(window_name, temp_image)
        
        key = cv.waitKey(1) & 0xFF
        if key == ord('q') or len(points) == 4:
            break

    cv.destroyAllWindows()
    return np.array(points, dtype=np.float32)


#Transforming object coordinates
#x_vid-- x coordinate of TrackedObject position
#y_vid-- y coordinate of TrackedObject position
def transform_coordinates(x_vid, y_vid, H):
    #Transforming TrackedObject coordinates
    point = np.array([[x_vid, y_vid]], dtype=np.float32).reshape(1, 1, 2)
    transformed_point = cv.perspectiveTransform(point, H)
    return transformed_point[0][0]


In [3]:
#Main
def transform(vid_path, img_path):
    #Performing object tracking on video
    box_info = object_tracking(vid_path)
    boxes = []
    for x in box_info:
        boxes.append(box_info[x].id)
    print("Tracked Object IDs:")
    print(*boxes)
    id = input("Enter the key of an object to transform: ")
    
    #Obtain 1st video frame for point selection
    video = cv.VideoCapture(vid_path)
    vid_img_path = "vid_frame1.jpg"
    status, frame = video.read()
    if status == True:
        cv.imwrite(vid_img_path, frame)
    else: 
        print("Issue extracting video frame")
    
    #Selecting points with which to get homography matrix
    video_points = []
    topdown_points = []
    video_points = point_selection(vid_img_path, "Video Frame (press 'q' to quit)")
    topdown_points = point_selection(img_path, "Top-Down Image (press 'q' to quit)")
    print("Point selection completed")

    #Computing homography matrix
    H, _ = cv.findHomography(video_points, topdown_points, method=cv.RANSAC)

    #Transforming tracked center points using the homography matrix
    trans_coords = []
    for x in box_info[int(id)].ctr_pts:
        trans_coords.append(transform_coordinates(x[0], x[1], H))
    
    #Drawing newly transformed center points onto image 
    tr_img = cv.imread(cv.samples.findFile(img_path))
    for p in trans_coords:
        cv.circle(tr_img, (int(p[0]), int(p[1])), radius=2, color=(0, 0, 255), thickness=-1)


    #Displaying image
    cv.imshow("Transformed Path Points", tr_img)
    cv.waitKey(0)
    cv.destroyAllWindows()



In [None]:
#Video input
vpath = r"4min12fps_300clemantis.mp4"

#Corresponding image
imgpath = r"gearth_blank.png"
transform(vpath, imgpath)


inference results will accumulate in RAM unless `stream=True` is passed, causing potential out-of-memory
errors for large sources or long-running streams and videos. See https://docs.ultralytics.com/modes/predict/ for help.

Example:
    results = model(source=..., stream=True)  # generator of Results objects
    for r in results:
        boxes = r.boxes  # Boxes object for bbox outputs
        masks = r.masks  # Masks object for segment masks outputs
        probs = r.probs  # Class probabilities for classification outputs

video 1/1 (frame 1/3428) C:\Users\garne\OneDrive\Desktop\cctv-projector\mobintel-cctv-trajectories\digital_twin_processing\4min12fps_300clemantis.mp4: 384x640 2 persons, 1 truck, 226.0ms
video 1/1 (frame 2/3428) C:\Users\garne\OneDrive\Desktop\cctv-projector\mobintel-cctv-trajectories\digital_twin_processing\4min12fps_300clemantis.mp4: 384x640 2 persons, 1 truck, 233.4ms
video 1/1 (frame 3/3428) C:\Users\garne\OneDrive\Desktop\cctv-projector\mobintel-cctv-trajecto