## Imports & Global Setup

In [1]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from ultralytics import YOLO
from sklearn.cluster import KMeans

import re
from collections import defaultdict
import easyocr
import torch


## GPU Optimizations (PyTorch / cuDNN)

In [2]:
torch.backends.cudnn.benchmark = True


## OCR Reader Initialization (Digits Only, GPU)

In [3]:
OCR_READER = easyocr.Reader(['en'], gpu=True)


## Debug Controls (Crop Saving for OCR Inspection)

In [4]:
DEBUG_OCR = True
DEBUG_SAVE_EVERY = 15     # save every N frames per track
DEBUG_MAX_FRAMES = 900    # debug first N frames
DEBUG_DIR = "debug_ocr"
os.makedirs(DEBUG_DIR, exist_ok=True)


## Core Settings & Persistence Buffers

In [5]:
JERSEY_MODE = "back"      # "back" or "front"
DEVICE_ID = 0             # cuda:0

# Start loose then you can tighten later
CONF_TH = 0.30
MIN_READS = 5
MAXLEN_READS = 140
LOCK_MARGIN = 1.10

# show number only if locked OR very strong single read
SHOW_SINGLE_IF_CONF_GE = 0.60

# Memory / Persistence
FPS_ASSUMED = 30
TTL_SECONDS = 3.0
TTL_FRAMES = int(TTL_SECONDS * FPS_ASSUMED)

track_numbers = defaultdict(list)  # track_id -> [(num, conf), ...]
locked_number = {}                 # track_id -> locked number
last_seen_frame = {}               # track_id -> last frame idx


### Grass Color Helper (BGR Mean of Green Mask)

In [6]:
def get_grass_color(img):
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    lower_green = np.array([30, 40, 40])
    upper_green = np.array([80, 255, 255])
    mask = cv2.inRange(hsv, lower_green, upper_green)
    grass_color = cv2.mean(img, mask=mask)
    return grass_color[:3]


### Extract Player Crops from YOLO Result

In [7]:
def get_players_boxes(result):
    players_imgs = []
    players_boxes = []
    for box in result.boxes:
        label = int(box.cls.cpu().numpy()[0])
        if label == 0:  # person
            x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
            player_img = result.orig_img[y1:y2, x1:x2]
            if player_img.size:
                players_imgs.append(player_img)
                players_boxes.append(box)
    return players_imgs, players_boxes


### Compute Kit Color per Player (Upper Body, Grass Removed)


