In [2]:
# Premier League – Export xG Events for One Match (Understat)
# NOTE:
# 1) Change `match_id` below.
# 2) Run all cells. A CSV named `match_<match_id>_xg_events.csv` will be saved.

import re
import json
import requests
import pandas as pd
from bs4 import BeautifulSoup

def _extract_shots_json_from_script(script_text: str):
    """
    Robustly extract the JSON for shotsData from an Understat match page <script>.
    Handles both:
      - shotsData = JSON.parse('...');   (escaped JSON string)
      - shotsData = {...};               (direct JSON object)
    Returns a Python dict with keys like 'h' and 'a'.
    """
    text = script_text or ""
    # 1) Try JSON.parse('...') pattern (allow newlines)
    m = re.search(r"shotsData\s*=\s*JSON\.parse\((?P<q>['\"])(?P<data>.*?)(?P=q)\)\s*;", text, re.DOTALL)
    if m:
        escaped = m.group("data")
        # The captured data is a JS string literal containing JSON; we must unescape JS-style escapes.
        # Using 'unicode_escape' handles sequences like \n, \t, \uXXXX, \xNN.
        unescaped = bytes(escaped, "utf-8").decode("unicode_escape")
        return json.loads(unescaped)

    # 2) Fallback: direct assignment shotsData = {...};
    m2 = re.search(r"shotsData\s*=\s*(\{.*?\})\s*;", text, re.DOTALL)
    if m2:
        return json.loads(m2.group(1))

    raise RuntimeError("Could not extract shotsData JSON from script block.")

def fetch_understat_match_shots(match_id: int):
    """
    Fetch shots (with xG) for a single match from Understat.
    Returns a list of shot dicts with 'team_side' added (home/away).
    """
    url = f"https://understat.com/match/{match_id}"
    resp = requests.get(url, headers={
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    }, timeout=30)
    resp.raise_for_status()

    soup = BeautifulSoup(resp.text, "html.parser")
    shots_data = None

    # Search all scripts for one that contains 'shotsData'
    for s in soup.find_all("script"):
        txt = s.get_text() or ""
        if "shotsData" in txt:
            try:
                shots_data = _extract_shots_json_from_script(txt)
                break
            except Exception:
                # Try next script if this one didn't parse cleanly
                continue

    if shots_data is None:
        raise RuntimeError("shotsData not found. The page structure may have changed, or the match_id is invalid/private.")

    # shots_data should be dict with 'h' (home) and 'a' (away)
    all_shots = []
    for side_key, side_label in (("h", "home"), ("a", "away")):
        for shot in shots_data.get(side_key, []):
            item = dict(shot)
            item["team_side"] = side_label
            all_shots.append(item)
    return all_shots

def shots_to_dataframe(shots):
    """Convert Understat shots list into a clean DataFrame with common fields."""
    records = []
    for s in shots:
        records.append({
            "match_id": s.get("match_id"),
            "shot_id": s.get("id"),
            "team": s.get("team") or s.get("h_team") or s.get("a_team"),
            "team_side": s.get("team_side"),
            "player": s.get("player"),
            "minute": s.get("minute"),
            "second": s.get("second"),
            "result": s.get("result"),
            "situation": s.get("situation"),
            "shotType": s.get("shotType"),
            "x": s.get("x") or s.get("X"),
            "y": s.get("y") or s.get("Y"),
            "xG": float(s.get("xG")) if s.get("xG") not in (None, "") else None,
            "player_assisted": s.get("player_assisted"),
            "lastAction": s.get("lastAction"),
        })
    df = pd.DataFrame.from_records(records)
    if not df.empty:
        df = df.sort_values(by=["minute", "second", "team_side", "player"], na_position="last").reset_index(drop=True)
    return df

# 🔢 Enter your match ID here
match_id = 28780  # <-- CHANGE THIS (example ID). Use an Understat match ID.

# Fetch shots and save to CSV
shots = fetch_understat_match_shots(match_id)
df = shots_to_dataframe(shots)

csv_path = f"match_{match_id}_xg_events.csv"
df.to_csv(csv_path, index=False)
print(f"Saved {len(df)} xG/shot events to: {csv_path}")

# Show first few rows
df.head(10)

RuntimeError: shotsData not found. The page structure may have changed, or the match_id is invalid/private.

In [3]:
# ------------------------------------------------------------
# NOTE (read me):
# This script prints every EPL match ID (with the fixture text)
# in a given date window of the 2025/26 season, AND for each ID
# saves a CSV of all xG chances (shots) in that match.
#
# ✅ What you edit:
#   - Change START_DATE and END_DATE (YYYY-MM-DD).
#   - Optionally change LEAGUE and SEASON.
#   - Optionally change OUT_DIR (where the per-match CSVs go).
#
# Output:
#   - Console: lines like  ID 28778 — Liverpool v Bournemouth (2025-08-15 19:00)
#   - Files:   <OUT_DIR>/match_28778_shots.csv  (one file per match)
# ------------------------------------------------------------

