# Installing Dependencies

In [None]:
!pip install -q ultralytics
!pip install -q supervision
!pip install -q opencv-python
!pip install -q pandas
!pip install -q numpy
!pip install -q scikit-learn
!pip install -q torchreid
!pip install -q torch

In [5]:
!pip install git+https://github.com/openai/CLIP.git

Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-wcfh9_6n
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-wcfh9_6n
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Preparing metadata (setup.py) ... [?25l[?25hdone


# Read and Save video

In [6]:
import cv2 

def read_video(video_path):
    cap = cv2.VideoCapture(video_path) #capture the given video
    if not cap.isOpened():
        raise FileNotFoundError(f"❌ Could not open video: {video_path}")
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret: # means if ret is false which tells that current thing is not a frame
            break # loop will break and video has ended
        frames.append(frame)
    return frames

def save_video(output_video_frames, output_video_path):
    #output_video_frames is list of frames
    
    fourcc = cv2.VideoWriter_fourcc(*'XVID') #format of video 
    # making the video
    out = cv2.VideoWriter(output_video_path, fourcc, 25, (output_video_frames[0].shape[1], output_video_frames[0].shape[0]))
    # output_video_frames[0].shape[1], output_video_frames[0].shape[0] --> height and width of frame
    for frame in output_video_frames:
                          out.write(frame) # write the frame to the videowriter
    out.release()





# Video_Utils

In [7]:
def get_center_of_bbox(bbox):
        x1,y1,x2,y2 = bbox
        return int((x1+x2)/2),int((y1+y2)/2)

    
    
def get_bbox_width(bbox):
        return bbox[2] - bbox[0]

# Tracking Of Objects

In [8]:
from ultralytics import YOLO
import supervision as sv
import pickle
import os
import numpy as np
import pandas as pd
import cv2
import sys 



class Tracker:
    def __init__(self, model_path):
        self.model = YOLO(model_path) 
        self.tracker = sv.ByteTrack()

    

    def detect_frames(self, frames):
        batch_size=20 
        detections = [] 
        for i in range(0,len(frames),batch_size):
            detections_batch = self.model.predict(frames[i:i+batch_size],conf=0.1)
            detections += detections_batch
        return detections

    def get_object_tracks(self, frames, read_from_stub=False, stub_path=None):
        
        if read_from_stub and stub_path is not None and os.path.exists(stub_path):
            with open(stub_path,'rb') as f:
                tracks = pickle.load(f)
            return tracks

        detections = self.detect_frames(frames)

        tracks={
            "players":[],
            "referees":[],
            "ball":[]
        }

        for frame_num, detection in enumerate(detections):
            cls_names = detection.names
            cls_names_inv = {v:k for k,v in cls_names.items()}

            # Covert to supervision Detection format
            detection_supervision = sv.Detections.from_ultralytics(detection)

            # Convert GoalKeeper to player object
            for object_ind , class_id in enumerate(detection_supervision.class_id):
                if cls_names[class_id] == "goalkeeper":
                    detection_supervision.class_id[object_ind] = cls_names_inv["player"]

            # Track Objects
            detection_with_tracks = self.tracker.update_with_detections(detection_supervision)

            tracks["players"].append({})
            tracks["referees"].append({})
            tracks["ball"].append({})

            for frame_detection in detection_with_tracks:
                bbox = frame_detection[0].tolist()
                cls_id = frame_detection[3]
                track_id = frame_detection[4]

                if cls_id == cls_names_inv['player']:
                    tracks["players"][frame_num][track_id] = {"bbox":bbox}
                
                if cls_id == cls_names_inv['referee']:
                    tracks["referees"][frame_num][track_id] = {"bbox":bbox}
            
            for frame_detection in detection_supervision:
                bbox = frame_detection[0].tolist()
                cls_id = frame_detection[3]

                if cls_id == cls_names_inv['ball']:
                    tracks["ball"][frame_num][1] = {"bbox":bbox}

        if stub_path is not None:
            with open(stub_path,'wb') as f:
                pickle.dump(tracks,f)

        return tracks

    
    def draw_traingle(self,frame,bbox,color):
        y= int(bbox[1])
        x,_ = get_center_of_bbox(bbox)

        triangle_points = np.array([
            [x,y],
            [x-10,y-20],
            [x+10,y-20],
        ])
        cv2.drawContours(frame, [triangle_points],0,color, cv2.FILLED)
        cv2.drawContours(frame, [triangle_points],0,(0,0,0), 2)

        return frame

    
    def draw_ellipse(self,frame,bbox,color,track_id=None):
        y2 = int(bbox[3])
        x_center, _ = get_center_of_bbox(bbox)
        width = get_bbox_width(bbox)

        cv2.ellipse(
            frame,
            center=(x_center,y2),
            axes=(int(width), int(0.35*width)),
            angle=0.0,
            startAngle=-45,
            endAngle=235,
            color = color,
            thickness=2,
            lineType=cv2.LINE_4
        )

        rectangle_width = 40
        rectangle_height=20
        x1_rect = x_center - rectangle_width//2
        x2_rect = x_center + rectangle_width//2
        y1_rect = (y2- rectangle_height//2) +15
        y2_rect = (y2+ rectangle_height//2) +15

        if track_id is not None:
            cv2.rectangle(frame,
                          (int(x1_rect),int(y1_rect) ),
                          (int(x2_rect),int(y2_rect)),
                          color,
                          cv2.FILLED)
            
            x1_text = x1_rect+12
            if track_id > 99:
                x1_text -=10
            
            cv2.putText(
                frame,
                f"{track_id}",
                (int(x1_text),int(y1_rect+15)),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.6,
                (0,0,0),
                2
            )

        return frame

    

    

    def draw_annotations(self,video_frames, tracks):
        output_video_frames= []
        for frame_num, frame in enumerate(video_frames):
            frame = frame.copy()

            player_dict = tracks["players"][frame_num]
            ball_dict = tracks["ball"][frame_num]
            referee_dict = tracks["referees"][frame_num]

            # Draw Players
            for track_id, player in player_dict.items():
                frame = self.draw_ellipse(frame, player['bbox'],(0,0,255), track_id)

            # Draw Referee
            for _, referee in referee_dict.items():
                frame = self.draw_ellipse(frame, referee["bbox"],(0,255,255))
                
            # Draw ball 
            for track_id, ball in ball_dict.items():
                frame = self.draw_traingle(frame, ball["bbox"],(0,255,0))
            

            output_video_frames.append(frame)

        return output_video_frames

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


# Saving the detections

In [None]:
# Read Video
video_frames = read_video('/kaggle/input/football-clip/15sec_input_720p.mp4')
print(f'number of frames : {len(video_frames)}')
# Initialize Tracker
tracker = Tracker('/kaggle/input/best.pt/pytorch/default/1/best.pt')

tracks = tracker.get_object_tracks(video_frames,
                                   read_from_stub=True,
                                   stub_path='/kaggle/working/track_players.pkl')

In [10]:
import pickle
with open('/kaggle/input/player-id/track_players.pkl','rb') as f:
    tracks = pickle.load(f)

len(tracks['players']) #number of frames

375

# Capturing Players Images

In [11]:
output_dir = "/kaggle/working/players_images"

# Iterate over frames
for frame_idx, frame_players in enumerate(tracks['players']):
    # Make a directory for this frame
    frame_folder = os.path.join(output_dir, f"frame_{frame_idx:04d}")
    os.makedirs(frame_folder, exist_ok=True)
    
    frame = video_frames[frame_idx]

    for track_id, player in frame_players.items():
        bbox = player['bbox']
        
        # Crop the player from the frame
        x1, y1, x2, y2 = map(int, bbox)
        cropped_image = frame[y1:y2, x1:x2]

        # Save the cropped image in the current frame's folder
        save_path = os.path.join(frame_folder, f"player_{track_id}.jpg")
        cv2.imwrite(save_path, cropped_image)

# Reidentification

## CLIP_Model

In [55]:
import os
import cv2
import numpy as np
from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity

import torch
import clip  

def assign_consistent_ids_clip(video_frames, tracks, cropped_dir, threshold):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    
    # Load CLIP model and preprocessing
    model, preprocess = clip.load("ViT-B/32", device=device)
    model.eval()

    reid = {}  # global_id: feature_vector
    # update_track_id's
    updated_track = {}
    updated_track['players'] = []
    updated_track['referees'] = tracks['referees']
    updated_track['ball'] = tracks['ball']

    

    for i in range(len(video_frames)):
        # entering the first frame folder
        frame_folder = os.path.join(cropped_dir, f"frame_{i:04d}")
        # output = /kaggle/working/players_images/frame_0000
        
        # creating a dictionary inside the list updated_tracks['players'] for each frame
        updated_track['players'].append({}) # creating 1 element of list that is dictionary 
        for filename in os.listdir(frame_folder):

            
            if not filename.endswith(".jpg"):
                continue
            # output - player_17.jpg
            
            # Original track ID
            track_id = int(filename.split('_')[1].split('.')[0])
            # output - 17

            # Image_path
            img_path = os.path.join(frame_folder, filename)
            #output - /kaggle/working/players_images/frame_0000/player_17.jpg

            # Preprocess and extract CLIP feature
            try:
                image = preprocess(Image.open(img_path).convert("RGB")).unsqueeze(0).to(device)
            except:
                continue

            with torch.no_grad():
                feature = model.encode_image(image)
                feature = feature / feature.norm(dim=-1, keepdim=True)  # L2 normalize

            feature_np = feature.cpu().numpy() # shape (1,512)
            feature_vector = feature_np.flatten() # shape(512)
            
            # First frame 
            if i == 0:
                reid[track_id] = feature_vector
                updated_track['players'][i][track_id] = tracks['players'][i][track_id]
            
            else:
                matched_id = None
               # Step 1: Direct match (track_id seen before)  
                matched_id = None
                if track_id in reid:
                    matched_id = track_id
                    
                else:
                    max_sim = 0 # max similarity score
                    # Step 2: Check for similarity score with existing global IDs
                    for rid, ref_feat in reid.items():
                        sim = cosine_similarity(feature_np, ref_feat.reshape(1, -1))[0][0]
                        if sim >= threshold:
                            if sim > max_sim:
                                max_sim = sim # update the max_sim score
                                matched_id = rid
                            

                
                # Step 3: Assign and update
                if matched_id is not None:
                    updated_track['players'][i][matched_id] = tracks['players'][i][track_id]
                    
                else:
                    new_id = max(reid.keys(), default=0) + 1
                    reid[new_id] = feature_vector
                    updated_track['players'][i][new_id] = tracks['players'][i][track_id]
                
   # saving the file
    import pickle

    with open('/kaggle/working/updated_track_id.pkl', 'wb') as f:
        pickle.dump(updated_track, f)

    with open('/kaggle/working/reid.pkl', 'wb') as f:
        pickle.dump(reid, f)

## OS_Net Model

In [None]:
#pip install torchreid

In [None]:
'''import torch
import torchreid

# Set device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Load model
model = torchreid.models.build_model(
    name='osnet_x1_0',
    num_classes=1000,  
    pretrained=False
)

# Load pre-trained weights
state_dict = torch.load("/kaggle/input/os_net/pytorch/default/1/osnet_x1_0_imagenet.pth", map_location=device)
model.load_state_dict(state_dict)

model.to(device)
model.eval()


from torchvision import transforms
from PIL import Image

transform = transforms.Compose([
    transforms.Resize((256, 128)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])


model

import os
import cv2
import numpy as np
from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity

import torch
import clip  # make sure clip is installed: !pip install git+https://github.com/openai/CLIP.git

def assign_consistent_ids_clip(video_frames, tracks, cropped_dir, threshold):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    
    

    reid = {}  # global_id: feature_vector
    updated_track = {}
    updated_track['players'] = []
    updated_track['referees'] = tracks['referees']
    updated_track['ball'] = tracks['ball']

    

    for i in range(len(video_frames)):
        # entering the first frame folder
        frame_folder = os.path.join(cropped_dir, f"frame_{i:04d}")
        # output = /kaggle/working/players_images/frame_0000
        
        # creating a dictionary inside the list updated_tracks['players'] for each frame
        updated_track['players'].append({}) # creating 1 element of list that is dictionary 
        for filename in os.listdir(frame_folder):

            
            if not filename.endswith(".jpg"):
                continue
            # output - player_17.jpg
            
            # Original track ID
            track_id = int(filename.split('_')[1].split('.')[0])
            # output - 17

            # Image_path
            img_path = os.path.join(frame_folder, filename)
            #output - /kaggle/working/players_images/frame_0000/player_17.jpg

            # Preprocess and extract CLIP feature
            try:
                img = Image.open(img_path).convert("RGB")
                img_tensor = transform(img).unsqueeze(0).to(device)
            except:
                continue

            with torch.no_grad():
                feature = model(img_tensor)
                
            
            feature_np = feature.cpu().numpy()
            feature_vector = feature_np.flatten()
            if i == 0:
                reid[track_id] = feature_vector
                updated_track['players'][i][track_id] = tracks['players'][i][track_id]
            
            else:
                matched_id = None
                
                # Step 1: Direct match (track_id seen before)    
                if track_id in reid:
                    matched_id = track_id
                    #if i == 2:
                        #print(f"Frame {i}, replacing track_id {track_id} with consistent ID {matched_id}")
                else:
                    # Step 2: Check for similarity with existing IDs
                    for rid, ref_feat in reid.items():
                        sim = cosine_similarity(feature_np, ref_feat.reshape(1, -1))[0][0]
                        if sim >= threshold:
                            matched_id = rid
                            #if i == 2:
                                #print(f"Frame {i}, replacing track_id {track_id} with consistent ID {matched_id}, sim = {sim:.2f}")
                            break
                # Step 3: Assign and update
                if matched_id is not None:
                    updated_track['players'][i][matched_id] = tracks['players'][i][track_id]
                else:
                    new_id = max(reid.keys(), default=0) + 1
                    reid[new_id] = feature_vector
                    updated_track['players'][i][new_id] = tracks['players'][i][track_id]
                
            

                    
        

   # saving the file
    import pickle

    with open('/kaggle/working/updated_track_id.pkl', 'wb') as f:
        pickle.dump(updated_track, f)

    with open('/kaggle/working/reid.pkl', 'wb') as f:
        pickle.dump(reid, f)'''

## Calling update function

In [56]:
updated_tracks = assign_consistent_ids_clip(
    video_frames=video_frames,
    tracks=tracks,
    cropped_dir="/kaggle/working/players_images",
    threshold=0.85
)


## Opening the update track_id file

In [57]:

with open('/kaggle/working/updated_track_id.pkl', 'rb') as f:
    updated_tracks = pickle.load(f)

In [58]:
len(updated_tracks['players'])

375

In [59]:

with open('/kaggle/working/reid.pkl', 'rb') as f:
    reid = pickle.load(f)

In [60]:
reid.keys()

dict_keys([16, 8, 13, 11, 14, 12, 5, 7, 9, 2, 4, 3, 10, 1, 6, 17, 18, 19, 20, 21, 22, 23])

# OUTPUT Video

In [61]:
 
tracker = Tracker('/kaggle/input/best.pt/pytorch/default/1/best.pt')
ouput_video_frames = tracker.draw_annotations(video_frames, updated_tracks)
# Save video
save_video(ouput_video_frames, '/kaggle/working/output_video.avi') # same video just as output