# Ultra-Marathon Analytics – Data Transformation Pipeline
**Project:** Ultra-Marathon Analytics Dashboard (PRD v3.0)  
**Dataset:** TWO_CENTURIES_OF_UM_RACES.csv (~7M raw records → ~4.6M clean, 1996–2022)  
**Pipeline stages:**
1. Config
2. Spark session
3. Ingest & clean raw CSV
4. Deduplication
5. Gender / null filtering
6. EDA
7. Star-schema export (dim + fact tables)
8. Pre-aggregated tables for Power BI

> **Path convention** – all file I/O is driven by `BASE_DIR` in the config cell below.  
> Change that one variable to move the pipeline to any machine or cloud storage location.

## 0 · Configuration
Set `BASE_DIR` once; every path is derived from it.

In [None]:
import os

# ── Change this to your local or cloud storage root ─────────────────────────
BASE_DIR = r"C:/Users/youse/Dropbox/PC/Downloads/usa_ultra_marathon50km_mil_data"
RAW_CSV  = r"C:/Users/youse/Dropbox/PC/Downloads/archive/TWO_CENTURIES_OF_UM_RACES.csv"

# ── Derived paths (no need to edit below) ───────────────────────────────────
PATHS = {
    "raw_clean":    os.path.join(BASE_DIR, "ultra_marathon_clean_final.parquet"),
    "deduped":      os.path.join(BASE_DIR, "ultra_marathon_clean_final_withoutduplicates.parquet"),
    "final":        os.path.join(BASE_DIR, "fourmillion_fully_transformed_data.parquet"),
    "dim_athletes": os.path.join(BASE_DIR, "dim_athletes.csv"),
    "dim_events":   os.path.join(BASE_DIR, "dim_events.csv"),
    "dim_date":     os.path.join(BASE_DIR, "dim_date.csv"),
    "dim_distance": os.path.join(BASE_DIR, "dim_distance.csv"),
    "dim_gender":   os.path.join(BASE_DIR, "dim_gender.csv"),
    "fact_races":   os.path.join(BASE_DIR, "fact_races.parquet"),
    "agg_yearly":   os.path.join(BASE_DIR, "agg_yearly.csv"),
    "agg_country":  os.path.join(BASE_DIR, "agg_country.csv"),
    "agg_seasonal": os.path.join(BASE_DIR, "agg_seasonality.csv"),
    "agg_age":      os.path.join(BASE_DIR, "agg_age_performance.csv"),
    "agg_elite":    os.path.join(BASE_DIR, "agg_elite_vs_avg.csv"),
}

os.makedirs(BASE_DIR, exist_ok=True)
print("Config loaded. BASE_DIR:", BASE_DIR)

## 1 · Spark Session
Requires PySpark ≥ 4.1.1 (see `requirements.txt`).

In [None]:
try:
    spark.stop()
except Exception:
    pass

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = (
    SparkSession.builder
    .appName("UltraMarathonCleaning")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "4g")
    .config("spark.sql.shuffle.partitions", "100")
    .config("spark.driver.maxResultSize", "4g")
    .config("spark.network.timeout", "600s")
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .getOrCreate()
)

print(f"PySpark version: {spark.version}")
print("Spark session ready.")

## 2 · Ingest & Clean
**Steps performed:**
- Load raw CSV, normalise column names (lowercase, underscores)
- Strip BOM / invisible characters from `event_name`
- Parse event dates (`dd.MM.yyyy`), distances (km & mi → km), and performance times (supports multi-day races)
- Validate `birth_year` (1900–2005) and derive `athlete_age`
- Validate `avg_speed_kmh` (physical ceiling: 0–25 km/h)
- Filter: year 1950–2023, ages 16–90, finishers > 0, performance not null

> **Dataset scope note:** The raw source covers 1950–2022 historically.  
> Usable race data starts from **1996** — pre-1996 coverage is too sparse after filtering.  
> See PRD §6 Known Limitations for context.

In [None]:
# ── Ingest ──────────────────────────────────────────────────────────────────
df = spark.read.csv(RAW_CSV, header=True, inferSchema=True)
print(f"Raw rows loaded: {df.count():,}")

