In [1]:
from __future__ import annotations

import warnings
from pathlib import Path
from typing import Any

import polars as pl
import psutil

warnings.filterwarnings("ignore")

In [2]:
#config
DATA_DIR = Path(r"C:\Users\Solomon\OneDrive\Desktop\OMSA\SPRING 2026\Practicum\data")
STATSBOMB_DIR = DATA_DIR / "Statsbomb"
WIDTH = 80
TOP_N = 10

# Memory tracking
_process = psutil.Process()
_peak_memory_mb = 0.0


def get_memory_mb() -> float:
    """Get current process memory in MB (RSS - Resident Set Size)."""
    return _process.memory_info().rss / 1024**2


def update_peak() -> float:
    """Update and return peak memory."""
    global _peak_memory_mb
    current = get_memory_mb()
    _peak_memory_mb = max(_peak_memory_mb, current)
    return current


def header(title: str) -> None:
    print("\n" + "=" * WIDTH + f"\n  {title}\n" + "=" * WIDTH)


def mem_report() -> str:
    """Return current and peak memory usage."""
    current = update_peak()
    return f"Memory: {current:.1f} MB (peak: {_peak_memory_mb:.1f} MB)"


def sub(title: str) -> None:
    print(f"\n--- {title} ---")


def dist(lf: pl.LazyFrame, col: str, n: int = TOP_N) -> pl.DataFrame:
    """Print and return distribution for a column."""
    r = (
        lf.group_by(col)
        .agg(pl.len().alias("count"))
        .sort("count", descending=True)
        .head(n)
        .collect()
    )
    print(r)
    return r


def desc(lf: pl.LazyFrame, col: str) -> None:
    """Print describe stats for a column."""
    print(lf.select(col).collect()[col].describe())


def top(lf: pl.LazyFrame, cols: list[str], sort_col: str, n: int = TOP_N) -> None:
    """Print top N rows sorted by a column."""
    print(lf.select(cols).sort(sort_col, descending=True).head(n).collect())


def safe_run(func, name: str) -> dict[str, Any] | None:
    """Run analysis with error handling."""
    try:
        return func()
    except FileNotFoundError:
        print(f"\n[SKIP] {name}: File not found")
    except Exception as e:
        print(f"\n[ERROR] {name}: {e}")
    return None


In [3]:
#set up polar dataframes
tables = {
    "events": pl.scan_parquet(STATSBOMB_DIR / "events.parquet"),
    "lineups": pl.scan_parquet(STATSBOMB_DIR / "lineups.parquet"),
    "matches": pl.scan_parquet(STATSBOMB_DIR / "matches.parquet"),
    "360": pl.scan_parquet(STATSBOMB_DIR / "three_sixty.parquet"),
}

events = tables["events"]
lineups = tables["lineups"]
matches = tables["matches"]
ts = tables["360"]


### Direction Normalization

In [4]:
#Logic:
##Understand team orientation per match; which team attacks which direction?
##Ususally home team attacks left->right (0->120), away team attacks right->left
##Create team orientation lookup
##Use match data to determine home/away, then set attacking direction

#Get unique teams per match with their home/away status
team_orientation = (
    matches
    .select([
        "match_id",
        "home_team_id",
        "home_team",  
        "away_team_id",
        "away_team"   
    ])
    .collect()
)

#Create a lookup for each team's attacking direction in each match
#Home team: attacks --> goal @ x=120 (left to right)
# Away team: attacks --> goal @ x=0 (right to left)

#one row per team per match
home_teams = (
    team_orientation
    .select([
        "match_id",
        pl.col("home_team_id").alias("team_id"),
        pl.col("home_team").alias("team_name"),  
        pl.lit(1).alias("attacking_direction")  
    ])
)

away_teams = (
    team_orientation
    .select([
        "match_id",
        pl.col("away_team_id").alias("team_id"),
        pl.col("away_team").alias("team_name"),  
        pl.lit(-1).alias("attacking_direction")  
    ])
)

team_direction_lookup = pl.concat([home_teams, away_teams])

print("\nSample of direction mapping:")
print(team_direction_lookup.head(5))


