In [1]:
import sys
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import ultralytics
from ultralytics import YOLO

In [11]:
#Get OpenCV Perspective Transform (BEV) 
class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0: #No detections
            return points

        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

#Calculate the speed of each detected vehicle
def get_speed(points_vector, frame_time):
    global previous_y, first_loop

    if first_loop:
        previous_y = np.copy(points_vector)
        first_loop = False
        
    if len(previous_y) <= len(points_vector):
        previous_y_expanded = np.copy(points_vector)
        previous_y_expanded[:len(previous_y)] = previous_y
    else:
        previous_y_expanded = np.copy(previous_y)
        previous_y_expanded = previous_y_expanded[-len(points_vector):]
        
    #print(points_vector, previous_y_expanded)
    #print(abs(points_vector - previous_y_expanded))
        
    previous_y = np.append(previous_y, points_vector)
    previous_y = previous_y[-len(points_vector):]
    distance_vector = abs(points_vector - previous_y_expanded)
    speed_vector = (distance_vector / frame_time) * 3.6# speed = distance / time [m/s] * 3.6 = [km/h]
    
    return speed_vector

#Determine if a point is inside a polygon
def point_in_polygon(x, y, polygon):
    result = cv2.pointPolygonTest(polygon, (x, y), False)
    return result >= 0

#Draw text with bakground on image
def draw_text(img, text,
          font=cv2.FONT_HERSHEY_PLAIN,
          pos=(0, 0),
          font_scale=3,
          font_thickness=2,
          text_color=(0, 255, 0),
          text_color_bg=(0, 0, 0)
          ):

    x, y = pos
    text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness)
    text_w, text_h = text_size
    cv2.rectangle(img, pos, (x + text_w, y + text_h), text_color_bg, -1)
    cv2.putText(img, text, (int(x), int(y + text_h + font_scale - 1)), font, font_scale, text_color, font_thickness)

    return text_size           

In [34]:
#In Praga, where the video comes from, the lenght of the white line across the road is about 3m and 6m among they,
#so the polygon shaped by the vertexes may cover a total distance of 65m x 25m aproximately.
#Only for windows_view purpose we'll define view_transformer_for_window using vertexes * 10, because with 65m x 25m the windows would be too small

YOLO_model_path = r'.\models\yolov8m.pt'
video_path = 'road.mp4'
cap = cv2.VideoCapture(video_path)

codec = cv2.VideoWriter_fourcc(*'XVID')
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
time_among_frames = 1/fps

vertexes = np.array([[587, 391], [818, 391], [1395, 719], [-50, 719]]) #Points that define the inference zone
vertexes = vertexes.reshape((-1, 1, 2))

TARGET_WIDTH = 25 
TARGET_HEIGHT = 65
target_vertexes = np.array([[0, 0], [TARGET_WIDTH - 1, 0], [TARGET_WIDTH - 1, TARGET_HEIGHT - 1], [0, TARGET_HEIGHT - 1]])
target_vertexes = target_vertexes.reshape((-1, 1, 2))
view_transformer = ViewTransformer(vertexes, target_vertexes)

#Only for view purposes
TARGET_WIDTH_for_view = 25 * 10 
TARGET_HEIGHT_for_view = 65 * 10
target_vertexes_for_view = np.array([[0, 0], [TARGET_WIDTH_for_view - 1, 0], [TARGET_WIDTH_for_view - 1, TARGET_HEIGHT_for_view - 1], [0, TARGET_HEIGHT_for_view - 1]])
target_vertexes_for_view = target_vertexes_for_view.reshape((-1, 1, 2))
view_transformer_for_window = ViewTransformer(vertexes, target_vertexes_for_view)

#output_video = cv2.VideoWriter(r'.\outputs\detected.mp4', codec, fps, (width, height)) #Save output video
model = YOLO(YOLO_model_path)

first_loop = True
previous_y = []
display_option = 'Annotated_original' #'Original','Annotated_original','BEV'

#Frame processing
# Loop until the end of the video
while (cap.isOpened()):
    
    # Capture frame-by-frame
    ret, frame = cap.read()

    if frame is not None:
        results = model.track(frame, show=False, persist=True, verbose=False, conf=0.3, iou=0.5, classes=[0,1,2,3,5,7], imgsz=736, tracker="botsort.yaml") #tracker="bytetrack.yaml"
        frame_cp = np.copy(frame)

        for result in results:
            boxes = result.boxes.xywh.cpu().numpy()
            ids = result.boxes.id.cpu().numpy()       
            poi_list = [[box[0], box[1] + box[3] / 2, int(id)] for box, id in zip(boxes, ids) if point_in_polygon(box[0], box[1] + box[3] / 2, vertexes)] #Use center down point (x, y, id)

            if poi_list:
                id_poi = [poi[-1] for poi in poi_list]
                coord_poi = [poi[:2] for poi in poi_list]
                transformed_poi = view_transformer.transform_points(np.array(coord_poi)) #Trasnform point from image coordinate to real coordinates
                y_transformed = transformed_poi[:,1] #Get only the y coordinate transformed for each ID
                
                speed_vector = get_speed(y_transformed, time_among_frames)
                speed_coord_id = zip(speed_vector, coord_poi, id_poi)

                for speed, coord, id in speed_coord_id:
                    draw_text(frame_cp, text=f'ID:{id} - {int(speed)} km/h', font=cv2.FONT_HERSHEY_SIMPLEX, pos=(int(coord[0]), int(coord[1]+5)),
                              font_scale=0.5, font_thickness=1, text_color=(0,0,0), text_color_bg=(0,255,255)) 

        #Display Options
        if display_option == 'BEV':
            if poi_list:
                # Display the annotated frame with points on each detected vehicle
                transformed_poi_window = view_transformer_for_window.transform_points(np.array([coord_poi])) #Trasnform point from image coordinate to real coordinates multiply by 10
                imgOutput = cv2.warpPerspective(frame, view_transformer_for_window.m, (TARGET_WIDTH_for_view, TARGET_HEIGHT_for_view), cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=(0,0,0))
                for xy_point in transformed_poi_window:
                    cv2.circle(imgOutput, (int(xy_point[0]),int(xy_point[1])), radius=4, color=(255,0,0), thickness=-1)
                cv2.imshow("YOLOv8 Tracking_BEV", imgOutput)
            else:
                transformed_poi_window = view_transformer_for_window.transform_points(np.array([coord_poi])) #Trasnform point from image coordinate to real coordinates multiply by 10
                imgOutput = cv2.warpPerspective(frame, view_transformer_for_window.m, (TARGET_WIDTH_for_view, TARGET_HEIGHT_for_view), cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=(0,0,0))
                cv2.imshow("YOLOv8 Tracking_BEV", imgOutput)
        
        elif display_option == 'Annotated_original':
            # Display the annotated frame with points
            for xy_point in poi_list:
                cv2.circle(frame_cp, (int(xy_point[0]),int(xy_point[1])), radius=2, color=(255,0,0), thickness=-1)
            frame_cp = cv2.polylines(frame_cp, [vertexes], isClosed=True, color=(0, 0, 255), thickness=1)
            cv2.imshow("YOLOv8 Tracking", frame_cp)
        
        elif display_option == 'Original':
            cv2.imshow("YOLOv8 Tracking", frame)             

    # define q as the exit button
    if cv2.waitKey(25) & 0xFF == ord('q'):
        break

# release the video capture object
cap.release()
# Closes all the windows currently opened.
cv2.destroyAllWindows()