# Baseball Biomechanics Analysis

This notebook provides a template for analyzing baseball player biomechanics using pose estimation data linked to Statcast metrics.

In [None]:
# Add project root to path
import sys
sys.path.insert(0, '..')

# Core imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime

# Project imports
from src.database import DatabaseOperations, init_db, get_session_factory, get_engine
from src.database.models import Play, Player, PoseSequence, PoseFrame, Keypoint, PlayerRole
from src.pose import MediaPipeBackend
from src.utils.video_utils import VideoProcessor

# Visualization settings
plt.style.use('seaborn-v0_8-whitegrid')
%matplotlib inline

## 1. Database Connection

In [None]:
# Initialize database connection
DATABASE_URL = "sqlite:///../data/baseball_biomechanics.db"
engine = get_engine(DATABASE_URL)
SessionLocal = get_session_factory(engine)
session = SessionLocal()
ops = DatabaseOperations(session)

# Check database stats
stats = ops.get_database_stats()
print("Database Statistics:")
for key, value in stats.items():
    print(f"  {key}: {value}")

## 2. Load Player Data

In [None]:
# Example: Load data for a specific player
PLAYER_ID = 592789  # Replace with actual player ID

# Get player info
player = ops.get_player(PLAYER_ID)
if player:
    print(f"Player: {player.player_name}")
    print(f"Team: {player.team}")
    print(f"Position: {player.position}")
    print(f"Throws: {player.throws}")
else:
    print(f"Player {PLAYER_ID} not found")

# Get player stats
player_stats = ops.get_player_stats(PLAYER_ID)
print(f"\nPlayer Stats:")
print(f"  Pitches thrown: {player_stats.get('pitches_thrown', 0)}")
print(f"  At bats: {player_stats.get('at_bats', 0)}")
print(f"  Pose sequences: {player_stats.get('pose_sequences', 0)}")

## 3. Query Plays and Pose Data

In [None]:
# Get all plays for the player (as pitcher)
plays = ops.get_plays_by_pitcher(PLAYER_ID)
print(f"Found {len(plays)} plays for pitcher {PLAYER_ID}")

# Convert to DataFrame for easier analysis
plays_data = []
for play in plays:
    plays_data.append({
        'play_id': play.play_id,
        'pitch_type': play.pitch_type,
        'release_speed': play.release_speed,
        'spin_rate': play.spin_rate,
        'pfx_x': play.pfx_x,
        'pfx_z': play.pfx_z,
        'plate_x': play.plate_x,
        'plate_z': play.plate_z,
        'events': play.events,
        'has_video': play.video_local_path is not None,
        'has_pose': len(play.pose_sequences) > 0,
    })

plays_df = pd.DataFrame(plays_data)
plays_df.head(10)

In [None]:
# Summary statistics
print("Pitch Type Distribution:")
print(plays_df['pitch_type'].value_counts())

print(f"\nPlays with video: {plays_df['has_video'].sum()}")
print(f"Plays with pose data: {plays_df['has_pose'].sum()}")

## 4. Analyze Pose Data

In [None]:
def get_pose_data_for_play(play_id: int, role: str = 'pitcher'):
    """Extract pose keypoints for a play."""
    player_role = PlayerRole.PITCHER if role == 'pitcher' else PlayerRole.BATTER
    sequences = ops.get_pose_sequences_for_play(play_id, player_role)
    
    if not sequences:
        return None
    
    sequence = sequences[0]
    frames_data = []
    
    for frame in sequence.frames:
        frame_dict = {
            'frame_number': frame.frame_number,
            'timestamp_ms': frame.timestamp_ms,
        }
        for kp in frame.keypoints:
            frame_dict[f'{kp.keypoint_name}_x'] = kp.x
            frame_dict[f'{kp.keypoint_name}_y'] = kp.y
            frame_dict[f'{kp.keypoint_name}_z'] = kp.z
            frame_dict[f'{kp.keypoint_name}_conf'] = kp.confidence
        frames_data.append(frame_dict)
    
    return pd.DataFrame(frames_data)

# Example: Get pose data for a play with pose data
plays_with_pose = plays_df[plays_df['has_pose']]
if not plays_with_pose.empty:
    example_play_id = plays_with_pose.iloc[0]['play_id']
    pose_df = get_pose_data_for_play(example_play_id)
    if pose_df is not None:
        print(f"Pose data for play {example_play_id}:")
        print(f"Frames: {len(pose_df)}")
        print(f"Columns: {len(pose_df.columns)}")
        pose_df.head()
else:
    print("No plays with pose data found")

## 5. Biomechanics Analysis

In [None]:
def calculate_joint_angle(row, point_a, point_b, point_c):
    """Calculate angle at point_b formed by points A-B-C."""
    try:
        ax, ay = row[f'{point_a}_x'], row[f'{point_a}_y']
        bx, by = row[f'{point_b}_x'], row[f'{point_b}_y']
        cx, cy = row[f'{point_c}_x'], row[f'{point_c}_y']
        
        ba = np.array([ax - bx, ay - by])
        bc = np.array([cx - bx, cy - by])
        
        cosine = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
        cosine = np.clip(cosine, -1.0, 1.0)
        angle = np.degrees(np.arccos(cosine))
        return angle
    except:
        return np.nan