Sample of direction mapping:
shape: (5, 4)
┌──────────┬─────────┬────────────────────────┬─────────────────────┐
│ match_id ┆ team_id ┆ team_name              ┆ attacking_direction │
│ ---      ┆ ---     ┆ ---                    ┆ ---                 │
│ i32      ┆ i32     ┆ str                    ┆ i32                 │
╞══════════╪═════════╪════════════════════════╪═════════════════════╡
│ 9880     ┆ 217     ┆ Barcelona              ┆ 1                   │
│ 9912     ┆ 219     ┆ RC Deportivo La Coruña ┆ 1                   │
│ 9924     ┆ 217     ┆ Barcelona              ┆ 1                   │
│ 9855     ┆ 217     ┆ Barcelona              ┆ 1                   │
│ 9827     ┆ 208     ┆ Las Palmas             ┆ 1                   │
└──────────┴─────────┴────────────────────────┴─────────────────────┘


### Event Processing

In [5]:
#core events dataset
events_core = (
    events
    .select([
        "match_id",
        "possession",
        "possession_team_id",
        "team",
        "team_id",  
        "period",   
        "type",
        "minute",
        "second",
        "location_x",
        "location_y",
        "pass_length",
        "pass_angle",
        "pass_cross",
        "pass_through_ball",
        "pass_switch",
        "shot_statsbomb_xg"
    ])
    .with_columns(
        (pl.col("minute") * 60 + pl.col("second")).alias("event_time_sec")
    )
)

print("Core events created")

Core events created


### Join w Direction Info

In [6]:
#convert to LazyFrame for joining
team_direction_lazy = pl.LazyFrame(team_direction_lookup)

events_with_direction = (
    events_core
    .join(
        team_direction_lazy,
        left_on=["match_id", "team_id"],
        right_on=["match_id", "team_id"],
        how="left"
    )
)

#locations normalized based on attacking direction and period:
#Period 1: use attacking_direction as-is
#Period 2: flip the direction (teams switch sides)

events_normalized = (
    events_with_direction
    .with_columns([
        # Determine effective attacking direction for this period
        pl.when(pl.col("period") == 1)
        .then(pl.col("attacking_direction"))
        .when(pl.col("period") == 2)
        .then(pl.col("attacking_direction") * -1)  # Flip for second half
        .otherwise(pl.col("attacking_direction"))  # Handle extra time if needed
        .alias("effective_direction"),
        
        # Normalize X coordinate so forward is always positive
        # If attacking toward x=120: normalized_x = x (no change)
        # If attacking toward x=0: normalized_x = 120 - x (flip)
        pl.when(
            (pl.col("period") == 1) & (pl.col("attacking_direction") == 1)
        ).then(pl.col("location_x"))
        .when(
            (pl.col("period") == 1) & (pl.col("attacking_direction") == -1)
        ).then(120 - pl.col("location_x"))
        .when(
            (pl.col("period") == 2) & (pl.col("attacking_direction") == 1)
        ).then(120 - pl.col("location_x"))  # Flipped because switched sides
        .when(
            (pl.col("period") == 2) & (pl.col("attacking_direction") == -1)
        ).then(pl.col("location_x"))  # Flipped because switched sides
        .otherwise(pl.col("location_x"))
        .alias("normalized_x")
    ])
)

print("Direction normalization applied")

Direction normalization applied


### Possession agg

In [7]:
possessions_base = (
    events_normalized
    .group_by([
        "match_id",
        "possession",
        "possession_team_id"
    ])
    .agg([
        pl.count().alias("n_events"),
        pl.min("event_time_sec").alias("possession_start_time"),
        pl.max("event_time_sec").alias("possession_end_time"),
        pl.first("team").alias("team"),
        pl.first("period").alias("period")
    ])
    .with_columns(
        (pl.col("possession_end_time") - pl.col("possession_start_time"))
        .alias("duration_seconds")
    )
)

print("Base possession metrics created.")

events_normalized_sorted = events_normalized.sort(
    ["match_id", "possession", "event_time_sec"]
)

