In [1]:
# Scrape a player's home run videos
# Using Baseball Savant endpoints
# For a given time period

In [2]:
import os
import re
import requests
import subprocess
import pandas as pd
import jupyter_black
from tqdm.notebook import tqdm
from bs4 import BeautifulSoup

In [3]:
jupyter_black.load()
pd.options.display.max_columns = 2000
pd.options.display.max_rows = 1000
pd.options.display.max_colwidth = None

In [4]:
# Define the input directory where the raw videos are stored
os.makedirs("../videos", exist_ok=True)
input_dir = "../videos/"

# Define the base directory where all processed videos will be stored
output_dir_base = "../processed_videos/"
os.makedirs(output_dir_base, exist_ok=True)

In [5]:
# Headers we need for requests
headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
}

In [6]:
# Define variables for requests
event = "home_run"
player = "660271"
game_start = "2024-03-28"
game_end = "2024-09-30"
game_season = pd.to_datetime(game_end).strftime("%Y")
today = pd.Timestamp("today").strftime("%Y-%m-%d")
base_video_url = "https://baseballsavant.mlb.com"

In [7]:
# Get every pitch the player hit
params = {
    "warehouse": "true",
    "hfPT": "",
    "hfAB": "",
    "hfGT": "",
    "hfPR": "",
    "hfZ": "",
    "hfStadium": "",
    "hfBBL": "",
    "hfNewZones": "",
    "hfPull": "",
    "hfC": "",
    "hfSea": f"{game_season}|",
    "hfSit": "",
    "player_type": "batter",
    "batters_lookup[]": player,
    "hfFlag": "",
    "metric_1": "",
    "group_by": "name",
    "min_pitches": "0",
    "min_results": "0",
    "min_pas": "0",
    "sort_col": "pitches",
    "player_event_sort": "",
    "sort_order": "desc",
    "type": "details",
    "player_id": player,
    "minors": "false",
}

# Make the request
spray_response = requests.get(
    "https://baseballsavant.mlb.com/player/spray", params=params, headers=headers
)

# Read the response as json and conver to dataframe
spray_data = spray_response.json()
spray_df = pd.DataFrame(spray_data)

In [19]:
# Slim the dataframe to just the columns we need
keep_cols = [
    "batter_name",
    "pitcher",
    "pitcher_name",
    "venue",
    "game_date",
    "game_year",
    "hit_distance_sc",
    "launch_angle",
    "launch_speed",
    "hc_x",
    "hc_y",
    "hc_x_ft",
    "hc_y_ft",
    "pitch_name",
    "events",
]

# Limit the number of columns in the dataframe
# Also filter to date range, player and event type, e.g. "home_run"
df = spray_df.query(
    f'game_date > "{game_start}" and game_date < "{game_end}" and batter == {player} and events == "{event}"'
)[keep_cols]

# Sort the dataframe, so the events are in chronological order
df_sorted = df.sort_values("game_date").reset_index(drop=True)

# Get a list of all the players who threw the pitches
shohei_hr_pitchers = list(df_sorted["pitcher"].unique())

In [20]:
len(shohei_hr_pitchers)

54

In [None]:
# Use the pitcher IDs to get data on the pitches
video_urls = []

for pitcher in tqdm(shohei_hr_pitchers):

    params = {
        "hfPT": "",
        "hfAB": "home\\.\\.run|",
        "hfGT": "",
        "hfPR": "",
        "hfZ": "",
        "hfStadium": "",
        "hfBBL": "",
        "hfNewZones": "",
        "hfPull": "",
        "hfC": "",
        "hfSea": "2024|",
        "hfSit": "",
        "player_type": "pitcher",
        "hfOuts": "",
        "hfOpponent": "",
        "pitcher_throws": "",
        "batter_stands": "",
        "hfSA": "",
        "game_date_gt": "",
        "game_date_lt": "",
        "hfMo": "",
        "hfTeam": "",
        "home_road": "",
        "hfRO": "",
        "position": "",
        "hfInfield": "",
        "hfOutfield": "",
        "hfInn": "",
        "hfBBT": "",
        "batters_lookup[]": "660271",
        "hfFlag": "",
        "metric_1": "",
        "group_by": "name",
        "min_pitches": "0",
        "min_results": "0",
        "min_pas": "0",
        "sort_col": "velocity",
        "player_event_sort": "api_p_release_speed",
        "sort_order": "desc",
        "chk_stats_velocity": "on",
        "type": "details",
        "player_id": pitcher,
    }

    # Request the pitch data
    pitch_response = requests.get(
        "https://baseballsavant.mlb.com/statcast_search", params=params, headers=headers
    )
    # Parse out the video ids from the markup to form a list of video display page urls
    soup = BeautifulSoup(pitch_response.content, "html.parser")

    # Find all the video links (in case there are multiple home runs against same pitcher)
    video_links = soup.find_all("a")

    for link in video_links:
        video_id = link["href"]
        video_url = base_video_url + video_id
        video_urls.append(video_url)
        video_urls_unique = list(dict.fromkeys(video_urls))

