<a href="https://colab.research.google.com/github/zachrobel/Soccer_Analysis/blob/main/Final_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Initial Analysis
----
Initializes a YOLO object detection model. The original model is only partially accurate. Most detections are largely irrelevant for project desires.


In [None]:
!pip install ultralytics
!pip install roboflow
!pip install supervision

In [None]:
from ultralytics import YOLO

model = YOLO('yolov8x')

results = model.predict('Soccer1.mp4', save=True)
print(results[0])
print('===============================')

for box in results[0].boxes:
  print(box)


# Get Dataset
---
Uses Roboflow for dataset management and preparation with the YOLOv5 model for object detection training. It accessing labeled datasets from Roboflow, organizing data for model training, and configuring training parameters for YOLOv5.

In [None]:
from roboflow import Roboflow
rf = Roboflow(api_key="Zc10kKvu944d9TXcsnXx")
project = rf.workspace("roboflow-jvuqo").project("football-players-detection-3zvbc")
version = project.version(1)
dataset = version.download("yolov5")

In [None]:
import shutil

shutil.move('football-players-detection-1/train', 'football-players-detection-1/football-players-detection-1/train')
shutil.move('football-players-detection-1/test', 'football-players-detection-1/football-players-detection-1/test')
shutil.move('football-players-detection-1/valid', 'football-players-detection-1/football-players-detection-1/valid')


# Training
---
The parameter epochs=100 specifies that the YOLOv5 model will be trained for 100 epochs, meaning it will go through the entire training dataset 100 times during the training process

In [None]:
!yolo task=detect mode=train model=yolov5lu.pt data={dataset.location}/data.yaml epochs=100 imgsz=640


# Updated Model Detection
---
The new model is implemented here. The next steps will focus on utilizing the upgraded detections

In [None]:
from ultralytics import YOLO

model = YOLO('best-4.pt')

results = model.predict('Soccer1.mp4', save=True)
print(results[0])
print('===============================')

for box in results[0].boxes:
  print(box)


# Video Utils
---
Takes the path to a video file (video_path) as input. Initializes a VideoCapture object using OpenCV to read frames from the specified video . Iterates through each frame in the video using a while loop. For each iteration, it reads the next frame using the read method. If a frame is successfully read (ret is True), it appends the frame to the frames list. If no frame is read (i.e., ret is False, indicating the end of the video), the loop breaks. Returns the list of frames read from the video.

Takes a list of video frames (output_video_frames) and the desired output path (output_video_path) as input. Initializes VideoWriter object using OpenCV, specifying the output video path, codec (XVID), frame rate (24 frames per second), and frame size (same as the size of the first frame in output_video_frames). Iterates through each frame in output_video_frames, writes each frame to the output video using the write method of the out object, and finally releases the video.

In [None]:
!pip install opencv-python

In [None]:
import cv2

def read_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    return frames

In [None]:
def save_video(ouput_video_frames,output_video_path):
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_video_path, fourcc, 24, (ouput_video_frames[0].shape[1], ouput_video_frames[0].shape[0]))
    for frame in ouput_video_frames:
        out.write(frame)
    out.release()

# BBOX Utils
----
The get_center_of_bbox function takes a bounding box (bbox) as input.
Calculates the center coordinates of the bounding box by taking the average of the x-coordinates (x1 and x2) and the average of the y-coordinates (y1 and y2). The coordinates (x_center, y_center) are returned as integers.

The get_bbox_width function takes a bounding box (bbox) as input. It calculates the width of the bounding box by subtracting the x-coordinate of the top-left corner (bbox[0]) from the x-coordinate of the bottom-right corner (bbox[2]).

The measure_distance function takes two points (p1 and p2) as input.
It calculates the Euclidean distance between the two points using the distance formula.

In [None]:
import cv2
def get_center_of_bbox(bbox):
    x1,y1,x2,y2 = bbox
    return int((x1+x2)/2),int((y1+y2)/2)
def get_bbox_width(bbox):
    return bbox[2]-bbox[0]