# ── Normalise column names ───────────────────────────────────────────────────
new_cols = [c.lower().replace(" ", "_").replace("/", "_").replace("-", "_") for c in df.columns]
df_clean = df.toDF(*new_cols)

# ── Transformations ─────────────────────────────────────────────────────────
df_clean = (
    df_clean
    # Strip BOM / zero-width characters
    .withColumn("event_name",     regexp_replace(col("event_name"), r"^[\ufeff\u200b]+", ""))
    .withColumn("athlete_country", upper(trim(col("athlete_country"))))
    .withColumn("athlete_gender",  upper(trim(col("athlete_gender"))))

    # ── Event date ────────────────────────────────────────────────────────────
    .withColumn("event_date_str",
                regexp_extract(col("event_dates"), r"(\d{2}\.\d{2}\.\d{4})", 1))
    .withColumn("event_start_date",
                try_to_date(col("event_date_str"), "dd.MM.yyyy"))

    # ── Distance (km and mi) ─────────────────────────────────────────────────
    .withColumn("distance_value",
        when(regexp_extract(col("event_distance_length"), r"(\d+\.?\d*)", 1) != "",
             regexp_extract(col("event_distance_length"), r"(\d+\.?\d*)", 1).cast(DoubleType()))
        .otherwise(None))
    .withColumn("distance_unit",
        when(regexp_extract(col("event_distance_length"), r"\d+\.?\d*\s*([a-zA-Z]+)", 1) != "",
             lower(regexp_extract(col("event_distance_length"), r"\d+\.?\d*\s*([a-zA-Z]+)", 1)))
        .otherwise(None))
    .withColumn("distance_km",
        when(col("distance_unit").isin("mi", "miles"), col("distance_value") * 1.60934)
        .when(col("distance_unit") == "km", col("distance_value"))
        .otherwise(None))

    # ── Performance time (supports multi-day: e.g. '2d 14:22:00') ────────────
    .withColumn("perf_days",
        when(regexp_extract(col("athlete_performance"), r"(\d+)d", 1) != "",
             regexp_extract(col("athlete_performance"), r"(\d+)d", 1).cast(IntegerType()))
        .otherwise(lit(0)))
    .withColumn("perf_time",
                regexp_extract(col("athlete_performance"), r"(\d{1,2}:\d{2}:\d{2})", 1))
    .withColumn("performance_seconds",
        when(col("perf_time") != "",
             col("perf_days") * 86400
             + split(col("perf_time"), ":").getItem(0).cast(IntegerType()) * 3600
             + split(col("perf_time"), ":").getItem(1).cast(IntegerType()) * 60
             + split(col("perf_time"), ":").getItem(2).cast(IntegerType()))
        .otherwise(None))
    .withColumn("performance_hours", round(col("performance_seconds") / 3600, 2))

    # ── Athlete age ───────────────────────────────────────────────────────────
    .withColumn("birth_year",
        when((col("athlete_year_of_birth") >= 1900) & (col("athlete_year_of_birth") <= 2005),
             col("athlete_year_of_birth").cast(IntegerType()))
        .otherwise(None))
    .withColumn("athlete_age",
        when(col("birth_year").isNotNull(), col("year_of_event") - col("birth_year"))
        .otherwise(None))

    # ── Speed (physical ceiling 25 km/h) ─────────────────────────────────────
    .withColumn("avg_speed_kmh",
        when(col("athlete_average_speed").rlike(r"^\d+\.?\d*$"),
             col("athlete_average_speed").cast(DoubleType()))
        .otherwise(None))
    .withColumn("avg_speed_kmh",
        when((col("avg_speed_kmh") > 0) & (col("avg_speed_kmh") < 25), col("avg_speed_kmh"))
        .otherwise(None))
)

# ── Quality filters ──────────────────────────────────────────────────────────
df_filtered = (
    df_clean
    .filter(col("year_of_event").between(1950, 2023))
    .filter((col("athlete_age").isNull()) | col("athlete_age").between(16, 90))
    .filter(col("event_number_of_finishers") > 0)
    .filter(col("performance_seconds").isNotNull())
    .filter(col("performance_seconds") > 0)
)

