In [1]:
from ultralytics import YOLO
import cv2
import supervision as sv
import numpy as np

In [2]:
model = YOLO('D:/projects/football_analysis/models/best_11l.pt')

In [3]:
cap = cv2.VideoCapture('D:/projects/football_analysis/input_videos/08fd33_4.mp4')
frames = []
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frames.append(frame)

In [4]:
len(frames)

750

In [5]:
frames[0].shape

(1080, 1920, 3)

In [6]:
frames[1].shape

(1080, 1920, 3)

In [7]:
batch_size = 20
detections = []
for i in range(0, len(frames), batch_size):
    detection_batch = model.predict(frames[i:i+batch_size], conf=0.1)
    detections += detection_batch


0: 384x640 1 ball, 1 goalkeeper, 21 players, 3 referees, 668.2ms
1: 384x640 1 ball, 1 goalkeeper, 21 players, 3 referees, 668.2ms
2: 384x640 1 goalkeeper, 19 players, 3 referees, 668.2ms
3: 384x640 1 ball, 1 goalkeeper, 20 players, 3 referees, 668.2ms
4: 384x640 1 goalkeeper, 19 players, 3 referees, 668.2ms
5: 384x640 1 goalkeeper, 20 players, 3 referees, 668.2ms
6: 384x640 1 goalkeeper, 21 players, 3 referees, 668.2ms
7: 384x640 1 goalkeeper, 20 players, 3 referees, 668.2ms
8: 384x640 1 ball, 1 goalkeeper, 20 players, 3 referees, 668.2ms
9: 384x640 1 ball, 1 goalkeeper, 19 players, 3 referees, 668.2ms
10: 384x640 1 goalkeeper, 22 players, 3 referees, 668.2ms
11: 384x640 21 players, 3 referees, 668.2ms
12: 384x640 20 players, 3 referees, 668.2ms
13: 384x640 21 players, 3 referees, 668.2ms
14: 384x640 1 ball, 22 players, 3 referees, 668.2ms
15: 384x640 1 ball, 20 players, 3 referees, 668.2ms
16: 384x640 19 players, 3 referees, 668.2ms
17: 384x640 1 ball, 19 players, 3 referees, 668.2ms

In [8]:
len(detections)

750

In [9]:
type(detections[0])

ultralytics.engine.results.Results

In [10]:
tracks = {
    "players":[],
    "referees":[],
    "ball":[]
}

In [11]:
tracker = sv.ByteTrack()

In [12]:
for frame_num, detection in enumerate(detections):
    cls_names = detection.names
    cls_names_inv = {v:k for k, v in cls_names.items()}

    detection_supervision = sv.Detections.from_ultralytics(detection)
    
    for obj_idx, class_id in enumerate(detection_supervision.class_id):
        if cls_names[class_id] == "goalkeeper":
            detection_supervision.class_id[obj_idx] = cls_names_inv["player"]

    detection_with_tracks = tracker.update_with_detections(detection_supervision)

    tracks["players"].append({})
    tracks["referees"].append({})
    tracks["ball"].append({})

    for frame_detection in detection_with_tracks:
        bbox = frame_detection[0].tolist()
        cls_id = frame_detection[3]
        track_id = frame_detection[4]

        if cls_id == cls_names_inv["player"]:
            tracks["players"][frame_num][track_id] = {"bbox":bbox}

        if cls_id == cls_names_inv["referee"]:
            tracks["referees"][frame_num][track_id] = {"bbox":bbox}

    for frame_detection in detection_supervision:
        bbox = frame_detection[0].tolist()
        cls_id = frame_detection[3]

        if cls_id == cls_names_inv["ball"]:
            tracks["ball"][frame_num][1] = {"bbox":bbox}

In [23]:
import pandas as pd

In [24]:
ball_positions = [x.get(1, {}).get("bbox", []) for x in tracks["ball"]]
df_ball_positions = pd.DataFrame(ball_positions, columns=['x1','y1','x2','y2'])

# Interpolate missing values
df_ball_positions = df_ball_positions.interpolate() 
# If missing detection is first one than it will not interploate so we will replace with nearest detection
df_ball_positions = df_ball_positions.bfill()

tracks["ball"] = [{1: {"bbox":x}} for x in df_ball_positions.to_numpy().tolist()]

In [25]:
def get_center_of_bbox(bbox):
    x1,y1,x2,y2 = bbox
    return int((x1+x2)/2), int((y1+y2)/2)

def get_bbox_width(bbox):
    return bbox[2] - bbox[0]

