In [1]:
import pandas as pd
import numpy as np
import csv
import matplotlib
import matplotlib.pyplot as plt
matplotlib.use('TkAgg')  # Cambiar el backend a TkAgg
from pathlib import Path

import cv2
import ultralytics
from ultralytics import YOLO

In [2]:
# NAME OF THE FILES

file_name = "videotest7.mp4"
video_folder = "videos/"
csv_folder = "csv/"
name = Path(file_name).stem

video_path = video_folder + file_name
df_file = csv_folder + name + ".csv"
df_pose_file = csv_folder + name + "_pose.csv"
df_pose_homography = csv_folder + name + "_homography.csv"

output_video_poss = video_folder + name + '_ballposs.mp4'
output_video_hoop = video_folder + name + '_hoop.mp4'
output_video_projection2D = video_folder + name + '_projection2D.mp4'
output_video_combined = video_folder + name + '_combined.mp4'
output_video_withshot = video_folder + name + '_withshot.mp4'

In [3]:
# READ THE CSV DATAFILES

df = pd.read_csv(df_file)
df_pose = pd.read_csv(df_pose_file)

In [5]:
# ELIMINATE DUPLICATE OF BALLS IN SAME FRAME

def UNIQUE_BALLS(df):

    df['correct_ball'] = False
    last_ball_id = None

    for frame in sorted(df['frame'].unique()):
        balls_in_frame = df[(df['frame'] == frame) & (df['class_id'] == 0)]
        if len(balls_in_frame) == 1:
            ball_index = balls_in_frame.index[0]
            df.loc[ball_index, 'correct_ball'] = True
            last_ball_id = balls_in_frame.iloc[0]['tracking_id']
        elif len(balls_in_frame) > 1:
            correct_ball = balls_in_frame[balls_in_frame['tracking_id'] == last_ball_id]
            if not correct_ball.empty:
                ball_index = correct_ball.index[0]
                df.loc[ball_index, 'correct_ball'] = True
            else:
                ball_index = balls_in_frame.index[0]
                df.loc[ball_index, 'correct_ball'] = True    
            last_ball_id = df.loc[ball_index, 'tracking_id']

    df = df[(df['class_id'] != 0) | (df['correct_ball'] == True)].drop(columns=['correct_ball'])
    df = df.reset_index(drop=True)
    
    return df

df = UNIQUE_BALLS(df)

In [4]:
# INTERPOLATE LINEAR THE POSITION OF THE BALL (POLINOMICAL) AND THE HOOP (LINEAR) IN CASE THERE'S NO DETECTION

def INTERPOLATE_BALL_HOOP (df):

    all_frames = pd.DataFrame({'frame': range(df['frame'].min(), df['frame'].max() + 1)})

    df_basketball = df[df['class_id'] == 0].copy()
    df_basketball = pd.merge(all_frames, df_basketball[['frame', 'tracking_id', 'x_center', 'y_center', 'width', 'height', 'confidence']], on='frame', how='left')
    df_basketball_interpolated = df_basketball.set_index(['frame', 'tracking_id']).interpolate(method='linear', limit_direction='both', limit_area='inside').reset_index()
    df_basketball_interpolated['class_id'] = 0

    df_hoop = df[df['class_id'] == 1].copy()
    df_hoop = pd.merge(all_frames, df_hoop[['frame', 'tracking_id', 'x_center', 'y_center', 'width', 'height', 'confidence']], on='frame', how='left')
    df_hoop_interpolated = df_hoop.set_index(['frame']).interpolate(method='linear', limit_direction='both', limit_area='inside').reset_index()
    df_hoop_interpolated['class_id'] = 1

    df_players = df[df['class_id'] == 2].copy()
    df_players = pd.merge(all_frames, df_players[['frame', 'tracking_id', 'x_center', 'y_center', 'width', 'height', 'confidence']], on='frame', how='left')
    df_players['class_id'] = 2

    df = pd.concat([df_basketball_interpolated, df_hoop_interpolated, df_players])
    df = df.sort_values(by=['frame','class_id']).reset_index(drop=True)
    
    return df

