In [1]:
# [N2-C01] Install
!pip -q install ultralytics==8.4.11 opencv-python pandas pyarrow

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m22.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h

In [2]:
# [N2-C02] Paths (your model + your video)
import os

MODEL_PATH = "/kaggle/input/football-yolov8m-plrglkprrefball-4-class/pytorch/default/1/best.pt"
VIDEO_PATH = "/kaggle/input/football-match-sample/sample.mp4"

assert os.path.exists(MODEL_PATH), MODEL_PATH
assert os.path.exists(VIDEO_PATH), VIDEO_PATH

print("MODEL_PATH:", MODEL_PATH)
print("VIDEO_PATH:", VIDEO_PATH)

MODEL_PATH: /kaggle/input/football-yolov8m-plrglkprrefball-4-class/pytorch/default/1/best.pt
VIDEO_PATH: /kaggle/input/football-match-sample/sample.mp4


In [3]:
# [N2-C03] Run tracking and collect per-frame detections
# This creates a table with: frame, track_id, cls, conf, xyxy.
import pandas as pd
from ultralytics import YOLO

model = YOLO(MODEL_PATH)

rows = []
frame_idx = 0

# stream=True gives you results frame-by-frame
for r in model.track(
    source=VIDEO_PATH,
    tracker="bytetrack.yaml",   # try "botsort.yaml" later if you want stronger ID persistence
    conf=0.20,
    iou=0.50,
    imgsz=960,
    persist=True,
    stream=True,
    verbose=False
):
    boxes = r.boxes
    if boxes is None or len(boxes) == 0:
        frame_idx += 1
        continue

    xyxy = boxes.xyxy.cpu().numpy()
    cls  = boxes.cls.cpu().numpy().astype(int)
    conf = boxes.conf.cpu().numpy()
    tid  = None if boxes.id is None else boxes.id.cpu().numpy().astype(int)

    for i in range(len(xyxy)):
        rows.append({
            "frame": frame_idx,
            "track_id": int(tid[i]) if tid is not None else -1,
            "cls": int(cls[i]),
            "conf": float(conf[i]),
            "x1": float(xyxy[i][0]),
            "y1": float(xyxy[i][1]),
            "x2": float(xyxy[i][2]),
            "y2": float(xyxy[i][3]),
        })

    frame_idx += 1

