In [18]:
# ==========================================
# CONFIGURAZIONE E IMPORT
# ==========================================
import os
import sys

# =============================================================================
# 1. ENVIRONMENT SANITIZATION (Reset)
# =============================================================================
# We force a return to the container root and wipe the existing repository.
# This prevents "State Leakage" where old config files or artifacts from a
# previous run might corrupt the current execution.
print(">>> Executing Environment Reset (Clean Slate)...")
try:
    os.chdir('/content/')
    # Force remove (-rf) to handle non-empty directories and git history without prompts
    !rm -rf Hawk-AI-CV-Project
except Exception as e:
    # Pass silently if directory doesn't exist (first run)
    pass

# =============================================================================
# 2. SOURCE CODE RETRIEVAL (Clone)
# =============================================================================
# Pulling the latest version of the codebase (HEAD) from the remote origin.
!git clone https://github.com/leonardoCosta02/Hawk-AI-CV-Project.git

# =============================================================================
# 3. RUNTIME CONTEXT & PATH CONFIGURATION
# =============================================================================
repo_name = "Hawk-AI-CV-Project"

# Change the working directory of the shell/notebook kernel to the project root.
%cd $repo_name

# CRITICAL: UPDATING PYTHONPATH
# By default, Python looks for modules in site-packages and the script directory.
# We explicitly inject the current working directory (os.getcwd()) to the TOP (index 0)
# of sys.path.
# WHY? This allows us to use absolute imports like `from src import config`
# regardless of which sub-folder we are currently executing code in.
sys.path.insert(0, os.getcwd())

# =============================================================================
# 4. DEPENDENCY RESOLUTION
# =============================================================================
print(">>> Installing Runtime Dependencies...")
# - numpy: Vector math
# - opencv-python: Computer Vision algorithms (Canny, Hough, Homography)
# - matplotlib: Visualization and plotting
!pip install numpy opencv-python matplotlib

# =============================================================================
# 5. VERIFICATION
# =============================================================================
print("\n>>> Setup Complete. Active Runtime Root:")
print(os.getcwd())
# Check if imports work immediately to fail fast if something went wrong
try:
    import numpy as np
    import cv2
    print(f">>> Dependencies verified: OpenCV {cv2.__version__}")
except ImportError as e:
    print(f"!!! CRITICAL ERROR: Dependency installation failed - {e}")