df = INTERPOLATE_BALL_HOOP (df)

In [6]:
# IDENTIFY WHICH PLAYER OR HOOP HAVE THE POSSESION OF THE BALL
# AND THE POSITION OF THE BALL ON THE COURT (POSITION OF THE BALL HANDLER OR HOOP)

def IDENTIFY_POSSESSION(df):
    
    df['ball_pos'] = None
    df['is_shot'] = None
    df['x_min'] = df['x_center'] - df['width'] / 2
    df['x_max'] = df['x_center'] + df['width'] / 2
    df['y_min'] = df['y_center'] - df['height'] / 2
    df['y_max'] = df['y_center'] + df['height'] / 2

    previous_possession = None
    consecutive_frames = 0

    for frame in df['frame'].unique():
        ball_data = df[(df['frame'] == frame) & (df['class_id'] == 0)]
        hoop_data = df[(df['frame'] == frame) & (df['class_id'] == 1)]
        players_data = df[(df['frame'] == frame) & (df['class_id'] == 2)]

        if not ball_data.empty:
            for ball_index, ball_row in ball_data.iterrows():
                ball_x = ball_row['x_center']
                ball_y = ball_row['y_center']
                ball_x_min = ball_row['x_center'] - ball_row['width'] / 2
                ball_x_max = ball_row['x_center'] + ball_row['width'] / 2
                ball_y_min = ball_row['y_center'] - ball_row['height'] / 2
                ball_y_max = ball_row['y_center'] + ball_row['height'] / 2

                hoop_possession = None # IF THE BALL IS OVER THE HOOP, THE POSSESSION IS IN THE HOOP
                if not hoop_data.empty:
                    for _, hoop_row in hoop_data.iterrows():
                        hoop_x_min = hoop_row['x_center'] - 3 * hoop_row['width'] / 2 # DOUBLE OF THE WIDE IF THE HOOP
                        hoop_x_max = hoop_row['x_center'] + 3 * hoop_row['width'] / 2
                        hoop_y_min = hoop_row['y_min']
                        hoop_y_max = hoop_row['y_max']
                        
                        if hoop_x_min <= ball_x <= hoop_x_max and ball_y <= hoop_y_min:
                            hoop_possession = hoop_row['tracking_id']
                            break

                if hoop_possession:
                    df.loc[ball_index, 'ball_pos'] = hoop_possession
                    df.loc[ball_index, 'is_shot'] = True
                    previous_possession = hoop_possession
                    consecutive_frames = 0
                
                else:
                    potential_possessions = pd.concat([players_data, hoop_data])
                    touching_bbox = potential_possessions[
                        (ball_x_min <= potential_possessions['x_max']) &
                        (ball_x_max >= potential_possessions['x_min']) &
                        (ball_y_min <= potential_possessions['y_max']) &
                        (ball_y_max >= potential_possessions['y_min'])]

                    if not touching_bbox.empty: # IF THERE IS MORE THAT ONE BOUNDING BOX
                        if previous_possession in touching_bbox['tracking_id'].values: # PRIORIZE THE OBJECT WITH THE PREVIOUS POSSESSION
                            closest_object = touching_bbox[touching_bbox['tracking_id'] == previous_possession].iloc[0]
                        else:
                            touching_bbox = touching_bbox.assign(  # IF NOT PREVIOUS POSSESSION, ASSIGN CLOSEST BOUNDING BOX
                                distance_to_ball=((touching_bbox['x_center'] - ball_x) ** 2 +
                                                  (touching_bbox['y_center'] - ball_y) ** 2) ** 0.5)
                            closest_object = touching_bbox.loc[touching_bbox['distance_to_ball'].idxmin()]

                        current_possession = closest_object['tracking_id']
                        if current_possession == previous_possession:
                            consecutive_frames += 1
                        else:
                            consecutive_frames = 1
                            previous_possession = current_possession

                        # ASSIGN POSSESSIONS ONLY IF 5 FRAMES ARE CONSECUTIVE
                        if consecutive_frames >= 5:
                            df.loc[ball_index, 'ball_pos'] = current_possession
                        else:
                            df.loc[ball_index, 'ball_pos'] = None
                            
                    else:  # IF THE BALL IS NOT IN ANY BOUNDING BOX, POSSESSION IS NONE
                        df.loc[ball_index, 'ball_pos'] = None
                        previous_possession = None
                        consecutive_frames = 0

    return df