def measure_distance(p1,p2):
    return ((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2)**0.5

# Team Assignment

----

The TeamAssigner class contains methods for assigning team colors to players based on their jersey colors.

The get_player_color method is called for each player's bounding box.
This method extracts the dominant color from the top half of the player's jersey using K-means clustering.
K-means clustering is performed on the pixel values of the top half of the player's jersey image to identify the dominant colors.
The cluster with the highest frequency is assumed to represent the player's team color.

Once the dominant colors are extracted for all players, K-means clustering is applied again to cluster these colors into two groups, representing the two teams.
The team colors are then assigned based on the centroids of these clusters.
Each player is assigned to a team based on the proximity of their jersey color to the centroids of the team color clusters.

For each player, their jersey color is compared to the centroids of the team color clusters.
The player is assigned to the team whose centroid is closest to their jersey color.
In cases where a player's jersey color is significantly different from both team colors, additional logic may be applied to handle such scenarios.


In [None]:
from sklearn.cluster import KMeans

class TeamAssigner:
    def __init__(self):
        self.team_colors = {}
        self.player_team_dict = {}

    def get_clustering_model(self,image):
        # Reshape the image to 2D array
        image_2d = image.reshape(-1,3)

        # Preform K-means with 2 clusters
        kmeans = KMeans(n_clusters=2, init="k-means++",n_init=1)
        kmeans.fit(image_2d)

        return kmeans

    def get_player_color(self,frame,bbox):
        image = frame[int(bbox[1]):int(bbox[3]),int(bbox[0]):int(bbox[2])]

        top_half_image = image[0:int(image.shape[0]/2),:]

        # Get Clustering model
        kmeans = self.get_clustering_model(top_half_image)

        # Get the cluster labels forr each pixel
        labels = kmeans.labels_

        # Reshape the labels to the image shape
        clustered_image = labels.reshape(top_half_image.shape[0],top_half_image.shape[1])

        # Get the player cluster
        corner_clusters = [clustered_image[0,0],clustered_image[0,-1],clustered_image[-1,0],clustered_image[-1,-1]]
        non_player_cluster = max(set(corner_clusters),key=corner_clusters.count)
        player_cluster = 1 - non_player_cluster

        player_color = kmeans.cluster_centers_[player_cluster]

        return player_color


    def assign_team_color(self,frame, player_detections):

        player_colors = []
        for _, player_detection in player_detections.items():
            bbox = player_detection["bbox"]
            player_color =  self.get_player_color(frame,bbox)
            player_colors.append(player_color)

        kmeans = KMeans(n_clusters=2, init="k-means++",n_init=10)
        kmeans.fit(player_colors)

        self.kmeans = kmeans

        self.team_colors[1] = kmeans.cluster_centers_[0]
        self.team_colors[2] = kmeans.cluster_centers_[1]


    def get_player_team(self,frame,player_bbox,player_id):
        if player_id in self.player_team_dict:
            return self.player_team_dict[player_id]

        player_color = self.get_player_color(frame,player_bbox)

        team_id = self.kmeans.predict(player_color.reshape(1,-1))[0]
        team_id+=1

        self.player_team_dict[player_id] = team_id

        return team_id

# Color Assignment Logic


In [None]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
from sklearn.cluster import KMeans

In [None]:
image_path = "stubs/cropped_.jpg"
image = cv2.imread(image_path)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

In [None]:
plt.imshow(image)
plt.show()

In [None]:
top_half_image=  image[0: int(image.shape[0]/2), :]
plt.imshow(top_half_image)
plt.show()

In [None]:
# Reshape the image into 2d array
image_2d = top_half_image.reshape(-1, 3)

# perform k-means clustering with 2 clusters
kmeans = KMeans(n_clusters=2, random_state=0)
kmeans.fit(image_2d)

# get the cluster labels
labels = kmeans.labels_

# reshape the labels into the orginal image shape
clustered_image = labels.reshape(top_half_image.shape[0], top_half_image.shape[1])

# Display the clustered image
plt.imshow(clustered_image)
plt.show()

In [None]:
corner_clusters = [clustered_image[0, 0], clustered_image[0, -1], clustered_image[-1, 0], clustered_image[-1, -1]]
non_player_cluster = max(set(corner_clusters), key=corner_clusters.count)
print(non_player_cluster)

In [None]:
player_cluster = 1-non_player_cluster
print(player_cluster)

In [None]:
kmeans.cluster_centers_[player_cluster]

# Player Ball Assignment
---

Here, the bounding box of the ball is extracted from each frame of the video.

For each player detected in the frame, the distance between the player's position (center of their bounding box) and the ball's position (center of its bounding box) is calculated.

The player closest to the ball within a certain threshold distance (e.g., max_player_ball_distance) is considered to be in possession of the ball.
If multiple players are within the threshold distance, the player closest to the ball is assigned possession.

The assigned player's ID is recorded for each frame. If no player is within the threshold distance of the ball, the possession status is maintained from the previous frame.


In [None]:
class PlayerBallAssigner():
    def __init__(self):
        self.max_player_ball_distance = 70

    def assign_ball_to_player(self,players,ball_bbox):
        ball_position = get_center_of_bbox(ball_bbox)

        miniumum_distance = 99999
        assigned_player=-1

        for player_id, player in players.items():
            player_bbox = player['bbox']

            distance_left = measure_distance((player_bbox[0],player_bbox[-1]),ball_position)
            distance_right = measure_distance((player_bbox[2],player_bbox[-1]),ball_position)
            distance = min(distance_left,distance_right)

            if distance < self.max_player_ball_distance:
                if distance < miniumum_distance:
                    miniumum_distance = distance
                    assigned_player = player_id

        return assigned_player

# Tracking
---

Takes the path to the YOLO model (model_path) as input and initializes a YOLO object for object detection.
Additionally, it initializes a ByteTrack object from the supervision library for object tracking.

Ball Position Interpolation (interpolate_ball_positions):

The interpolate_ball_positions method takes a list of ball positions across frames (ball_positions) as input. It interpolates missing or inaccurate ball positions to ensure a smoother trajectory. Missing positions are filled by linear interpolation between adjacent known positions. The method returns a list of interpolated ball positions.

Object Detection (detect_frames):

The detect_frames method performs object detection on a batch of frames.
It takes a list of frames as input and utilizes the YOLO model to detect objects within each frame. The method returns a list of detections, where each detection contains information about detected objects (bounding boxes, class labels, confidence scores, etc.).

Object Tracking (get_object_tracks):

The get_object_tracks method combines object detection results with object tracking to provide object tracks over multiple frames. It takes a list of frames (video_frames) as input along with optional parameters for reading from a stub and providing a stub path. It detects objects in each frame using the detect_frames method and then updates the object tracks using the ByteTrack object. The method returns tracks for players, referees, and the ball, containing information about their positions and identities across frames.

Ellipse Drawing (draw_ellipse):

The draw_ellipse method draws an ellipse around an object's bounding box.
It takes parameters such as the frame, bounding box (bbox), color, and optional track ID. The method draws an ellipse around the bounding box using OpenCV's ellipse function and adds a rectangle and text for the track ID.

Triangle Drawing (draw_triangle):

The draw_triangle method draws a triangle above an object's bounding box to indicate possession (e.g., player with the ball). Similar to draw_ellipse, it takes parameters such as the frame, bounding box (bbox), and color. The method draws a triangle above the bounding box using OpenCV's drawContours function.

Team Ball Control Visualization (draw_team_ball_control):

The draw_team_ball_control method visualizes ball control statistics for each team. It takes parameters such as the frame, frame number, and team ball control statistics. The method overlays semi-transparent rectangles on the frame to display ball control percentages for each team.

Annotations Drawing (draw_annotations):

The draw_annotations method draws annotations (ellipses, triangles, team ball control visuals) on each frame. It takes a list of video frames (video_frames), object tracks (tracks), and team ball control statistics as input. The method iterates through each frame, draws annotations for players, referees, and the ball using the draw_ellipse, draw_triangle, and draw_team_ball_control methods, and returns a list of annotated frames.

In [None]:
from ultralytics import YOLO
import supervision as sv
import pickle
import os
import numpy as np
import pandas as pd

class Tracker:
    def __init__(self, model_path):
        self.model = YOLO(model_path)
        self.tracker = sv.ByteTrack()

    def interpolate_ball_positions(self,ball_positions):
        ball_positions = [x.get(1,{}).get('bbox',[]) for x in ball_positions]
        df_ball_positions = pd.DataFrame(ball_positions,columns=['x1','y1','x2','y2'])

        # Interpolate missing values
        df_ball_positions = df_ball_positions.interpolate()
        df_ball_positions = df_ball_positions.bfill()

        ball_positions = [{1: {"bbox":x}} for x in df_ball_positions.to_numpy().tolist()]

        return ball_positions


    def detect_frames(self, frames):
        batch_size=20
        detections = []
        for i in range(0,len(frames),batch_size):
            detections_batch = self.model.predict(frames[i:i+batch_size],conf=0.1)
            detections += detections_batch
        return detections

    def get_object_tracks(self, frames, read_from_stub=False, stub_path=None):

        if read_from_stub and stub_path is not None and os.path.exists(stub_path):
            with open(stub_path,'rb') as f:
                tracks = pickle.load(f)

        detections = self.detect_frames(frames)

        tracks={
            "players":[],
            "referees":[],
            "ball":[]
        }

        for frame_num, detection in enumerate(detections):
            cls_names = detection.names
            cls_names_inv = {v:k for k,v in cls_names.items()}

            detection_supervision = sv.Detections.from_ultralytics(detection)

            # Convert GoalKeeper to player object
            for object_ind , class_id in enumerate(detection_supervision.class_id):
                if cls_names[class_id] == "goalkeeper":
                    detection_supervision.class_id[object_ind] = cls_names_inv["player"]

            detection_with_tracks = self.tracker.update_with_detections(detection_supervision)

            tracks["players"].append({})
            tracks["referees"].append({})
            tracks["ball"].append({})

            for frame_detection in detection_with_tracks:
                bbox = frame_detection[0].tolist()
                cls_id = frame_detection[3]
                track_id = frame_detection[4]

                if cls_id == cls_names_inv['player']:
                    tracks["players"][frame_num][track_id] = {"bbox":bbox}

                if cls_id == cls_names_inv['referee']:
                    tracks["referees"][frame_num][track_id] = {"bbox":bbox}

            for frame_detection in detection_supervision:
                bbox = frame_detection[0].tolist()
                cls_id = frame_detection[3]

                if cls_id == cls_names_inv['ball']:
                    tracks["ball"][frame_num][1] = {"bbox":bbox}


        if stub_path is not None:
            with open(stub_path,'wb') as f:
                pickle.dump(tracks,f)

        return tracks

    def draw_ellipse(self,frame,bbox,color,track_id=None):
        y2 = int(bbox[3])
        x_center, _ = get_center_of_bbox(bbox)
        width = get_bbox_width(bbox)

        cv2.ellipse(
            frame,
            center=(x_center,y2),
            axes=(int(width), int(0.35*width)),
            angle=0.0,
            startAngle=-45,
            endAngle=235,
            color = color,
            thickness=2,
            lineType=cv2.LINE_4
        )

        rectangle_width = 40
        rectangle_height=20
        x1_rect = x_center - rectangle_width//2
        x2_rect = x_center + rectangle_width//2
        y1_rect = (y2- rectangle_height//2) +15
        y2_rect = (y2+ rectangle_height//2) +15

        if track_id is not None:
            cv2.rectangle(frame,
                          (int(x1_rect),int(y1_rect) ),
                          (int(x2_rect),int(y2_rect)),
                          color,
                          cv2.FILLED)

            x1_text = x1_rect+12
            if track_id > 99:
                x1_text -=10

            cv2.putText(
                frame,
                f"{track_id}",
                (int(x1_text),int(y1_rect+15)),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.6,
                (0,0,0),
                2
            )

        return frame

    def draw_traingle(self,frame,bbox,color):
        y= int(bbox[1])
        x,_ = get_center_of_bbox(bbox)

        triangle_points = np.array([
            [x,y],
            [x-10,y-20],
            [x+10,y-20],
        ])
        cv2.drawContours(frame, [triangle_points],0,color, cv2.FILLED)
        cv2.drawContours(frame, [triangle_points],0,(0,0,0), 2)

        return frame
    def draw_team_ball_control(self,frame,frame_num,team_ball_control):
        # Draw a semi-transparent rectaggle
        overlay = frame.copy()
        cv2.rectangle(overlay, (1350, 850), (1900,970), (255,255,255), -1 )
        alpha = 0.4
        cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)

        team_ball_control_till_frame = team_ball_control[:frame_num+1]
        # Get the number of time each team had ball control
        team_1_num_frames = team_ball_control_till_frame[team_ball_control_till_frame==1].shape[0]
        team_2_num_frames = team_ball_control_till_frame[team_ball_control_till_frame==2].shape[0]
        team_1 = team_1_num_frames/(team_1_num_frames+team_2_num_frames)
        team_2 = team_2_num_frames/(team_1_num_frames+team_2_num_frames)

        cv2.putText(frame, f"Team 1 Ball Control: {team_1*100:.2f}%",(1400,900), cv2.TIMES_NEW_ROMAN, 1, (0,0,0), 3)
        cv2.putText(frame, f"Team 2 Ball Control: {team_2*100:.2f}%",(1400,950), cv2.TIMES_NEW_ROMAN, 1, (0,0,0), 3)

        return frame

    def draw_annotations(self, video_frames, tracks, team_ball_control):
        output_video_frames = []
        for frame_num, frame in enumerate(video_frames):
            frame = frame.copy()

            if frame_num < len(tracks["players"]):  # Check if frame_num is within bounds
                player_dict = tracks["players"][frame_num]
                ball_dict = tracks["ball"][frame_num]
                referee_dict = tracks["referees"][frame_num]

                for track_id, player in player_dict.items():
                    color = player.get('team_color',(0,0,225))
                    frame = self.draw_ellipse(frame, player['bbox'], color, track_id)

                    if player.get('has_ball',False):
                        frame = self.draw_traingle(frame, player["bbox"],(0,0,255))
                for _, referee in referee_dict. items ():
                    frame = self. draw_ellipse(frame, referee["bbox"], (0,255, 255))
                for track_id, ball in ball_dict.items():
                    frame = self.draw_traingle(frame, ball["bbox"],(0,255,0))

                # Draw Team Ball Control
                frame = self.draw_team_ball_control(frame, frame_num, team_ball_control)

            output_video_frames.append(frame)

        return output_video_frames


# Main
---
Orchestrates the entire process of analyzing a soccer match video, including object detection, object tracking, color assignment, player-ball assignment, and visualization


In [None]:
def main():
    video_frames = read_video('Soccer1.mp4')
    tracker = Tracker('best-4.pt')
    tracks = tracker.get_object_tracks(video_frames,
                                        read_from_stub=True,
                                       stub_path='stubs/track_stubs.pkl')
    for track_id, player in tracks['players'][0].items():
        bbox = player['bbox']
        frame = video_frames[0]
        # crop bbox from frame
        cropped_image = frame[int(bbox[1]):int (bbox[3]), int(bbox[0]):int(bbox[2])]
        # save the cropped image
        cv2.imwrite('stubs/cropped_image.jpg', cropped_image)

    tracks["ball"] = tracker.interpolate_ball_positions(tracks["ball"])

    team_assigner = TeamAssigner()
    team_assigner.assign_team_color(video_frames[0],
                                    tracks['players'][0])

    for frame_num, player_track in enumerate(tracks['players']):
        for player_id, track in player_track.items():
            team = team_assigner.get_player_team(video_frames[frame_num],
                                                track['bbox'],
                                                player_id)
            tracks['players'][frame_num][player_id]['team'] = team
            tracks['players'][frame_num][player_id]['team_color'] = team_assigner.team_colors[team]

    player_assigner =PlayerBallAssigner()
    team_ball_control= []
    for frame_num, player_track in enumerate(tracks['players']):
        ball_bbox = tracks['ball'][frame_num][1]['bbox']
        assigned_player = player_assigner.assign_ball_to_player(player_track, ball_bbox)

        if assigned_player != -1:
            tracks['players'][frame_num][assigned_player]['has_ball'] = True
            team_ball_control.append(tracks['players'][frame_num][assigned_player]['team'])
        else:
            team_ball_control.append(team_ball_control[-1])
    team_ball_control= np.array(team_ball_control)

    output_video_frames = tracker.draw_annotations(video_frames, tracks, team_ball_control)



    save_video(output_video_frames,'ouput_3.avi')

In [None]:
main()