# ── Select final columns ─────────────────────────────────────────────────────
df_stage1 = df_filtered.select(
    "year_of_event", "event_start_date", "event_name",
    col("event_distance_length").alias("distance_original"),
    "distance_km", "distance_unit", "event_number_of_finishers",
    "athlete_id", "athlete_gender", "athlete_country",
    "birth_year", "athlete_age", "athlete_age_category", "athlete_club",
    col("athlete_performance").alias("performance_original"),
    "performance_seconds", "performance_hours", "avg_speed_kmh"
)

# ── Save checkpoint ──────────────────────────────────────────────────────────
# Using toPandas().to_parquet() to avoid Hadoop temp-file permission errors on Windows
df_stage1.toPandas().to_parquet(PATHS["raw_clean"])
print(f"Stage 1 saved → {PATHS['raw_clean']}")

## 3 · Deduplication
Remove duplicate athlete–event pairs (one row per `athlete_id` + `event_name`).

> Timing systems sometimes record a runner twice if chip splits were captured separately.  
> Deduplication keeps the first occurrence, which aligns with the 4.6M figure in the PRD.

In [None]:
df = spark.read.parquet(PATHS["raw_clean"])
print(f"Before dedup: {df.count():,} rows")

df_dedup = df.dropDuplicates(["athlete_id", "event_name"])
print(f"After dedup:  {df_dedup.count():,} rows")

df_dedup.toPandas().to_parquet(PATHS["deduped"])
print(f"Saved → {PATHS['deduped']}")

## 4 · Gender & Null Filtering
- **Drop `athlete_gender = 'X'`** – non-binary entries represent < 0.1 % of records  
  and are excluded to maintain consistent M/F comparisons across all dashboard visuals.
- **Drop null critical columns** – `distance_km`, `avg_speed_kmh`, `performance_seconds`.  
  Rows missing any of these three cannot contribute to any performance metric.

In [None]:
from pyspark.sql.functions import col, sum as _sum

df = spark.read.parquet(PATHS["deduped"])

# ── Drop 'X' gender ──────────────────────────────────────────────────────────
df = df.filter(col("athlete_gender") != "X")
print(f"After dropping X gender: {df.count():,} rows")
df.select("athlete_gender").distinct().show()

# ── Null audit ───────────────────────────────────────────────────────────────
audit_cols = ["athlete_gender", "athlete_age", "athlete_country",
              "athlete_club", "distance_km", "avg_speed_kmh"]
df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in audit_cols]).show()

# ── Drop rows missing critical measures ──────────────────────────────────────
df_clean_final = df.dropna(subset=["distance_km", "avg_speed_kmh", "performance_seconds"])
print(f"After null drop: {df_clean_final.count():,} rows")

# ── Select and save final clean dataset ─────────────────────────────────────
dff_spark = df_clean_final.select(
    "athlete_id", "event_name", "distance_km", "avg_speed_kmh",
    "athlete_age", "athlete_gender", "athlete_country",
    "year_of_event", "event_start_date",
    "performance_seconds", "athlete_club",
    "performance_hours", "event_number_of_finishers"
)
dff_spark.toPandas().to_parquet(PATHS["final"])
print(f"Final dataset saved → {PATHS['final']}")

## 5 · Exploratory Data Analysis (EDA)
Switch to Pandas from here — the heavy lifting is done.

**All helper columns are created once in this setup cell** and reused across every chart,  
eliminating the late-definition bug in the original notebook where `dist_category` was  
referenced before it was created in some cells.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("whitegrid")

# ── Load final dataset ───────────────────────────────────────────────────────
dff = pd.read_parquet(PATHS["final"])
dff["event_start_date"] = pd.to_datetime(dff["event_start_date"])

# ── Month columns ─────────────────────────────────────────────────────────────
dff["month"]      = dff["event_start_date"].dt.month_name()   # e.g. 'June'
dff["race_month"] = dff["event_start_date"].dt.month          # numeric 1-12

