In [10]:
# Import libraries
import numpy as np
import pandas as pd
import os 
import cv2
import skimage
from PIL import Image
from ultralytics import YOLO
from matplotlib import pyplot as plt
import json
import time

In [11]:
import cv2
import json

# Load the original image to get its dimensions
original_image = cv2.imread('trainingw.jpg')
original_height, original_width = original_image.shape[:2]

# New dimensions to which the frame is resized
# window_width = 854
# window_height = 480

keypoints_path = "trainingW.json"

window_width = original_width
window_height = original_height
print(window_width, window_height)

# Calculate scaling factors
x_scale = window_width / original_width
y_scale = window_height / original_height

# Load coordinates from JSON file
with open(f'coordinates/{keypoints_path}', 'r') as f:
    vocational_keypoints = json.load(f)

# Update coordinates with the scaling factor
for key, value in vocational_keypoints.items():
    if value:  # Check if the list is not empty
        x, y = value
        new_x = int(x * x_scale)
        new_y = int(y * y_scale)
        vocational_keypoints[key] = [new_x, new_y]

# Optionally, save the updated coordinates back to a file
with open('coordinates/updated.json', 'w') as f:
    json.dump(vocational_keypoints, f, indent=2)

with open('coordinates/updated.json', 'r') as f:
    vocational_keypoints = json.load(f)

with open('coordinates/map_keypoints.json', 'r') as f:
    map_keypoints = json.load(f)

# Filter out keypoints that are not visible
image_points = []
map_points = []
label_points = {}
labels_dic = {
    0:"player",
    1:"referee",
    2:"ball",
}

i = 0 

for key in vocational_keypoints:
    if vocational_keypoints[key] and map_keypoints[key]:  # Ensure both keypoints are visible
        image_points.append(vocational_keypoints[key])
        map_points.append(map_keypoints[key])
        label_points[key] = i 
        i+=1

# Convert to NumPy arrays
image_points = np.array(image_points, dtype=np.float32)
map_points = np.array(map_points, dtype=np.float32)

# Calculate the homography matrix
H, status = cv2.findHomography(image_points, map_points, cv2.RANSAC, 5.0)
H

1920 1080


array([[    0.13249,     0.22899,      -581.7],
       [  -0.062489,     -1.2935,      232.57],
       [ 0.00030539,  -0.0039285,           1]])

In [12]:
video_path = './videos/KVS_H1_TEST.mp4'
video_path = './videos/TrainingW_TEST.mp4'
tac_map = cv2.imread('./map.jpg')

# Define team colors
nbr_team_colors = 2
colors_dic = {
    "Purple":[(105,105,160), (105,105,160)], # DKU Colors (Players kit color, GK kit color)
    "Yellow":[(200,200,100), (200,200,100)] # KVS Colors (Players kit color, GK kit color)
}

colors_list = colors_dic["Purple"]+colors_dic["Yellow"] # Define color list to be used for detected player team prediction
color_list_lab = [skimage.color.rgb2lab([i/255 for i in c]) for c in colors_list] # Converting color_list to L*a*b* space

In [13]:
# Load the YOLOv8 players detection model
model_players = YOLO("weights/best.pt")

In [14]:
# Open video file
cap = cv2.VideoCapture(video_path)

# Initialize frame counter
frame_nbr = 0

# Set keypoints average displacement tolerance level (in pixels) [set to -1 to always update homography matrix]
keypoints_displacement_mean_tol = 10

# Set confidence thresholds for players and field keypoints detections
player_model_conf_thresh = 0.50

# Set variable to record the time when we processed last frame 
prev_frame_time = 0
# Set variable to record the time at which we processed current frame 
new_frame_time = 0

# Store the ball track history
ball_track_history = {'src':[],
                      'dst':[]
}

# Count consecutive frames with no ball detected
nbr_frames_no_ball = 0
# Threshold for number of frames with no ball to reset ball track (frames)
nbr_frames_no_ball_thresh = 30
# Distance threshold for ball tracking (pixels)
ball_track_dist_thresh = 100
# Maximum ball track length (detections)
max_track_length = 35



In [15]:
frame_save_path = 'saved_frames'
os.makedirs(frame_save_path, exist_ok=True)  # Create the directory if it doesn't exist

