In [1]:
# Import Libraries

import polars as pl
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patheffects as pe
from pathlib import Path
import ipywidgets as widgets
from ipywidgets import Dropdown, IntSlider, Play, Button, HBox, VBox, Output, jslink, link
from IPython.display import display, clear_output
from scipy.signal import savgol_filter
import plotly.graph_objects as go

pl.Config.set_tbl_rows(100)

%matplotlib inline

df = pl.read_csv("~/Desktop/ShrineBowlSumerSportsAnalyticsCompetition/Output/df_with_features.csv").filter(pl.col("matchup_type") == "INTERIOR")

##### Primary Metric: DL Depth Gained (For Evaluating DL) and DL Depth Allowed (For Evaluating OL) at Frame 25
The simplest metric that can be derived from the tracking data is the depth (x position) gained past the line of scrimmage by the defensive lineman at frame 25. Why frame 25? In 2025, [the median time to throw for NFL quarterbacks was about 2.68 seconds](https://sumersports.com/players/quarterback/?plays=100). Next Gen Stats has previously defined a "quick pressure" as [a pressure occurring within the first 2.5 seconds of a play](https://www.nfl.com/news/next-gen-stats-introduction-to-pressure-probability) and has said the average pressure occurred within 2.9 seconds. Likewise, ESPN Analytics' pass rush win rate metric quantifies ["how often a pass rusher is able to beat his block within 2.5 seconds"](https://www.espn.com/nfl/story/_/id/46138675/2025-nfl-win-rates-top-teams-players-rankings-pass-run-block). As such, we will use frame 25 as the time at which to evaluate the depth gained by the defensive lineman. The basic idea is that depth penetration at 2.5 seconds is a potential proxy for whether the interior defensive lineman beat his block, walked the offensive lineman back, gained a negligible amount of depth, or stalled out. In turn, interior offensive lineman can be evaluated by examining how much depth they allowed the defensive lineman to gain.

In [2]:
# DL / OL Depth Gained
dl_depth_at_25 = (
    df
    .filter(pl.col("frame_number") == 25)
    .group_by("dl_player_name")
    .agg(
        pl.col("dl_x").len().alias("count"),
        pl.col("dl_x").median().alias("median_dl_x")
        )
    .filter(pl.col("count") > 2) # Filter out players with two or fewer reps
    .sort("median_dl_x", descending=True)
    )

ol_depth_at_25 = (
    df
    .filter(pl.col("frame_number") == 25)
    .filter(pl.col("ol_player_name").is_not_null())
    .group_by("ol_player_name")
    .agg(
        pl.col("dl_x").len().alias("count"),
        pl.col("dl_x").median().alias("median_dl_x")
        )
    .filter(pl.col("count") > 2)
    .sort("median_dl_x")
    )

display(dl_depth_at_25)
display(ol_depth_at_25)

dl_player_name,count,median_dl_x
str,u32,f64
"""Tuli Letuligasenoa""",3,4.543264
"""Jowon Briggs""",4,4.171051
"""Logan Lee""",5,3.949351
"""Khristian Boyd""",6,3.602264
"""Jordan Miller""",3,3.6
"""Jamree Kromah""",3,3.534068
"""Myles Murphy""",7,3.472575
"""Evan Anderson""",4,3.379145
"""Fabien Lovett""",6,3.337219
"""Zion Logue""",4,3.082565


ol_player_name,count,median_dl_x
str,u32,f64
"""Mason McCormick""",4,2.383333
"""Dylan McMahon""",5,3.015523
"""Kaitori Leveston""",4,3.094915
"""Christian Mahogany""",4,3.221329
"""Hunter Nourzad""",4,3.379145
"""X'Zauvea Gadlin""",4,3.386719
"""Karsen Barnhart""",4,3.420354
"""C.J. Hanson""",10,3.445333
"""Matt Lee""",5,3.472575
"""Willis Patrick""",5,3.536041


##### Rep Examination
Before moving on, let's examine the reps with 1) the greatest depth gained and 2) the least depth gained by defensive linemen within the sample.

In [3]:
dls = dl_depth_at_25.get_column("dl_player_name").to_list()

indiv_reps = (
    df
    .filter((pl.col("dl_player_name").is_in(dls)) & (pl.col("frame_number") == 25))
    .sort("dl_x", descending=True)
    .select("session_name", "rep_number")
)

most_gained = indiv_reps[0]
least_gained = indiv_reps[-1]

In [4]:
dls = dl_depth_at_25.get_column("dl_player_name").to_list()

indiv_reps = (
    df
    .filter((pl.col("dl_player_name").is_in(dls)) & (pl.col("frame_number") == 25))
    .sort("dl_x", descending=True)
    .select("session_name", "rep_number")
)

most_gained = indiv_reps[0]
least_gained = indiv_reps[-1]

most_session = most_gained["session_name"][0]
most_rep = most_gained["rep_number"][0]
least_session = least_gained["session_name"][0]
least_rep = least_gained["rep_number"][0]

X_MIN, X_MAX = -15.0, 15.0
Y_MIN, Y_MAX = 10.0, 40.0

def create_interactive_rep_plot(session_name, rep_number, title_prefix=""):
    rep_df = df.filter(
        (pl.col("session_name") == session_name) & 
        (pl.col("rep_number") == rep_number) &
        (pl.col("frame_number") <= 30)
    ).sort("frame_number")
    
    if rep_df.height == 0:
        print(f"No data found for {session_name} rep {rep_number}")
        return None
    
    ol_x = rep_df["ol_x"].to_list()
    ol_y = rep_df["ol_y"].to_list()
    dl_x = rep_df["dl_x"].to_list()
    dl_y = rep_df["dl_y"].to_list()
    frame_numbers = rep_df["frame_number"].to_list()
    max_idx = len(frame_numbers) - 1
    
    # Create FigureWidget
    fig = go.FigureWidget()
    
    # Field yard lines
    for x_val in range(-15, 16, 5):
        color = "yellow" if x_val == 0 else "rgba(255,255,255,0.5)"
        width = 2 if x_val == 0 else 1
        fig.add_shape(
            type="line", x0=x_val, x1=x_val, y0=Y_MIN, y1=Y_MAX,
            line=dict(color=color, width=width), layer="below",
        )
    
    # OL trace
    fig.add_trace(go.Scatter(
        x=[ol_x[0]], y=[ol_y[0]], mode="markers", name="OL",
        marker=dict(size=18, color="dodgerblue", line=dict(color="white", width=2)),
    ))
    
    # DL trace
    fig.add_trace(go.Scatter(
        x=[dl_x[0]], y=[dl_y[0]], mode="markers", name="DL",
        marker=dict(size=18, color="red", line=dict(color="white", width=2)),
    ))
    
    fig.update_layout(
        width=900, height=500,
        title=f"{title_prefix}{session_name} | rep {rep_number} | frame {frame_numbers[0]}",
        showlegend=True, legend=dict(x=0.02, y=0.98),
        plot_bgcolor="#2e7d32", paper_bgcolor="white",
    )
    fig.update_xaxes(range=[X_MIN, X_MAX], title="X (yards)", showgrid=False)
    fig.update_yaxes(range=[Y_MIN, Y_MAX], title="Y (yards)", showgrid=False, scaleanchor="x", scaleratio=1)
    
    # Widgets
    play = Play(interval=100, min=0, max=max_idx, step=1, value=0)
    frame_slider = IntSlider(min=0, max=max_idx, step=1, value=0, description="Frame")
    back_button = Button(description="◀")
    forward_button = Button(description="▶")
    
    def update_plot(frame_idx):
        idx = max(0, min(frame_idx, max_idx))
        with fig.batch_update():
            fig.data[0].x = [ol_x[idx]]
            fig.data[0].y = [ol_y[idx]]
            fig.data[1].x = [dl_x[idx]]
            fig.data[1].y = [dl_y[idx]]
            fig.layout.title = f"{title_prefix}{session_name} | rep {rep_number} | frame {frame_numbers[idx]}"
    
    def on_frame_change(change):
        if change["name"] == "value":
            update_plot(change["new"])
    
    link((play, "value"), (frame_slider, "value"))
    frame_slider.observe(on_frame_change, names="value")
    back_button.on_click(lambda _: setattr(frame_slider, "value", max(0, frame_slider.value - 1)))
    forward_button.on_click(lambda _: setattr(frame_slider, "value", min(max_idx, frame_slider.value + 1)))
    
    controls = HBox([play, back_button, forward_button, frame_slider])
    ui = VBox([controls, fig])
    
    return ui

# Create and display both plots
ui_most = create_interactive_rep_plot(most_session, most_rep, title_prefix="MOST GAINED: ")
ui_least = create_interactive_rep_plot(least_session, least_rep, title_prefix="LEAST GAINED: ")

if ui_most:
    display(ui_most)
if ui_least:
    display(ui_least)

VBox(children=(HBox(children=(Play(value=0, max=29), Button(description='◀', style=ButtonStyle()), Button(desc…

VBox(children=(HBox(children=(Play(value=0, max=29), Button(description='◀', style=ButtonStyle()), Button(desc…

##### Secondary Metrics: Stall and Bull Score

We also make use of the **contact onset** frame from the feature engineering notebook as a reference point in each rep. In most reps, this frame lines up with a clear transition: the two players stop rapidly closing space (closing velocity drops sharply), which is a good sign the rep has moved out of the approach phase and into an **engaged** phase.

From that anchor, we measure two outcomes:

**Stall (OL strength / anchoring):**
A stall rep is one where the offensive lineman both:

* **halts the rusher’s forward progress** soon after contact onset (strong “arrest”), and
* **minimizes backward movement** over the next ~0.5–0.6 seconds (low displacement).

In plain terms: *the OL gets the defender stopped without getting walked back.*

**Bull (DL power / walk-back):**
A bull rep is one where the defender becomes engaged (again, strong “arrest”) and then:

* **drives the offensive lineman backward** over the next ~0.5–0.6 seconds (high walk-back).

In plain terms: *the DL locks in and moves the OL off his spot.*


In [6]:
# Scoring helpers
def _safe_mean(value):
    if value is None or (isinstance(value, float) and not np.isfinite(value)):
        return 0.0
    return float(value)


def _safe_std(value):
    if value is None or (isinstance(value, float) and (not np.isfinite(value) or value == 0)):
        return 1e-9
    return float(value)


def add_zscores(df: pl.DataFrame, cols: list[str], prefix: str = "z_") -> pl.DataFrame:
    if df.height == 0:
        return df
    stats_exprs = []
    for col in cols:
        stats_exprs += [
            pl.col(col).mean().alias(f"{col}__mean"),
            pl.col(col).std().alias(f"{col}__std"),
        ]
    stats = df.select(stats_exprs).to_dicts()[0]

    z_exprs = []
    for col in cols:
        mu = _safe_mean(stats.get(f"{col}__mean"))
        sd = _safe_std(stats.get(f"{col}__std"))
        z_exprs.append(((pl.col(col) - pl.lit(mu)) / pl.lit(sd)).alias(f"{prefix}{col}"))
    return df.with_columns(z_exprs)


def quantile_threshold_abs(df: pl.DataFrame, col: str, q: float):
    if df.height == 0:
        return None
    value = df.select(pl.col(col).abs().drop_nulls().quantile(q)).item()
    if value is None or (isinstance(value, float) and not np.isfinite(value)):
        return None
    return float(value)


def clip_min_expr(expr: pl.Expr, min_value: float) -> pl.Expr:
    return pl.when(expr < min_value).then(min_value).otherwise(expr)



In [7]:
# Stall Score
# Parameters
FRAME_DT = 0.1
PRE_FRAMES = 6
POST_FRAMES = 6
MIN_AXIS_NORM = 0.05  # yards; guardrail for unstable axis
STALL_ALPHA = 0.15    # fraction of pre_v_close_par_mean
STALL_ABS_FLOOR = 0.05  # yards/s; absolute floor for stall threshold
STALL_PERSIST = 2     # consecutive frames
MIN_PRE_FRAMES = 3
MIN_POST_FRAMES = 3
AXIS_SIGN = 1  # +1 = OL->DL pre-contact axis; set -1 to flip sign

axis_df = (
    df
    .filter(pl.col("matchup_type") == "INTERIOR")
    .filter(pl.col("contact_onset_frame").is_not_null())
    .with_columns(
        rep_id=pl.concat_str(
            [
                pl.col("session_name").fill_null(""),
                pl.col("rep_number").cast(pl.Utf8).fill_null(""),
                pl.col("dl_zebra_id").cast(pl.Utf8).fill_null(""),
                pl.col("ol_zebra_id").cast(pl.Utf8).fill_null(""),
            ],
            separator="|",
        ),
        frame_dt=pl.when(pl.col("frame_delta").is_not_null())
        .then(pl.col("frame_delta").cast(pl.Float64) * FRAME_DT)
        .otherwise(pl.lit(FRAME_DT)),
        rel_frame=(pl.col("frame_number") - pl.col("contact_onset_frame")),
        center_x=(pl.col("dl_x") + pl.col("ol_x")) / 2,
        center_y=(pl.col("dl_y") + pl.col("ol_y")) / 2,
    )
    .sort(["rep_id", "frame_number"])
    .with_columns(
        v_close=(pl.col("pairwise_distance").shift(1).over("rep_id") - pl.col("pairwise_distance"))
        / pl.col("frame_dt")
    )
)

rep_meta_cols = [
    "rep_id",
    "session_name",
    "rep_number",
    "dl_player_name",
    "dl_zebra_id",
    "ol_player_name",
    "ol_zebra_id",
    "contact_onset_frame",
    "window_start",
    "window_end",
]
rep_meta = (
    axis_df
    .sort(["rep_id", "frame_number"])
    .group_by("rep_id")
    .agg([pl.first(c).alias(c) for c in rep_meta_cols if c != "rep_id"])
)


def _mean_or_nan(values):
    values = np.array(values, dtype=float, copy=True)
    if values.size == 0:
        return np.nan
    values = values[np.isfinite(values)]
    if values.size == 0:
        return np.nan
    return float(values.mean())


def _max_abs_or_nan(values):
    values = np.array(values, dtype=float, copy=True)
    if values.size == 0:
        return np.nan
    values = values[np.isfinite(values)]
    if values.size == 0:
        return np.nan
    return float(np.max(np.abs(values)))


def _series_to_float(series):
    return series.cast(pl.Float64).fill_null(np.nan).to_numpy()


def _interpolate_nans(values):
    values = np.array(values, dtype=float, copy=True)
    if values.size == 0:
        return values
    valid = np.isfinite(values)
    if not valid.any():
        return values
    idx = np.arange(values.size)
    values[~valid] = np.interp(idx[~valid], idx[valid], values[valid])
    return values


def _nan_to_none(value):
    if isinstance(value, (float, np.floating)) and not np.isfinite(value):
        return None
    return value


def compute_rep_metrics(g: pl.DataFrame) -> dict:
    g = g.sort("rel_frame")
    rel_frame = g["rel_frame"].to_numpy()
    pre_mask = (rel_frame >= -PRE_FRAMES) & (rel_frame <= -1)
    post_mask = (rel_frame >= 0) & (rel_frame <= POST_FRAMES)
    post_mask_strict = (rel_frame >= 1) & (rel_frame <= POST_FRAMES)

    v_close = g["v_close"].to_numpy()
    pre_v_close = v_close[pre_mask]
    post_v_close = v_close[post_mask]
    pre_v_close = pre_v_close[np.isfinite(pre_v_close)]
    post_v_close = post_v_close[np.isfinite(post_v_close)]

    pre_v_close_mean = _mean_or_nan(pre_v_close)
    post_v_close_mean = _mean_or_nan(post_v_close)

    delta_v_close = post_v_close_mean - pre_v_close_mean
    ratio_v_close = np.nan
    if np.isfinite(pre_v_close_mean) and abs(pre_v_close_mean) > 1e-6:
        ratio_v_close = post_v_close_mean / pre_v_close_mean

    frame_dt = g["frame_dt"].to_numpy()
    frame_dt = np.where(np.isfinite(frame_dt), frame_dt, FRAME_DT)
    post_dt = frame_dt[post_mask_strict]
    post_v_close_strict = v_close[post_mask_strict]
    auc_pos_v_close_post = float(np.nansum(np.clip(post_v_close_strict, 0, None) * post_dt))

    # Axis definition from pre-contact geometry
    axis_defined = False
    axis_source = "undefined"
    axis_norm = np.nan
    u_x = np.nan
    u_y = np.nan
    axis_definition = "OL_to_DL_pre"
    if AXIS_SIGN < 0:
        axis_definition = "OL_to_DL_pre_flipped"

    dl_x = _series_to_float(g["dl_x"])
    dl_y = _series_to_float(g["dl_y"])
    ol_x = _series_to_float(g["ol_x"])
    ol_y = _series_to_float(g["ol_y"])

    pre_valid = (
        pre_mask
        & np.isfinite(dl_x)
        & np.isfinite(dl_y)
        & np.isfinite(ol_x)
        & np.isfinite(ol_y)
    )
    if np.any(pre_valid):
        r_x = float(np.mean(dl_x[pre_valid] - ol_x[pre_valid]))
        r_y = float(np.mean(dl_y[pre_valid] - ol_y[pre_valid]))
        axis_norm = float(np.hypot(r_x, r_y))
        if np.isfinite(axis_norm) and axis_norm >= MIN_AXIS_NORM:
            u_x = AXIS_SIGN * (r_x / axis_norm)
            u_y = AXIS_SIGN * (r_y / axis_norm)
            axis_defined = True
            axis_source = "mean_pre"
        else:
            first_idx = np.where(pre_valid)[0][0]
            r_x = dl_x[first_idx] - ol_x[first_idx]
            r_y = dl_y[first_idx] - ol_y[first_idx]
            axis_norm = float(np.hypot(r_x, r_y))
            if np.isfinite(axis_norm) and axis_norm >= MIN_AXIS_NORM:
                u_x = AXIS_SIGN * (r_x / axis_norm)
                u_y = AXIS_SIGN * (r_y / axis_norm)
                axis_defined = True
                axis_source = "first_pre"

    # Axis-projected closing velocity
    pre_v_close_par_mean = np.nan
    post_v_close_par_mean = np.nan
    delta_v_close_par = np.nan
    ratio_v_close_par = np.nan
    auc_pos_v_close_par_post = np.nan
    pre_par_n = 0
    post_par_n = 0
    v_close_par = None

    if axis_defined:
        sep_par = (dl_x - ol_x) * u_x + (dl_y - ol_y) * u_y
        v_close_par = np.full(len(g), np.nan, dtype=float)
        v_close_par[1:] = (sep_par[:-1] - sep_par[1:]) / frame_dt[1:]

        pre_v_close_par = v_close_par[pre_mask]
        post_v_close_par = v_close_par[post_mask]
        post_v_close_par_strict = v_close_par[post_mask_strict]

        pre_par_n = int(np.isfinite(pre_v_close_par).sum())
        post_par_n = int(np.isfinite(post_v_close_par).sum())

        pre_v_close_par_mean = _mean_or_nan(pre_v_close_par)
        post_v_close_par_mean = _mean_or_nan(post_v_close_par)
        delta_v_close_par = post_v_close_par_mean - pre_v_close_par_mean
        if np.isfinite(pre_v_close_par_mean) and abs(pre_v_close_par_mean) > 1e-6:
            ratio_v_close_par = post_v_close_par_mean / pre_v_close_par_mean

        post_dt_arr = frame_dt[post_mask_strict]
        auc_pos_v_close_par_post = float(
            np.nansum(np.clip(post_v_close_par_strict, 0, None) * post_dt_arr)
        )

    # Displacement projections (endpoint displacement)
    delta_center_par = np.nan
    delta_center_perp = np.nan
    delta_ol_par = np.nan
    delta_ol_perp = np.nan

    if axis_defined and np.any(post_mask):
        v_x = -u_y
        v_y = u_x

        center_x = _series_to_float(g["center_x"])
        center_y = _series_to_float(g["center_y"])
        center_par = center_x * u_x + center_y * u_y
        center_perp = center_x * v_x + center_y * v_y
        ol_par = ol_x * u_x + ol_y * u_y
        ol_perp = ol_x * v_x + ol_y * v_y

        post_idx = np.where(post_mask)[0]
        start_idx = post_idx[0]
        end_idx = post_idx[-1]

        delta_center_par = center_par[end_idx] - center_par[start_idx]
        delta_center_perp = center_perp[end_idx] - center_perp[start_idx]
        delta_ol_par = ol_par[end_idx] - ol_par[start_idx]
        delta_ol_perp = ol_perp[end_idx] - ol_perp[start_idx]

    # Time-to-stall (relative to pre baseline, axis-projected)
    time_to_stall = np.nan
    pre_v_close_par_positive = False
    if axis_defined and np.isfinite(pre_v_close_par_mean) and pre_v_close_par_mean > 0:
        pre_v_close_par_positive = True
        stall_threshold = max(STALL_ALPHA * pre_v_close_par_mean, STALL_ABS_FLOOR)
        post_rel = rel_frame[post_mask_strict]
        post_vcp = v_close_par[post_mask_strict] if v_close_par is not None else np.array([])
        count = 0
        for rel_f, v in zip(post_rel, post_vcp):
            if not np.isfinite(v):
                count = 0
                continue
            if v <= stall_threshold:
                count += 1
                if count >= STALL_PERSIST:
                    time_to_stall = rel_f
                    break
            else:
                count = 0

    # COD / lateral proxies (DL) in pre window
    omega_max_pre = np.nan
    a_lat_max_pre = np.nan
    jerk_max_pre = np.nan

    dl_dir = _series_to_float(g["dl_dir"])
    if np.any(np.isfinite(dl_dir)):
        dir_filled = _interpolate_nans(dl_dir)
        dir_rad = np.deg2rad(dir_filled)
        dir_unwrapped = np.unwrap(dir_rad)
        omega = np.full(len(g), np.nan, dtype=float)
        omega[1:] = np.diff(dir_unwrapped) / frame_dt[1:]
        a_lat = _series_to_float(g["dl_s"]) * omega
        jerk = np.full(len(g), np.nan, dtype=float)
        dl_a = _series_to_float(g["dl_a"])
        jerk[1:] = np.diff(dl_a) / frame_dt[1:]

        omega_max_pre = _max_abs_or_nan(omega[pre_mask])
        a_lat_max_pre = _max_abs_or_nan(a_lat[pre_mask])
        jerk_max_pre = _max_abs_or_nan(jerk[pre_mask])

    metrics = {
        "rep_id": g["rep_id"][0],
        "axis_defined": axis_defined,
        "axis_source": axis_source,
        "axis_definition": axis_definition,
        "axis_norm": axis_norm,
        "axis_u_x": u_x,
        "axis_u_y": u_y,
        "pre_n": int(pre_v_close.size),
        "post_n": int(post_v_close.size),
        "pre_par_n": pre_par_n,
        "post_par_n": post_par_n,
        "pre_v_close_mean": pre_v_close_mean,
        "post_v_close_mean": post_v_close_mean,
        "delta_v_close": delta_v_close,
        "ratio_v_close": ratio_v_close,
        "auc_pos_v_close_post": auc_pos_v_close_post,
        "pre_v_close_par_mean": pre_v_close_par_mean,
        "post_v_close_par_mean": post_v_close_par_mean,
        "delta_v_close_par": delta_v_close_par,
        "ratio_v_close_par": ratio_v_close_par,
        "auc_pos_v_close_par_post": auc_pos_v_close_par_post,
        "time_to_stall_frames": time_to_stall,
        "delta_center_par": delta_center_par,
        "delta_center_perp": delta_center_perp,
        "delta_ol_par": delta_ol_par,
        "delta_ol_perp": delta_ol_perp,
        "pre_v_close_par_positive": pre_v_close_par_positive,
        "omega_max_pre": omega_max_pre,
        "a_lat_max_pre": a_lat_max_pre,
        "jerk_max_pre": jerk_max_pre,
    }
    return {k: _nan_to_none(v) for k, v in metrics.items()}


rows = []
for rep_id, g in axis_df.group_by("rep_id"):
    rows.append(compute_rep_metrics(g))

rep_metrics = pl.DataFrame(rows)
rep_metrics = rep_metrics.join(rep_meta, on="rep_id", how="left")

# COD-heavy flag (p90 of omega_max_pre)
omega_vals = rep_metrics.get_column("omega_max_pre").drop_nulls().to_numpy()
if omega_vals.size > 0:
    omega_p90 = float(np.nanpercentile(omega_vals, 90))
    rep_metrics = rep_metrics.with_columns(
        (pl.col("omega_max_pre") >= omega_p90).alias("cod_heavy")
    )
else:
    omega_p90 = np.nan
    rep_metrics = rep_metrics.with_columns(pl.lit(False).alias("cod_heavy"))

# Derived scores for 2x2 grid
rep_metrics = rep_metrics.with_columns(
    (pl.col("pre_v_close_par_mean") - pl.col("post_v_close_par_mean")).alias("arrest_score"),
    pl.col("delta_center_par").alias("displacement_score"),
    pl.col("delta_ol_par").alias("displacement_score_ol"),
)

# Valid mask for primary summaries
rep_metrics = rep_metrics.with_columns(
    (
        pl.col("axis_defined")
        & pl.col("pre_v_close_par_positive")
        & (pl.col("pre_par_n") >= MIN_PRE_FRAMES)
        & (pl.col("post_par_n") >= MIN_POST_FRAMES)
    ).alias("valid_mask")
)


# OL-level stall score
stall_ol = (
    rep_metrics
    .filter(pl.col("valid_mask"))
    .with_columns(
        disp_abs=pl.col("delta_ol_par").abs(),  # or use delta_center_par if you prefer
        tts=pl.col("time_to_stall_frames").cast(pl.Float64),
    )
    .group_by(["ol_zebra_id", "ol_player_name"])
    .agg(
        n_reps=pl.len(),
        arrest_mean=pl.col("arrest_score").mean(),
        arrest_median=pl.col("arrest_score").median(),
        disp_abs_mean=pl.col("disp_abs").mean(),
        disp_abs_median=pl.col("disp_abs").median(),
        tts_mean=pl.col("tts").mean(),
        auc_pos_mean=pl.col("auc_pos_v_close_par_post").mean(),
    )
)

if stall_ol.height == 0:
    stall_ol_ranked = stall_ol
else:
    stall_ol = add_zscores(stall_ol, ["arrest_mean", "disp_abs_mean", "auc_pos_mean"])
    stall_ol = stall_ol.with_columns([
        (pl.col("z_arrest_mean") - pl.col("z_disp_abs_mean")).alias("stall_score"),
        (
            pl.col("z_arrest_mean")
            - pl.col("z_disp_abs_mean")
            - pl.lit(0.5) * pl.col("z_auc_pos_mean")
        ).alias("stall_score_v2"),
    ])
    stall_ol_ranked = stall_ol.sort("stall_score", descending=True)


# Rep-level stall score
stall_rep = (
    rep_metrics
    .filter(pl.col("valid_mask"))
    .with_columns(
        disp_abs=pl.col("delta_ol_par").abs()
    )
)

if stall_rep.height == 0:
    stall_rep_scored = stall_rep
else:
    stall_rep_scored = add_zscores(
        stall_rep,
        ["arrest_score", "disp_abs", "auc_pos_v_close_par_post"],
    )
    stall_rep_scored = stall_rep_scored.with_columns([
        (pl.col("z_arrest_score") - pl.col("z_disp_abs")).alias("stall_score"),
        (
            pl.col("z_arrest_score")
            - pl.col("z_disp_abs")
            - pl.lit(0.5) * pl.col("z_auc_pos_v_close_par_post")
        ).alias("stall_score_v2"),
    ])

# stall_rep_scored.sort("stall_score", descending=True)

PASS_EPS = 0.05
POST_FRAMES_STALL = POST_FRAMES if "POST_FRAMES" in globals() else 6

if "axis_df" not in globals():
    print("axis_df not found; run the Metric 9 cell first.")
    stall_rep_fixed = pl.DataFrame()
    stall_ol_fixed = pl.DataFrame()
else:
    rep_axes = (
        rep_metrics
        .filter(pl.col("valid_mask"))
        .select(["rep_id", "axis_u_x", "axis_u_y"])
        .drop_nulls(["axis_u_x", "axis_u_y"])
    )

    sep_par_post = (
        axis_df
        .join(rep_axes, on="rep_id", how="inner")
        .with_columns(
            sep_par=(
                (pl.col("dl_x") - pl.col("ol_x")) * pl.col("axis_u_x")
                + (pl.col("dl_y") - pl.col("ol_y")) * pl.col("axis_u_y")
            )
        )
        .filter((pl.col("rel_frame") >= 0) & (pl.col("rel_frame") <= POST_FRAMES_STALL))
    )

    passed_df = (
        sep_par_post
        .group_by("rep_id")
        .agg(pl.col("sep_par").min().alias("sep_par_min_post"))
        .with_columns(
            passed=pl.col("sep_par_min_post") <= PASS_EPS
        )
    )

    stall_rep_base = (
        rep_metrics
        .filter(pl.col("valid_mask"))
        .join(passed_df, on="rep_id", how="left")
        .with_columns(passed=pl.col("passed").fill_null(True))
        .filter(~pl.col("passed"))
        .with_columns(
            residual_close_auc=pl.col("auc_pos_v_close_par_post"),
            arrest_quality=-pl.col("auc_pos_v_close_par_post"),
            disp_abs=pl.col("delta_ol_par").abs(),
        )
    )

    if stall_rep_base.height == 0:
        stall_rep_fixed = stall_rep_base
        stall_ol_fixed = stall_rep_base
    else:
        stall_rep_fixed = add_zscores(stall_rep_base, ["arrest_quality", "disp_abs"])
        stall_rep_fixed = stall_rep_fixed.with_columns([
            (pl.col("z_arrest_quality") - pl.col("z_disp_abs")).alias("stall_score_fixed"),
        ])

        stall_ol_fixed = (
            stall_rep_fixed
            .group_by(["ol_zebra_id", "ol_player_name"])
            .agg(
                n_reps=pl.len(),
                stall_score_median=pl.col("stall_score_fixed").median(),
                arrest_quality_mean=pl.col("arrest_quality").mean(),
                disp_abs_mean=pl.col("disp_abs").mean(),
            )
            .sort("stall_score_median", descending=True)
        )

display(stall_ol_fixed)



ol_zebra_id,ol_player_name,n_reps,stall_score_median,arrest_quality_mean,disp_abs_mean
i64,str,u32,f64,f64,f64
1760000095,"""Hunter Nourzad""",3,1.088486,-0.215204,0.607621
1760000093,"""Kaitori Leveston""",3,0.942277,-0.208637,0.526596
1770000093,"""Willis Patrick""",5,0.922655,-0.209977,0.585936
1770000091,"""Mason McCormick""",4,0.874899,-0.249555,0.454472
1760000094,"""Christian Mahogany""",2,0.536401,-0.226965,0.650676
1770000092,"""Doug Nester""",1,0.195451,-0.040821,1.407318
1760000090,"""Karsen Barnhart""",4,0.078926,-0.225527,1.069111
1760000092,"""Trente Jones""",2,-0.002036,-0.07782,1.38667
1770000095,"""Dylan McMahon""",4,-0.138549,-0.218291,1.055349
1770000089,"""C.J. Hanson""",7,-0.450614,-0.171434,1.199424


##### Note: Above leaderboard was not used for slides. Fix was implemented to avoid scoring a DL winning laterally and beating the OL as a high stall score.

In [8]:
LAT_Q = 0.90

if stall_rep_fixed.height == 0:
    stall_rep_fixed_wash = stall_rep_fixed.with_columns(
        pl.lit(False).alias("lat_wash")
    )
    stall_ol_no_wash = stall_rep_fixed
    stall_ol_with_wash = stall_rep_fixed
else:
    lat_eps = quantile_threshold_abs(stall_rep_fixed, "delta_center_perp", LAT_Q)

    stall_rep_fixed_wash = stall_rep_fixed.with_columns(
        pl.when(pl.lit(lat_eps).is_null())
          .then(pl.lit(False))
          .otherwise(pl.col("delta_center_perp").abs() >= pl.lit(lat_eps))
          .alias("lat_wash")
    )

    stall_ol_no_wash = (
        stall_rep_fixed_wash
        .filter(~pl.col("lat_wash"))
        .group_by(["ol_zebra_id", "ol_player_name"])
        .agg(
            n_reps=pl.len(),
            stall_score_mean=pl.col("stall_score_fixed").median(),
        )
        .sort("stall_score_mean", descending=True)
    )

    stall_ol_with_wash = (
        stall_rep_fixed_wash
        .group_by(["ol_zebra_id", "ol_player_name"])
        .agg(
            n_reps=pl.len(),
            stall_score_mean=pl.col("stall_score_fixed").median(),
            wash_rate=pl.col("lat_wash").mean(),
            lat_perp_mean=pl.col("delta_center_perp").abs().mean(),
        )
        .sort("stall_score_mean", descending=True)
    )


In [None]:
# Leaderboard used for slides
stall_ol_no_wash.join(ol_depth_at_25, on="ol_player_name", how="left")

ol_zebra_id,ol_player_name,n_reps,stall_score_mean,count,median_dl_x
i64,str,u32,f64,u32,f64
1760000093,"""Kaitori Leveston""",3,0.942277,4.0,3.094915
1770000091,"""Mason McCormick""",4,0.874899,4.0,2.383333
1760000094,"""Christian Mahogany""",2,0.536401,4.0,3.221329
1760000095,"""Hunter Nourzad""",2,0.419064,4.0,3.379145
1770000092,"""Doug Nester""",1,0.195451,,
1770000093,"""Willis Patrick""",3,0.193087,5.0,3.536041
1760000090,"""Karsen Barnhart""",4,0.078926,4.0,3.420354
1760000092,"""Trente Jones""",2,-0.002036,4.0,3.55515
1770000095,"""Dylan McMahon""",4,-0.138549,5.0,3.015523
1770000089,"""C.J. Hanson""",7,-0.450614,10.0,3.445333


In [None]:
# stall small multiple goes here

import math
import numpy as np
import polars as pl
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter

MAX_FRAME = 25
TOP_N = 6
N_COLS = 3
X_MIN, X_MAX = -10.0, 10.0
Y_MIN, Y_MAX = 15.0, 35.0
FIELD_COLOR = "#2e7d32"
BG_COLOR = "#121317"
MARKER_SIZE = 12 ** 2
MARKER_EDGE_COLOR = "white"
MARKER_EDGE_WIDTH = 2
TITLE_COLOR = "#e7e9eb"
GIF_PATH = "stall_top6.gif"
FPS = 10

if "stall_rep_fixed" not in globals():
    print("stall_rep_fixed not found; run the stall score (pass filtered) cell first.")
else:
    top_reps = (
        stall_rep_fixed_wash
        .filter(~pl.col("lat_wash"))
        .sort("stall_score_fixed", descending=True)
        .head(TOP_N)
        .select([
            "session_name",
            "rep_number",
            "ol_player_name",
            "dl_player_name",
            "stall_score_fixed",
        ])
    )

    if top_reps.height == 0:
        print("No stall reps found to animate.")
    else:
        reps = top_reps.to_dicts()
        n_reps = len(reps)
        n_rows = max(1, math.ceil(n_reps / N_COLS))

        def format_score(value):
            if value is None:
                return "na"
            if isinstance(value, float) and not np.isfinite(value):
                return "na"
            return f"{value:.2f}"

        score_texts = [format_score(r.get("stall_score_fixed")) for r in reps]

        def build_track(session_name, rep_number):
            rep_df = (
                df
                .filter(
                    (pl.col("session_name") == session_name)
                    & (pl.col("rep_number") == rep_number)
                    & (pl.col("frame_number") <= MAX_FRAME)
                )
                .sort("frame_number")
            )

            if rep_df.height == 0:
                return None

            frames = rep_df["frame_number"].to_list()
            ol_x = rep_df["ol_x"].to_list()
            ol_y = rep_df["ol_y"].to_list()
            dl_x = rep_df["dl_x"].to_list()
            dl_y = rep_df["dl_y"].to_list()

            length = MAX_FRAME + 1
            ol_x_arr = np.full(length, np.nan, dtype=float)
            ol_y_arr = np.full(length, np.nan, dtype=float)
            dl_x_arr = np.full(length, np.nan, dtype=float)
            dl_y_arr = np.full(length, np.nan, dtype=float)

            for f, x, y, dx, dy in zip(frames, ol_x, ol_y, dl_x, dl_y):
                if f is None:
                    continue
                f_idx = int(f)
                if 0 <= f_idx <= MAX_FRAME:
                    ol_x_arr[f_idx] = x
                    ol_y_arr[f_idx] = y
                    dl_x_arr[f_idx] = dx
                    dl_y_arr[f_idx] = dy

            def fill_forward(values):
                if np.all(np.isnan(values)):
                    return values
                first = int(np.argmax(~np.isnan(values)))
                values[:first] = values[first]
                for i in range(first + 1, len(values)):
                    if np.isnan(values[i]):
                        values[i] = values[i - 1]
                return values

            return {
                "ol_x": fill_forward(ol_x_arr),
                "ol_y": fill_forward(ol_y_arr),
                "dl_x": fill_forward(dl_x_arr),
                "dl_y": fill_forward(dl_y_arr),
            }

        tracks = [build_track(r["session_name"], r["rep_number"]) for r in reps]

        fig, axes = plt.subplots(
            n_rows,
            N_COLS,
            figsize=(5.74, 3.96),
            sharex=True,
            sharey=True,
        )

        axes_flat = np.array(axes).reshape(-1)
        axes_active = axes_flat[:n_reps]

        for ax in axes_flat[n_reps:]:
            ax.set_axis_off()

        ol_pts = []
        dl_pts = []

        for ax, track, score_text in zip(axes_active, tracks, score_texts):
            for x_val in range(int(X_MIN), int(X_MAX) + 1, 5):
                if x_val == 0:
                    ax.axvline(x_val, color="yellow", linewidth=1.5, alpha=0.8)
                else:
                    ax.axvline(x_val, color="white", linewidth=0.8, alpha=0.4)

            ax.set_xlim(X_MIN, X_MAX)
            ax.set_ylim(Y_MIN, Y_MAX)
            ax.set_aspect("equal", adjustable="box")
            ax.set_facecolor(FIELD_COLOR)
            ax.set_xticks([])
            ax.set_yticks([])
            ax.set_xlabel("")
            ax.set_ylabel("")

            if track is None:
                ax.set_title(f"Stall Score: {score_text}", color=TITLE_COLOR)
                ol_pts.append(
                    ax.scatter(
                        [], [],
                        s=MARKER_SIZE,
                        color="dodgerblue",
                        edgecolors=MARKER_EDGE_COLOR,
                        linewidths=MARKER_EDGE_WIDTH,
                    )
                )
                dl_pts.append(
                    ax.scatter(
                        [], [],
                        s=MARKER_SIZE,
                        color="red",
                        edgecolors=MARKER_EDGE_COLOR,
                        linewidths=MARKER_EDGE_WIDTH,
                    )
                )
                continue

            ol_pt = ax.scatter(
                [track["ol_x"][0]],
                [track["ol_y"][0]],
                s=MARKER_SIZE,
                color="dodgerblue",
                edgecolors=MARKER_EDGE_COLOR,
                linewidths=MARKER_EDGE_WIDTH,
            )
            dl_pt = ax.scatter(
                [track["dl_x"][0]],
                [track["dl_y"][0]],
                s=MARKER_SIZE,
                color="red",
                edgecolors=MARKER_EDGE_COLOR,
                linewidths=MARKER_EDGE_WIDTH,
            )
            ax.set_title(f"Stall Score: {score_text}", color=TITLE_COLOR)
            ol_pts.append(ol_pt)
            dl_pts.append(dl_pt)

        fig.subplots_adjust(wspace=0.08, hspace=0.22)

        fig.patch.set_facecolor(BG_COLOR)
        fig.patch.set_alpha(1.0)

        def update(frame_idx):
            for track, ol_pt, dl_pt in zip(tracks, ol_pts, dl_pts):
                if track is None:
                    continue
                ol_pt.set_offsets([[track["ol_x"][frame_idx], track["ol_y"][frame_idx]]])
                dl_pt.set_offsets([[track["dl_x"][frame_idx], track["dl_y"][frame_idx]]])
            return ol_pts + dl_pts

        anim = FuncAnimation(
            fig,
            update,
            frames=list(range(0, MAX_FRAME + 1)),
            interval=100,
            blit=False,
        )

        anim.save(GIF_PATH, writer=PillowWriter(fps=FPS), savefig_kwargs={"facecolor": BG_COLOR})
        plt.close(fig)
        print(f"Saved {GIF_PATH}")


Saved stall_top6.gif


In [11]:
# Bull Score

WALKBACK_EPS = 0.05

bull_rep = (
    rep_metrics
    .filter(pl.col("valid_mask"))
    .filter(pl.col("arrest_score") > 0)
    .with_columns(
        walkback=clip_min_expr(-pl.col("delta_ol_par"), 0.0)
    )
    .filter(pl.col("walkback") > WALKBACK_EPS)
)

if bull_rep.height == 0:
    bull_rep_scored = bull_rep
    bull_rep_summary = bull_rep
else:
    bull_rep_scored = add_zscores(bull_rep, ["walkback", "arrest_score"]).with_columns(
        (pl.col("z_walkback") + pl.lit(0.25) * pl.col("z_arrest_score")).alias("bull_score")
    )

    bull_rep_summary = bull_rep_scored.select([
        "session_name",
        "rep_number",
        "ol_player_name",
        "dl_player_name",
        "walkback",
        "arrest_score",
        "bull_score",
    ])

bull_rep_summary.group_by("dl_player_name").agg([
    pl.col("bull_score").median().alias("bull_score_median"),
    pl.col("dl_player_name").count().alias("count"),
]).sort("bull_score_median", descending=True)


dl_player_name,bull_score_median,count
str,f64,u32
"""Jowon Briggs""",1.231916,3
"""Tuli Letuligasenoa""",0.975668,3
"""Myles Murphy""",0.968802,5
"""Jordan Miller""",0.773035,2
"""Logan Lee""",0.607201,2
"""Javontae Jean-Baptiste """,0.567021,1
"""Khristian Boyd""",0.490143,5
"""Justin Rogers""",0.290928,5
"""David Ugwoegbu""",-0.128886,2
"""Fabien Lovett""",-0.3894,3


In [12]:
# Top 6 bull reps animation (small multiples)

import math
import numpy as np
import polars as pl
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter

MAX_FRAME = 30
TOP_N = 6
N_COLS = 3
X_MIN, X_MAX = -10.0, 10.0
Y_MIN, Y_MAX = 15.0, 35.0
FIELD_COLOR = "#2e7d32"
BG_COLOR = "#121317"
MARKER_SIZE = 12 ** 2
MARKER_EDGE_COLOR = "white"
MARKER_EDGE_WIDTH = 2
TITLE_COLOR = "#e7e9eb"
GIF_PATH = "bull_top6.gif"
FPS = 10

if "bull_rep_summary" not in globals():
    print("bull_rep_summary not found; run the Bull Score cell first.")
else:
    top_reps = (
        bull_rep_summary
        .sort("bull_score", descending=True)
        .head(TOP_N)
        .select(["session_name", "rep_number", "ol_player_name", "dl_player_name", "bull_score"])
    )

    if top_reps.height == 0:
        print("No bull reps found to animate.")
    else:
        reps = top_reps.to_dicts()
        n_reps = len(reps)
        n_rows = max(1, math.ceil(n_reps / N_COLS))

        def format_score(value):
            if value is None:
                return "na"
            if isinstance(value, float) and not np.isfinite(value):
                return "na"
            return f"{value:.2f}"

        score_texts = [format_score(r.get("bull_score")) for r in reps]

        def build_track(session_name, rep_number):
            rep_df = (
                df
                .filter(
                    (pl.col("session_name") == session_name)
                    & (pl.col("rep_number") == rep_number)
                    & (pl.col("frame_number") <= MAX_FRAME)
                )
                .sort("frame_number")
            )

            if rep_df.height == 0:
                return None

            frames = rep_df["frame_number"].to_list()
            ol_x = rep_df["ol_x"].to_list()
            ol_y = rep_df["ol_y"].to_list()
            dl_x = rep_df["dl_x"].to_list()
            dl_y = rep_df["dl_y"].to_list()

            length = MAX_FRAME + 1
            ol_x_arr = np.full(length, np.nan, dtype=float)
            ol_y_arr = np.full(length, np.nan, dtype=float)
            dl_x_arr = np.full(length, np.nan, dtype=float)
            dl_y_arr = np.full(length, np.nan, dtype=float)

            for f, x, y, dx, dy in zip(frames, ol_x, ol_y, dl_x, dl_y):
                if f is None:
                    continue
                f_idx = int(f)
                if 0 <= f_idx <= MAX_FRAME:
                    ol_x_arr[f_idx] = x
                    ol_y_arr[f_idx] = y
                    dl_x_arr[f_idx] = dx
                    dl_y_arr[f_idx] = dy

            def fill_forward(values):
                if np.all(np.isnan(values)):
                    return values
                first = int(np.argmax(~np.isnan(values)))
                values[:first] = values[first]
                for i in range(first + 1, len(values)):
                    if np.isnan(values[i]):
                        values[i] = values[i - 1]
                return values

            return {
                "ol_x": fill_forward(ol_x_arr),
                "ol_y": fill_forward(ol_y_arr),
                "dl_x": fill_forward(dl_x_arr),
                "dl_y": fill_forward(dl_y_arr),
            }

        tracks = [build_track(r["session_name"], r["rep_number"]) for r in reps]

        fig, axes = plt.subplots(
            n_rows,
            N_COLS,
            figsize=(5.74, 3.96),
            sharex=True,
            sharey=True,
        )

        axes_flat = np.array(axes).reshape(-1)
        axes_active = axes_flat[:n_reps]

        for ax in axes_flat[n_reps:]:
            ax.set_axis_off()

        ol_pts = []
        dl_pts = []

        for ax, track, score_text in zip(axes_active, tracks, score_texts):
            for x_val in range(int(X_MIN), int(X_MAX) + 1, 5):
                if x_val == 0:
                    ax.axvline(x_val, color="yellow", linewidth=1.5, alpha=0.8)
                else:
                    ax.axvline(x_val, color="white", linewidth=0.8, alpha=0.4)

            ax.set_xlim(X_MIN, X_MAX)
            ax.set_ylim(Y_MIN, Y_MAX)
            ax.set_aspect("equal", adjustable="box")
            ax.set_facecolor(FIELD_COLOR)
            ax.set_xticks([])
            ax.set_yticks([])
            ax.set_xlabel("")
            ax.set_ylabel("")

            if track is None:
                ax.set_title(f"Bull Score: {score_text}", color=TITLE_COLOR)
                ol_pts.append(
                    ax.scatter(
                        [], [],
                        s=MARKER_SIZE,
                        color="dodgerblue",
                        edgecolors=MARKER_EDGE_COLOR,
                        linewidths=MARKER_EDGE_WIDTH,
                    )
                )
                dl_pts.append(
                    ax.scatter(
                        [], [],
                        s=MARKER_SIZE,
                        color="red",
                        edgecolors=MARKER_EDGE_COLOR,
                        linewidths=MARKER_EDGE_WIDTH,
                    )
                )
                continue

            ol_pt = ax.scatter(
                [track["ol_x"][0]],
                [track["ol_y"][0]],
                s=MARKER_SIZE,
                color="dodgerblue",
                edgecolors=MARKER_EDGE_COLOR,
                linewidths=MARKER_EDGE_WIDTH,
            )
            dl_pt = ax.scatter(
                [track["dl_x"][0]],
                [track["dl_y"][0]],
                s=MARKER_SIZE,
                color="red",
                edgecolors=MARKER_EDGE_COLOR,
                linewidths=MARKER_EDGE_WIDTH,
            )
            ax.set_title(f"Bull Score: {score_text}", color=TITLE_COLOR)
            ol_pts.append(ol_pt)
            dl_pts.append(dl_pt)

        fig.subplots_adjust(wspace=0.08, hspace=0.22)

        fig.patch.set_facecolor(BG_COLOR)
        fig.patch.set_alpha(1.0)

        def update(frame_idx):
            for track, ol_pt, dl_pt in zip(tracks, ol_pts, dl_pts):
                if track is None:
                    continue
                ol_pt.set_offsets([[track["ol_x"][frame_idx], track["ol_y"][frame_idx]]])
                dl_pt.set_offsets([[track["dl_x"][frame_idx], track["dl_y"][frame_idx]]])
            return ol_pts + dl_pts

        anim = FuncAnimation(
            fig,
            update,
            frames=list(range(0, MAX_FRAME + 1)),
            interval=100,
            blit=False,
        )

        anim.save(GIF_PATH, writer=PillowWriter(fps=FPS), savefig_kwargs={"facecolor": BG_COLOR})
        plt.close(fig)
        print(f"Saved {GIF_PATH}")

Saved bull_top6.gif
