In [None]:
from understatapi import UnderstatClient
import pandas as pd
import numpy as np
from math import hypot
from time import sleep
from tqdm.auto import tqdm
import json
from pprint import pprint

import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression


In [None]:
LEAGUE = "EPL"    # "EPL", "La_Liga", "Bundesliga", "Serie_A", "Ligue_1", "RFPL"
SEASON = ["2024"]

In [None]:
def get_league_match_ids(league: str, seasons: list[str]):
    """Retorna lista de match_ids por liga y temporadas."""
    all_match_ids = []
    all_matches = []

    with UnderstatClient() as us:
        for season in seasons:
            matches = us.league(league=league).get_match_data(season=season)
            all_match_ids.extend([m["id"] for m in matches])
            all_matches.extend(matches)

    return all_match_ids, all_matches

match_ids, raw_matches = get_league_match_ids(LEAGUE, SEASON)
print(f"Total de matches encontrados: {len(match_ids)}")
print("Ejemplos de match_ids:", match_ids[:10])

print("\nEjemplo de cómo viene un match:")
pprint({k: raw_matches[0][k] for k in list(raw_matches[0].keys())[:10]})

In [None]:
def fetch_match_shots(match_id: str):
    """Return dict with 'h' and 'a' shot lists straight from Understat."""
    with UnderstatClient() as us:
        data = us.match(match=match_id).get_shot_data()
    return data

sample_mid = match_ids[0]
md = fetch_match_shots(sample_mid)

print(f"\nMatch {sample_mid} - raw keys:", md.keys())
print("Home shots:", len(md.get("h", [])), "| Away shots:", len(md.get("a", [])))

# Pretty-print one raw shot for the audience
print("\nFirst HOME shot (raw Understat JSON):")
print(json.dumps(md["h"][0], indent=2)[:800])


In [None]:
# METODOS de obtención y formateo de datos Optimizados
def fetch_match_shots(match_id: str, client: UnderstatClient) -> list[dict]:
    out = []
    md = client.match(match=match_id).get_shot_data()
    for side in ("h", "a"):
        for s in md.get(side, []):
            s = dict(s)  # copy
            s["h_a"] = side
            s["match_id"] = match_id
            out.append(s)
    return out

def shots_to_df(shots: list[dict]) -> pd.DataFrame:
    rows = []
    for s in shots:
        rows.append({
            "match_id": s.get("match_id"),
            "minute": int(s.get("minute")),
            "team": s.get("h_team") if s.get("h_a") == "h" else s.get("a_team"),
            "h_a": s.get("h_a"),
            "player": s.get("player"),
            "result": s.get("result"),             
            "X": float(s.get("X")),                
            "Y": float(s.get("Y")),                
            "is_goal": 1 if s.get("result") == "Goal" else 0,
            "situation": s.get("situation")
        })
    df = pd.DataFrame(rows)
    return df.dropna(subset=["X", "Y"]).reset_index(drop=True)

In [None]:
all_shots = []
with UnderstatClient() as us:
    for mid in tqdm(match_ids, desc=f"Descargando tiros {LEAGUE} {SEASON}"):
        try:
            all_shots.extend(fetch_match_shots(mid, us))
        except Exception:
            pass
        sleep(0.15)

len(all_shots)

In [None]:
# --- Dataframe preview ---
df_raw = shots_to_df(all_shots)

print("Rows:", len(df_raw))

df_raw = df_raw[~df_raw["situation"].isin(["Penalty","DirectFreekick"])]

print("Rows filtrando Penalties y tiros libres:", len(df_raw))

df_raw.sample(8).sort_values(["match_id","minute"]).head(8)