# ── Canonical distance bins (Big 4) ──────────────────────────────────────────
def bin_distance(dist):
    """Map continuous distance_km to the four canonical ultra distances."""
    if   48  <= dist <= 52:  return "50km"
    elif 78  <= dist <= 82:  return "50mi (80km)"
    elif 98  <= dist <= 102: return "100km"
    elif 158 <= dist <= 165: return "100mi (160km)"
    return "Other"

dff["dist_category"] = dff["distance_km"].apply(bin_distance)

# ── Elite tier  (PRD §5 – 90th percentile per gender × distance) ─────────────
# Elite = top-10% speed within each gender AND distance group.
# This mirrors the methodology in agg_elite_vs_avg powering the Performance Lab.
dff["speed_90th"] = (
    dff.groupby(["athlete_gender", "distance_km"])["avg_speed_kmh"]
    .transform(lambda x: x.quantile(0.90))
)
dff["athlete_tier"] = np.where(
    dff["avg_speed_kmh"] >= dff["speed_90th"], "Elite", "Average"
)

print(f"Loaded {len(dff):,} rows | {dff['year_of_event'].min()}–{dff['year_of_event'].max()}")
print("Gender:",  dff["athlete_gender"].value_counts().to_dict())
print("Tier:",    dff["athlete_tier"].value_counts().to_dict())

### 5.1 · Overview Dashboard – Age, Speed, Seasonality, Pace Decay

In [None]:
MONTH_ORDER = ["January","February","March","April","May","June",
               "July","August","September","October","November","December"]
MAIN_DIST   = ["50km", "50mi (80km)", "100km", "100mi (160km)"]

fig, axes = plt.subplots(2, 2, figsize=(18, 12))
fig.suptitle("Ultra-Marathon EDA Dashboard: Who, When, and How Fast?", fontsize=20)

# Age distribution by gender
sns.histplot(data=dff, x="athlete_age", hue="athlete_gender", bins=30, kde=True,
             palette={"M": "#3498db", "F": "#e74c3c"}, ax=axes[0, 0])
axes[0, 0].set_title("Distribution of Runner Ages", fontsize=14)
axes[0, 0].set_xlim(18, 80)

# Speed distribution
mean_speed = dff["avg_speed_kmh"].mean()
sns.histplot(data=dff, x="avg_speed_kmh", bins=30, color="green", kde=True, ax=axes[0, 1])
axes[0, 1].axvline(mean_speed, color="red", linestyle="--",
                   label=f"Mean: {mean_speed:.1f} km/h")
axes[0, 1].set_title("Distribution of Average Speed", fontsize=14)
axes[0, 1].legend()

# Seasonality
sns.countplot(data=dff, x="month", order=MONTH_ORDER, palette="viridis", ax=axes[1, 0])
axes[1, 0].set_title("Seasonality: Which Months Are Most Popular?", fontsize=14)
axes[1, 0].tick_params(axis="x", rotation=45)

# Pace decay
subset_dist = dff[dff["dist_category"].isin(MAIN_DIST)]
sns.boxplot(data=subset_dist, x="dist_category", y="avg_speed_kmh",
            order=MAIN_DIST, palette="magma", ax=axes[1, 1])
axes[1, 1].set_title("Pace Decay: How Speed Drops with Distance", fontsize=14)

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

### 5.2 · Elite vs Average – Age Profile & Pace Decay

In [None]:
palette = {"Elite": "#e74c3c", "Average": "#95a5a6"}
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Age profile
sns.kdeplot(data=dff, x="athlete_age", hue="athlete_tier", fill=True,
            palette=palette, alpha=0.3, ax=axes[0])
axes[0].set_title("Age Profile: Do You Have to Be Young to Be Elite?", fontsize=14)
axes[0].set_xlim(18, 70)

# Pace decay by tier
subset_dist = dff[dff["dist_category"].isin(MAIN_DIST)]
sns.pointplot(data=subset_dist, x="dist_category", y="avg_speed_kmh", hue="athlete_tier",
              order=MAIN_DIST, palette=palette,
              markers=["o", "x"], linestyles=["-", "--"], ax=axes[1])
axes[1].set_title("Pace Decay – Elite vs Average", fontsize=14)

plt.tight_layout()
plt.show()

