In [12]:
!pip install ultralytics opencv-python deep_sort_realtime

Collecting deep_sort_realtime
  Downloading deep_sort_realtime-1.3.2-py3-none-any.whl.metadata (12 kB)
Downloading deep_sort_realtime-1.3.2-py3-none-any.whl (8.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.4/8.4 MB[0m [31m62.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deep_sort_realtime
Successfully installed deep_sort_realtime-1.3.2


# Save the data in the csv

## upload the video data and all

In [49]:
import cv2
import csv
import os
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# Load YOLOv5x model
model = YOLO("best.pt")
class_names = model.names  # ['ball', 'goalkeeper', 'player', 'referee']

# Open video
cap = cv2.VideoCapture("tacticam.mp4")
width, height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter("output.mp4", fourcc, fps, (width, height))

# Initialize Deep SORT tracker for players only
tracker = DeepSort(max_age=30, n_init=2)

# CSV Setup
csv_path = "tracking_output.csv"
header = ["Frame", "Ball_X", "Ball_Y", "Ball_Vx", "Ball_Vy"]
for i in range(1, 4):
    header += [f"Ref_{i}_X", f"Ref_{i}_Y"]
header += ["GK_X", "GK_Y"]
for i in range(1, 23):
    header += [f"Player_{i}_X", f"Player_{i}_Y"]

f = open(csv_path, mode='w', newline='')
csv_writer = csv.writer(f)
csv_writer.writerow(header)

# Track ID mapping for players
player_id_map = {}  # track_id -> 1..22
next_player_slot = 1

# Ball tracking state
prev_ball_pos = None
ball_velocity = (0, 0)
frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model(frame, verbose=False)[0]
    detections = []

    # Storage for non-player detections
    ball_pos = (0, 0)
    gk_pos = (0, 0)
    ref_positions = []
    ball_box = None
    ref_boxes = []
    gk_box = None

    for box in results.boxes:
        cls_id = int(box.cls[0])
        conf = float(box.conf[0])
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        cx = (x1 + x2) // 2
        cy = (y1 + y2) // 2
        label = class_names[cls_id]

        if label == "player":
            w, h = x2 - x1, y2 - y1
            detections.append(([x1, y1, w, h], conf, "player"))
        elif label == "ball":
            ball_pos = (cx, cy)
            ball_box = (x1, y1, x2, y2)
        elif label == "referee":
            if len(ref_positions) < 3:
                ref_positions.append((cx, cy))
                ref_boxes.append((x1, y1, x2, y2))
        elif label == "goalkeeper":
            gk_pos = (cx, cy)
            gk_box = (x1, y1, x2, y2)

    # Track players
    tracks = tracker.update_tracks(detections, frame=frame)
    player_centroids = [(0, 0)] * 22

    for track in tracks:
        if not track.is_confirmed() or track.get_det_class() != "player":
            continue

        track_id = track.track_id
        ltrb = track.to_ltrb()
        x1, y1, x2, y2 = map(int, ltrb)
        cx, cy = (x1 + x2) // 2, (y1 + y2) // 2

        # Assign slot 1..22 for each unique track_id
        if track_id not in player_id_map and next_player_slot <= 22:
            player_id_map[track_id] = next_player_slot
            next_player_slot += 1

        if track_id in player_id_map:
            idx = player_id_map[track_id] - 1
            player_centroids[idx] = (cx, cy)
            label = f"player #{player_id_map[track_id]}"
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

    # Draw boxes for non-tracked classes
    if ball_box:
        cv2.rectangle(frame, (ball_box[0], ball_box[1]), (ball_box[2], ball_box[3]), (0, 0, 255), 2)
        cv2.putText(frame, "ball", (ball_box[0], ball_box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
    for i, ref_box in enumerate(ref_boxes):
        cv2.rectangle(frame, (ref_box[0], ref_box[1]), (ref_box[2], ref_box[3]), (255, 255, 0), 2)
        cv2.putText(frame, f"referee {i+1}", (ref_box[0], ref_box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
    if gk_box:
        cv2.rectangle(frame, (gk_box[0], gk_box[1]), (gk_box[2], gk_box[3]), (255, 0, 255), 2)
        cv2.putText(frame, "goalkeeper", (gk_box[0], gk_box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 2)

    # Ball velocity calculation
    if prev_ball_pos and ball_pos != (0, 0):
        ball_velocity = (ball_pos[0] - prev_ball_pos[0], ball_pos[1] - prev_ball_pos[1])
    elif ball_pos == (0, 0) and prev_ball_pos:
        # Predict missing ball
        ball_pos = (prev_ball_pos[0] + ball_velocity[0], prev_ball_pos[1] + ball_velocity[1])

    prev_ball_pos = ball_pos if ball_pos != (0, 0) else prev_ball_pos

    # Write row to CSV
    row = [frame_idx, *ball_pos, *ball_velocity]
    for i in range(3):
        if i < len(ref_positions):
            row += list(ref_positions[i])
        else:
            row += [0, 0]
    row += list(gk_pos)
    for cx, cy in player_centroids:
        row += [cx, cy]
    csv_writer.writerow(row)

    # Write annotated frame
    out.write(frame)
    frame_idx += 1

cap.release()
out.release()
f.close()
print("Video and CSV saved successfully.")


Video and CSV saved successfully.


In [53]:
import numpy as np
import cv2
import csv
from scipy.optimize import linear_sum_assignment
from collections import deque
from deep_sort_realtime.deepsort_tracker import DeepSort
from ultralytics import YOLO

# Load reference CSV data from broadcast video
ref_data = []
with open("tracking_output.csv", newline='') as f:
    reader = csv.reader(f)
    headers = next(reader)
    for row in reader:
        ref_data.append([int(row[0])] + list(map(float, row[1:])))

ref_data = {row[0]: row[1:] for row in ref_data}  # frame -> data

# Load YOLO model
model = YOLO("best.pt")
class_names = model.names

# Open second video (tacticam)
cap = cv2.VideoCapture("tacticam.mp4")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter("tacticam_matched.mp4", fourcc, fps, (width, height))

# Tracker
tracker = DeepSort(max_age=30, n_init=2)

# Save matched centroids
csv_out = open("matched_output.csv", 'w', newline='')
csv_writer = csv.writer(csv_out)
csv_writer.writerow(["Frame"] + [f"Player_{i}_X" for i in range(1, 41)] + [f"Player_{i}_Y" for i in range(1, 41)])

frame_idx = 0
ball_vel_history = deque(maxlen=3)
ball_pos_history = deque(maxlen=3)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    results = model(frame, verbose=False)[0]

    players = []
    boxes = []
    ball = None
    referee = None
    goalkeeper = None

    for box in results.boxes:
        cls_id = int(box.cls[0])
        conf = float(box.conf[0])
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
        label = class_names[cls_id]

        if label == "player":
            players.append((cx, cy))
            boxes.append((x1, y1, x2, y2))
        elif label == "ball":
            ball = (cx, cy)
        elif label == "referee":
            referee = (cx, cy)
        elif label == "goalkeeper":
            goalkeeper = (cx, cy)

    if ball:
        if len(ball_pos_history) > 0:
            vx = ball[0] - ball_pos_history[-1][0]
            vy = ball[1] - ball_pos_history[-1][1]
            ball_vel_history.append((vx, vy))
        ball_pos_history.append(ball)
    else:
        if len(ball_vel_history) > 0 and len(ball_pos_history) > 0:
            avg_vx = sum(v[0] for v in ball_vel_history) / len(ball_vel_history)
            avg_vy = sum(v[1] for v in ball_vel_history) / len(ball_vel_history)
            last_x, last_y = ball_pos_history[-1]
            ball = (int(last_x + avg_vx), int(last_y + avg_vy))

    matched_ids = [-1] * len(players)
    if frame_idx in ref_data and ball:
        ref_frame = ref_data[frame_idx]
        offset = 2 + 3 * 2 + 2  # skip to player_1_x
        known = []
        for i in range(22):
            x = ref_frame[offset + 2 * i]
            y = ref_frame[offset + 2 * i + 1]
            if x != 0 or y != 0:
                known.append((x, y))

        if len(known) >= 3 and len(players) >= 3:
            A = np.array(known)
            B = np.array(players)

            cost_matrix = np.linalg.norm(A[:, None, :] - B[None, :, :], axis=2)
            row_ind, col_ind = linear_sum_assignment(cost_matrix)

            A_matched = A[row_ind]
            B_matched = B[col_ind]

            A3D = np.hstack([A_matched, np.zeros((A_matched.shape[0], 1))])
            B3D = np.hstack([B_matched, np.zeros((B_matched.shape[0], 1))])

            centroid_A = A3D.mean(axis=0)
            centroid_B = B3D.mean(axis=0)
            AA = A3D - centroid_A
            BB = B3D - centroid_B

            scale = np.linalg.norm(AA) / np.linalg.norm(BB)
            BB *= scale

            H = BB.T @ AA
            U, _, Vt = np.linalg.svd(H)
            R = Vt.T @ U.T
            if np.linalg.det(R) < 0:
                Vt[2, :] *= -1
                R = Vt.T @ U.T
            t = centroid_A - R @ BB.mean(axis=0)
            transformed = (R @ np.hstack([B, np.zeros((B.shape[0], 1))]).T * scale).T + t

            final_cost = np.linalg.norm(transformed[:, None, :2] - np.hstack([A, np.zeros((A.shape[0], 1))])[None, :, :2], axis=2)
            row_ind2, col_ind2 = linear_sum_assignment(final_cost)

            assigned = set()
            for r, c in zip(row_ind2, col_ind2):
                if r < len(matched_ids) and c < 22 and c + 1 not in assigned:
                    matched_ids[r] = c + 1
                    assigned.add(c + 1)

    frame_out = [frame_idx]
    xs = [0] * 40
    ys = [0] * 40

    for i, (cx, cy) in enumerate(players):
        global_id = matched_ids[i] if i < len(matched_ids) else -1
        draw_id = global_id if 1 <= global_id <= 40 else 0
        if 1 <= global_id <= 40:
            xs[global_id - 1] = cx
            ys[global_id - 1] = cy
        x1, y1, x2, y2 = boxes[i]
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)
        cv2.putText(frame, f"#{draw_id}", (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)

    # Draw ball, ref, gk
    if ball:
        cv2.circle(frame, ball, 5, (0, 0, 255), -1)
        cv2.putText(frame, "Ball", (ball[0] + 5, ball[1] - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
    if referee:
        cv2.circle(frame, referee, 5, (255, 0, 0), -1)
        cv2.putText(frame, "Ref", (referee[0] + 5, referee[1] - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
    if goalkeeper:
        cv2.circle(frame, goalkeeper, 5, (0, 255, 0), -1)
        cv2.putText(frame, "GK", (goalkeeper[0] + 5, goalkeeper[1] - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)

    frame_out += xs + ys
    csv_writer.writerow(frame_out)
    out.write(frame)
    frame_idx += 1

cap.release()
out.release()
csv_out.close()
print("Cross-view matching complete.")


Cross-view matching complete.
