In [None]:
import torch
import pandas as pd 
from ultralytics import YOLO
import numpy as np
import supervision as sv
import logging
from tkinter import Tk,filedialog
import os 
from pathlib import Path
import csv
import cv2
from typing import Optional
import warnings
from collections import deque

In [None]:
model = YOLO('../Models/2025_v0.pt')
#source_file =  add code for file diaglouge to open to laod video 
#target_file = get the file name freom sourcepath and add_processed before the file extgension. 
logging.basicConfig(level=logging.INFO)
logging.info('Started Inference')

root=Tk()
root.withdraw()
source_file = filedialog.askopenfilename(title="Select source video file",filetypes=[("Video files", "*.mp4;*.avi;*.mov"), ("All files", "*.*")])
if not source_file:
    logging.error("No source file selected. Exiting.")
    exit()
directory,filename = os.path.split(source_file)
name,ext = os.path.splitext(filename)
target_file_name = name + '_processed' + ext
target_file = os.path.join(directory,target_file_name)

In [None]:
def run_inference(
    model: YOLO,
    frame: np.ndarray,
    frame_number: int,
    *,
    state: dict,            # maps det_idx -> deque of (cx,cy)
    save_csv: bool = False,
    writer: Optional = None,
    conf_threshold: float = 0.25,
    iou_threshold: float = 0.45
) -> np.ndarray:
    """
    Runs YOLO → annotate all boxes → track per-detection trajectories → optionally log to CSV.

    Args:
      model         : your YOLO instance
      frame         : HxWx3 video frame array
      frame_number  : current frame index

    Keyword Args:
      state         : dict[int, deque], must start empty
      save_csv      : if True, writer must be provided
      writer        : csv.writer for logging
      conf_threshold: YOLO confidence cutoff
      iou_threshold : YOLO NMS IoU threshold
    """
    # sanity checks
    if save_csv and writer is None:
        raise ValueError("`writer` is required when save_csv=True")
    if not save_csv and writer is not None:
        raise ValueError("`writer` only allowed when save_csv=True")

    # 1) YOLO detect + built-in NMS
    yolo_res = model(frame, conf=conf_threshold, iou=iou_threshold)[0]
    dets     = sv.Detections.from_ultralytics(yolo_res)

    # 2) Annotate all boxes
    img = sv.BoxAnnotator().annotate(frame, dets)

    # 3) For each detection, record its center in a fixed-length deque
    for det_idx, (x1, y1, x2, y2) in enumerate(dets.xyxy, start=1):
        cx, cy = int((x1 + x2)/2), int((y1 + y2)/2)
        if det_idx not in state:
            state[det_idx] = deque(maxlen=5)
        state[det_idx].append((cx, cy))

    # 4) Draw each object’s trajectory when it has ≥3 points
    for pts in state.values():
        if pts and len(pts) > 2:
            pts_arr = np.array(pts, dtype=np.int32)
            cv2.polylines(
                img,
                [pts_arr],
                isClosed=False,
                color=(0, 255, 0),
                thickness=2
            )

    # 5) Overlay frame number
    cv2.putText(
        img,
        text=str(frame_number),
        org=(10, 30),
        fontFace=cv2.FONT_HERSHEY_COMPLEX,
        fontScale=1,
        color=(255, 0, 0),
        thickness=2,
        lineType=cv2.LINE_AA
    )

    # 6) CSV logging of all detections
    if save_csv:
        for det_idx, (xy, conf) in enumerate(zip(dets.xyxy, dets.confidence), start=1):
            x1, y1, x2, y2 = map(float, xy)
            c_x, c_y = (x1 + x2)/2, (y1 + y2)/2
            writer.writerow([
                frame_number,
                det_idx,   # per-frame detection index
                conf,
                c_x, c_y,
                x1, y1, x2, y2
            ])

    return img




