In [None]:
import os
import cv2
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)
import matplotlib.pyplot as plt
from ultralytics import YOLO
from scipy.signal import savgol_filter,find_peaks

MODEL_PATH = "../models/shuttle/best.pt"
VIDEO_PATH = "../data/videos/baddy_sample.mp4"

assert os.path.exists(MODEL_PATH), "Model not found"
assert os.path.exists(VIDEO_PATH), "Video not found"

model = YOLO(MODEL_PATH)
model.info()


In [None]:
START_T = 0.0   # seconds
END_T   = 72.0  # seconds

In [None]:
cap = cv2.VideoCapture(VIDEO_PATH)
FPS = cap.get(cv2.CAP_PROP_FPS)
print(FPS)
start_frame = int(START_T * FPS)
end_frame   = int(END_T * FPS)

cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)


In [None]:
shuttle = []  # <-- this is what was missing
frame_id = 0

while cap.isOpened():
    if frame_id > end_frame:
        break
    ret, frame = cap.read()
    if not ret:
        break

    results = model(
        frame,
        imgsz=1280,
        conf=0.15,
        iou=0.25,
        max_det=1,
        verbose=False
    )

    boxes = results[0].boxes
    # always append
    if boxes is not None and len(boxes) > 0:
        best = max(boxes, key=lambda b: float(b.conf[0]))
        x1, y1, x2, y2 = best.xyxy[0]
        cx = int((x1 + x2) / 2)
        cy = int((y1 + y2) / 2)
        conf = float(best.conf[0])
    else:
        cx, cy, conf = np.nan, np.nan, 0.0

    shuttle.append((frame_id, cx, cy, conf))
    frame_id += 1


print(f"Detected shuttle in {len(shuttle)} frames")

In [None]:
df = pd.DataFrame(
    shuttle,
    columns=["frame", "x", "y", "conf"]
)
df.to_csv("../data/interim/shuttle_detections.csv", index=False)


In [None]:
df = pd.read_csv("../data/interim/shuttle_detections.csv")
df.head()

In [None]:
plt.figure(figsize=(6,10))
plt.scatter(df["x"], df["y"], s=5)
plt.gca().invert_yaxis()  # image coordinate system
plt.title("Raw Shuttle Detections")
plt.xlabel("x")
plt.ylabel("y")
plt.show()


In [None]:
df_all = df.copy()   # raw tracker output, every frame
print("Total rows in dataframe: ",len(df_all))
print("Total N/A records in x: ",df_all.x.isnull().sum())
print("Total N/A records in y: ",df_all.y.isnull().sum())

if len(df[df.isna().any(axis=1)])/len(df_all) > 0.75:
    print("Video quality is good, able to detect shuttle frames.")
else:
    print("Video quality is poor, unable to detect enough shuttle frames.")


MAX_GAP = int(0.5*FPS) #Number of consecutive frames to fill

df_all["x_phys"] = df_all["x"].interpolate(limit=MAX_GAP,limit_direction='both')
df_all["y_phys"] = df_all["y"].interpolate(limit=MAX_GAP)
df_all["phys_valid"] = (
    df_all["x_phys"].notna() &
    df_all["y_phys"].notna()
)

print("Post Interpolation - Total N/A records in x: ",df_all.x_phys.isnull().sum())
print("Post Interpolation - Total N/A records in y: ",df_all.y_phys.isnull().sum())

In [None]:
df_all["dx"] = df_all["x_phys"].diff()
df_all["dy"] = df_all["y_phys"].diff()

df_all["dt"] = df_all["frame"].diff() / FPS
df_all["time_sec"] = df_all["frame"] / FPS

df_all.loc[~df_all["phys_valid"], ["dx", "dy"]] = np.nan

#df_all["speed"] = (np.sqrt(df_all["dx"]**2 + df_all["dy"]**2) / df_all["dt"]).apply(lambda x: '{:.2f}'.format(x))
df_all["speed"] = (np.sqrt(df_all["dx"]**2 + df_all["dy"]**2) / df_all["dt"])

In [None]:
df_all["vx"] = df_all["dx"] / df_all["dt"]
df_all["vy"] = df_all["dy"] / df_all["dt"]
df_all["ax"] = df_all["vx"].diff() / df_all["dt"]
df_all["ay"] = df_all["vy"].diff() / df_all["dt"]

df_all["acc"] = np.sqrt(df_all["ax"]**2 + df_all["ay"]**2)

