Author: Mark Ssenoga

In [None]:
%pip install SoccerNet opencv-python ultralytics scikit-learn deep-sort-realtime numpy

In [None]:
from SoccerNet.Downloader import SoccerNetDownloader

mySoccerNetDownloader = SoccerNetDownloader(
    LocalDirectory="/content/drive/MyDrive/Research and Development Project/Datasets"
)

# Download SoccerNet videos
mySoccerNetDownloader.password = ""
mySoccerNetDownloader.downloadRAWVideo(dataset="SoccerNet-Tracking")

In [None]:
import cv2
import numpy as np
from deep_sort_realtime.deepsort_tracker import DeepSort
from sklearn.cluster import KMeans
from ultralytics import YOLO

# Load YOLO models
model = YOLO('yolo11x.pt')
pose_model = YOLO('yolo11x-pose.pt')

# Initialize DeepSORT
deep_sort = DeepSort()

# Pose descriptor memory buffer
pose_memory = {}

# Video paths
video_path = "/content/drive/MyDrive/Research and Development Project/Datasets/1/1.mkv"
output_path = "output_processed.mp4"

# Frame limit
TARGET_FRAME_COUNT = 500

# Confidence threshold
CONFIDENCE_THRESHOLD = 0.5

# Color distance threshold
COLOR_DISTANCE_THRESHOLD = 50

# Pose descriptor parameters
POSE_MEMORY_DURATION_FRAMES = 250
POSE_MATCH_THRESHOLD = 0.25

# Normalize pose keypoints relative to bounding box size
def normalize_keypoints(keypoints, bbox):
    x1, y1, x2, y2 = bbox
    width = max(x2 - x1, 1)
    height = max(y2 - y1, 1)
    normalized = [(x / width, y / height) for (x, y, _) in keypoints]
    return np.array(normalized).flatten()

# Store a pose descriptor in memory for a given track ID
def store_pose_descriptor(track_id, descriptor, frame_number):
    pose_memory[track_id] = (descriptor, frame_number)

# Remove old pose descriptors that exceed memory duration
def clean_old_pose_descriptors(current_frame):
    old_ids = [tid for tid, (_, fnum) in pose_memory.items()
               if current_frame - fnum > POSE_MEMORY_DURATION_FRAMES]
    for tid in old_ids:
        del pose_memory[tid]

# Find the best matching stored descriptor for a new descriptor
def find_best_pose_match(new_descriptor, current_frame):
    best_id = None
    best_dist = float('inf')
    for track_id, (stored_desc, frame_num) in pose_memory.items():
        if current_frame - frame_num <= POSE_MEMORY_DURATION_FRAMES:
            dist = np.linalg.norm(new_descriptor - stored_desc)
            if dist < best_dist and dist < POSE_MATCH_THRESHOLD:
                best_dist = dist
                best_id = track_id
    return best_id

# Helper to compute dominant color in HSV
def get_dominant_color(image, k=1):
    data = image.reshape((-1, 3))
    if len(data) == 0:
        return None
    kmeans = KMeans(n_clusters=k, random_state=42).fit(data)
    return kmeans.cluster_centers_[0]

# Function to draw pose keypoints within bounding box
def draw_pose_keypoints(frame, keypoints, bbox, confidence_threshold=0.5):
    x1, y1, x2, y2 = bbox

    for (x, y, conf) in keypoints:
        if conf > confidence_threshold:
            # Transform keypoint coordinates relative to the bounding box
            rel_x = int(x1 + x)
            rel_y = int(y1 + y)

            # Draw a circle for the keypoint
            cv2.circle(frame, (rel_x, rel_y), 5, (0, 0, 255), -1)

    return frame