In [None]:
try:
    csv_file = '../Data/detections.csv'
    # 1) Initialize
    tracker = BallTracker(
        max_history=10,
        static_thresh=5,
        stable_frames=5,
        lock_frames=3,
        max_age=5
    )
        f = open(csv_file,mode='w',newline='')
    writer = csv.writer(f)
    writer.writerow(['frame_number','tracker_id','detection_id','confidence','c_x','c_y','x1','x2','y1','y2'])
    tracker = sv.ByteTrack(
    track_activation_threshold=0.10,   # accept detections ≥10% conf
    minimum_matching_threshold=0.30,   # match on ≥30% IoU
    lost_track_buffer=60,              # keep “dead” tracks alive for 60 frames
    frame_rate=30,                     # (optional, for motion model timing)
    minimum_consecutive_frames=1       # start a track after 1 consecutive detection
    )
    state   = {"ball_id": None, "centers": []}
    warnings.filterwarnings("error", message="The truth value of an empty array is ambiguous")
    sv.process_video(source_path=source_file,
                     target_path=target_file,
                     callback=lambda frame,frame_number:run_inference(model, frame,frame_number,state=state,
                                                                      save_csv=True,writer=writer))
    logging.info('Inference completed')
finally:
    f.close()
    


In [1]:
# ——— your BallTracker class (from previous message) ———
from collections import deque
import numpy as np

class BallTracker:
    def __init__(self,
                 max_history: int = 10,
                 static_thresh: float = 5.0,
                 stable_frames: int = 5,
                 lock_frames: int = 3,
                 max_age: int = 5):
        self.max_history      = max_history
        self.static_thresh    = static_thresh
        self.stable_frames    = stable_frames
        self.lock_frames      = lock_frames
        self.max_age          = max_age

        # per-candidate histories and ages
        self.histories        = {}            # cid -> deque[(cx,cy)]
        self.ages             = {}            # cid -> frames since last match

        self.locked_id        = None          # once decided, this is the cid
        self.locked_count     = 0             # how many frames in a row exactly one mover
        self.last_center      = None

    def update(self, xyxy: np.ndarray):
        """
        xyxy: (N,4) array of [x1,y1,x2,y2]
        Returns: (chosen_cid, chosen_box, chosen_center)
                 or (None, None, None) if not locked yet
        """
        centers = [((x1+x2)/2,(y1+y2)/2) for x1,y1,x2,y2 in xyxy]

        # 1) age out
        for cid in list(self.ages):
            self.ages[cid] += 1
            if self.ages[cid] > self.max_age:
                del self.ages[cid], self.histories[cid]

        # 2) match or spawn
        new_ages = {}
        for c in centers:
            # find nearest existing cid
            best = min(
                ((cid, hist[-1]) for cid, hist in self.histories.items()),
                key=lambda kv: np.hypot(kv[1][0]-c[0], kv[1][1]-c[1]),
                default=(None, None)
            )
            cid, last = best
            if cid is not None and np.hypot(last[0]-c[0], last[1]-c[1]) < self.static_thresh:
                # matched
                self.histories[cid].append(c)
                new_ages[cid] = 0
            else:
                # new candidate
                new_cid = max(self.histories.keys(), default=-1) + 1
                self.histories[new_cid] = deque(maxlen=self.max_history)
                self.histories[new_cid].append(c)
                new_ages[new_cid] = 0

        self.ages = new_ages

        # 3) static filter & lock-in
        if self.locked_id is None:
            movers = [
                cid for cid, hist in self.histories.items()
                if len(hist) >= self.stable_frames and
                   np.hypot(hist[-1][0]-hist[0][0], hist[-1][1]-hist[0][1]) >= self.static_thresh
            ]
            if len(movers) == 1:
                self.locked_count += 1
                if self.locked_count >= self.lock_frames:
                    self.locked_id   = movers[0]
                    self.last_center = self.histories[self.locked_id][-1]
            else:
                self.locked_count = 0

        # 4) steady-state: pick closest to last_center
        if self.locked_id is not None:
            dists = [
                (i, np.hypot(cx-self.last_center[0], cy-self.last_center[1]))
                for i, (cx, cy) in enumerate(centers)
            ]
            chosen_idx, _   = min(dists, key=lambda t: t[1])
            chosen_box      = xyxy[chosen_idx]
            chosen_center   = centers[chosen_idx]
            self.last_center = chosen_center
            return chosen_idx, chosen_box, chosen_center

        return None, None, None