In [None]:
eps = 1e-6
den = df_all["speed"].shift(1).clip(lower=eps)
df_all["speed_change_perc"] = ((df_all["speed"] - den) / den*100)

In [None]:
# Remove first two frames as we are calculating velocity and acceleration
df_all = df_all.iloc[2:]
df_all = df_all.reset_index(drop=True)

In [None]:
valid = (
    df_all["phys_valid"] &
    df_all["speed_change_perc"].notna() &
    df_all["acc"].notna()
)

speed_hard = df_all.loc[valid, "speed_change_perc"].quantile(0.90)
speed_soft = df_all.loc[valid, "speed_change_perc"].quantile(0.40)

acc_hard = df_all.loc[valid, "acc"].quantile(0.90)
acc_soft = df_all.loc[valid, "acc"].quantile(0.60)

In [None]:

df_all["rebound"] = (
    df_all["speed_change_perc"].shift(-1) - df_all["speed_change_perc"]
) / df_all["speed_change_perc"]*100

df_all["is_speed_min"] = (df_all["phys_valid"] &
    (df_all["speed"] < df_all["speed"].shift(1)) &
    (df_all["speed"] < df_all["speed"].shift(-1))
)


df_all["shot_contact"] = (
    df_all["phys_valid"] &
    (
        # CASE 1: hard impulse (smash / drive)
        (df_all["acc"] > acc_hard)
        |
        # CASE 2: soft shot with speed dip (drop / net / lift)
        (
            (df_all["speed_change_perc"] > speed_soft) &
            (df_all["acc"] > acc_soft) &
            (
                df_all["is_speed_min"] |
                (df_all["rebound"] > 0.15)
            )
        )
    )
)


min_gap = 2 #2 frames

final_idx = []
last_f = -1e9

for i, r in df_all[df_all["shot_contact"]].iterrows():
    if r["frame"] - last_f >= min_gap:
        final_idx.append(i)
        last_f = r["frame"]

df_all["shot_contact_final"] = False
df_all.loc[final_idx, "shot_contact_final"] = True

print("Detected shots:", df_all["shot_contact_final"].sum())

In [None]:
def compute_shot_confidence(df, i):
    WIN = 2

    # detector support
    conf_win = df.loc[max(0,i-WIN):min(len(df)-1,i+WIN),"conf"]
    C_det = np.clip(conf_win.max()/0.5, 0, 1)

    # interpolation penalty
    phys_win = df.loc[max(0,i-WIN):min(len(df)-1,i+WIN),"phys_valid"]
    interp_frac = 1 - phys_win.mean()
    C_gap = np.clip(1 - interp_frac/0.4, 0, 1)

    # physics
    acc = abs(df.loc[i,"acc"])
    acc_norm = acc / df["acc"].quantile(0.95)

    drop_norm = df.loc[i,"speed_change_perc"] / df["speed_change_perc"].quantile(0.9)
    C_phys = np.clip(0.6*acc_norm + 0.4*drop_norm, 0, 1)

    # time sanity
    t = df.loc[i,"time_sec"]
    C_time = 0.3 if t < 0.5 else 1.0

    return (C_det**0.4) * (C_gap**0.3) * (C_phys**0.3) * C_time


In [None]:
df_final = df_all.loc[df_all["shot_contact_final"]].copy()
df_final["shot_confidence"] = [
    compute_shot_confidence(df_all, i)
    for i in df_final.index
]

shots = df_final[df_final["shot_confidence"] >= 0.45].copy()
print("Final Shots Detected based on Shot Confidence: ",len(shots))

In [None]:
plt.figure(figsize=(14,4))
plt.plot(shots["frame"], shots["speed"])
plt.scatter(
    shots.loc[shots["shot_contact_final"], "frame"],
    shots.loc[shots["shot_contact_final"], "speed"],
    marker="x"
)
plt.title("Speed changes with detected contacts")
plt.show()


In [None]:
speed_hi = shots["speed"].quantile(0.70)     # fast shots
vy_down  = shots["vy"].quantile(0.30)        # downward shots

def classify_shot(row):
    if row.speed > speed_hi and row.vy < vy_down:
        return "smash"
    elif row.vy > 0:
        return "clear_or_lift"
    else:
        return "drop_or_net"

shots["shot_type"] = shots.apply(classify_shot, axis=1)



In [None]:
shots