# Elite threshold summary
for gender in ["M", "F"]:
    threshold = dff[dff["athlete_gender"] == gender]["avg_speed_kmh"].quantile(0.90)
    label = "Male" if gender == "M" else "Female"
    print(f"Elite threshold ({label}): > {threshold:.2f} km/h  (90th percentile)")

### 5.3 · The Ultra-Running Boom (1996–2022)

In [None]:
growth = (
    dff[dff["year_of_event"] >= 1996]
    .groupby("year_of_event")
    .agg(Total_Finishers=("athlete_id", "count"),
         Unique_Events=("event_name", "nunique"))
    .sort_index()
)

fig, ax1 = plt.subplots(figsize=(14, 6))
ax1.bar(growth.index, growth["Total_Finishers"], alpha=0.6, color="royalblue")
ax1.set_ylabel("Number of Finishers", color="royalblue", fontsize=12)
ax1.tick_params(axis="y", labelcolor="royalblue")
ax1.set_xlabel("Year", fontsize=12)

ax2 = ax1.twinx()
ax2.plot(growth.index, growth["Unique_Events"],
         color="crimson", marker="o", linewidth=2)
ax2.set_ylabel("Number of Races", color="crimson", fontsize=12)
ax2.tick_params(axis="y", labelcolor="crimson")

# COVID-19 annotation
ax1.axvline(2020, color="black", linestyle=":", alpha=0.6)
ax1.text(2020.2, ax1.get_ylim()[1] * 0.9, "COVID-19", fontsize=9)

plt.title("Ultra-Marathon Growth (1996–2022)", fontsize=16, pad=20)
plt.tight_layout()
plt.show()

### 5.4 · Rise of Female Ultra Runners

In [None]:
gender_growth = (
    dff[dff["year_of_event"] >= 1996]
    .groupby(["year_of_event", "athlete_gender"])
    .size().reset_index(name="count")
)
gender_pivot = (
    gender_growth
    .pivot(index="year_of_event", columns="athlete_gender", values="count")
    .fillna(0)
)
gender_pivot["Total"]      = gender_pivot["M"] + gender_pivot["F"]
gender_pivot["Female_Pct"] = gender_pivot["F"] / gender_pivot["Total"] * 100

fig, ax1 = plt.subplots(figsize=(14, 7))
sns.lineplot(data=gender_growth, x="year_of_event", y="count", hue="athlete_gender",
             palette={"M": "#3498db", "F": "#e74c3c"}, linewidth=2.5, ax=ax1)
ax1.set_ylabel("Number of Runners", fontsize=12)
ax1.legend(title="Gender", loc="upper left")

ax2 = ax1.twinx()
ax2.fill_between(gender_pivot.index, gender_pivot["Female_Pct"],
                 color="#e74c3c", alpha=0.15)
ax2.plot(gender_pivot.index, gender_pivot["Female_Pct"],
         color="#e74c3c", linestyle="--", linewidth=1.5, alpha=0.6)
ax2.set_ylabel("Female Participation (%)", color="#c0392b", fontsize=12)
ax2.set_ylim(0, 40)
ax2.tick_params(axis="y", labelcolor="#c0392b")

latest = gender_pivot.index.max()
ax2.text(latest + 0.3, gender_pivot.loc[latest, "Female_Pct"],
         f"{gender_pivot.loc[latest, 'Female_Pct']:.1f}%",
         color="#c0392b", fontweight="bold")

plt.title("Rise of Female Ultra Runners (1996–2022)", fontsize=16, pad=20)
plt.show()

### 5.5 · Peak Age Curve (50 km races)

In [None]:
subset_50 = dff[
    (dff["distance_km"] == 50)
    & (dff["athlete_age"].between(18, 70))
    & (dff["athlete_gender"].isin(["M", "F"]))
]

plt.figure(figsize=(12, 6))
sns.lineplot(data=subset_50, x="athlete_age", y="avg_speed_kmh",
             hue="athlete_gender", palette=["#1f77b4", "#ff7f0e"])
plt.title("At What Age Are Ultra Runners Fastest? (50 km Races)", fontsize=16)
plt.xlabel("Athlete Age")
plt.ylabel("Average Speed (km/h)")
plt.grid(True, alpha=0.3)
plt.show()