In [23]:
# Loop through those video pages, fetching the actual video file urls
video_file_urls = []

for video_url in tqdm(video_urls):
    video_page_response = requests.get(video_url, headers=headers)
    video_page_content = BeautifulSoup(video_page_response.content, "html.parser")
    video_file_url = video_page_content.find("source")["src"]
    video_file_urls.append(video_file_url)

  0%|          | 0/55 [00:00<?, ?it/s]

In [24]:
# Loop through each video file URL and download the file
for idx, video_url in enumerate(tqdm(video_file_urls, desc="Downloading videos")):
    # Get the video content
    video_response = requests.get(video_url, stream=True)

    # Define the filename and path
    video_filename = f"../videos/otani_hr_{idx+1}.mp4"

    # Write the video content to the file
    with open(video_filename, "wb") as video_file:
        for chunk in video_response.iter_content(chunk_size=1024):
            video_file.write(chunk)

print("All videos downloaded successfully!!!")

Downloading videos:   0%|          | 0/55 [00:00<?, ?it/s]

All videos downloaded successfully!!!


---

In [None]:
# Edit videos

In [25]:
def natural_sort_key(file_name):
    """Helper function to sort filenames with numbers in a natural order."""
    return [
        int(text) if text.isdigit() else text for text in re.split(r"(\d+)", file_name)
    ]


def process_video(
    input_file,
    output_file,
    crop_params=None,
    start_time="00:00:00.5",
    duration=None,
    scale=True,
):
    """Function to crop, trim, and scale videos with flexible options.

    Args:
    - input_file (str): Path to the input video file.
    - output_file (str): Path to save the processed video.
    - crop_params (str): Crop parameters (e.g., '405:720:(in_w-405)/2:0'). If None, no cropping.
    - start_time (str): When to start the video (e.g., '00:00:00.5').
    - duration (str): How long the video should be (e.g., '5'). If None, keeps the original length.
    - scale (bool): Whether to scale to 1080x1920. If False, keeps the original width.
    """
    # Use the full path to ffmpeg
    ffmpeg_path = "/opt/homebrew/bin/ffmpeg"

    # Basic ffmpeg command
    command = [ffmpeg_path, "-i", input_file]

    # Optional start time and duration
    if start_time:
        command.extend(["-ss", start_time])
    if duration:
        command.extend(["-t", duration])

    # Apply cropping and scaling if provided, otherwise just scale or keep original width
    if crop_params:
        vf_options = f"crop={crop_params}"
        if scale:
            vf_options += ",scale=1080:1920"
    elif scale:
        vf_options = "scale=1080:1920"
    else:
        vf_options = None  # No cropping or scaling

    # Add video filter if applicable
    if vf_options:
        command.extend(["-vf", vf_options])

    # Suppress output, overwrite existing file
    command.extend(["-loglevel", "quiet", "-y", output_file])

    # Run the ffmpeg command
    subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)


# Directory paths
input_dir = "../videos/"
output_dir_base = "../processed_videos/"
os.makedirs(output_dir_base, exist_ok=True)

# List all video files and sort them naturally by number
video_files = sorted(
    [f for f in os.listdir(input_dir) if f.endswith(".mp4")], key=natural_sort_key
)

In [26]:
# Example usage: Experiment with different versions of the video
for idx, video_file in enumerate(video_files):
    input_file = os.path.join(input_dir, video_file)

    # Flexible video versions
    versions = [
        # No cropping, no trimming, just wide
        {
            "output_subdir": "wide",
            "crop_params": None,
            "duration": None,
            "scale": False,
        },
        # Cropping, trimming, and scaling for TikTok/YouTube Shorts
        {
            "output_subdir": "cropped_trimmed",
            "crop_params": "405:720:(in_w-405)/2:0",
            "duration": "5",
            "scale": True,
        },
        # Cropping but no trimming (keep full duration)
        {
            "output_subdir": "cropped_full",
            "crop_params": "405:720:(in_w-405)/2:0",
            "duration": None,
            "scale": True,
        },
        # Trimming but no cropping (just wide YouTube)
        {
            "output_subdir": "trimmed_wide",
            "crop_params": None,
            "duration": "5",
            "scale": False,
        },
    ]

    # Process each version of the video
    for version in versions:
        output_dir = os.path.join(output_dir_base, version["output_subdir"])
        os.makedirs(output_dir, exist_ok=True)
        output_file = os.path.join(output_dir, f"processed_hr_{idx+1}.mp4")

        # Run the processing function with specific version settings
        process_video(
            input_file,
            output_file,
            crop_params=version["crop_params"],
            start_time="00:00:00.5",  # Adjust this if needed per video
            duration=version["duration"],
            scale=version["scale"],
        )