>>> Executing Environment Reset (Clean Slate)...
Cloning into 'Hawk-AI-CV-Project'...
remote: Enumerating objects: 803, done.[K
remote: Counting objects: 100% (28/28), done.[K
remote: Compressing objects: 100% (28/28), done.[K
remote: Total 803 (delta 9), reused 0 (delta 0), pack-reused 775 (from 2)[K
Receiving objects: 100% (803/803), 59.54 MiB | 31.06 MiB/s, done.
Resolving deltas: 100% (339/339), done.
/content/Hawk-AI-CV-Project
>>> Installing Runtime Dependencies...

>>> Setup Complete. Active Runtime Root:
/content/Hawk-AI-CV-Project
>>> Dependencies verified: OpenCV 4.12.0


In [19]:
# ==========================================
# CELLA 1: CONFIGURAZIONE E IMPORT
# ==========================================
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
import csv
import os
from google.colab import files

print("Librerie importate. Pronto per il giudizio.")

COURT_DIMENSIONS = {
    'X_MIN': 0.0,
    'X_MAX': 8.23, # Larghezza singolo
    'Y_MIN': 0.0,
    'Y_MAX': 5.485 # Limite della linea di servizio rispetto al fondo
}

def is_point_in(x, y):
    """Restituisce True se il punto (metri) Ã¨ dentro le righe."""
    in_x = COURT_DIMENSIONS['X_MIN'] <= x <= COURT_DIMENSIONS['X_MAX']
    in_y = COURT_DIMENSIONS['Y_MIN'] <= y <= COURT_DIMENSIONS['Y_MAX']
    return in_x and in_y

Librerie importate. Pronto per il giudizio.


In [20]:
# =============================================================================
# MODULE: HAWK-EYE JUDICIAL ENGINE (M3)
# Implementation of Step 3 (Parallax Correction) and Step 4 (Kinematic Analysis)
# =============================================================================
import cv2 as cv
import numpy as np

# Metric constraints synchronized with ITF standards and Module 3 calibration.
# Note: Y_MAX is 5.485m, representing the Baseline-to-Service Line distance
if 'COURT_DIMENSIONS' not in globals():
    COURT_DIMENSIONS = {
        'X_MIN': 0.0,   'X_MAX': 8.23,
        'Y_MIN': 0.0,   'Y_MAX': 5.485  # Real metric depth of the service box
    }

def is_point_in(x, y):
    """
    Evaluates if a projected world coordinate falls within the court boundaries.
    """
    return (COURT_DIMENSIONS['X_MIN'] <= x <= COURT_DIMENSIONS['X_MAX'] and
            COURT_DIMENSIONS['Y_MIN'] <= y <= COURT_DIMENSIONS['Y_MAX'])

class HawkEyeJudge:
    """
    Automated line-judging engine that maps pixel trajectories to metric space.
    """
    def __init__(self, homography_matrix):
        # Initializing with the 3x3 Homography Matrix from Module 3
        self.H = homography_matrix

    def pixel_to_world(self, u, v):
        """
        Transforms 2D Image Pixels (u,v) to 3D World Meters (X,Y) via Matrix H.
        """
        point_vec = np.array([u, v, 1.0])
        mapped_vec = np.dot(self.H, point_vec)

        # Normalize by the homogeneous scale factor (w)
        if mapped_vec[2] != 0:
            return mapped_vec[0] / mapped_vec[2], mapped_vec[1] / mapped_vec[2]
        return None, None

    def detect_bounces(self, trajectory):
        """
        STEP 4: KINEMATIC BOUNCE DETECTION
        Identifies impacts by analyzing vertical velocity (Vy) sign inversion.
        A physical bounce occurs when downward momentum is reversed.
        """
        bounces = []
        if len(trajectory) < 10: return bounces

        last_bounce_frame = -20 # Prevents duplicate triggers within a 20-frame window

        for t in range(2, len(trajectory) - 2):
            prev = trajectory[t-1]
            curr = trajectory[t]
            nxt  = trajectory[t+1]

            if prev is None or curr is None or nxt is None:
                continue

            # Calculate vertical delta (Velocity Proxy)
            # In pixel coordinates, Vy > 0 indicates descent (approaching court bottom)
            v_vel_in  = curr[1] - prev[1]  # Velocity entering current frame
            v_vel_out = nxt[1] - curr[1]   # Velocity exiting current frame

            # Logic: Bounce detected if velocity flips from positive (falling) to negative (rising)
            is_velocity_reversal = v_vel_in > 0 and v_vel_out <= 0

            if is_velocity_reversal:
                # Amplitude check: Ensure the change is not just tracking jitter
                if v_vel_in > 2:
                    if (t - last_bounce_frame) > 15:
                        bounces.append({
                            'frame': t,
                            'pixel': curr
                        })
                        last_bounce_frame = t
        return bounces

    def adjudicate_video(self, video_path, trajectory, output_path):
        """
        Processes the video, identifies bounces, and overlays the judicial verdict.
        """
        cap = cv.VideoCapture(video_path)
        fps = cap.get(cv.CAP_PROP_FPS)
        width, height = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
        out = cv.VideoWriter(output_path, cv.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

        # Identify impact frames using Step 4 velocity analysis
        bounces = self.detect_bounces(trajectory)
        print(f"ðŸŽ¾ Impact events identified: {len(bounces)}")

        decisions = {}
        for b in bounces:
            f_idx = b['frame']
            px, py = b['pixel']

            # STEP 3: PARALLAX MITIGATION (Ground Contact Offset)
            # YOLO detects the ball's center. We apply a +6px offset to py to target
            # the ball's base, ensuring the projection lies on the ground plane.
            py_contact = py + 6

            wx, wy = self.pixel_to_world(px, py_contact)

            if wx is not None:
                # Execute final spatial logic
                verdict = "IN" if is_point_in(wx, wy) else "OUT"
                decisions[f_idx] = {
                    'verdict': verdict,
                    'coords': (wx, wy),
                    'color': (0, 255, 0) if verdict == "IN" else (0, 0, 255)
                }
                print(f"   -> Frame {f_idx}: {verdict} (X={wx:.2f}m, Y={wy:.2f}m)")

        # Main Visualization Loop
        frame_count = 0
        display_timer = 0
        current_display = None
        while True:
            ret, frame = cap.read()
            if not ret: break

            if frame_count in decisions:
                current_display = decisions[frame_count]
                display_timer = int(fps * 2)

            # Draw ball trajectory
            if frame_count < len(trajectory) and trajectory[frame_count] is not None:
                cv.circle(frame, (int(trajectory[frame_count][0]), int(trajectory[frame_count][1])), 5, (0, 255, 255), -1)

            # Draw Verdict UI Overlay
            if display_timer > 0 and current_display:
                cv.rectangle(frame, (50, 150), (450, 320), (0,0,0), -1)
                cv.putText(frame, current_display['verdict'], (80, 240), cv.FONT_HERSHEY_SIMPLEX, 3, current_display['color'], 8)
                cv.putText(frame, f"X: {current_display['coords'][0]:.2f}m Y: {current_display['coords'][1]:.2f}m", (80, 300), cv.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
                display_timer -= 1

            out.write(frame)
            frame_count += 1

        cap.release()
        out.release()

In [21]:
# ==========================================
# CELLA 3: ESECUZIONE DEL GIUDICE (Versione Robusta)
# ==========================================
import pandas as pd
import numpy as np # Importante per la matrice
import glob
import os
from google.colab import files

# 1. LA TUA MATRICE H
# DA SOSTITUIRE (!!)
# -----------------------------------------------------------
H_MATRIX = np.array([
    [ 6.60767767e-02,  2.68232422e-02, -3.15310509e+01],
    [-4.44622299e-04, -2.13413995e-01,  1.07256989e+02],
    [ 1.94201000e-05,  6.47891119e-03,  1.00000000e+00]
])
# -----------------------------------------------------------

print("Inizializzazione Giudice...")
try:
    judge = HawkEyeJudge(H_MATRIX)
except NameError:
    print("ERRORE: Inizializzare class HawkEyeJudge !")
    judge = None

if judge:
    # Cerchiamo i file CSV che iniziano con DATI_
    csv_files = glob.glob("DATI_*.csv")

    if not csv_files:
        print("Nessun file 'DATI_*.csv' contenente le coordinate trovato nella cartella corrente.")
    else:
        print(f"Trovati {len(csv_files)} file dati da analizzare.")

        for csv_path in csv_files:
            print(f"\n" + "="*40)
            print(f"ðŸ”Ž Analisi file dati: {csv_path}")

            # A. Ricostruisci il nome del video originale
            # 1. Prendi solo il nome file (es. DATI_video1.csv) ignorando cartelle
            nome_file_pulito = os.path.basename(csv_path)

            # 2. Rimuovi il prefisso "DATI_" e il suffisso ".csv"
            nome_base = nome_file_pulito.replace("DATI_", "").replace(".csv", "")

            print(f"Cerco il video corrispondente a: '{nome_base}'")

            # 3. Cerca il video in varie posizioni possibili (da modificare !!)
            possibili_path = [
                os.path.join(os.getcwd(), 'data', 'videos'), # Cartella strutturata
                os.getcwd(),                                 # Cartella corrente (/content)
                '/content'                                   # Root esplicita
            ]

            video_path = None
            estensioni = ['.mp4', '.mov', '.avi']

            for cartella in possibili_path:
                for ext in estensioni:
                    test_path = os.path.join(cartella, nome_base + ext)
                    if os.path.exists(test_path):
                        video_path = test_path
                        break
                if video_path: break

            if not video_path:
                print(f"VIDEO NON TROVATO!")
                continue

            print(f"Video trovato: {video_path}")

            try:
                df = pd.read_csv(csv_path)

                # Trova il frame massimo
                max_frame = int(df['frame'].max())

                # Crea lista (aggiungo +50 di buffer per sicurezza)
                trajectory_list = [None] * (max_frame + 50)

                # Riempi la lista
                punti_validi = 0
                for _, row in df.iterrows():
                    try:
                        idx = int(row['frame'])
                        u, v = float(row['u']), float(row['v'])

                        # Controllo base per evitare valori assurdi
                        if not np.isnan(u) and not np.isnan(v):
                            trajectory_list[idx] = (u, v)
                            punti_validi += 1
                    except ValueError:
                        continue # Salta righe corrotte

                print(f"Caricati {punti_validi} punti di tracciamento.")

                # C. Esegui il giudizio
                output_filename = f"JUDGED_{nome_base}.mp4"

                judge.adjudicate_video(video_path, trajectory_list, output_filename)

                # D. Scarica
                if os.path.exists(output_filename):
                    print(f"   ðŸ’¾ Download in corso: {output_filename}")
                    files.download(output_filename)
                else:
                    print("Errore: Il video output non Ã¨ stato creato.")

            except Exception as e:
                print(f"Errore durante l'elaborazione di {nome_base}: {e}")

Inizializzazione Giudice...
Nessun file 'DATI_*.csv' contenente le coordinate trovato nella cartella corrente.


In [22]:
# ==========================================
# CELLA DI DEBUG: VISUALIZZA LA GRIGLIA
# ==========================================
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow

H_MATRIX = np.array([
    [6.50980565e-02, 2.65116930e-02, -3.04981716e+01],
    [3.86372362e-04, -1.97434738e-01, 7.91002378e+01],
    [1.76143090e-04, 6.49680276e-03, 1.00000000e+00]
])

# 2. Definiamo i 4 angoli del campo (in METRI) secondo il sistema M3
# Ordine: [0,0] (Servizio SX), [8.23, 0] (Servizio DX), [8.23, 6.40] (Rete DX), [0, 6.40] (Rete SX)
# Usiamo i punti chiave delle righe del singolare
punti_metri = np.array([
    [0.0,  0.0],   # Intersezione T servizio / laterale sinistra
    [8.23, 0.0],   # Intersezione T servizio / laterale destra
    [8.23, 6.40],  # Rete / laterale destra
    [0.0,  6.40],  # Rete / laterale sinistra
    [0.0, -5.49],  # Fondo / laterale sinistra
    [8.23, -5.49]  # Fondo / laterale destra
], dtype='float32')

def disegna_campo_su_frame(frame, H):
    # Calcoliamo l'inversa per proiettare Metri -> Pixel
    try:
        H_inv = np.linalg.inv(H)
    except:
        print("La matrice non Ã¨ invertibile!")
        return frame

    # Convertiamo i punti metri in coordinate omogenee [x, y, 1]
    h, w = frame.shape[:2]
    pts_pixel = []

    for pm in punti_metri:
        vec_m = np.array([pm[0], pm[1], 1])
        vec_px = np.dot(H_inv, vec_m)

        # Normalizza
        if vec_px[2] != 0:
            u = int(vec_px[0] / vec_px[2])
            v = int(vec_px[1] / vec_px[2])
            pts_pixel.append((u, v))

            # Disegna punto
            cv.circle(frame, (u, v), 10, (0, 0, 255), -1)
            cv.putText(frame, f"{pm[0]},{pm[1]}", (u+10, v), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,255), 2)

    return frame

# --- TEST SU UN FRAME A CASO DEL TUO VIDEO ---
video_path = "JUDGED_video1.mp4" # <--- CONTROLLA NOME FILE
cap = cv.VideoCapture(video_path)
ret, frame = cap.read()
if ret:
    frame_debug = disegna_campo_su_frame(frame, H_MATRIX)
    cv2_imshow(frame_debug)
else:
    print("Impossibile leggere il video per il debug.")
cap.release()

Impossibile leggere il video per il debug.