possession_locations = (
    events_normalized_sorted
    .filter(pl.col("normalized_x").is_not_null())
    .group_by([
        "match_id",
        "possession",
        "possession_team_id"
    ])
    .agg([
        pl.first("normalized_x").alias("start_x"),  # Now guaranteed to be first event in time
        pl.first("location_y").alias("start_y"),
        pl.last("normalized_x").alias("end_x"),    # Now guaranteed to be last event in time
        pl.last("location_y").alias("end_y")
    ])
)

possessions = possessions_base.join(
    possession_locations,
    on=["match_id", "possession", "possession_team_id"],
    how="left"
)

possessions = possessions.with_columns([
    (pl.col("end_x") - pl.col("start_x")).alias("forward_progression"),
    # Also calculate absolute for comparison/validation
    (pl.col("end_x") - pl.col("start_x")).abs().alias("total_distance")
])

print("progression calculated using normalized coordinates.")

Base possession metrics created.
progression calculated using normalized coordinates.


### Sanity check: did normalization work?

In [9]:
validation = possessions.select([
    pl.mean("forward_progression").alias("mean_forward_progression"),
    pl.min("forward_progression").alias("min_forward_progression"),
    pl.max("forward_progression").alias("max_forward_progression"),
    pl.mean("start_x").alias("mean_start_x"),
    pl.mean("end_x").alias("mean_end_x"),
]).collect()

print(validation)
print("If normalization worked:")
print("--mean_forward_progression should be slightly positive")
print("--Both teams should show similar patterns")

#addtl. validation: Check if home/away have similar mean progressions
team_progression = (
    possessions
    .join(
        pl.LazyFrame(team_direction_lookup),
        left_on=["match_id", "possession_team_id"],
        right_on=["match_id", "team_id"],  # Key mismatch fix
        how="left"
    )
    .group_by("attacking_direction")
    .agg([
        pl.count().alias("n_possessions"),
        pl.mean("forward_progression").alias("mean_progression"),
    ])
    .collect()
)

print("\nProgression by team type (home=1, away=-1):")
print(team_progression)

home_prog = team_progression.filter(pl.col("attacking_direction") == 1)["mean_progression"][0] if len(team_progression.filter(pl.col("attacking_direction") == 1)) > 0 else None
away_prog = team_progression.filter(pl.col("attacking_direction") == -1)["mean_progression"][0] if len(team_progression.filter(pl.col("attacking_direction") == -1)) > 0 else None

if home_prog is not None and away_prog is not None:
    diff = abs(home_prog - away_prog)
    if diff > 5:
        print(f"\nOOPS: Large difference between home ({home_prog:.2f}) and away ({away_prog:.2f}) progression")
        print("Attacking direction assumption may be incorrect; consider validating with shot locations or manual inspection.")
    else:
        print(f"\nHome/away progression difference is acceptable ({diff:.2f} yards)")
        print("Direction normalization assumption appears valid.")


shape: (1, 5)
┌──────────────────────────┬─────────────────────────┬─────────────────────────┬──────────────┬────────────┐
│ mean_forward_progression ┆ min_forward_progression ┆ max_forward_progression ┆ mean_start_x ┆ mean_end_x │
│ ---                      ┆ ---                     ┆ ---                     ┆ ---          ┆ ---        │
│ f32                      ┆ f32                     ┆ f32                     ┆ f32          ┆ f32        │
╞══════════════════════════╪═════════════════════════╪═════════════════════════╪══════════════╪════════════╡
│ 0.040853                 ┆ -119.300003             ┆ 120.300003              ┆ 59.961128    ┆ 60.00198   │
└──────────────────────────┴─────────────────────────┴─────────────────────────┴──────────────┴────────────┘
If normalization worked:
--mean_forward_progression should be slightly positive
--Both teams should show similar patterns

Progression by team type (home=1, away=-1):
shape: (2, 3)
┌─────────────────────┬───────────────┬───


### Addt'l features


