<a href="https://colab.research.google.com/github/jauger4/MLB-Pitch-Tunneling-Analysis-Dashboard/blob/main/tunneling_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Setup
For this project, you will need to install the necessary libraries. If you are working in a Colab environment, you will only need to install streamlit and pybaseball, but you will need to run install commands each time you restart the session. If you are working locally, all of these libraries must be installed beforehand, but once they are installed, you will not need to run the install commands again. The install command will look like:

```
!pip install streamlit
```
It is also important to note that if you are running this in a Jupyter Notebook, you must keep all code relating to the dashboard in one block so that Streamlit can read it properly. If you break the code up into multiple blocks, Streamlit will not correctly read your program.


In [None]:
%%writefile app.py
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
from pybaseball import statcast_pitcher, playerid_lookup
import time

st.set_page_config(page_title="MLB Tunneling", layout="wide")

# --- SESSION STATE ---
if 'data' not in st.session_state:
    st.session_state['data'] = None
if 'player_name' not in st.session_state:
    st.session_state['player_name'] = ""

@st.cache_data(show_spinner=False)
def fetch_pitch_data(pitcher_name, year):
    # Look up desired pitcher
    name_parts = pitcher_name.strip().split()
    if len(name_parts) < 2: return "Please enter full name."

    try:
        ids = playerid_lookup(name_parts[-1], name_parts[0])
        if ids.empty: return f"ID not found for {pitcher_name}."
        player_id = ids['key_mlbam'].values[0]
    except:
        return "Database connection failed."

    # Download data in chunks
    months = [
        (f"{year}-03-01", f"{year}-04-30"),
        (f"{year}-05-01", f"{year}-05-31"),
        (f"{year}-06-01", f"{year}-06-31"),
        (f"{year}-07-01", f"{year}-07-31"),
        (f"{year}-08-01", f"{year}-08-31"),
        (f"{year}-09-01", f"{year}-10-01")
    ]

    all_dfs = []
    prog_bar = st.progress(0)
    status = st.empty()

    for i, (start, end) in enumerate(months):
        status.text(f"Fetching data: {start} to {end}...")
        try:
            chunk = statcast_pitcher(start, end, player_id)
            if chunk is not None and not chunk.empty:
                all_dfs.append(chunk)
        except:
            pass
        prog_bar.progress((i + 1) / len(months))
        time.sleep(0.05)

    # Reset status and progress bar
    status.empty()
    prog_bar.empty()

    if not all_dfs:
        return f"No pitch data found for {year}."

    # Combine all the data
    data = pd.concat(all_dfs)

    # Physics to find position at the "tunnel"
    if 'vy0' in data.columns:
        data = data[data['vy0'] != 0]
        data['t_tunnel'] = (data['release_pos_y'] - 23.8) / -data['vy0']
        data['x_tunnel'] = (data['release_pos_x'] + (data['vx0'] * data['t_tunnel']) + (0.5 * data['ax'] * data['t_tunnel']**2))
        data['z_tunnel'] = (data['release_pos_z'] + (data['vz0'] * data['t_tunnel']) + (0.5 * data['az'] * data['t_tunnel']**2))

    # Find pitch pairings
    data = data.sort_values(['game_date', 'at_bat_number', 'pitch_number'])
    cols = ['pitch_type', 'plate_x', 'plate_z', 'x_tunnel', 'z_tunnel']
    for c in cols:
        if c in data.columns:
            data[f'prev_{c}'] = data.groupby(['game_date', 'at_bat_number'])[c].shift(1)

    return data

# UI Design
st.title("‚öæ MLB Pitch Tunneling")

c1, c2, c3 = st.columns([1, 2, 1])
with c1:
    year = st.selectbox("Year", [2025, 2024, 2023, 2022, 2021])
with c2:
    name = st.text_input("Pitcher Name", placeholder="e.g. Spencer Strider")
with c3:
    st.write("")
    st.write("")
    run_search = st.button("Analyze Pitcher", use_container_width=True)

st.divider()

if run_search and name:
    with st.spinner("Downloading Data..."):
        result = fetch_pitch_data(name, year)
        st.session_state['data'] = result
        st.session_state['player_name'] = name