import os
import re
import json
import requests
import pandas as pd

# ======== EDIT THESE ========
LEAGUE = "EPL"
SEASON = 2025           # Understat "2025" = 2025/26 season
START_DATE = "2025-08-15"
END_DATE   = "2025-08-18"
OUT_DIR    = "data/understat/shots"   # folder to save per-match CSVs
# ============================

BASE = "https://understat.com"
S = requests.Session()
S.headers.update({
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Accept-Language": "en-US,en;q=0.9",
    "Referer": "https://www.google.com/",
})

# ---------- small helpers ----------
def _decode_js_escaped(s: str) -> str:
    """Decode \xNN / \uXXXX style escapes from JSON.parse('...') payloads."""
    return bytes(s, "utf-8").decode("unicode_escape")

def _extract_array(text: str) -> str:
    """Quote/escape-aware bracket scan that returns the JSON array substring."""
    i = text.find("[")
    if i == -1:
        raise RuntimeError("No '[' found in decoded datesData.")
    out, depth, esc, in_str, quote = [], 0, False, False, ""
    j = i
    while j < len(text):
        ch = text[j]
        out.append(ch)
        if in_str:
            if esc:
                esc = False
            elif ch == "\\":
                esc = True
            elif ch == quote:
                in_str = False
        else:
            if ch in ('"', "'"):
                in_str = True; quote = ch
            elif ch == "[":
                depth += 1
            elif ch == "]":
                depth -= 1
                if depth == 0:
                    break
        j += 1
    if depth != 0:
        raise RuntimeError("Unbalanced brackets while slicing datesData.")
    return "".join(out)

def _fetch_week_fixtures(league: str, season: int, week: int) -> pd.DataFrame:
    """Return a DataFrame of fixtures for a given week (or empty DF if none)."""
    url = f"{BASE}/league/{league}/{season}?week={week}"
    html = S.get(url, timeout=30)
    html.raise_for_status()
    text = html.text

    # A) datesData = JSON.parse(' ... ');
    m = re.search(r"datesData\s*=\s*JSON\.parse\('(.*?)'\);", text, flags=re.S)
    if m:
        decoded = _decode_js_escaped(m.group(1))
        arr_txt = _extract_array(decoded)
        data = json.loads(arr_txt)
    else:
        # B) Fallback: datesData = [ ... ];
        m2 = re.search(r"datesData\s*=\s*(\[[\s\S]*?\]);", text, flags=re.S)
        if not m2:
            return pd.DataFrame()  # week page without fixtures
        data = json.loads(m2.group(1))

    df = pd.json_normalize(data)

    # kickoff datetime (make tz-naive)
    for col in ("datetime", "date", "time"):
        if col in df.columns:
            df["kickoff"] = pd.to_datetime(df[col], errors="coerce")
            break
    else:
        df["kickoff"] = pd.NaT
    df["kickoff"] = df["kickoff"].dt.tz_localize(None)

    # Normalize a few common fields
    if "id" in df.columns:
        df["match_id"] = df["id"].astype("Int64")
    if "h.title" in df.columns:
        df.rename(columns={"h.title": "home_team", "a.title": "away_team"}, inplace=True)

    return df[["match_id", "home_team", "away_team", "kickoff"]].copy()

def _collect_fixtures_in_window(league: str, season: int, start_date: str, end_date: str) -> pd.DataFrame:
    """Scan a handful of weeks and return fixtures filtered+deduped by match_id."""
    start = pd.to_datetime(start_date)
    end_excl = pd.to_datetime(end_date) + pd.Timedelta(days=1)  # inclusive end
    all_rows = []

    for week in range(1, 6):  # early season window typically within first few weeks
        df = _fetch_week_fixtures(league, season, week)
        if df.empty:
            continue
        all_rows.append(df)
        wk_min = df["kickoff"].min()
        if pd.notna(wk_min) and wk_min > end_excl + pd.Timedelta(days=7):
            break

    if not all_rows:
        return pd.DataFrame(columns=["match_id", "home_team", "away_team", "kickoff"])

    fixtures = pd.concat(all_rows, ignore_index=True)
    in_window = fixtures[(fixtures["kickoff"] >= start) & (fixtures["kickoff"] < end_excl)]
    in_window = in_window.drop_duplicates(subset=["match_id"]).sort_values("kickoff").reset_index(drop=True)
    return in_window

# ---------- robust shots extractor ----------
def _find_in_obj(obj, key="shotsData"):
    """Recursively search dict/list for a key; return value or None."""
    if isinstance(obj, dict):
        if key in obj:
            return obj[key]
        for v in obj.values():
            found = _find_in_obj(v, key)
            if found is not None:
                return found
    elif isinstance(obj, list):
        for it in obj:
            found = _find_in_obj(it, key)
            if found is not None:
                return found
    return None