In [10]:
possessions = possessions.with_columns([
    #field zones
    (pl.col("end_x") >= 80).alias("ends_in_final_third"),
    (pl.col("end_x") >= 102).alias("ends_in_penalty_box"),
    #starting zone
    (pl.col("start_x") <= 40).alias("starts_in_defensive_third"),
    (pl.col("start_x").is_between(40, 80)).alias("starts_in_middle_third"),
    #progression "intensity"
    (pl.col("forward_progression") >= 20).alias("high_progression"),
    (pl.col("forward_progression") < 0).alias("regressive"),
])

#crossing info
crosses_by_possession = (
    events_normalized_sorted 
    .filter(pl.col("pass_cross") == True)
    .group_by(["match_id", "possession", "possession_team_id"])
    .agg(pl.lit(True).alias("has_cross"))
)

possessions = possessions.join(
    crosses_by_possession,
    on=["match_id", "possession", "possession_team_id"],
    how="left"
).with_columns(
    pl.col("has_cross").fill_null(False)
)

print("tactical features added")

tactical features added


### Classifying Possessions (the fun stuff)

In [11]:
possessions = possessions.with_columns(
    
    #fast counter-attack: quick, direct, forward
    pl.when(
        (pl.col("duration_seconds") <= 12) &          
        (pl.col("n_events") <= 10) &                  
        (pl.col("forward_progression") >= 40) &   
        (pl.col("forward_progression") > 0)   
    ).then(pl.lit("Fast counter-attack"))

    
    #sustained buildup: "patient" possession  
    .when(
        (pl.col("duration_seconds") >= 30) &
        (pl.col("n_events") >= 20) &
        (pl.col("forward_progression") < 25) &       
        (pl.col("forward_progression") > -10)        
    ).then(pl.lit("Sustained buildup"))

    .when(
        (pl.col("duration_seconds").is_between(12, 30)) &  # Changed from (8, 30)
        (pl.col("forward_progression") >= 20)
    ).then(pl.lit("Transitional attack"))

    
    #wide/crossing play: any possession with cross
    .when(
        pl.col("has_cross") == True
    ).then(pl.lit("Wide / crossing"))


    #regressive possession: moving backward
    .when(
        pl.col("forward_progression") < -5
    ).then(pl.lit("Regressive / defensive"))

    .otherwise(pl.lit("Standard circulation"))
    .alias("possession_type")
)

print("Possession types classified")

Possession types classified


### Possession type distribution

In [12]:
type_dist = (
    possessions
    .group_by("possession_type")
    .agg([
        pl.count().alias("n_possessions"),
        pl.mean("forward_progression").alias("mean_forward_prog"),
        pl.mean("duration_seconds").alias("mean_duration"),
        pl.mean("n_events").alias("mean_events")
    ])
    .sort("n_possessions", descending=True)
    .collect()
)

print(type_dist)

shape: (6, 5)
┌────────────────────────┬───────────────┬───────────────────┬───────────────┬─────────────┐
│ possession_type        ┆ n_possessions ┆ mean_forward_prog ┆ mean_duration ┆ mean_events │
│ ---                    ┆ ---           ┆ ---               ┆ ---           ┆ ---         │
│ str                    ┆ u32           ┆ f32               ┆ f64           ┆ f64         │
╞════════════════════════╪═══════════════╪═══════════════════╪═══════════════╪═════════════╡
│ Regressive / defensive ┆ 249922        ┆ -41.51144         ┆ 19.874613     ┆ 17.83821    │
│ Standard circulation   ┆ 229171        ┆ 20.101048         ┆ 17.297568     ┆ 14.091246   │
│ Transitional attack    ┆ 82188         ┆ 56.667809         ┆ 19.398574     ┆ 20.464934   │
│ Wide / crossing        ┆ 57157         ┆ -13.61142         ┆ 32.846685     ┆ 30.533111   │
│ Fast counter-attack    ┆ 30790         ┆ 60.021141         ┆ 6.193764      ┆ 5.896038    │
│ Sustained buildup      ┆ 18669         ┆ 7.374985     

### Integrating xG

In [13]:
possession_xg = (
    events_normalized_sorted 
    .filter(pl.col("shot_statsbomb_xg").is_not_null())
    .group_by(["match_id", "possession", "possession_team_id"])
    .agg(
        pl.sum("shot_statsbomb_xg").alias("total_xg"),
        pl.count().alias("n_shots")
    )
)