# Loop through the video frames
while cap.isOpened():

    # Read a frame from the video
    success, frame = cap.read()
    if not success:
        print("Failed to read frame or end of video reached.")
        break  # Exit the loop if no frame is read or end of video is reached

    frame = cv2.resize(frame, (window_width, window_height))

    # Save each frame to the folder
    frame_nbr += 1
    
    # Reset tactical map image for each new frame
    tac_map_copy = tac_map.copy()

    # Reset ball tracks
    if nbr_frames_no_ball>nbr_frames_no_ball_thresh:
            ball_track_history['dst'] = []
            ball_track_history['src'] = []

    # Process the frame if it was successfuly read
    if success:
        
        #################### Part 1 ####################
        # Object Detection & Coordiante Transofrmation #
        ################################################

        # Run YOLOv8 players inference on the frame
        results_players = model_players(frame, conf=player_model_conf_thresh)

        ## Extract detections information
        bboxes_p = results_players[0].boxes.xyxy.cpu().numpy()                          # Detected players, referees and ball (x,y,x,y) bounding boxes
        bboxes_p_c = results_players[0].boxes.xywh.cpu().numpy()                        # Detected players, referees and ball (x,y,w,h) bounding boxes    
        labels_p = list(results_players[0].boxes.cls.cpu().numpy())                     # Detected players, referees and ball labels list
        confs_p = list(results_players[0].boxes.conf.cpu().numpy())                     # Detected players, referees and ball confidence level
                                
        bboxes_p_c_0 = bboxes_p_c[[i==0 for i in labels_p],:]                       # Get bounding boxes information (x,y,w,h) of detected players (label 0)
        bboxes_p_c_2 = bboxes_p_c[[i==2 for i in labels_p],:]                       # Get bounding boxes information (x,y,w,h) of detected ball(s) (label 2)

        # Get coordinates of detected players on frame (x_cencter, y_center+h/2)
        detected_ppos_src_pts = bboxes_p_c_0[:,:2]  + np.array([[0]*bboxes_p_c_0.shape[0], bboxes_p_c_0[:,3]/2]).transpose()
        # Get coordinates of the first detected ball (x_center, y_center)
        detected_ball_src_pos = bboxes_p_c_2[0,:2] if bboxes_p_c_2.shape[0]>0 else None

        # Transform players coordinates from frame plane to tactical map plance using the calculated Homography matrix
        pred_dst_pts = []                                                           # Initialize players tactical map coordiantes list
        for pt in detected_ppos_src_pts:     
            # print("Coor:", pt)
            pt = np.append(np.array(pt), np.array([1]), axis=0)                     # Covert to homogeneous coordiantes
            dest_point = np.matmul(H, np.transpose(pt))                              # Apply homography transofrmation
            dest_point = dest_point/dest_point[2]           
            # print("map:", list(np.transpose(dest_point)[:2]))
            pred_dst_pts.append(list(np.transpose(dest_point)[:2]))                 # Update players tactical map coordiantes list
        pred_dst_pts = np.array(pred_dst_pts)

        # Transform ball coordinates from frame plane to tactical map plane using the calculated Homography matrix
        if detected_ball_src_pos is not None:
            pt = np.append(np.array(detected_ball_src_pos), np.array([1]), axis=0)
            dest_point = np.matmul(H, np.transpose(pt))
            dest_point = dest_point/dest_point[2]
            detected_ball_dst_pos = np.transpose(dest_point)

            # Update track ball position history
            if len(ball_track_history['src'])>0 :
                if np.linalg.norm(detected_ball_src_pos-ball_track_history['src'][-1])<ball_track_dist_thresh:
                    ball_track_history['src'].append((int(detected_ball_src_pos[0]), int(detected_ball_src_pos[1])))
                    ball_track_history['dst'].append((int(detected_ball_dst_pos[0]), int(detected_ball_dst_pos[1])))
                else:
                    ball_track_history['src']=[(int(detected_ball_src_pos[0]), int(detected_ball_src_pos[1]))]
                    ball_track_history['dst']=[(int(detected_ball_dst_pos[0]), int(detected_ball_dst_pos[1]))]
            else:
                ball_track_history['src'].append((int(detected_ball_src_pos[0]), int(detected_ball_src_pos[1])))
                ball_track_history['dst'].append((int(detected_ball_dst_pos[0]), int(detected_ball_dst_pos[1])))
                
        # Remove oldest tracked ball postion if track exceedes threshold        
        if len(ball_track_history) > max_track_length:
                ball_track_history['src'].pop(0)
                ball_track_history['dst'].pop(0)

        ######### Part 2 ########## 
        # Players Team Prediction #
        ###########################

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)                                      # Convert frame to RGB
        obj_palette_list = []                                                                   # Initialize players color palette list
        palette_interval = (0,5)                                                                # Color interval to extract from dominant colors palette (1rd to 5th color)
        annotated_frame = frame                                                                 # Create annotated frame 

        ## Loop over detected players (label 0) and extract dominant colors palette based on defined interval
        for i, j in enumerate(list(results_players[0].boxes.cls.cpu().numpy())):
            if int(j) == 0:
                bbox = results_players[0].boxes.xyxy.cpu().numpy()[i,:]                         # Get bbox info (x,y,x,y)
                obj_img = frame_rgb[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]       # Crop bbox out of the frame
                obj_img_w, obj_img_h = obj_img.shape[1], obj_img.shape[0]

                center_filter_x1 = np.max([(obj_img_w//2)-(obj_img_w//5), 1])
                center_filter_x2 = (obj_img_w//2)+(obj_img_w//5)
                center_filter_y1 = np.max([(obj_img_h//3)-(obj_img_h//5), 1])
                center_filter_y2 = (obj_img_h//3)+(obj_img_h//5)
                center_filter = obj_img[center_filter_y1:center_filter_y2, 
                                        center_filter_x1:center_filter_x2]

                # obj_pil_img = Image.fromarray(np.uint8(center_filter))                          # Convert to pillow image
                obj_pil_img = Image.fromarray(np.uint8(center_filter))                          # Convert to pillow image

                reduced = obj_pil_img.convert("P", palette=Image.Palette.WEB)                   # Convert to web palette (216 colors)
                palette = reduced.getpalette()                                                  # Get palette as [r,g,b,r,g,b,...]
                palette = [palette[3*n:3*n+3] for n in range(256)]                              # Group 3 by 3 = [[r,g,b],[r,g,b],...]
                color_count = [(n, palette[m]) for n,m in reduced.getcolors()]                  # Create list of palette colors with their frequency
                RGB_df = pd.DataFrame(color_count, columns = ['cnt', 'RGB']).sort_values(       # Create dataframe based on defined palette interval
                                      by = 'cnt', ascending = False).iloc[
                                          palette_interval[0]:palette_interval[1],:]
                palette = list(RGB_df.RGB)                                                      # Convert palette to list (for faster processing)
                annotated_frame = cv2.rectangle(annotated_frame,                                # Add center filter bbox annotations
                                                (int(bbox[0])+center_filter_x1, 
                                                 int(bbox[1])+ center_filter_y1),  
                                                (int(bbox[0])+center_filter_x2, 
                                                 int(bbox[1])+center_filter_y2), (0,0,0), 2)
                
                # Update detected players color palette list
                obj_palette_list.append(palette)
        
        ## Calculate distances between each color from every detected player color palette and the predefined teams colors
        players_distance_features = []
        # Loop over detected players extracted color palettes
        for palette in obj_palette_list:
            palette_distance = []
            palette_lab = [skimage.color.rgb2lab([i/255 for i in color]) for color in palette]  # Convert colors to L*a*b* space
            # Loop over colors in palette
            for color in palette_lab:
                distance_list = []
                # Loop over predefined list of teams colors
                for c in color_list_lab:
                    #distance = np.linalg.norm([i/255 - j/255 for i,j in zip(color,c)])
                    distance = skimage.color.deltaE_cie76(color, c)                             # Calculate Euclidean distance in Lab color space
                    distance_list.append(distance)                                              # Update distance list for current color
                palette_distance.append(distance_list)                                          # Update distance list for current palette
            players_distance_features.append(palette_distance)                                  # Update distance features list

        ## Predict detected players teams based on distance features
        players_teams_list = []
        # Loop over players distance features
        for distance_feats in players_distance_features:
            vote_list=[]
            # Loop over distances for each color 
            for dist_list in distance_feats:
                team_idx = dist_list.index(min(dist_list))//nbr_team_colors                     # Assign team index for current color based on min distance
                vote_list.append(team_idx)                                                      # Update vote voting list with current color team prediction
            players_teams_list.append(max(vote_list, key=vote_list.count))                      # Predict current player team by vote counting


        #################### Part 3 #####################
        # Updated Frame & Tactical Map With Annotations #
        #################################################

        ball_color_bgr = (0,0,255)                                                                          # Color (GBR) for ball annotation on tactical map
        j=0                                                                                                 # Initializing counter of detected players
        palette_box_size = 5                                                                            # Set color box size in pixels (for display)
        

        # Loop over all detected object by players detection model
        for i in range(bboxes_p.shape[0]):
            conf = confs_p[i]                                                                               # Get confidence of current detected object
            if labels_p[i]==0:                                                                              # Display annotation for detected players (label 0)
                
                # Display extracted color palette for each detected player
                palette = obj_palette_list[j]                                                               # Get color palette of the detected player
                for k, c in enumerate(palette):
                    c_bgr = c[::-1]                                                                         # Convert color to BGR
                    annotated_frame = cv2.rectangle(annotated_frame, (int(bboxes_p[i,2])+3,                 # Add color palette annotation on frame
                                                            int(bboxes_p[i,1])+k*palette_box_size),
                                                            (int(bboxes_p[i,2])+palette_box_size,
                                                            int(bboxes_p[i,1])+(palette_box_size)*(k+1)),
                                                              c_bgr, -1)

                team_name = list(colors_dic.keys())[players_teams_list[j]]                                  # Get detected player team prediction
                color_rgb = colors_dic[team_name][0]                                                        # Get detected player team color
                color_bgr = color_rgb[::-1]                                                                 # Convert color to bgr

                annotated_frame = cv2.rectangle(annotated_frame, (int(bboxes_p[i,0]), int(bboxes_p[i,1])),  # Add bbox annotations with team colors
                                                (int(bboxes_p[i,2]), int(bboxes_p[i,3])), color_bgr, 1)
                
                cv2.putText(annotated_frame, team_name + f" {conf:.2f}",                                    # Add team name annotations
                             (int(bboxes_p[i,0]), int(bboxes_p[i,1])-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                               color_bgr, 2)
                
                if color_bgr == (255, 255, 255):
                    color_bgr = (0,0,0) 
                
                # Add tactical map player postion color coded annotation
                tac_map_copy = cv2.circle(tac_map_copy, (abs(int(pred_dst_pts[j][0])),abs(int(pred_dst_pts[j][1]))),
                                          radius=5, color=color_bgr, thickness=-1)

                j+=1           
                                                                                             # Update players counter
            else:                                                                                           # Display annotation for otehr detections (label 1, 2)
                annotated_frame = cv2.rectangle(annotated_frame, (int(bboxes_p[i,0]), int(bboxes_p[i,1])),  # Add white colored bbox annotations
                                                 (int(bboxes_p[i,2]), int(bboxes_p[i,3])), (255,255,255), 1)
                cv2.putText(annotated_frame, labels_dic[int(labels_p[i])] + f" {conf:.2f}",                      # Add white colored label text annotations
                            (int(bboxes_p[i,0]), int(bboxes_p[i,1])-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                              (255,255,255), 2)

                # Add tactical map ball postion annotation if detected
                if detected_ball_src_pos is not None:
                    tac_map_copy = cv2.circle(tac_map_copy, (int(detected_ball_dst_pos[0]), 
                                                   int(detected_ball_dst_pos[1])), radius=5, 
                                                   color=ball_color_bgr, thickness=3)
        
        # Plot the ball tracks on tactical map
        if len(ball_track_history['src'])>0:
            points = np.hstack(ball_track_history['dst']).astype(np.int32).reshape((-1, 1, 2))
            tac_map_copy = cv2.polylines(tac_map_copy, [points], isClosed=False, color=(0, 0, 100), thickness=2)
        
        # Combine annotated frame and tactical map in one image with colored border separation
        border_color = [255,255,255]                                                                        # Set border color (BGR)
        annotated_frame=cv2.copyMakeBorder(annotated_frame, 40, 10, 10, 10,                                 # Add borders to annotated frame
                                            cv2.BORDER_CONSTANT, value=border_color)
        tac_map_copy = cv2.copyMakeBorder(tac_map_copy, 70, 50, 10, 10, cv2.BORDER_CONSTANT,                # Add borders to tactical map 
                                           value=border_color)     

        # Calculate the position to start the overlay (top right corner)
        start_y = 0
        start_x = annotated_frame.shape[1] - tac_map_copy.shape[1]  # Start at the right edge of the annotated_frame

        annotated_frame[start_y:start_y + tac_map_copy.shape[0], start_x:start_x + tac_map_copy.shape[1]] = tac_map_copy

        final_img = annotated_frame

        ## Add info annotation
        cv2.putText(final_img, "Press 'p' to pause & 'q' to quit", (300,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 2)

        new_frame_time = time.time()                                                                        # Get time after finished processing current frame
        fps = 1/(new_frame_time-prev_frame_time)                                                            # Calculate FPS as 1/(frame proceesing duration)
        prev_frame_time = new_frame_time                                                                    # Save current time to be used in next frame
        cv2.putText(final_img, "FPS: " + str(int(fps)), (20,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 2)
        
        # Display the final annotated frame
        cv2.imshow("YOLOv8 Players and Field Keypoints Detection with Team Prediction and Tactical Map",    
                    final_img)
        
        frame_filename = os.path.join(frame_save_path, f'frame_{frame_nbr:04d}.jpg')
        cv2.imwrite(frame_filename, final_img)  # Save the frame as JPEG file

        # Treat keyboard user inputs ("p" for pause/unpause & "q" for quit)
        key = cv2.waitKey(1)
        # Break the loop if 'q' is pressed
        if key == ord("q"):
            break
        if key == ord('p'):
            cv2.waitKey(-1) #wait until any key is pressed
    else:
        # Break the loop if the end of the video is reached
        break

# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()


0: 384x640 10 players, 1 referee, 386.1ms
Speed: 2.7ms preprocess, 386.1ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 players, 1 referee, 402.5ms
Speed: 2.0ms preprocess, 402.5ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 13 players, 1 referee, 382.5ms
Speed: 2.0ms preprocess, 382.5ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 players, 448.8ms
Speed: 2.1ms preprocess, 448.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 players, 684.8ms
Speed: 2.9ms preprocess, 684.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 players, 413.4ms
Speed: 2.2ms preprocess, 413.4ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 players, 1 referee, 456.7ms
Speed: 2.3ms preprocess, 456.7ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 players, 1 referee, 385.2ms
Spee

In [16]:
import cv2
import os
import numpy as np

def create_video_from_frames(frame_folder, output_video_path, fps=30, codec='XVID'):
    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*codec)
    frame_files = sorted([os.path.join(frame_folder, f) for f in os.listdir(frame_folder) if f.endswith('.jpg')])
    
    # Read the first frame to get the size
    first_frame = cv2.imread(frame_files[0])
    height, width, layers = first_frame.shape
    video = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    for frame_file in frame_files:
        frame = cv2.imread(frame_file)
        video.write(frame)
    
    video.release()

# Usage
frame_folder = 'saved_frames'  # Folder containing all the frames
output_video_path = 'output_video.avi'  # Path where the output video will be saved
fps = 20  # Frames per second of the output video

create_video_from_frames(frame_folder, output_video_path, fps)


[ERROR:0@1075.093] global /private/var/folders/nz/j6p8yfhx1mv_0grj5xl4650h0000gp/T/abs_d9lyif19nl/croot/opencv-suite_1676472756314/work/modules/videoio/src/cap.cpp (597) open VIDEOIO(CV_IMAGES): raised OpenCV exception:

OpenCV(4.6.0) /private/var/folders/nz/j6p8yfhx1mv_0grj5xl4650h0000gp/T/abs_d9lyif19nl/croot/opencv-suite_1676472756314/work/modules/videoio/src/cap_images.cpp:253: error: (-5:Bad argument) CAP_IMAGES: can't find starting number (in the name of file): output_video.avi in function 'icvExtractPattern'




In [9]:
import cv2
import os
import glob

# Path to the directory containing images
folder_path = 'saved_frames'
# Output video file
output_file = 'output_video.avi'
# Frame rate of the output video
fps = 15

# Retrieve a list of image filenames
images = glob.glob(f"{folder_path}/*.jpg")  # Adjust the extension if needed
images.sort()  # Sort the images by name

# Read the first image to get the dimensions
frame = cv2.imread(images[0])
height, width, layers = frame.shape

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'XVID')  # 'XVID' is the codec used for AVI files
out = cv2.VideoWriter(output_file, fourcc, fps, (width, height))

# Read each image and write it to the video
for image in images:
    frame = cv2.imread(image)
    out.write(frame)  # Write the frame to the video file

# Release the VideoWriter object
out.release()


In [None]:
print(cv2.__version__) #4.5.5.64

In [None]:
!pip install --upgrade opencv-python

In [None]:
!pip install opencv-python==4.5.5.64