def _fetch_match_shots(match_id: int) -> pd.DataFrame:
    """Fetch shots (xG events) for a single match_id and return a DataFrame."""
    url = f"{BASE}/match/{match_id}"
    html = S.get(url, timeout=30)
    html.raise_for_status()
    text = html.text

    shots = None

    # A) shotsData = JSON.parse(' ... ');
    m = re.search(r"shotsData\s*=\s*JSON\.parse\('(.*?)'\);", text, flags=re.S)
    if m:
        decoded = _decode_js_escaped(m.group(1))
        # For shotsData the root is an object: {"h":[...], "a":[...]}
        # Safely slice the JSON object:
        # find first { and its matching }
        t = decoded
        i = t.find("{")
        if i == -1:
            raise RuntimeError("Couldn't find '{' in shotsData payload.")
        out, depth, esc, in_str, quote = [], 0, False, False, ""
        j = i
        while j < len(t):
            ch = t[j]
            out.append(ch)
            if in_str:
                if esc:
                    esc = False
                elif ch == "\\":
                    esc = True
                elif ch == quote:
                    in_str = False
            else:
                if ch in ('"', "'"):
                    in_str = True; quote = ch
                elif ch == "{":
                    depth += 1
                elif ch == "}":
                    depth -= 1
                    if depth == 0:
                        break
            j += 1
        obj_txt = "".join(out)
        shots = json.loads(obj_txt)

    # B) window.__NUXT__ = {...}; recurse to 'shotsData'
    if shots is None:
        m2 = re.search(r"window\.__NUXT__\s*=\s*({[\s\S]*?});", text, flags=re.S)
        if m2:
            try:
                nuxt = json.loads(m2.group(1))
                shots = _find_in_obj(nuxt, key="shotsData")
            except Exception:
                shots = None

    # C) Fallback: shotsData = {...};
    if shots is None:
        m3 = re.search(r"shotsData\s*=\s*({[\s\S]*?});", text, flags=re.S)
        if m3:
            shots = json.loads(m3.group(1))

    if shots is None:
        raise RuntimeError(f"Could not extract shots for match {match_id}")

    # Normalize into rows
    rows = []
    for side in ("h", "a"):
        for ev in shots.get(side, []):
            rows.append({
                "match_id": match_id,
                "side": side,
                "team": ev.get("team"),
                "player": ev.get("player"),
                "player_id": ev.get("player_id"),
                "minute": int(ev.get("minute", 0)) if ev.get("minute") is not None else None,
                "second": int(ev.get("second", 0)) if ev.get("second") is not None else None,
                "xG": float(ev.get("xG", 0.0)) if ev.get("xG") is not None else None,
                "result": ev.get("result"),
                "situation": ev.get("situation"),
                "shotType": ev.get("shotType"),
                "X": float(ev.get("X")) if ev.get("X") is not None else None,
                "Y": float(ev.get("Y")) if ev.get("Y") is not None else None,
                "assist": ev.get("assist"),
                "assist_id": ev.get("assist_id"),
                "h_a": ev.get("h_a"),
                "lastAction": ev.get("lastAction"),
            })
    df = pd.DataFrame(rows).sort_values(["minute", "second"], na_position="last").reset_index(drop=True)
    return df

def _ensure_dir(p: str):
    if p and not os.path.isdir(p):
        os.makedirs(p, exist_ok=True)

# ---------- main ----------
def main():
    _ensure_dir(OUT_DIR)

    win = _collect_fixtures_in_window(LEAGUE, SEASON, START_DATE, END_DATE)
    if win.empty:
        print(f"No fixtures found between {START_DATE} and {END_DATE}.")
        return

    for _, r in win.iterrows():
        mid  = int(r["match_id"])
        home = r.get("home_team", "")
        away = r.get("away_team", "")
        ko   = r["kickoff"].strftime("%Y-%m-%d %H:%M")
        print(f"ID {mid} — {home} v {away} ({ko})")

        # Fetch shots and save CSV per match
        try:
            shots_df = _fetch_match_shots(mid)
            out_path = os.path.join(OUT_DIR, f"match_{mid}_shots.csv")
            shots_df.to_csv(out_path, index=False, encoding="utf-8-sig")
        except Exception as e:
            print(f"  △ Could not save shots for {mid}: {e}")
        else:
            print(f"  ✓ Saved {len(shots_df)} shots -> {out_path}")

# Run
if __name__ == "__main__":
    main()

SyntaxError: (unicode error) 'unicodeescape' codec can't decode bytes in position 7-8: truncated \xXX escape (3129987837.py, line 41)