df = IDENTIFY_POSSESSION(df)

In [7]:
# FUNCTION THAT PROCESSES THE VIDEO AND DISPLAYS A MESSAGE ABOUT THE BALL'S BOUNDING BOX

def VIDEO_SHOW_BALL_POSS (video_path, output_video_path, df):
    
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, cap.get(cv2.CAP_PROP_FPS),
                          (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

    for frame in df['frame'].unique():
        ball_data = df[(df['frame'] == frame) & (df['class_id'] == 0)]  # BALL
        
        ret, frame_video = cap.read()
        if not ret:
            break

        if not ball_data.empty:
            for ball_index, ball_row in ball_data.iterrows():
                ball_x = ball_row['x_center']
                ball_y = ball_row['y_center']
                ball_width = ball_row['width']
                ball_height = ball_row['height']
                ball_pos = ball_row['ball_pos']
                
                if pd.isna(ball_x) == False or pd.isna(ball_y) == False:

                    x_min = int(ball_x - ball_width / 2)
                    y_min = int(ball_y - ball_height / 2)
                    x_max = int(ball_x + ball_width / 2)
                    y_max = int(ball_y + ball_height / 2)

                    cv2.rectangle(frame_video, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
                
                    ball_pos_message = f"Ball Pos: {ball_pos}" if ball_pos is not None else "No possession"
                    cv2.putText(frame_video, ball_pos_message, (x_min, y_min - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

        out.write(frame_video)
    cap.release()
    out.release()

VIDEO_SHOW_BALL_POSS (video_path, output_video_poss, df)


In [8]:
# LINEAL INTERPOLATE IN KEYPOINTS DATAFRAME

def INTERPOLATE_ALL_VALUES (df_pose):

    columns_to_interpolate = df_pose.loc[:, 'confidence':'conf_kp6']
    df_pose.loc[:, 'confidence':'conf_kp6'] = columns_to_interpolate.interpolate(method='linear', axis=0, limit_direction='both', limit_area='inside')
        
    return df_pose

df_pose =  INTERPOLATE_ALL_VALUES (df_pose)

In [9]:
# SELECT POINTS IN THE 2D BASKETBALL COURT

left_court_csv = 'csv/points2D_left_court.csv'
right_court_csv = 'csv/points2D_right_court.csv'

def POINTS_COURT (side_court):

    court = cv2.imread('images/basketball_court.jpg')
    cv2.imshow('Basketball court', court)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

    plt.imshow(cv2.cvtColor(court, cv2.COLOR_BGR2RGB))
    plt.title(f"Select 6 key points in the {side_court} court 2D")
    points_2D = plt.ginput(6)
    plt.close()

    print(f"Points selected in the 2D {side_court} court:", points_2D)

    return points_2D

'''
left_court_2D = POINTS_COURT("left")
left_court_df = pd.DataFrame(left_court_2D, columns=['X', 'Y'])
left_court_df.to_csv(left_court_csv, index=False)
    
right_court_2D = POINTS_COURT("right")
right_court_df = pd.DataFrame(right_court_2D, columns=['X', 'Y'])
right_court_df.to_csv(right_court_csv, index=False)

'''

left_court_2D = pd.read_csv(left_court_csv, skiprows=1, header=None).apply(tuple, axis=1).tolist()
right_court_2D = pd.read_csv(right_court_csv, skiprows=1, header=None).apply(tuple, axis=1).tolist()

print(f"Points selected in the 2D left court: {left_court_2D}")
print(f"Points selected in the 2D right court: {right_court_2D}")


Points selected in the 2D left court: [(141.44921922285442, 85.333192107493), (145.48020820723897, 292.25729330589513), (142.79288221764926, 483.05743856675946), (140.1055562280597, 691.3252027599565), (320.15639753056536, 293.60095630069), (318.8127345357705, 483.05743856675946)]
Points selected in the 2D right court: [(1080.6696525844327, 85.333192107493), (1079.325989589638, 292.25729330589513), (1080.6696525844327, 485.7447645563491), (1080.6696525844327, 691.3252027599565), (900.618811281927, 292.25729330589513), (903.3061372715168, 483.05743856675946)]


In [10]:
# MAKE THE HOMOGRAPHY MATRIX PER FRAME

def HOMOGRAPHY_MATRIX (df_pose, left_court_2D, right_court_2D):
    data = []

    df_pose['keypoints'] = df_pose.apply(lambda row: [(row[f'kp{i+1}_x'], row[f'kp{i+1}_y']) for i in range(6)], axis=1)
    conf_columns = ['conf_kp1', 'conf_kp2', 'conf_kp3', 'conf_kp4', 'conf_kp5', 'conf_kp6']
    df_pose['conf_keypoints'] = df_pose[conf_columns].apply(lambda row: row.tolist(), axis=1)

    for i in range(len(df_pose)):
        if df_pose['class_id'].iloc[i] == 0:
            points_2D = left_court_2D
        else:
            points_2D = right_court_2D

        keypoints_forH = np.array([])
        points2D_forH = np.array([])

        for j in range(len(df_pose['keypoints'].iloc[i])):
            if df_pose['conf_keypoints'].iloc[i][j] >= 0.3 and not pd.isnull(df_pose['keypoints'].iloc[i][j][0]) and not pd.isnull(df_pose['keypoints'].iloc[i][j][1]):
                keypoints_forH = np.append(keypoints_forH, df_pose['keypoints'].iloc[i][j])
                points2D_forH = np.append(points2D_forH, points_2D[j])         
        
        keypoints_forH = np.array(keypoints_forH, dtype=np.float32)
        points2D_forH = np.array(points2D_forH, dtype=np.float32)
        
        keypoints_forH = keypoints_forH.reshape(-1, 1, 2)
        points2D_forH = points2D_forH.reshape(-1, 1, 2)
        
        if len(keypoints_forH) >= 4: # WE NEED 4 KEYPOINTS TO CALCULATE HOMOGRAPHY MATRIX!!
            H, status = cv2.findHomography(keypoints_forH, points2D_forH)
            data.append({
                'frame': df_pose['frame'].iloc[i],
                'Homo_matrix': H.tolist()
            })
        else:
            data.append({
                'frame': df_pose['frame'].iloc[i],
                'Homo_matrix': np.full((3, 3), np.nan)
            })
    homo_df = pd.DataFrame(data)
    df_pose = pd.merge(df_pose, homo_df, on='frame', how='left')

    return df_pose

df_pose = HOMOGRAPHY_MATRIX (df_pose, left_court_2D, right_court_2D)

In [11]:
# CREATE THE PROJECTION OF THE HOOP AT THE FLOOR

def CALCULATE_HOOP_PROJECTION (df, df_pose):

    df['x_floor'] = None
    df['y_floor'] = None
    
    for frame_number in df['frame'].unique():
        frame_data = df_pose[df_pose['frame'] == frame_number]
        row = df[(df['frame'] == frame_number) & (df['class_id'] == 1)]
        
        if not row.empty and not frame_data.empty:
            kp2_x = frame_data['kp2_x'].iloc[0]
            kp2_y = frame_data['kp2_y'].iloc[0]
            kp5_x = frame_data['kp5_x'].iloc[0]
            kp5_y = frame_data['kp5_y'].iloc[0]
            kp3_x = frame_data['kp3_x'].iloc[0]
            kp3_y = frame_data['kp3_y'].iloc[0]
            kp6_x = frame_data['kp6_x'].iloc[0]
            kp6_y = frame_data['kp6_y'].iloc[0]
            pt23_x = (kp2_x + kp3_x) / 2
            pt23_y = (kp2_y + kp3_y) / 2
            pt56_x = (kp5_x + kp6_x) / 2
            pt56_y = (kp5_y + kp6_y) / 2
            
            hook_x = row['x_center'].iloc[0]    
            m_yellow = (pt56_y - pt23_y) / (pt56_x - pt23_x)
            b_yellow = pt23_y - m_yellow * pt23_x
            intersection_y = m_yellow * hook_x + b_yellow
            
            df.at[row.index[0], 'x_floor'] = hook_x
            df.at[row.index[0], 'y_floor'] = intersection_y
            
    return df

df = CALCULATE_HOOP_PROJECTION (df, df_pose)

In [12]:
# GENERATE A VIDEO DEMOSTRATING THE HOOP PROJECTION

def VIDEO_GENERATE_HOOP (df, df_pose, video_path, output_video_path):
    cap = cv2.VideoCapture(video_path)
    video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    video_fps = int(cap.get(cv2.CAP_PROP_FPS))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, video_fps, (video_width, video_height))
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) - 1
        frame_data = df_pose[df_pose['frame'] == frame_number]
        row = df[(df['frame'] == frame_number) & (df['class_id'] == 1)]
        
        if not row.empty and not frame_data.empty:
            kp2_x = frame_data['kp2_x'].iloc[0]
            kp2_y = frame_data['kp2_y'].iloc[0]
            kp5_x = frame_data['kp5_x'].iloc[0]
            kp5_y = frame_data['kp5_y'].iloc[0]
            kp3_x = frame_data['kp3_x'].iloc[0]
            kp3_y = frame_data['kp3_y'].iloc[0]
            kp6_x = frame_data['kp6_x'].iloc[0]
            kp6_y = frame_data['kp6_y'].iloc[0]
            pt23_x = (kp2_x + kp3_x) / 2
            pt23_y = (kp2_y + kp3_y) / 2
            pt56_x = (kp5_x + kp6_x) / 2
            pt56_y = (kp5_y + kp6_y) / 2
            
            hook_x = row['x_center'].iloc[0]
            intersection_y = row['y_floor'].iloc[0]
            
            if frame_data[['kp2_x', 'kp2_y', 'kp5_x', 'kp5_y', 'kp3_x', 'kp3_y', 'kp6_x', 'kp6_y']].notnull().all().all():
                cv2.line(frame, (int(kp2_x), int(kp2_y)), (int(kp5_x), int(kp5_y)), (0, 0, 255), 2)
                cv2.line(frame, (int(kp3_x), int(kp3_y)), (int(kp6_x), int(kp6_y)), (0, 0, 255), 2)
                cv2.line(frame, (int(pt23_x), int(pt23_y)), (int(pt56_x), int(pt56_y)), (0, 255, 255), 2)
                cv2.line(frame, (int(hook_x), 0), (int(hook_x), video_height), (0, 255, 0), 2)
                cv2.circle(frame, (int(hook_x), int(intersection_y)), 5, (0, 0, 0), -1)
        
        out.write(frame)
    
    cap.release()
    out.release()
    cv2.destroyAllWindows()

VIDEO_GENERATE_HOOP (df, df_pose, video_path, output_video_hoop)

In [13]:
# CREATE THE POINTS OF THE FLOOR OF THE BALL AND PLAYERS

def POINTS_FLOOR(df):
    
    df.loc[df['class_id'] == 2, 'x_floor'] = df['x_center']
    df.loc[df['class_id'] == 2, 'y_floor'] = df['y_center'] + (df['height'] / 2)
    
    ball_indices = df[df['class_id'] == 0].index
    for idx in ball_indices:
        ball_pos = df.at[idx, 'ball_pos']  # TRACKING_ID OF THE POSSESSION
        frame = df.at[idx, 'frame']
        owner_data = df[(df['frame'] == frame) & (df['tracking_id'] == ball_pos)]
        if not owner_data.empty:
            df.at[idx, 'x_floor'] = df.at[idx, 'x_center']
            df.at[idx, 'y_floor'] = owner_data.iloc[0]['y_floor']
        else:
            df.at[idx, 'x_floor'] = None
            df.at[idx, 'y_floor'] = None
            
    df_ball = df[df['class_id'] == 0][['frame', 'x_floor', 'y_floor']]
    df_ball['x_floor'] = pd.to_numeric(df_ball['x_floor'], errors='coerce')
    df_ball['y_floor'] = pd.to_numeric(df_ball['y_floor'], errors='coerce')
    df_ball_interpolated = df_ball.set_index('frame').interpolate(method='linear', limit_direction='both').reset_index()
    df.loc[df['class_id'] == 0, ['x_floor', 'y_floor']] = df_ball_interpolated[['x_floor', 'y_floor']].values
    
    return df

df = POINTS_FLOOR(df)

In [14]:
# APPLY HOMOGRAPHY TO POINTS IN THE FLOOR OF THE OBJECTS

def APPLY_HOMOGRAPHY (df, df_pose):
    
    df_homography = df_pose[['frame', 'Homo_matrix']].copy()
    df = df.merge(df_homography, on='frame', how='left')

    coords = df[['x_floor', 'y_floor']].to_numpy(dtype=np.float64, copy=True)
    coords_homogeneous = np.hstack([coords, np.ones((coords.shape[0], 1))]) 
    x_2D = []
    y_2D = []
    
    for i in range(len(df)):
        H = np.array(df['Homo_matrix'][i])
        if not np.isnan(H).all():  # IF MATRIX IS NOT NUL      
            transformed = H @ coords_homogeneous[i]
            x_2D.append(transformed[0] / transformed[2])
            y_2D.append(transformed[1] / transformed[2])
        else:
            x_2D.append(np.nan)
            y_2D.append(np.nan)   
    
    df['x_2D'] = x_2D
    df['y_2D'] = y_2D

    return df

df = APPLY_HOMOGRAPHY (df, df_pose)

In [15]:
# CREATED THE PROJECTED VIDEO TO 2D

def VIDEO_PROJECTION_2D (df, output_video_projection2D):

    background_img = cv2.imread('images/basketball_court.jpg')
    height, width, _ = background_img.shape
    fps = 30
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_projection2D, fourcc, fps, (width, height))

    unique_ids = df['tracking_id'].dropna().unique()
    unique_ids = unique_ids.astype(int)
    colors = {tid: tuple(np.random.randint(0, 255, size=3).tolist()) for tid in unique_ids}

    for frame in sorted(df['frame'].unique()):

        frame_img = background_img.copy()
        frame_data = df[df['frame'] == frame]
        for _, row in frame_data.iterrows():
            if pd.notna(row['x_2D']) and pd.notna(row['y_2D']):
                x = int(row['x_2D'])
                y = int(row['y_2D'])
                if pd.notna(row['tracking_id']):
                    tracking_id = int(row['tracking_id'])
                else:
                    tracking_id = None
                class_id = int(row['class_id'])

                if class_id == 0:
                    color = (0, 0, 0)  # BLACK COLOR FOR THE BALL
                elif class_id == 1:
                    color = (255, 255, 0)  # YELLOW COLOR FOR THE BALL
                else:
                    color = colors[tracking_id]  # RANDOM SINGLE FOR EACH TRACK_ID

                cv2.circle(frame_img, (x, y), 10, color, -1)
                cv2.putText(frame_img, f"ID {tracking_id}", (x + 15, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        frame_img_bgr = cv2.cvtColor(frame_img, cv2.COLOR_RGB2BGR)
        video_writer.write(frame_img_bgr)

    video_writer.release()

VIDEO_PROJECTION_2D (df, output_video_projection2D)

In [16]:
# CREATED BOTH VIDEOS IN SAME FILE

def VIDEO_COMBINED_PROJECTION (video_path, output_video_projection2D, output_video_combined):

    cap1 = cv2.VideoCapture(video_path)
    cap2 = cv2.VideoCapture(output_video_projection2D)

    fps = int(cap1.get(cv2.CAP_PROP_FPS))
    width1 = int(cap1.get(cv2.CAP_PROP_FRAME_WIDTH))
    height1 = int(cap1.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width2 = int(cap2.get(cv2.CAP_PROP_FRAME_WIDTH))
    height2 = int(cap2.get(cv2.CAP_PROP_FRAME_HEIGHT))
    output_width = max(width1, width2)
    output_height = height1 + height2 

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_combined, fourcc, fps, (output_width, output_height))

    while cap1.isOpened() and cap2.isOpened():
        ret1, frame1 = cap1.read()
        ret2, frame2 = cap2.read()

        if not ret1 or not ret2:
            break

        frame1_resized = cv2.resize(frame1, (output_width, height1))
        frame2_resized = cv2.resize(frame2, (output_width, height2))
        combined_frame = cv2.vconcat([frame1_resized, frame2_resized])
        out.write(combined_frame)

    cap1.release()
    cap2.release()
    out.release()
    cv2.destroyAllWindows()

VIDEO_COMBINED_PROJECTION (video_path, output_video_projection2D, output_video_combined)

In [17]:
# DETECT THE MOMENT OF THE SHOT

def DETECT_SHOT (df, video_path):
    
    df_shot = df[df['class_id'] == 0].copy().reset_index()
    is_shot_indextrue = df_shot.index[df_shot['is_shot'] == True].tolist()

    # PROPAGATE AHEAD AND BACK
    
    for idx in is_shot_indextrue:
        for i in range(idx - 1, -1, -1):
            if df_shot.loc[i, 'ball_pos'] is not None:
                break
            df_shot.loc[i, 'is_shot'] = True
        
        for i in range(idx + 1, len(df_shot)):
            if df_shot.loc[i, 'ball_pos'] is not None:
                break
            df_shot.loc[i, 'is_shot'] = True
    
    return df_shot

df_shot = DETECT_SHOT (df, video_path)

In [18]:
# SHOW THE SHOT IN THE VIDEO

def VIDEO_SHOT_DETECTION (df_shot, video_path, output_path_withshot):

    cap = cv2.VideoCapture(video_path)

    video_fps = int(cap.get(cv2.CAP_PROP_FPS))
    video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_withshot, fourcc, video_fps, (video_width, video_height))

    frame_index = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break 
        if frame_index in df_shot[df_shot['is_shot'] == True]['frame'].values:
            cv2.putText(frame, 'SHOT DETECTED!', (50, 50), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
        out.write(frame)
        frame_index += 1

    cap.release()
    out.release()

VIDEO_SHOT_DETECTION (df_shot, video_path, output_video_withshot)


In [19]:
# EXTRACT THE POINTS OF THE SHOT

def EXTRACT_SHOT_POINTS (df_shot):

    df_shot = df_shot.sort_values(by='frame').reset_index(drop=True)
    df_shot['is_shot'] = df_shot['is_shot'].fillna(False)
    df_shot['group'] = (df_shot['is_shot'] != df_shot['is_shot'].shift()).cumsum()
    shot_groups = df_shot[df_shot['is_shot']].groupby('group')

    results = []

    for group_id, group in shot_groups:
        first_true_idx = group.index.min()
        last_true_idx = group.index.max()
        if first_true_idx > 0:  # Evitar índices negativos
            prev_frame = df_shot.loc[first_true_idx - 1]
            x_2D_previous, y_2D_previous = prev_frame['x_2D'], prev_frame['y_2D']
        else:
            x_2D_previous, y_2D_previous = None, None
        
        if last_true_idx < len(df_shot) - 1:
            post_frame = df_shot.loc[last_true_idx + 1]
            x_2D_next, y_2D_next = post_frame['x_2D'], post_frame['y_2D']
        else:
            x_2D_next, y_2D_next = None, None
        
        results.append({
            'x_2D_shot': x_2D_previous,
            'y_2D_shot': y_2D_previous,
            'x_2D_rebound': x_2D_next,
            'y_2D_rebound': y_2D_next
        })
    
    return pd.DataFrame(results)

df_shot_points = EXTRACT_SHOT_POINTS(df_shot)
df_shot_points

Unnamed: 0,x_2D_shot,y_2D_shot,x_2D_rebound,y_2D_rebound
0,248.460705,271.421426,287.698243,304.483562
1,181.259734,399.640868,179.025315,400.166312