# Calculate elbow angle over time (if pose data exists)
if 'pose_df' in dir() and pose_df is not None:
    pose_df['right_elbow_angle'] = pose_df.apply(
        lambda row: calculate_joint_angle(row, 'right_shoulder', 'right_elbow', 'right_wrist'),
        axis=1
    )
    pose_df['left_elbow_angle'] = pose_df.apply(
        lambda row: calculate_joint_angle(row, 'left_shoulder', 'left_elbow', 'left_wrist'),
        axis=1
    )
    
    # Plot elbow angle over time
    fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(pose_df['timestamp_ms'], pose_df['right_elbow_angle'], label='Right Elbow')
    ax.plot(pose_df['timestamp_ms'], pose_df['left_elbow_angle'], label='Left Elbow')
    ax.set_xlabel('Time (ms)')
    ax.set_ylabel('Angle (degrees)')
    ax.set_title('Elbow Angle During Pitch')
    ax.legend()
    plt.show()

## 6. Correlate Biomechanics with Performance

In [None]:
def analyze_pitch_biomechanics(play_id: int):
    """Analyze biomechanics for a single pitch."""
    play = ops.get_play(play_id)
    pose_df = get_pose_data_for_play(play_id)
    
    if play is None or pose_df is None:
        return None
    
    # Calculate key metrics
    pose_df['right_elbow_angle'] = pose_df.apply(
        lambda row: calculate_joint_angle(row, 'right_shoulder', 'right_elbow', 'right_wrist'),
        axis=1
    )
    pose_df['right_shoulder_angle'] = pose_df.apply(
        lambda row: calculate_joint_angle(row, 'right_elbow', 'right_shoulder', 'right_hip'),
        axis=1
    )
    
    return {
        'play_id': play_id,
        'pitch_type': play.pitch_type,
        'release_speed': play.release_speed,
        'spin_rate': play.spin_rate,
        'max_elbow_angle': pose_df['right_elbow_angle'].max(),
        'min_elbow_angle': pose_df['right_elbow_angle'].min(),
        'elbow_angle_range': pose_df['right_elbow_angle'].max() - pose_df['right_elbow_angle'].min(),
        'max_shoulder_angle': pose_df['right_shoulder_angle'].max(),
    }

# Analyze all plays with pose data
if not plays_with_pose.empty:
    biomech_results = []
    for play_id in plays_with_pose['play_id'].head(10):  # Analyze first 10
        result = analyze_pitch_biomechanics(play_id)
        if result:
            biomech_results.append(result)
    
    if biomech_results:
        biomech_df = pd.DataFrame(biomech_results)
        print("Biomechanics Analysis Results:")
        display(biomech_df)
        
        # Correlation analysis
        if len(biomech_df) >= 3:
            fig, axes = plt.subplots(1, 2, figsize=(14, 5))
            
            # Velocity vs elbow angle
            axes[0].scatter(biomech_df['max_elbow_angle'], biomech_df['release_speed'])
            axes[0].set_xlabel('Max Elbow Angle (degrees)')
            axes[0].set_ylabel('Release Speed (mph)')
            axes[0].set_title('Elbow Angle vs Velocity')
            
            # Velocity vs shoulder angle
            axes[1].scatter(biomech_df['max_shoulder_angle'], biomech_df['release_speed'])
            axes[1].set_xlabel('Max Shoulder Angle (degrees)')
            axes[1].set_ylabel('Release Speed (mph)')
            axes[1].set_title('Shoulder Angle vs Velocity')
            
            plt.tight_layout()
            plt.show()

## 7. Visualize Pose Sequence

In [None]:
def plot_pose_sequence(pose_df, keypoints_to_plot=None, num_frames=5):
    """Plot pose keypoints at different time points."""
    if keypoints_to_plot is None:
        keypoints_to_plot = [
            'nose', 'left_shoulder', 'right_shoulder',
            'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist',
            'left_hip', 'right_hip', 'left_knee', 'right_knee',
            'left_ankle', 'right_ankle'
        ]
    
    # Select evenly spaced frames
    frame_indices = np.linspace(0, len(pose_df) - 1, num_frames, dtype=int)
    
    fig, axes = plt.subplots(1, num_frames, figsize=(4 * num_frames, 6))
    
    for idx, frame_idx in enumerate(frame_indices):
        ax = axes[idx]
        row = pose_df.iloc[frame_idx]
        
        # Plot keypoints
        for kp in keypoints_to_plot:
            x_col = f'{kp}_x'
            y_col = f'{kp}_y'
            if x_col in row and y_col in row:
                ax.scatter(row[x_col], row[y_col], s=50)
        
        # Define skeleton connections
        connections = [
            ('left_shoulder', 'right_shoulder'),
            ('left_shoulder', 'left_elbow'),
            ('left_elbow', 'left_wrist'),
            ('right_shoulder', 'right_elbow'),
            ('right_elbow', 'right_wrist'),
            ('left_shoulder', 'left_hip'),
            ('right_shoulder', 'right_hip'),
            ('left_hip', 'right_hip'),
            ('left_hip', 'left_knee'),
            ('left_knee', 'left_ankle'),
            ('right_hip', 'right_knee'),
            ('right_knee', 'right_ankle'),
        ]
        
        # Draw connections
        for start, end in connections:
            try:
                ax.plot(
                    [row[f'{start}_x'], row[f'{end}_x']],
                    [row[f'{start}_y'], row[f'{end}_y']],
                    'b-', linewidth=2
                )
            except:
                pass
        
        ax.set_title(f'Frame {row["frame_number"]}\n{row["timestamp_ms"]:.0f}ms')
        ax.invert_yaxis()  # Image coordinates
        ax.set_aspect('equal')
    
    plt.suptitle('Pose Sequence Over Time')
    plt.tight_layout()
    plt.show()

# Plot pose sequence if data exists
if 'pose_df' in dir() and pose_df is not None:
    plot_pose_sequence(pose_df, num_frames=5)

## 8. Cleanup

In [None]:
# Close database session
session.close()
print("Session closed")