# Show results
if st.session_state['data'] is not None:
    df = st.session_state['data']

    if isinstance(df, str):
        st.error(df)
    else:
        if 'pitch_type' in df.columns:
            st.success(f"Loaded {len(df)} pitches for {st.session_state['player_name']}")

            types = sorted(df['pitch_type'].dropna().unique())
            s1, s2 = st.columns(2)
            p1 = s1.selectbox("Pitch 1 (Setup)", types, index=0)
            idx2 = 1 if len(types) > 1 else 0
            p2 = s2.selectbox("Pitch 2 (Payoff)", types, index=idx2)

            # Filter
            subset = df[(df['prev_pitch_type'] == p1) & (df['pitch_type'] == p2)].copy()

            st.write(f"**Sample Size:** {len(subset)} pairs")

            if not subset.empty:
                # Remove NaNs
                clean = subset.dropna(subset=['x_tunnel', 'z_tunnel', 'plate_x', 'plate_z'])

                if not clean.empty:
                    # Metrics
                    t_sep = np.sqrt((clean['x_tunnel'] - clean['prev_x_tunnel'])**2 +
                                    (clean['z_tunnel'] - clean['prev_z_tunnel'])**2).mean()
                    p_sep = np.sqrt((clean['plate_x'] - clean['prev_plate_x'])**2 +
                                    (clean['plate_z'] - clean['prev_plate_z'])**2).mean()
                    ratio = p_sep / t_sep if t_sep > 0 else 0

                    m1, m2, m3 = st.columns(3)
                    m1.metric("Tunnel Sep", f"{t_sep:.2f} ft")
                    m2.metric("Plate Sep", f"{p_sep:.2f} ft")
                    m3.metric("Deception Ratio", f"{ratio:.2f}x")

                    st.divider()

                    # Restructure the data so that each pitch is its own group

                    # Group 1: The Setup Pitches (Prev)
                    df_p1 = clean[['prev_x_tunnel', 'prev_z_tunnel', 'prev_plate_x', 'prev_plate_z']].copy()
                    df_p1.columns = ['x_tunnel', 'z_tunnel', 'plate_x', 'plate_z'] # Standardize names
                    df_p1['pitch_type'] = p1 + " (Setup)" # Label for Legend

                    # Group 2: The Payoff Pitches (Curr)
                    df_p2 = clean[['x_tunnel', 'z_tunnel', 'plate_x', 'plate_z']].copy()
                    df_p2['pitch_type'] = p2 + " (Payoff)" # Label for Legend

                    # Combine
                    plot_data = pd.concat([df_p1, df_p2], axis=0)

                    # Create function to create scatterplots
                    def create_plot(data, x_col, y_col, title, is_pitcher_view=False):
                        # Pitcher View = Invert X Axis
                        x_range = [2.5, -2.5] if is_pitcher_view else [-2.5, 2.5]

                        fig = px.scatter(data, x=x_col, y=y_col, color='pitch_type',
                                         title=title, template='plotly_dark',
                                         range_x=x_range, range_y=[0, 5],
                                         opacity=0.6) # Slight transparency to see overlap

                        # Add Strike Zone
                        fig.add_shape(type="rect", x0=-0.83, y0=1.5, x1=0.83, y1=3.5,
                                      line=dict(color="White", width=2))

                        fig.update_layout(
                            height=400,
                            margin=dict(l=20, r=20, t=40, b=20),
                            xaxis_title="Horizontal (ft)",
                            yaxis_title="Vertical (ft)",
                            legend_title="Pitch Sequence"
                        )

                        # Ensure the axes are 1:1 to ensure realistic strike zone appearance
                        fig.update_yaxes(
                            scaleanchor="x",
                            scaleratio=1
                        )

                        return fig

                    # Catcher's view plot
                    st.write("#### Catcher's Perspective")
                    g1, g2 = st.columns(2)
                    with g1:
                        st.plotly_chart(create_plot(plot_data, 'x_tunnel', 'z_tunnel', "Tunnel (23.8 ft)", False), use_container_width=True)
                    with g2:
                        st.plotly_chart(create_plot(plot_data, 'plate_x', 'plate_z', "Plate Arrival", False), use_container_width=True)

                    # Pitcher's view plot
                    st.write("#### Pitcher's Perspective")
                    g3, g4 = st.columns(2)
                    with g3:
                        st.plotly_chart(create_plot(plot_data, 'x_tunnel', 'z_tunnel', "Tunnel (23.8 ft)", True), use_container_width=True)
                    with g4:
                        st.plotly_chart(create_plot(plot_data, 'plate_x', 'plate_z', "Plate Arrival", True), use_container_width=True)

                else:
                    st.warning("Physics data missing for these pitches.")
            else:
                st.info(f"No sequence found: {p1} -> {p2}")
        else:
            st.error("Data missing pitch types.")

## Viewing the Dashboard
There multiple ways to view the dashboard such as local host, ngrok, and cloudflared. I found that cloudflared worked best for me. I suggest doing your own research on methods to see what works best for you. I wrote this program in Colab, so Gemini was able to help me construct the correct commands to be able to run this project.

In [None]:
# 1. Kill any lingering processes from previous attempts
!pkill streamlit
!pkill cloudflared
!pkill ngrok

# 2. Download Cloudflare Tunnel (Silent & Fast)
!wget -q -O cloudflared https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64
!chmod +x cloudflared

# 3. Start Streamlit in the background
import subprocess
import time
# We use the standard run command
subprocess.Popen(["streamlit", "run", "app.py", "--server.enableCORS=false", "--server.enableXsrfProtection=false", "--server.address=localhost"])

# 4. Start the Cloudflare Tunnel
# This points the tunnel to port 8501 where Streamlit is running
subprocess.Popen(["./cloudflared", "tunnel", "--url", "http://localhost:8501"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

# 5. Extract and Print the Link
print("‚è≥ Initializing connection... (allow 10-15 seconds)")
time.sleep(5) # Give it a moment to handshake

# We grep the log file to find the URL
import re
import os

# Cloudflare logs the URL to a specific file in the temp directory usually,
# but since we can't easily capture the subprocess output in Colab without blocking,
# we will use a small hack to query the tunnel metrics or just retry the setup if it fails.
# simpler approach for Colab:

print("‚úÖ Streamlit is running.")
print("üëá CLICK THE LINK BELOW (It ends in .trycloudflare.com):")
print("-" * 50)

# We use a python loop to grab the log output
p = subprocess.Popen(["./cloudflared", "tunnel", "--url", "http://localhost:8501"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
for line in p.stderr:
    if ".trycloudflare.com" in line:
        url = re.search(r'https://[a-zA-Z0-9-]+\.trycloudflare\.com', line)
        if url:
            print(f"üîó \033[1m{url.group(0)}\033[0m")
            print("-" * 50)
            break