In [8]:
def get_kits_colors(players, grass_hsv=None, frame=None):
    kits_colors = []
    if grass_hsv is None:
        grass_color = get_grass_color(frame)
        grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV)

    for player_img in players:
        hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV)
        lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40])
        upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255])
        mask = cv2.inRange(hsv, lower_green, upper_green)
        mask = cv2.bitwise_not(mask)

        upper_mask = np.zeros(player_img.shape[:2], np.uint8)
        upper_mask[0:player_img.shape[0] // 2, :] = 255
        mask = cv2.bitwise_and(mask, upper_mask)

        kit_color = np.array(cv2.mean(player_img, mask=mask)[:3])
        kits_colors.append(kit_color)

    return kits_colors


### KMeans Kit Classifier (2 Teams)

In [9]:
def get_kits_classifier(kits_colors):
    kits_kmeans = KMeans(n_clusters=2, n_init="auto")
    kits_kmeans.fit(kits_colors)
    return kits_kmeans

def classify_kits(kits_classifer, kits_colors):
    return kits_classifer.predict(kits_colors)


### Decide Which Cluster Is ‚ÄúLeft Team

In [10]:
def get_left_team_label(players_boxes, kits_colors, kits_clf):
    left_team_label = 0
    team_0 = []
    team_1 = []

    for i in range(len(players_boxes)):
        x1, y1, x2, y2 = map(int, players_boxes[i].xyxy[0].cpu().numpy())
        team = classify_kits(kits_clf, [kits_colors[i]]).item()
        (team_0 if team == 0 else team_1).append(np.array([x1]))

    team_0 = np.array(team_0) if len(team_0) else np.array([0])
    team_1 = np.array(team_1) if len(team_1) else np.array([0])

    if np.average(team_0) - np.average(team_1) > 0:
        left_team_label = 1
    return left_team_label


### ROI Generator for Jersey Area

In [11]:
def jersey_rois(player_img_bgr, mode="back"):
    """
    Wider left margin to capture thin '1'
    """
    h, w = player_img_bgr.shape[:2]
    if h < 50 or w < 35:
        return []

    rois = []
    if mode == "back":
        rois.append(player_img_bgr[int(h*0.12):int(h*0.82), int(w*0.12):int(w*0.88)])
        rois.append(player_img_bgr[int(h*0.18):int(h*0.78), int(w*0.18):int(w*0.82)])
        rois.append(player_img_bgr[int(h*0.25):int(h*0.85), int(w*0.15):int(w*0.85)])
    else:
        rois.append(player_img_bgr[int(h*0.10):int(h*0.75), int(w*0.12):int(w*0.88)])
        rois.append(player_img_bgr[int(h*0.16):int(h*0.70), int(w*0.18):int(w*0.82)])

    return [r for r in rois if r is not None and r.size != 0]


### Image Variants for OCR (Gentle Thresholding)

In [12]:
def make_variants(roi_bgr):
    """
    Gentle variants to avoid killing thin '1'
    """
    if roi_bgr is None or roi_bgr.size == 0:
        return []

    roi = cv2.resize(roi_bgr, None, fx=2.8, fy=2.8, interpolation=cv2.INTER_CUBIC)

    gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    gray2 = clahe.apply(gray)

    thr = cv2.threshold(gray2, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
    inv = cv2.bitwise_not(thr)

    ada = cv2.adaptiveThreshold(gray2, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                cv2.THRESH_BINARY, 21, 5)
    ada_inv = cv2.bitwise_not(ada)

    roi_rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)

    return [
        ("rgb", roi_rgb),
        ("gray", gray2),
        ("thr", thr),
        ("inv", inv),
        ("ada", ada),
        ("ada_inv", ada_inv),
    ]


### OCR Helpers (Digits Only + Best Candidate)

In [13]:
def _ocr_digits(img, allow="0123456789"):
    return OCR_READER.readtext(img, detail=1, paragraph=False, allowlist=allow)

def _best_digits_from_results(results):
    best_num, best_conf = None, 0.0
    for bbox, text, conf in results:
        digits = re.sub(r"\D", "", text)
        if 1 <= len(digits) <= 2:
            conf = float(conf)
            if conf > best_conf:
                best_conf = conf
                best_num = digits
    return best_num, best_conf


### Prefix ‚Äú1‚Äù Recovery (Fixes 14‚Üí4 Mistake)

In [14]:
def _detect_prefix_one(roi_bgr):
    """
    Detect digit '1' in a thin left strip.
    """
    h, w = roi_bgr.shape[:2]
    if h < 20 or w < 20:
        return False, 0.0

    strip = roi_bgr[:, :max(8, int(w * 0.28))]
    for tag, img in make_variants(strip):
        res = _ocr_digits(img, allow="1")
        for bbox, text, conf in res:
            t = re.sub(r"\D", "", text)
            if t == "1" and float(conf) >= 0.25:
                return True, float(conf)
    return False, 0.0


### Main Jersey Number Detector

In [15]:
def detect_jersey_number(player_img_bgr, mode="back"):
    """
    Best-of multi-ROI + multi-variants
    plus prefix-1 recovery (14->4 fix).
    Returns (num, conf, tag)
    """
    best_num, best_conf, best_tag = None, 0.0, None

    rois = jersey_rois(player_img_bgr, mode=mode)
    for roi in rois:
        # normal 1-2 digit read
        for tag, img in make_variants(roi):
            results = _ocr_digits(img, allow="0123456789")
            num, conf = _best_digits_from_results(results)
            if num is not None and conf > best_conf:
                best_num, best_conf, best_tag = num, conf, tag

        # If best is 1 digit, try to recover a missing leading '1'
        if best_num is not None and len(best_num) == 1:
            has1, c1 = _detect_prefix_one(roi)
            if has1:
                candidate = "1" + best_num
                cand_conf = min(0.99, max(best_conf, 0.60))
                if cand_conf >= best_conf:
                    best_num, best_conf, best_tag = candidate, cand_conf, "prefix1+" + (best_tag or "")

    return best_num, best_conf, best_tag


### Voting / Locking Logic (Per Track)
##### Update History Buffer

In [16]:
def update_track_number(track_id, num, conf, maxlen=MAXLEN_READS):
    if track_id is None or num is None:
        return
    track_numbers[track_id].append((num, float(conf)))
    if len(track_numbers[track_id]) > maxlen:
        track_numbers[track_id] = track_numbers[track_id][-maxlen:]


### Get Stable Number by Score (freq √ó avg_conf)

In [17]:
def get_stable_number(track_id, min_reads=MIN_READS, conf_th=CONF_TH, margin=LOCK_MARGIN):
    if track_id is None:
        return None

    reads = track_numbers.get(track_id, [])
    if len(reads) < min_reads:
        return None

    reads = [(n, c) for (n, c) in reads if c >= conf_th]
    if len(reads) < 3:
        return None

    by_num = defaultdict(list)
    for n, c in reads:
        by_num[n].append(c)

    scores = []
    for n, confs in by_num.items():
        freq = len(confs)
        avgc = sum(confs) / len(confs)
        score = freq * avgc
        scores.append((score, n))

    scores.sort(reverse=True)
    best_score, best_num = scores[0]
    second_score = scores[1][0] if len(scores) > 1 else 0.0

    if second_score > 0 and (best_score / second_score) < margin:
        return None

    return best_num


### Lock Policy (Avoid locking 1-digit too early)

In [18]:
def should_lock(track_id, stable_num, reads, prefer_two_digits=True):
    """
    Do NOT lock single-digit easily (prevents 14 -> 4 lock).
    """
    if stable_num is None:
        return False

    if prefer_two_digits and len(stable_num) == 1:
        strong = [(n, c) for (n, c) in reads if n == stable_num and c >= 0.80]
        return len(strong) >= 10  # very strict for 1-digit
    return True


### Upgrade Policy (1-digit lock ‚Üí 2-digit if strong later)

In [19]:
def maybe_upgrade_lock(track_id):
    """
    If locked to 1-digit but a 2-digit becomes dominant later, upgrade.
    """
    if track_id is None or track_id not in locked_number:
        return

    cur = locked_number[track_id]
    reads = track_numbers.get(track_id, [])
    if len(cur) == 1:
        good2 = [n for (n, c) in reads if len(n) == 2 and c >= 0.55]
        if len(good2) >= 5:
            from collections import Counter
            cand = Counter(good2).most_common(1)[0][0]
            locked_number[track_id] = cand


### Cleanup Old Tracks (TTL)

In [20]:
def cleanup_old_tracks(frame_idx):
    to_delete = []
    for tid, last_f in list(last_seen_frame.items()):
        if frame_idx - last_f > TTL_FRAMES:
            to_delete.append(tid)

    for tid in to_delete:
        last_seen_frame.pop(tid, None)
        locked_number.pop(tid, None)
        track_numbers.pop(tid, None)


### üé¨ Main Pipeline: Track + Team Classify + OCR + Render
#### Video Annotation Function

In [21]:
def annotate_video(video_path, model):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"ERROR: Could not open video: {video_path}")
        return

    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    video_name = video_path.split('/')[-1]
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    output_video = cv2.VideoWriter(
        './output/' + video_name.split('.')[0] + "_out.mp4",
        fourcc,
        30.0,
        (width, height)
    )

    kits_clf = None
    left_team_label = 0
    grass_hsv = None

    pbar = tqdm(total=total_frames, unit="frame", desc="Annotating video")

    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break

        frame_idx = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        pbar.update(1)

        annotated_frame = cv2.resize(frame, (width, height))

        # tracking (try botsort)
        try:
            result = model.track(
                annotated_frame,
                conf=0.5,
                persist=True,
                verbose=False,
                device=DEVICE_ID,
                tracker="botsort.yaml"
            )[0]
        except Exception:
            result = model.track(
                annotated_frame,
                conf=0.5,
                persist=True,
                verbose=False,
                device=DEVICE_ID
            )[0]

        # Team clustering init
        players_imgs, players_boxes = get_players_boxes(result)
        kits_colors = get_kits_colors(players_imgs, grass_hsv, annotated_frame)

        if kits_clf is None and len(kits_colors) >= 2:
            kits_clf = get_kits_classifier(kits_colors)
            left_team_label = get_left_team_label(players_boxes, kits_colors, kits_clf)
            grass_color = get_grass_color(result.orig_img)
            grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV)

        for box in result.boxes:
            label = int(box.cls.cpu().numpy()[0])
            x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())

            track_id = None
            if hasattr(box, "id") and box.id is not None:
                track_id = int(box.id.cpu().numpy()[0])

            if track_id is not None:
                last_seen_frame[track_id] = frame_idx

            jersey_text = None
            debug_info = None

            if label == 0:
                # team classification
                if kits_clf is not None and grass_hsv is not None:
                    kit_color = get_kits_colors([result.orig_img[y1:y2, x1:x2]], grass_hsv)
                    team = classify_kits(kits_clf, kit_color).item()
                    label = 0 if team == left_team_label else 1

                crop = result.orig_img[y1:y2, x1:x2]
                h_c, w_c = crop.shape[:2]

                num, conf, tag = None, 0.0, None
                if h_c >= 55 and w_c >= 35:
                    num, conf, tag = detect_jersey_number(crop, mode=JERSEY_MODE)
                    update_track_number(track_id, num, conf)
                    debug_info = (num, conf, tag, h_c, w_c)

                # lock & keep
                reads = track_numbers.get(track_id, [])
                stable = get_stable_number(track_id)

                if track_id is not None and track_id not in locked_number:
                    if should_lock(track_id, stable, reads, prefer_two_digits=True):
                        locked_number[track_id] = stable

                maybe_upgrade_lock(track_id)

                final_num = locked_number.get(track_id, None)
                if final_num:
                    jersey_text = f"#{final_num}"
                else:
                    if num is not None and conf >= SHOW_SINGLE_IF_CONF_GE:
                        jersey_text = f"#{num}"

                # DEBUG: save crops
                if DEBUG_OCR and frame_idx <= DEBUG_MAX_FRAMES and track_id is not None:
                    if frame_idx % DEBUG_SAVE_EVERY == 0:
                        outp = os.path.join(DEBUG_DIR, f"f{frame_idx:05d}_id{track_id}_hc{h_c}_wc{w_c}.jpg")
                        cv2.imwrite(outp, crop)

            elif label == 1:
                label = 2 if x1 < 0.5 * width else 3
            else:
                label = label + 2

            # Draw box + label
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), box_colors[str(label)], 2)
            cv2.putText(
                annotated_frame,
                labels[label],
                (x1 - 30, y1 - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.9,
                box_colors[str(label)],
                2
            )

            if jersey_text is not None:
                cv2.putText(
                    annotated_frame,
                    jersey_text,
                    (x1, y2 + 25),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.8,
                    (255, 255, 255),
                    2
                )

            # Optional debug overlay
            if DEBUG_OCR and debug_info is not None:
                n, c, t, hc, wc = debug_info
                txt = f""
                cv2.putText(
                    annotated_frame,
                    txt,
                    (x1, y2 + 45),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.55,
                    (0, 255, 255),
                    2
                )

        cleanup_old_tracks(frame_idx)
        output_video.write(annotated_frame)

    pbar.close()
    cv2.destroyAllWindows()
    output_video.release()
    cap.release()


### ‚ñ∂Ô∏è Run Section
##### Labels, Colors, Model Load, and Execute

In [22]:
labels = ["Player-L", "Player-R", "GK-L", "GK-R", "Ball", "Main Ref", "Side Ref", "Staff"]
box_colors = {
    "0": (150, 50, 50),
    "1": (37, 47, 150),
    "2": (41, 248, 165),
    "3": (166, 196, 10),
    "4": (155, 62, 157),
    "5": (123, 174, 213),
    "6": (217, 89, 204),
    "7": (22, 11, 15)
}

video_path = "test_videos/CV_Task.mp4"
model = YOLO("weights/best.pt")
model.to("cuda:0")

print("CUDA:", torch.cuda.is_available(), "|", torch.cuda.get_device_name(0))
annotate_video(video_path, model)


  ckpt = torch.load(file, map_location="cpu")


CUDA: True | NVIDIA GeForce RTX 3060 Laptop GPU


Annotating video: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñâ| 743/744 [10:10<00:00,  1.22frame/s]