In [None]:
# --- Estandarizar coordenadas ---
def standardize_shot_coordinates(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    mask = out["X"] < 0.5
    out.loc[mask, "X"] = 1 - out.loc[mask, "X"]
    return out

df_std = standardize_shot_coordinates(df_raw)


In [None]:
PITCH_LENGTH_M = 100.0
# PITCH_WIDTH_M  = 65.0

def add_longitudinal_distance_m(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    # Con X estandarizado hacia x=1: la línea de gol rival está en X=1
    out["dist_long_m"] = (1.0 - out["X"]) * PITCH_LENGTH_M
    return out

df_long = add_longitudinal_distance_m(df_std)


In [None]:
BIN_WIDTH = 1.0
ALPHA = 1.0
LOW_N = 10
D_MIN, D_MAX = 2.0, 33.0

bins = np.arange(0.0, np.ceil(df_long["dist_long_m"].max()) + BIN_WIDTH, BIN_WIDTH)
df_long["dist_bin"] = pd.cut(df_long["dist_long_m"], bins=bins, right=False)

bin_stats_long = (df_long
    .groupby("dist_bin", observed=True)
    .agg(
        shots=("is_goal","size"),
        goals=("is_goal","sum"),
        d_min=("dist_long_m","min"),
        d_max=("dist_long_m","max")
    )
    .dropna(subset=["d_min","d_max"])
    .reset_index(drop=True)
)

bin_stats_long["d_mid"]  = 0.5*(bin_stats_long["d_min"]+bin_stats_long["d_max"])
bin_stats_long["p_goal"] = (bin_stats_long["goals"] + ALPHA) / (bin_stats_long["shots"] + 2*ALPHA)

In [None]:
# Muestra dispersión de rango completo con colores para colas "ruidosas" y bins de muestra bajos
LOW_N = 10

tail_mask = bin_stats_long["d_mid"] > D_MAX
lown_mask = bin_stats_long["shots"] < LOW_N

plt.figure(figsize=(8,5))
plt.scatter(bin_stats_long["d_mid"], bin_stats_long["p_goal"], s=24, label="Todos los bins")
plt.scatter(bin_stats_long.loc[tail_mask, "d_mid"],
            bin_stats_long.loc[tail_mask, "p_goal"], s=36, label=f"> {D_MAX} m (cola)")
plt.scatter(bin_stats_long.loc[lown_mask, "d_mid"],
            bin_stats_long.loc[lown_mask, "p_goal"], s=36, label=f"tiros < {LOW_N}")
plt.axvline(D_MAX, ls="--", lw=1.5)
plt.xlabel("Distancia hasta la portería (m)")
plt.ylabel("Probabilidad de Gol calculada")
plt.title("bins ruidosos al graficar >30m")
plt.legend()
plt.show()


In [None]:
# Cuantificamos porqué los bins son ruidosos
share_tail = (df_long["dist_long_m"] > D_MAX).mean()
shots_tail = int((df_long["dist_long_m"] > D_MAX).sum())
print(f"Tiros más allá de {D_MAX} m: {shots_tail} ({share_tail:.1%} de todos los tiros)")

# Mostrar los peores: bins con p alto pero tamaños de muestra pequeños o distancias muy largas
display_cols = ["d_mid", "shots", "goals", "p_goal"]
tail_preview = (bin_stats_long[tail_mask | lown_mask]
                .sort_values(["d_mid", "shots"], ascending=[True, True])
                [display_cols].head(12))
tail_preview


In [None]:
fit_bins = bin_stats_long[
    (bin_stats_long["shots"] >= LOW_N) &
    (bin_stats_long["d_mid"] >= D_MIN) &
    (bin_stats_long["d_mid"] <= D_MAX)
].copy()

plt.figure(figsize=(8,5))
plt.scatter(bin_stats_long["d_mid"], bin_stats_long["p_goal"], s=18, alpha=0.35, label="Todos los bins")
plt.scatter(fit_bins["d_mid"], fit_bins["p_goal"], s=36, label="Conservamos para la regresión")
plt.axvspan(D_MIN, D_MAX, color="grey", alpha=0.1, label="Rango de regresión")
plt.xlabel("Distancia hasta la portería (m)")
plt.ylabel("Probabilidad de gol por tiro")
plt.title("Rango de regresión limpio: 2–30 m, bins con ≥10 tiros")
plt.legend()
plt.show()


In [None]:
X_log = np.log(fit_bins["d_mid"].values).reshape(-1,1)
y_log = np.log(fit_bins["p_goal"].values)
w = fit_bins["shots"].values

lin = LinearRegression()
lin.fit(X_log, y_log, sample_weight=w)
b_hat = lin.coef_[0]
a_hat = float(np.exp(lin.intercept_))

In [None]:
def powercurve_xg(d_array, a: float, b: float):
    """power curve p(d) = a * d^b vectorizada."""
    d = np.maximum(np.asarray(d_array, dtype=float), 0.5)
    return a * (d ** b)

d_all = np.linspace(bin_stats_long["d_mid"].min(), bin_stats_long["d_mid"].max(), 400)
d_fit = np.linspace(D_MIN, D_MAX, 300)

plt.figure(figsize=(8,5))
plt.scatter(bin_stats_long["d_mid"], bin_stats_long["p_goal"], s=22, alpha=0.28, label="All bins")
plt.scatter(fit_bins["d_mid"], fit_bins["p_goal"], s=40, alpha=0.95, label="Kept for fit")
plt.axvspan(D_MIN, D_MAX, color="grey", alpha=0.10, label="Fit range")

p_fit_wls = powercurve_xg(d_fit, a_hat, b_hat)
plt.plot(d_fit, p_fit_wls, lw=2, label=f"WLS (bins) curve (a={a_hat:.4f}, b={b_hat:.4f})")

plt.xlabel("Distancia hasta la portería (m)")
plt.ylabel("Probabilidad de gol por tiro")
plt.title(f"Curva de potencia — {LEAGUE} {SEASON}")
plt.legend()
plt.tight_layout()
plt.show()


In [None]:
# xG por tiro a partir de la curva WLS
df_long["xg_wls"] = np.clip(powercurve_xg(df_long["dist_long_m"], a_hat, b_hat), 1e-9, 1 - 1e-9)

# 1) Por partido-equipo (OPEN PLAY; penales/tiros libres directos ya fueron filtrados antes)
team_match_xg_open = (
    df_long.groupby(["match_id", "team"], as_index=False)
           .agg(
               shots_open_play=("is_goal", "size"),
               goals_open_play=("is_goal", "sum"),
               xg_open_play=("xg_wls", "sum"),
           )
)

# 2) Por equipo-temporada (OPEN PLAY)
team_season_xg_open = (
    team_match_xg_open.groupby("team", as_index=False)
                      .agg(
                          matches=("match_id","nunique"),
                          shots_open_play=("shots_open_play","sum"),
                          goals_open_play=("goals_open_play","sum"),
                          xg_open_play=("xg_open_play","sum"),
                      )
)
team_season_xg_open["residual_open_play"] = (
    team_season_xg_open["goals_open_play"] - team_season_xg_open["xg_open_play"]
)

# 3) División local/visitante (OPEN PLAY) — renombrar h_a → venue y corregir problemas de categorías
df_long = df_long.copy()
df_long["venue"] = df_long["h_a"].map({"h": "home", "a": "away"})
# Definir categorías fijas para evitar desajustes en combinaciones no observadas
df_long["venue"] = pd.Categorical(df_long["venue"], categories=["home", "away"], ordered=False)

# Opcional: protección si tus datos pueden tener nulos
df_ha = df_long.dropna(subset=["team", "venue"])

# Build home/away open-play aggregates
home_away_xg_open = (
    df_ha
    .groupby(["team", "venue"], observed=True)
    .agg(
        matches=("match_id", "nunique"),
        goals_open_play=("is_goal", "sum"),
        xg_open_play=("xg_wls", "sum"),
    )
    .reset_index()
)

home_away_xg_open["xg_per_match_open"] = (
    home_away_xg_open["xg_open_play"] / home_away_xg_open["matches"]
)

home_away_pivot_open = (
    home_away_xg_open
        .pivot(index="team", columns="venue", values="xg_per_match_open")
        .rename(columns={"home": "home_xg_per_match_open", "away": "away_xg_per_match_open"})
        .rename_axis(None, axis=1)
        .reset_index()
)

# Optional: enforce column order
home_away_pivot_open = home_away_pivot_open.reindex(
    columns=["team", "home_xg_per_match_open", "away_xg_per_match_open"]
)

home_away_pivot_open

In [None]:
# --- 0) Funciones auxiliares ---
def safe_int(x, default=0):
    """Convertir a int de forma segura; devuelve default en caso de None/NaN/inválido."""
    try:
        if x is None:
            return default
        return int(float(x))  # maneja strings como "2" o "2.0"
    except Exception:
        return default

def _team_name_from_match_side(m: dict, side: str) -> str:
    """Extraer un nombre de equipo estable desde los metadatos del partido de Understat."""
    obj = m.get(side)
    if isinstance(obj, dict):
        for key in ("title", "team_title", "short_title", "name"):
            if key in obj and obj[key]:
                return str(obj[key])
        if "id" in obj and obj["id"] is not None:
            return str(obj["id"])
        return str(obj)
    return "" if obj is None else str(obj)

# --- 1) Marcador oficial por partido-equipo ---
def explode_match_meta(m: dict) -> pd.DataFrame:
    goals = m.get("goals") or {}
    mid = str(m.get("id"))
    h_name = _team_name_from_match_side(m, "h")
    a_name = _team_name_from_match_side(m, "a")
    return pd.DataFrame([
        {"match_id": mid, "team": h_name, "goals_official": safe_int(goals.get("h"), 0)},
        {"match_id": mid, "team": a_name, "goals_official": safe_int(goals.get("a"), 0)},
    ])

official_scores = pd.concat([explode_match_meta(m) for m in raw_matches], ignore_index=True)
official_scores["match_id"] = official_scores["match_id"].astype(str)
official_scores["team"] = official_scores["team"].astype(str)

# --- 2) Usar la agregación OPEN-PLAY de la celda anterior (asegurando llaves como str) ---
team_match_xg_open = (
    team_match_xg_open
    .assign(
        match_id=lambda d: d["match_id"].astype(str),
        team=lambda d: d["team"].astype(str),
    )
)

# --- 3) Merge (solo lo que nos interesa) ---
team_match_minimal = (
    team_match_xg_open[["match_id","team","shots_open_play","goals_open_play","xg_open_play"]]
    .merge(official_scores, on=["match_id","team"], how="left")
)

# ver cuántos goles no fueron open-play
team_match_minimal["non_open_goals"] = (
    (team_match_minimal["goals_official"] - team_match_minimal["goals_open_play"]).clip(lower=0)
)

# --- 4) Chequeo rápido para un partido ---
mid = "26965"
cols = [
    "match_id","team",
    "shots_open_play","goals_open_play","xg_open_play",
    "goals_official"  # <- marcador final incluyendo todos los goles
]
print(team_match_minimal.loc[team_match_minimal["match_id"] == mid, cols])

In [None]:
# Pivot solo para ese partido
match_xg_pivot = (
    team_match_minimal
        .loc[team_match_minimal["match_id"] == mid]
        .pivot(index="match_id", columns="team", values=["xg_open_play", "goals_official"])
        .reset_index()
)

# Aplanar nombres de columnas (MultiIndex → columnas simples)
match_xg_pivot.columns = [
    col if isinstance(col, str) else f"{col[0]}_{col[1]}"
    for col in match_xg_pivot.columns
]

print(match_xg_pivot)


In [None]:
from sklearn.metrics import r2_score

# Plain R² (bins)
p_pred_bin = np.clip(powercurve_xg(fit_bins["d_mid"].values, a_hat, b_hat), 1e-9, 1-1e-9)
r2_bin = r2_score(fit_bins["p_goal"].values, p_pred_bin)
print(f"a={a_hat:.6f}, b={b_hat:.6f}, R² (bin, longitudinal) = {r2_bin:.3f}")