In [26]:
def draw_ellipse(frame, bbox, color, track_id=None): # Drawing ellipse
    y2 = int(bbox[3]) # y2 is the bottom
    x_center,_ = get_center_of_bbox(bbox) # center of the x axis
    width = get_bbox_width(bbox) # Width of ellipse

    cv2.ellipse(frame,
                center=(x_center, y2),
                axes=(int(width), int(0.35*width)), # minor axis will be 35% of major axis.
                angle=0.0,
                startAngle=-45, # ellipse drawing will start from 45 degrees
                endAngle=235,   # and end before 235 degrees
                color=color,
                thickness=2,
                lineType=cv2.LINE_4
                )

    rectangle_width = 40
    rectangle_height = 20
    x1_rect = x_center - rectangle_width//2 # Top left corner of the rectangle
    x2_rect = x_center + rectangle_width//2 # Bottom right corner of the rectangle
    y1_rect = (y2 - rectangle_height//2) + 15 # Just random buffer 
    y2_rect = (y2 + rectangle_height//2) + 15

    if track_id is not None:
        cv2.rectangle(frame,
                        (int(x1_rect),int(y1_rect)),
                        (int(x2_rect),int(y2_rect)),
                        color,
                        cv2.FILLED # Filled Rectangle
                        )
        x1_text = x1_rect + 12
        y1_text = y1_rect + 15
        if track_id > 99:
            x1_text -= 10 

        cv2.putText(
            frame,
            f"{track_id}",
            (int(x1_text),int(y1_text)),
            cv2.FONT_HERSHEY_SIMPLEX, # Font type
            0.6, # Font ratio
            (0,0,0), # Black Color
            2 # Thickness
        )
    return frame

In [27]:
def draw_triangle(frame, bbox, color): # Inverted triangle
    y = int(bbox[1])
    x,_ = get_center_of_bbox(bbox)

    triangle_points = np.array([[x,y],
                                [x-10,y-20],
                                [x+10,y-20]
    ])
    cv2.drawContours(frame, [triangle_points],0,color, cv2.FILLED) # Drawing filled triangle
    cv2.drawContours(frame, [triangle_points],0,(0,0,0), 2) # Drawing border for triangle
    return frame

In [28]:
from sklearn.cluster import KMeans

In [29]:
def get_clustering_model(image):
    # Reshape image into 2d array
    image_2d = image.reshape(-1, 3)

    # Perform K-means with 2 clusters
    kmeans = KMeans(n_clusters=2, init='k-means++',n_init=1)
    kmeans.fit(image_2d)
    return kmeans

def get_player_color(frame, bbox):
    image = frame[int(bbox[1]):int(bbox[3]),int(bbox[0]):int(bbox[2])]
    top_half_img = image[0: int(image.shape[0]/2),:]

    # Getting Clustering Model 
    kmeans = get_clustering_model(top_half_img)
    
    # Get the cluster labels for each pixel
    labels = kmeans.labels_

    # Reshape the labels to the image shape
    clustered_image = labels.reshape(top_half_img.shape[0],top_half_img.shape[1])

    # Get the player cluster
    corner_clusters = [clustered_image[0,0], clustered_image[0,-1], clustered_image[-1,0], clustered_image[-1,-1]]
    non_player_cluster = max(set(corner_clusters), key= corner_clusters.count)
    player_cluster = 1 - non_player_cluster
    player_color = kmeans.cluster_centers_[player_cluster]

    return player_color

def assign_team_color(frame, player_detections):

    player_colors = []
    for _, player_detection in player_detections.items():
        bbox = player_detection['bbox']
        player_color = get_player_color(frame, bbox)
        player_colors.append(player_color)
    
    # Clustering into white and green
    kmeans = KMeans(n_clusters=2, init='k-means++',n_init=1)
    kmeans.fit(player_colors)
    return kmeans, kmeans.cluster_centers_[0], kmeans.cluster_centers_[1]

In [30]:
team_colors = {}
player_team_dict = {}

kmeans, team_colors[1], team_colors[2] = assign_team_color(frames[0], tracks['players'][0])

def get_player_team(frame, player_bbox, player_id, kmeans):
    if player_id in player_team_dict:
        return player_team_dict[player_id]
    
    player_color = get_player_color(frame, player_bbox)

    team_id = kmeans.predict(player_color.reshape(1,-1))[0]
    team_id += 1

    player_team_dict[player_id] = team_id

    return team_id

for frame_num, player_track in enumerate(tracks['players']):
    for player_id, track in player_track.items():
        team = get_player_team(frames[frame_num], track['bbox'], player_id, kmeans)
        tracks['players'][frame_num][player_id]['team'] = team # Assigning team
        tracks['players'][frame_num][player_id]['team_color'] = team_colors[team]

In [31]:
output_video_frames = []
for frame_num, frame in enumerate(frames):
    frame = frame.copy()
    player_dict = tracks['players'][frame_num]
    referee_dict = tracks['referees'][frame_num]
    ball_dict = tracks['ball'][frame_num]
    for track_id, player in player_dict.items():
        color = player.get("team_color", (0,0,255))
        frame = draw_ellipse(frame, player["bbox"], color, track_id)
    for _ , referee in referee_dict.items():
        framee = draw_ellipse(frame, referee["bbox"], (0, 255, 255))
    for track_id, ball in ball_dict.items():
        frame = draw_triangle(frame, ball["bbox"], (0, 255, 0))
    output_video_frames.append(frame)

#### For screenshot of player

In [30]:
for track_id, player in tracks['players'][0].items():
    bbox = player['bbox']
    frame = frames[0]

    # crop bbox from frame
    cropped_image = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]

    # save the cropped image
    cv2.imwrite('cropped_img.jpg', cropped_image)

    break

In [32]:
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('output_video.avi', fourcc, 24, (output_video_frames[0].shape[1], output_video_frames[0].shape[0]))
for frame in output_video_frames:
    out.write(frame)
out.release()