possessions_with_xg = (
    possessions
    .join(
        possession_xg,
        on=["match_id", "possession", "possession_team_id"],
        how="left"
    )
    .with_columns([
        pl.col("total_xg").fill_null(0),
        pl.col("n_shots").fill_null(0),
    ])
    .with_columns([
        (pl.col("n_shots") > 0).alias("has_shot")
    ])
)

print("xG data integrated")

xG data integrated


### xG by possession type

In [14]:
xg_by_type = (
    possessions_with_xg
    .group_by("possession_type")
    .agg([
        pl.count().alias("n_possessions"),
        pl.mean("has_shot").alias("shot_rate"),
        pl.mean("total_xg").alias("mean_xg_per_possession"),
        pl.sum("total_xg").alias("total_xg"),
        #calculate mean xG only for possessions w/ shots
        pl.col("total_xg")
        .filter(pl.col("has_shot"))
        .mean()
        .alias("mean_xg_given_shot")
    ])
    .sort("mean_xg_per_possession", descending=True)
    .collect()
)

print(xg_by_type)

shape: (6, 6)
┌────────────────────────┬───────────────┬───────────┬────────────────────────┬─────────────┬────────────────────┐
│ possession_type        ┆ n_possessions ┆ shot_rate ┆ mean_xg_per_possession ┆ total_xg    ┆ mean_xg_given_shot │
│ ---                    ┆ ---           ┆ ---       ┆ ---                    ┆ ---         ┆ ---                │
│ str                    ┆ u32           ┆ f64       ┆ f32                    ┆ f32         ┆ f32                │
╞════════════════════════╪═══════════════╪═══════════╪════════════════════════╪═════════════╪════════════════════╡
│ Wide / crossing        ┆ 57157         ┆ 0.31158   ┆ 0.041768               ┆ 2387.359863 ┆ 0.134054           │
│ Sustained buildup      ┆ 18669         ┆ 0.141679  ┆ 0.014815               ┆ 276.577667  ┆ 0.104566           │
│ Transitional attack    ┆ 82188         ┆ 0.139327  ┆ 0.01465                ┆ 1204.039185 ┆ 0.105147           │
│ Regressive / defensive ┆ 249922        ┆ 0.099371  ┆ 0.011534   

### Summary stats

In [None]:
overall_stats = possessions_with_xg.select([
    pl.count().alias("total_possessions"),
    pl.mean("duration_seconds").alias("mean_duration_sec"),
    pl.mean("n_events").alias("mean_events_per_possession"),
    pl.mean("forward_progression").alias("mean_forward_progression"),
    pl.mean("has_shot").alias("pct_possessions_with_shot"),
    pl.mean("total_xg").alias("mean_xg_per_possession"),
]).collect()

print(overall_stats)

### Exports

In [15]:
#saving possessions info
output_path = STATSBOMB_DIR / "possessions.parquet"
possessions_with_xg.collect().write_parquet(output_path)
print(f"Saved possessions info to: {output_path}")

#save summary statistics
summary_path = STATSBOMB_DIR / "possession_type_summary.parquet"
type_dist_with_xg = (
    possessions_with_xg
    .group_by("possession_type")
    .agg([
        pl.count().alias("n_possessions"),
        pl.mean("forward_progression").alias("mean_forward_prog"),
        pl.mean("duration_seconds").alias("mean_duration"),
        pl.mean("n_events").alias("mean_events"),
        pl.mean("has_shot").alias("shot_rate"),
        pl.mean("total_xg").alias("mean_xg"),
    ])
    .sort("mean_xg", descending=True)
    .collect()
)
type_dist_with_xg.write_parquet(summary_path)
print(f"Saved possession type summary to: {summary_path}")

Saved possessions info to: C:\Users\Solomon\OneDrive\Desktop\OMSA\SPRING 2026\Practicum\data\Statsbomb\possessions.parquet
Saved possession type summary to: C:\Users\Solomon\OneDrive\Desktop\OMSA\SPRING 2026\Practicum\data\Statsbomb\possession_type_summary.parquet