## 6 · Star-Schema Export
Generates the **V1 Analytics-First Model** tables consumed by Power BI.

| Table | Grain | Power BI relationship |
|---|---|---|
| `dim_athletes` | One row per athlete | `athlete_id` → `fact_races` |
| `dim_events` | One row per event–distance | `event_name` → `fact_races` |
| `dim_date` | One row per calendar day (1950–2022) | `full_date` → `fact_races.event_start_date` |
| `dim_distance` | Four canonical distances | `distance_km` → `fact_races` |
| `dim_gender` | M / F | `athlete_gender` → `dim_athletes` |
| `fact_races` | One row per athlete performance | central fact |

In [None]:
# ── dim_athletes ──────────────────────────────────────────────────────────────
# Note: birth_year is self-reported and may contain errors (PRD §6)
dim_athletes = dff[["athlete_id", "athlete_gender", "athlete_country"]].drop_duplicates()
dim_athletes.to_csv(PATHS["dim_athletes"], index=False)
print(f"dim_athletes : {dim_athletes.shape}")

# ── dim_events ────────────────────────────────────────────────────────────────
# Note: distance_km is the reported distance; actual course lengths vary by terrain (PRD §6)
dim_events = dff[["event_name", "distance_km"]].drop_duplicates()
dim_events.to_csv(PATHS["dim_events"], index=False)
print(f"dim_events   : {dim_events.shape}")

# ── dim_date (full calendar 1950–2022) ───────────────────────────────────────
date_range = pd.date_range(start="1950-01-01", end="2022-12-31", freq="D")
dim_date = pd.DataFrame({"full_date": date_range})
dim_date["year"]         = dim_date["full_date"].dt.year
dim_date["quarter"]      = dim_date["full_date"].dt.quarter
dim_date["month"]        = dim_date["full_date"].dt.month
dim_date["month_name"]   = dim_date["full_date"].dt.month_name()
dim_date["day_of_week"]  = dim_date["full_date"].dt.dayofweek + 1  # 1=Mon, 7=Sun (ISO)
dim_date["week_of_year"] = dim_date["full_date"].dt.isocalendar().week
dim_date.to_csv(PATHS["dim_date"], index=False)
print(f"dim_date     : {dim_date.shape}")

# ── dim_distance (Big 4) ──────────────────────────────────────────────────────
dim_distance = pd.DataFrame([
    (50.0,    "50km",  "Short Ultra", "Standard"),
    (80.467,  "50mi",  "Short Ultra", "Standard"),
    (100.0,   "100km", "Long Ultra",  "Standard"),
    (160.934, "100mi", "Long Ultra",  "Standard"),
], columns=["distance_km", "distance_name", "distance_category", "distance_type"])
dim_distance.to_csv(PATHS["dim_distance"], index=False)
print(f"dim_distance : {dim_distance.shape}")

# ── dim_gender ────────────────────────────────────────────────────────────────
dim_gender = pd.DataFrame(
    [("M", "Male"), ("F", "Female")],
    columns=["athlete_gender", "gender_full_name"]
)
dim_gender.to_csv(PATHS["dim_gender"], index=False)
print(f"dim_gender   : {dim_gender.shape}")

# ── fact_races ────────────────────────────────────────────────────────────────
fact_races = dff[[
    "athlete_id", "event_name", "event_start_date", "distance_km",
    "performance_seconds", "avg_speed_kmh", "athlete_age"
]].drop_duplicates()
fact_races.to_parquet(PATHS["fact_races"], index=False)
print(f"fact_races   : {fact_races.shape}")

## 7 · Pre-Aggregated Tables for Power BI
Pre-computing these aggregations in Python ensures Power BI loads within  
the **< 3 s** target defined in PRD §4 Success Metrics.

| Table | Powers visual |
|---|---|
| `agg_yearly` | Global Growth Hub – dual-axis growth chart |
| `agg_country` | Global Growth Hub – top countries bar chart |
| `agg_seasonality` | Performance Lab – seasonality heatmap |
| `agg_age_performance` | Performance Lab – Peak Age Curve |
| `agg_elite_vs_avg` | Performance Lab – Pace Decay & Pain Cave Benchmarks |