# Main video processing
def process_video(video_path, output_path, skip_seconds, debug=False):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    if skip_seconds > 0:
        print(f"Skipping first {skip_seconds} seconds...")
        for _ in range(int(fps) * skip_seconds):
            cap.read()

    frame_count = 0
    color_samples = []
    locked_team_colors = None  # Will hold fixed centroids after enough samples
    all_confidences = []  # Collects confidence scores for YOLO player detections

    while frame_count < TARGET_FRAME_COUNT:
        success, frame = cap.read()
        if not success:
            break

        # YOLO Player Detection
        results = model(frame, verbose=False)

        detections = results[0].boxes.data.cpu().numpy() \
                     if results and results[0].boxes else []
        players = [det for det in detections
            if int(det[5]) == 0 and float(det[4]) >= CONFIDENCE_THRESHOLD
        ]

        # Prepare the list of detections for Deep SORT
        bboxes = []
        scores = []
        for det in players:
            x1, y1, x2, y2 = map(int, det[:4])
            conf = float(det[4])
            bboxes.append([x1, y1, x2, y2])
            scores.append(conf)
            all_confidences.append(conf)

        # Update Deep SORT tracker
        trackers = deep_sort.update_tracks(
            [[(x1, y1, x2 - x1, y2 - y1), score, 'player']
             for (x1, y1, x2, y2), score in zip(bboxes, scores)],
            frame=frame
        ) if bboxes else []

        track_list = []
        for tracker in trackers:
            if not tracker.is_confirmed():
                continue # Skip unconfirmed tracks

            x1, y1, x2, y2 = map(int, tracker.to_tlbr())
            player_conf = tracker.det_conf if tracker.det_conf is not None else 0.0

            if player_conf == 0.0:  # Skip low confidence
                continue

            # Extract pose keypoints (from YOLO pose model)
            player_frame = frame[y1:y2, x1:x2]
            descriptor = None
            matched_track_id = None

            if player_frame.size != 0:
                pose_results = pose_model(player_frame, verbose=False)

                if pose_results and pose_results[0].keypoints:
                    pose_keypoints = pose_results[0].keypoints.cpu().numpy().data[0]

                    # Normalize keypoints for descriptor
                    descriptor = normalize_keypoints(pose_keypoints, (x1, y1, x2, y2))

                    # Re-identify if possible
                    matched_track_id = find_best_pose_match(descriptor, frame_count)

                    # Draw pose keypoints on frame
                    if debug:
                        frame = draw_pose_keypoints(frame, pose_keypoints, (x1, y1, x2, y2))

            # Assign consistent track ID if re-identified
            if matched_track_id is not None and matched_track_id < tracker.track_id:
                if debug:
                    print(
                        f"[Frame {frame_count}] "
                        f"Re-identified player - new ID: {tracker.track_id} -> {matched_track_id}"
                    )
                tracker.reid_id = matched_track_id
            else:
                if not hasattr(tracker, 'reid_id'):
                    tracker.reid_id = tracker.track_id

            # Store/update descriptor for current player
            if descriptor is not None:
                store_pose_descriptor(tracker.track_id, descriptor, frame_count)

            # Crop torso for jersey region
            roi_x1 = x1 + int(0.2 * (x2 - x1))
            roi_x2 = x1 + int(0.8 * (x2 - x1))
            roi_y1 = y1 + int(0.2 * (y2 - y1))
            roi_y2 = y1 + int(0.6 * (y2 - y1))

            jersey_crop = frame[roi_y1:roi_y2, roi_x1:roi_x2]
            dom_color = None

            if jersey_crop.size != 0 and roi_y2 > roi_y1 and roi_x2 > roi_x1:
                hsv_crop = cv2.cvtColor(jersey_crop, cv2.COLOR_BGR2HSV)
                dom_color = get_dominant_color(hsv_crop)

                if dom_color is not None:
                    if locked_team_colors is None:
                        color_samples.append(dom_color)
                    else:
                        # Only include in samples if color matches one of the teams
                        dists = [
                            np.linalg.norm(dom_color - team_color)
                            for team_color in locked_team_colors
                        ]

                        if min(dists) <= COLOR_DISTANCE_THRESHOLD:
                            color_samples.append(dom_color)
                        else:
                            # This color does not match any known team â€“ exclude from team assignment
                            dom_color = None

            tracker.color = dom_color
            track_list.append(tracker)

        # Lock team colors after collecting enough samples
        if locked_team_colors is None and len(color_samples) >= 50:
            kmeans = KMeans(n_clusters=2, random_state=42).fit(color_samples)
            locked_team_colors = kmeans.cluster_centers_

        team_colors = locked_team_colors if locked_team_colors is not None else []

        # Draw players with label
        for tracker in track_list:
            x1, y1, x2, y2 = map(int, tracker.to_tlbr())
            track_id = getattr(tracker, 'reid_id', tracker.track_id)
            dom_color = tracker.color
            player_conf = tracker.det_conf

            player_label = f"Player {player_conf:.2f}"

            team_label = ""
            box_color = (255, 255, 255)

            if dom_color is not None and len(team_colors) == 2:
                dists = [
                    np.linalg.norm(dom_color - team_color)
                    for team_color in team_colors
                ]

                if min(dists) <= COLOR_DISTANCE_THRESHOLD:
                    team_idx = int(np.argmin(dists))
                    team_label = f"Team {'A' if team_idx == 0 else 'B'} #{track_id}"

            # Draw bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)

            label_y = y1 - 10

            if team_label:
                # Draw two-line label: player on top, team below
                cv2.putText(frame, player_label, (x1, label_y - 20),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
                cv2.putText(frame, team_label, (x1, label_y),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
            else:
                # Draw one-line label only
                cv2.putText(frame, player_label, (x1, label_y),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)

        # Overlay current frame number (top-left)
        if debug:
            cv2.putText(frame, f"Frame: {frame_count}", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)

        out.write(frame)
        frame_count += 1

        clean_old_pose_descriptors(frame_count)

        if frame_count % 100 == 0:
            print(f"Processed {frame_count} frames...")

    cap.release()
    out.release()
    print(f"Processing complete. Output saved to: {output_path}\n")

    # Display performance metrics
    if all_confidences:
        print("YOLO Detection Confidence Stats (players only):")
        print(f"  Mean: {np.mean(all_confidences):.4f}")
        print(f"  Median: {np.median(all_confidences):.4f}")
        print(f"  Std Dev: {np.std(all_confidences):.4f}")
        print(f"  Max: {np.max(all_confidences):.4f}")
        print(f"  Min: {np.min(all_confidences):.4f}")
    else:
        print("No valid player detections found to compute confidence stats.")

# Run the processing function
process_video(video_path, output_path, skip_seconds=70, debug=False)

In [None]:
# from google.colab import files
# files.download('output_processed.mp4')