df = pd.DataFrame(rows)
print(df.head())
print("rows:", len(df), "frames:", df["frame"].nunique())

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
[31m[1mrequirements:[0m Ultralytics requirement ['lap>=0.5.12'] not found, attempting AutoUpdate...
Using Python 3.12.12 environment at: /usr
Resolved 2 packages in 257ms
Prepared 1 package in 75ms
Installed 1 package in 3ms
 + lap==0.5.12

[31m[1mrequirements:[0m AutoUpdate success ✅ 0.8s



[aac @ 0x3f0a9700] Input buffer exhausted before END element found


   frame  track_id  cls      conf           x1          y1           x2  \
0      0         1    0  0.907156   548.546631  388.484192   576.100525   
1      0         2    0  0.904687   577.584229  333.184326   608.443970   
2      0         3    0  0.903775   270.721527  312.516479   290.960785   
3      0         4    0  0.903154   668.158813  483.364258   697.090759   
4      0         5    0  0.899520  1065.475098  216.175598  1095.029175   

           y2  
0  441.450104  
1  382.262115  
2  361.712433  
3  538.519897  
4  256.805542  
rows: 9578 frames: 449


In [4]:
# [N2-C04] Save raw tracks
RAW_PATH = "/kaggle/working/tracks_raw.parquet"
df.to_parquet(RAW_PATH, index=False)
print("Saved:", RAW_PATH)

Saved: /kaggle/working/tracks_raw.parquet


In [5]:
# [N2-C05] Track-level class smoothing (majority vote weighted by confidence)
import numpy as np
import pandas as pd

CLASS_NAMES = {0:"player", 1:"goalkeeper", 2:"referee", 3:"ball"}

# Only smooth person-like classes; ball can be handled separately if desired
PERSON_CLASSES = {0, 1, 2}

df_s = df.copy()

# Compute a stable class per track_id using confidence-weighted voting
track_best_cls = {}
for tid, g in df_s[df_s["track_id"] >= 0].groupby("track_id"):
    # weighted vote per class
    score = g.groupby("cls")["conf"].sum().to_dict()
    best_cls = max(score, key=score.get)
    track_best_cls[int(tid)] = int(best_cls)

# Apply stable class back to all rows for that track
def stable_cls(row):
    tid = int(row["track_id"])
    if tid >= 0 and tid in track_best_cls and int(row["cls"]) in PERSON_CLASSES:
        return track_best_cls[tid]
    return int(row["cls"])

df_s["cls_smooth"] = df_s.apply(stable_cls, axis=1)

print(df_s[["track_id","cls","cls_smooth"]].head(10))

   track_id  cls  cls_smooth
0         1    0           0
1         2    0           0
2         3    0           0
3         4    0           0
4         5    0           0
5         6    0           0
6         7    0           0
7         8    0           0
8         9    0           0
9        10    0           0


In [6]:
# [N2-C07] Write smoothed annotated video
import cv2
from collections import defaultdict

IN = VIDEO_PATH
OUT = "/kaggle/working/annotated_smoothed.mp4"

cap = cv2.VideoCapture(IN)
fps = cap.get(cv2.CAP_PROP_FPS) or 25
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(OUT, fourcc, fps, (w, h))

# group detections by frame for fast lookup
by_frame = {k: v for k, v in df_s.groupby("frame")}

# colors (BGR)
COLORS = {
    0: (255, 0, 0),    # player - blue
    1: (0, 255, 255),  # goalkeeper - yellow
    2: (0, 165, 255),  # referee - orange
    3: (0, 255, 0),    # ball - green
}

frame_i = 0
while True:
    ok, frame = cap.read()
    if not ok:
        break

    if frame_i in by_frame:
        g = by_frame[frame_i]
        for _, row in g.iterrows():
            cls = int(row["cls_smooth"])
            conf = float(row["conf"])
            x1,y1,x2,y2 = map(int, [row["x1"],row["y1"],row["x2"],row["y2"]])
            tid = int(row["track_id"])

            color = COLORS.get(cls, (255,255,255))
            cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)

            # Reduce clutter: only show label if confidence is high enough
            if conf >= 0.50:
                label = f"id:{tid} {CLASS_NAMES.get(cls,cls)} {conf:.2f}"
                cv2.putText(frame, label, (x1, max(15,y1-5)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    writer.write(frame)
    frame_i += 1

cap.release()
writer.release()

print("Saved:", OUT)

[aac @ 0x5a5b4a40] Input buffer exhausted before END element found


Saved: /kaggle/working/annotated_smoothed.mp4


In [7]:
# [N2-C08] Save smoothed tracks
SMOOTH_PATH = "/kaggle/working/tracks_smoothed.parquet"
df_s.to_parquet(SMOOTH_PATH, index=False)
print("Saved:", SMOOTH_PATH)

Saved: /kaggle/working/tracks_smoothed.parquet


In [8]:
# [N2-C09] Install sklearn for KMeans
!pip -q install scikit-learn

In [9]:
# [N2-C10-NEW] Robust per-track jersey features (Lab mean with grass removal)
import cv2
import numpy as np
import pandas as pd
from collections import defaultdict

# expects df_s exists and has: frame, track_id, cls_smooth, x1,y1,x2,y2
assert "cls_smooth" in df_s.columns

CLS_PLAYER, CLS_GK, CLS_REF, CLS_BALL = 0, 1, 2, 3

FRAME_STRIDE = 2
MAX_SAMPLES_PER_TRACK = 80

cap = cv2.VideoCapture(VIDEO_PATH)
ok, frame0 = cap.read()
cap.release()
assert ok, "Cannot read VIDEO_PATH"
H, W = frame0.shape[:2]

by_frame = {k: v for k, v in df_s.groupby("frame")}

# accumulators: sum Lab + count of valid pixels
lab_sum = defaultdict(lambda: np.zeros(3, dtype=np.float64))
lab_cnt = defaultdict(int)

def clamp(v, lo, hi): 
    return max(lo, min(hi, v))

def crop_torso(frame, x1, y1, x2, y2):
    """
    Torso crop for top-view:
    - take top 55% of bbox
    - take center 45% width to reduce grass/arms/shorts
    """
    x1 = clamp(int(x1), 0, W-1); x2 = clamp(int(x2), 0, W-1)
    y1 = clamp(int(y1), 0, H-1); y2 = clamp(int(y2), 0, H-1)
    if x2 <= x1 or y2 <= y1:
        return None

    bw = x2 - x1
    bh = y2 - y1

    y2_t = y1 + int(0.55 * bh)
    x1_c = x1 + int(0.275 * bw)
    x2_c = x1 + int(0.725 * bw)

    if y2_t <= y1 + 2 or x2_c <= x1_c + 2:
        return None
    crop = frame[y1:y2_t, x1_c:x2_c]
    return crop if crop.size else None

def non_grass_mask(bgr_crop):
    """
    mask out grass pixels using HSV threshold
    """
    hsv = cv2.cvtColor(bgr_crop, cv2.COLOR_BGR2HSV)
    lower = np.array([30, 30, 30])   # grass-ish
    upper = np.array([95, 255, 255])
    grass = cv2.inRange(hsv, lower, upper) > 0
    return ~grass

cap = cv2.VideoCapture(VIDEO_PATH)
assert cap.isOpened(), "Failed to open video"

frame_i = 0
while True:
    ok, frame = cap.read()
    if not ok:
        break
    if frame_i % FRAME_STRIDE != 0:
        frame_i += 1
        continue

    g = by_frame.get(frame_i, None)
    if g is not None:
        for _, r in g.iterrows():
            tid = int(r["track_id"])
            if tid < 0:
                continue
            cls = int(r["cls_smooth"])
            if cls not in (CLS_PLAYER, CLS_GK, CLS_REF):   # ignore ball
                continue
            if lab_cnt[tid] >= MAX_SAMPLES_PER_TRACK:
                continue

            crop = crop_torso(frame, r["x1"], r["y1"], r["x2"], r["y2"])
            if crop is None:
                continue

            mask = non_grass_mask(crop)
            if mask.mean() < 0.10:
                # too much grass -> skip this sample
                continue

            lab = cv2.cvtColor(crop, cv2.COLOR_BGR2LAB).astype(np.float32)
            pixels = lab[mask]
            if pixels.shape[0] < 30:
                continue

            mean_lab = pixels.mean(axis=0)  # (L,a,b)
            lab_sum[tid] += mean_lab
            lab_cnt[tid] += 1

    frame_i += 1

cap.release()

track_ids = sorted([tid for tid, c in lab_cnt.items() if c > 0])
X = np.stack([lab_sum[tid] / lab_cnt[tid] for tid in track_ids], axis=0)

feat_df = pd.DataFrame({
    "track_id": track_ids,
    "n_samples": [lab_cnt[tid] for tid in track_ids],
    "L": X[:, 0], "a": X[:, 1], "b": X[:, 2]
})
print("Tracks with jersey features:", len(feat_df))
feat_df.head()

[aac @ 0x358d6cc0] Input buffer exhausted before END element found
[aac @ 0x358d6cc0] Input buffer exhausted before END element found


Tracks with jersey features: 77


Unnamed: 0,track_id,n_samples,L,a,b
0,1,20,234.471094,120.710394,130.556712
1,2,18,104.198877,126.481853,157.1006
2,4,80,225.472643,124.615981,125.758854
3,5,80,101.535563,137.127912,153.434158
4,6,6,208.058413,127.173448,121.981766


In [10]:
# [N2-C11-NEW] Cluster player tracks into 3 groups => 2 teams + referee-like cluster
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans

CLS_PLAYER, CLS_GK, CLS_REF, CLS_BALL = 0, 1, 2, 3

# dominant predicted class per track from df_s
track_cls = (
    df_s[df_s["track_id"] >= 0]
    .groupby("track_id")["cls_smooth"]
    .agg(lambda s: int(s.mode().iloc[0]))
    .to_dict()
)

# Use only tracks predicted as player for clustering (ref may be mispredicted as player)
player_tids = [tid for tid in track_ids if track_cls.get(tid) == CLS_PLAYER]

assert len(player_tids) >= 6, "Not enough player tracks for clustering."

tid_to_idx = {tid: i for i, tid in enumerate(track_ids)}
Xp = np.stack([X[tid_to_idx[tid]] for tid in player_tids], axis=0)

kmeans3 = KMeans(n_clusters=3, random_state=0, n_init="auto")
labels3 = kmeans3.fit_predict(Xp)

tmp = pd.DataFrame({"track_id": player_tids, "cluster": labels3})
cluster_sizes = tmp["cluster"].value_counts().sort_values(ascending=False)
print("cluster sizes:", cluster_sizes.to_dict())

# take the TWO largest clusters as teams
team_clusters = list(cluster_sizes.index[:2])
ref_cluster = int(cluster_sizes.index[-1])  # smallest cluster

track_team = {}      # track_id -> team_id {0,1} or -1 for referee
track_role = {}      # track_id -> override role "referee"/"player"

# Map cluster -> team_id (0/1)
cluster_to_team = {team_clusters[0]: 0, team_clusters[1]: 1}

for tid, c in zip(player_tids, labels3):
    c = int(c)
    if c == ref_cluster:
        track_team[tid] = -1
        track_role[tid] = "referee"   # recover referee tracks
    else:
        track_team[tid] = cluster_to_team[c]
        track_role[tid] = "player"

print("Recovered referee-like tracks:", sum(1 for t in track_role.values() if t=="referee"))

cluster sizes: {0: 45, 1: 24, 2: 7}
Recovered referee-like tracks: 7


In [11]:
# [N2-C11b-NEW] Assign GK team using average x-position relative to team player centroids
import numpy as np

# compute average x-center per track from df_s
df_s["xc"] = (df_s["x1"] + df_s["x2"]) / 2.0
track_xc = df_s[df_s["track_id"] >= 0].groupby("track_id")["xc"].mean().to_dict()

# team average x (using player tracks only that are assigned to teams)
team0_x = np.mean([track_xc[tid] for tid in track_team if track_team[tid] == 0 and tid in track_xc] or [W*0.25])
team1_x = np.mean([track_xc[tid] for tid in track_team if track_team[tid] == 1 and tid in track_xc] or [W*0.75])

gk_tids = [tid for tid in track_ids if track_cls.get(tid) == CLS_GK]
for tid in gk_tids:
    if tid not in track_xc:
        continue
    x = track_xc[tid]
    # assign to nearest team x-centroid
    track_team[tid] = 0 if abs(x - team0_x) <= abs(x - team1_x) else 1
    track_role[tid] = "goalkeeper"

print("GK assigned:", {tid: track_team.get(tid) for tid in gk_tids[:10]})

GK assigned: {}


In [12]:
# [N2-C12-NEW] Build df_out with team + recovered referees
import pandas as pd

CLASS_NAMES = {0:"player", 1:"goalkeeper", 2:"referee", 3:"ball"}

df_out = df_s.copy()

# apply recovered referee role (override cls_smooth)
def apply_role(row):
    tid = int(row["track_id"])
    if tid >= 0 and track_role.get(tid) == "referee":
        return 2  # referee
    return int(row["cls_smooth"])

df_out["cls_final"] = df_out.apply(apply_role, axis=1)

# team assignment: referee=-1, ball=None
def apply_team(row):
    tid = int(row["track_id"])
    cls = int(row["cls_final"])
    if cls == 3:      # ball
        return None
    if cls == 2:      # referee
        return -1
    return track_team.get(tid, None)

df_out["team_id"] = df_out.apply(apply_team, axis=1)
df_out["class_name"] = df_out["cls_final"].map(CLASS_NAMES)

df_out["team_name"] = df_out["team_id"].map({0:"team_A", 1:"team_B", -1:"referee"}).fillna("unknown")

df_out.head()

Unnamed: 0,frame,track_id,cls,conf,x1,y1,x2,y2,cls_smooth,xc,cls_final,team_id,class_name,team_name
0,0,1,0,0.907156,548.546631,388.484192,576.100525,441.450104,0,562.323578,0,0.0,player,team_A
1,0,2,0,0.904687,577.584229,333.184326,608.44397,382.262115,0,593.014099,0,1.0,player,team_B
2,0,3,0,0.903775,270.721527,312.516479,290.960785,361.712433,0,280.841156,0,,player,unknown
3,0,4,0,0.903154,668.158813,483.364258,697.090759,538.519897,0,682.624786,0,0.0,player,team_A
4,0,5,0,0.89952,1065.475098,216.175598,1095.029175,256.805542,0,1080.252136,0,1.0,player,team_B


In [13]:
# [N2-C13] Optional: render annotated video with TEAM colors
import cv2

IN = VIDEO_PATH
OUT = "/kaggle/working/annotated_team.mp4"

cap = cv2.VideoCapture(IN)
fps = cap.get(cv2.CAP_PROP_FPS) or 25
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(OUT, fourcc, fps, (w, h))

by_frame2 = {k: v for k, v in df_out.groupby("frame")}

# Team colors (BGR)
TEAM_COLOR = {
    "team_A": (0, 0, 255),    # red
    "team_B": (255, 0, 0),    # blue
    "referee": (0, 165, 255), # orange
    "unknown": (200, 200, 200),
}
BALL_COLOR = (0, 255, 0)

frame_i = 0
while True:
    ok, frame = cap.read()
    if not ok:
        break

    g = by_frame2.get(frame_i, None)
    if g is not None:
        for _, row in g.iterrows():
            cls = int(row["cls_smooth"])
            conf = float(row["conf"])
            x1,y1,x2,y2 = map(int, [row["x1"],row["y1"],row["x2"],row["y2"]])
            tid = int(row["track_id"])

            if cls == CLS_BALL:
                color = BALL_COLOR
                label = f"ball {conf:.2f}"
            else:
                team_name = row["team_name"]
                color = TEAM_COLOR.get(team_name, TEAM_COLOR["unknown"])
                label = f"id:{tid} {row['class_name']} {team_name} {conf:.2f}"

            cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)
            if conf >= 0.50:
                cv2.putText(frame, label, (x1, max(15,y1-5)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    writer.write(frame)
    frame_i += 1

cap.release()
writer.release()
print("Saved:", OUT)

[aac @ 0x5dbe5880] Input buffer exhausted before END element found


Saved: /kaggle/working/annotated_team.mp4


In [14]:
# [N2-C14] Recommended tracking settings (use these in your tracking cell)
TRACKER_NAME = "botsort.yaml"  # better ID persistence than bytetrack in many cases
INFER_IMGSZ = 1280             # helps small/blurred players and ball (slower)
INFER_CONF = 0.15              # lower conf improves recall; smoothing/gap-fill will handle jitter
INFER_IOU  = 0.50

In [15]:
# [N2-C15] Gap fill config (short gaps only)
MAX_GAP_PERSON = 8   # players/goalkeepers/referees
MAX_GAP_BALL   = 4   # ball changes fast; fill fewer frames

In [17]:
# [N2-C16-FIX] Gap fill preserving cls_final + team fields (safe mode)
import numpy as np
import pandas as pd

CLASS_NAMES = {0:"player", 1:"goalkeeper", 2:"referee", 3:"ball"}

def _best_per_frame(g: pd.DataFrame) -> pd.DataFrame:
    return g.sort_values("conf", ascending=False).drop_duplicates(["track_id", "frame"], keep="first")

def safe_mode(s: pd.Series, default=None):
    """Mode that handles all-NaN/all-None series."""
    if s is None:
        return default
    s2 = s.dropna()
    if len(s2) == 0:
        return default
    m = s2.mode()
    if len(m) == 0:
        return default
    return m.iloc[0]

def gap_fill_dense_with_team(df_in: pd.DataFrame, max_gap_person=8, max_gap_ball=4) -> pd.DataFrame:
    df = df_in.copy()

    # Ensure required columns exist
    required = ["frame","track_id","conf","x1","y1","x2","y2","cls_final"]
    for c in required:
        assert c in df.columns, f"Missing required column: {c}"

    # Optional columns
    if "team_id" not in df.columns:
        df["team_id"] = None
    if "team_name" not in df.columns:
        df["team_name"] = "unknown"
    if "class_name" not in df.columns:
        df["class_name"] = df["cls_final"].map(CLASS_NAMES)

    df = df[df["track_id"].notna()].copy()
    df["track_id"] = df["track_id"].astype(int)
    df = df[df["track_id"] >= 0].copy()

    df = _best_per_frame(df)
    df["is_interpolated"] = False

    out_parts = [df]

    for tid, g in df.groupby("track_id"):
        g = g.sort_values("frame").reset_index(drop=True)

        cls_track = int(safe_mode(g["cls_final"], default=int(g["cls_final"].iloc[0])))
        max_gap = max_gap_ball if cls_track == 3 else max_gap_person

        team_id = safe_mode(g["team_id"], default=None)
        team_name = safe_mode(g["team_name"], default="unknown")
        class_name = safe_mode(g["class_name"], default=CLASS_NAMES.get(cls_track, str(cls_track)))

        frames = g["frame"].to_numpy()

        for i in range(len(frames) - 1):
            f1, f2 = int(frames[i]), int(frames[i + 1])
            gap = f2 - f1 - 1
            if gap <= 0 or gap > max_gap:
                continue

            r1 = g.iloc[i]
            r2 = g.iloc[i + 1]

            for k in range(1, gap + 1):
                t = k / (gap + 1)

                new = r1.to_dict()
                new["frame"] = f1 + k
                new["x1"] = float((1 - t) * r1["x1"] + t * r2["x1"])
                new["y1"] = float((1 - t) * r1["y1"] + t * r2["y1"])
                new["x2"] = float((1 - t) * r1["x2"] + t * r2["x2"])
                new["y2"] = float((1 - t) * r1["y2"] + t * r2["y2"])

                new["conf"] = float(min(float(r1["conf"]), float(r2["conf"])) * 0.50)

                new["cls_final"] = cls_track
                new["team_id"] = team_id
                new["team_name"] = team_name
                new["class_name"] = class_name

                new["is_interpolated"] = True
                out_parts.append(pd.DataFrame([new]))

    df_filled = pd.concat(out_parts, ignore_index=True)
    df_filled = _best_per_frame(df_filled)
    df_filled = df_filled.sort_values(["frame", "track_id"]).reset_index(drop=True)
    return df_filled

df_final = gap_fill_dense_with_team(df_out, max_gap_person=8, max_gap_ball=4)
print("df_out rows:", len(df_out), "df_final rows:", len(df_final))
print("Interpolated rows:", int(df_final["is_interpolated"].sum()))

df_out rows: 9578 df_final rows: 9880
Interpolated rows: 302


  df_filled = pd.concat(out_parts, ignore_index=True)


In [19]:
# [N2-C17] Export FINAL gap-filled table
OUT_PARQUET = "/kaggle/working/tracks_final.parquet"
OUT_CSV     = "/kaggle/working/tracks_final.csv"

df_final.to_parquet(OUT_PARQUET, index=False)
df_final.to_csv(OUT_CSV, index=False)

print("Saved:", OUT_PARQUET)
print("Saved:", OUT_CSV)

Saved: /kaggle/working/tracks_final.parquet
Saved: /kaggle/working/tracks_final.csv


In [21]:
# [N2-C18] Render video with gap-filled boxes (interpolated boxes drawn dashed color)
import cv2

CLASS_NAMES = {0:"player", 1:"goalkeeper", 2:"referee", 3:"ball"}

IN = VIDEO_PATH
OUT = "/kaggle/working/annotated_gapfilled.mp4"

cap = cv2.VideoCapture(IN)
fps = cap.get(cv2.CAP_PROP_FPS) or 25
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(OUT, fourcc, fps, (w, h))

by_frame = {k: v for k, v in df_final.groupby("frame")}

# BGR colors
COLORS = {0:(255,0,0), 1:(0,255,255), 2:(0,165,255), 3:(0,255,0)}
INTERP_COLOR = (180, 180, 180)  # gray for interpolated

frame_i = 0
while True:
    ok, frame = cap.read()
    if not ok:
        break

    g = by_frame.get(frame_i, None)
    if g is not None:
        for _, row in g.iterrows():
            cls = int(row["cls_smooth"]) if "cls_smooth" in row else int(row["cls"])
            tid = int(row["track_id"])
            conf = float(row["conf"])
            interp = bool(row.get("is_interpolated", False))

            x1,y1,x2,y2 = map(int, [row["x1"],row["y1"],row["x2"],row["y2"]])

            color = INTERP_COLOR if interp else COLORS.get(cls, (255,255,255))
            cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)

            # show text only if not interpolated OR high confidence
            if (not interp and conf >= 0.45) or (interp and conf >= 0.30):
                label = f"id:{tid} {CLASS_NAMES.get(cls,cls)} {conf:.2f}"
                cv2.putText(frame, label, (x1, max(15,y1-5)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    writer.write(frame)
    frame_i += 1

cap.release()
writer.release()
print("Saved:", OUT)

[aac @ 0x5edbdc40] Input buffer exhausted before END element found


Saved: /kaggle/working/annotated_gapfilled.mp4


In [22]:
# [N2-C19] FINAL VIDEO: everything applied
import cv2

IN = VIDEO_PATH
FINAL_VIDEO = "/kaggle/working/final_result.mp4"

cap = cv2.VideoCapture(IN)
fps = cap.get(cv2.CAP_PROP_FPS) or 25
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(FINAL_VIDEO, fourcc, fps, (w, h))

by_frame = {k: v for k, v in df_final.groupby("frame")}

TEAM_COLOR = {
    "team_A": (0, 0, 255),    # red
    "team_B": (255, 0, 0),    # blue
    "referee": (0, 165, 255), # orange
    "unknown": (200, 200, 200),
}
BALL_COLOR = (0, 255, 0)
INTERP_COLOR = (180, 180, 180)

frame_i = 0
while True:
    ok, frame = cap.read()
    if not ok:
        break

    g = by_frame.get(frame_i, None)
    if g is not None:
        for _, r in g.iterrows():
            cls = int(r["cls_final"])
            conf = float(r["conf"])
            interp = bool(r.get("is_interpolated", False))
            x1,y1,x2,y2 = map(int, [r["x1"],r["y1"],r["x2"],r["y2"]])

            if cls == 3:
                color = BALL_COLOR
                label = f"ball {conf:.2f}"
            else:
                tn = r.get("team_name", "unknown")
                base_color = TEAM_COLOR.get(tn, TEAM_COLOR["unknown"])
                color = INTERP_COLOR if interp else base_color
                tid = int(r["track_id"])
                label = f"id:{tid} {r.get('class_name','?')} {tn} {conf:.2f}"

            cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)

            # reduce text clutter
            if (not interp and conf >= 0.45) or (interp and conf >= 0.30):
                cv2.putText(frame, label, (x1, max(15, y1-5)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    writer.write(frame)
    frame_i += 1

cap.release()
writer.release()
print("Saved FINAL video:", FINAL_VIDEO)

[aac @ 0x5da64dc0] Input buffer exhausted before END element found


Saved FINAL video: /kaggle/working/final_result.mp4


In [23]:
# [N2-C20] Export FINAL analytics table
OUT_PARQUET = "/kaggle/working/tracks_final.parquet"
OUT_CSV = "/kaggle/working/tracks_final.csv"

df_final.to_parquet(OUT_PARQUET, index=False)
df_final.to_csv(OUT_CSV, index=False)

print("Saved:", OUT_PARQUET)
print("Saved:", OUT_CSV)

Saved: /kaggle/working/tracks_final.parquet
Saved: /kaggle/working/tracks_final.csv