In [None]:
# ── AGG 1: Yearly ─────────────────────────────────────────────────────────────
agg_yearly = (
    dff.groupby(["year_of_event", "athlete_gender", "distance_km"])
    .agg(total_finishers=("athlete_id", "count"),
         total_races=("event_name", "nunique"),
         avg_speed=("avg_speed_kmh", "mean"),
         avg_age=("athlete_age", "mean"))
    .reset_index()
    .sort_values("year_of_event")
)
agg_yearly.to_csv(PATHS["agg_yearly"], index=False)
print(f"agg_yearly        : {agg_yearly.shape}")

# ── AGG 2: Country ────────────────────────────────────────────────────────────
agg_country = (
    dff.groupby(["athlete_country", "year_of_event"])
    .agg(total_finishers=("athlete_id", "count"))
    .reset_index()
    .sort_values("total_finishers", ascending=False)
)
agg_country.to_csv(PATHS["agg_country"], index=False)
print(f"agg_country       : {agg_country.shape}")

# ── AGG 3: Monthly Seasonality ────────────────────────────────────────────────
agg_seasonality = (
    dff.groupby(["race_month", "distance_km"])
    .agg(race_count=("athlete_id", "count"))
    .reset_index()
    .sort_values("race_month")
)
agg_seasonality.to_csv(PATHS["agg_seasonal"], index=False)
print(f"agg_seasonality   : {agg_seasonality.shape}")

# ── AGG 4: Age Performance (n > 10 filter for statistical significance) ────────
agg_age = (
    dff.groupby(["athlete_age", "athlete_gender", "distance_km"])
    .agg(avg_speed=("avg_speed_kmh", "mean"),
         sample_size=("athlete_id", "count"))
    .reset_index()
    .query("sample_size > 10")
)
agg_age.to_csv(PATHS["agg_age"], index=False)
print(f"agg_age_perf      : {agg_age.shape}")

# ── AGG 5: Elite vs Average (PRD §5 – 90th percentile per gender × distance) ──
agg_elite = (
    dff.groupby(["distance_km", "athlete_gender", "athlete_tier"])
    .agg(avg_speed=("avg_speed_kmh", "mean"),
         total_athletes=("athlete_id", "count"))
    .reset_index()
)
agg_elite.to_csv(PATHS["agg_elite"], index=False)
print(f"agg_elite_vs_avg  : {agg_elite.shape}")

print("\n✅ All pre-aggregated tables saved successfully.")

## Appendix · Memory Optimisation Reference
Apply these dtype casts to the final Pandas DataFrame before export if RAM is constrained.  
Reduces fact-table in-memory footprint by ~40 %.

| Column | Original | Optimised | Reason |
|---|---|---|---|
| `athlete_id` | int64 | int32 | IDs ≤ 2 B |
| `year_of_event` | int64 | int16 | Years ≤ 32 K |
| `athlete_age` | float64 | int8 | Ages ≤ 127; nulls → 0 |
| `distance_km` | float64 | float32 | Precision preserved |
| `avg_speed_kmh` | float64 | float32 | Precision preserved |
| `performance_seconds` | int64 | int32 | Handles up to ~68 years |
| `athlete_gender` | object | category | 2 unique values |
| `athlete_country` | object | category | ~100 unique values |

```python
import numpy as np
df_final = dff.copy()
df_final["athlete_id"]          = df_final["athlete_id"].astype("int32")
df_final["year_of_event"]       = df_final["year_of_event"].astype("int16")
df_final["athlete_age"]         = df_final["athlete_age"].fillna(0).astype("int8")
df_final["distance_km"]         = df_final["distance_km"].astype("float32")
df_final["avg_speed_kmh"]       = df_final["avg_speed_kmh"].astype("float32")
df_final["performance_seconds"] = df_final["performance_seconds"].astype("int32")
df_final["athlete_gender"]      = df_final["athlete_gender"].astype("category")
df_final["athlete_country"]     = df_final["athlete_country"].astype("category")
```