# Notebook 01: ETL & Data Unification

**Purpose:** Load all 5 triathlon data sources, normalize schemas, engineer derived features, and output unified CSVs.

**Inputs:**
- `research/data/scraped/kaggle/Half_Ironman_df6.csv` (840K rows, times in seconds)
- `research/data/scraped/kaggle/postgres_public_tristat_stat.csv` (2.96M rows, times in HH:MM:SS)
- `research/data/scraped/kaggle/results.csv` (1.1M rows, CoachCox, times in HH:MM:SS)
- `research/data/scraped/kaggle/races.csv` + `series.csv` (event metadata)
- `research/data/scraped/t100/*.csv` (~500 rows, times in seconds)
- `research/data/scraped/wiki/*.json` (49 files, times in seconds)

**Outputs:**
- `research/data/cleaned/athlete_race.csv` (~4.8M rows)
- `research/data/cleaned/athlete_profile.csv` (~500K unique athletes)


In [3]:
! pip install -r ../requirements.txt

Collecting numpy>=1.24 (from -r ../requirements.txt (line 1))
  Downloading numpy-2.4.2-cp311-cp311-macosx_14_0_arm64.whl.metadata (6.6 kB)
Collecting pandas>=2.0 (from -r ../requirements.txt (line 2))
  Downloading pandas-3.0.0-cp311-cp311-macosx_11_0_arm64.whl.metadata (79 kB)
Collecting scipy>=1.11 (from -r ../requirements.txt (line 3))
  Downloading scipy-1.17.0-cp311-cp311-macosx_14_0_arm64.whl.metadata (62 kB)
Collecting scikit-learn>=1.3 (from -r ../requirements.txt (line 4))
  Downloading scikit_learn-1.8.0-cp311-cp311-macosx_12_0_arm64.whl.metadata (11 kB)
Collecting joblib>=1.3.0 (from scikit-learn>=1.3->-r ../requirements.txt (line 4))
  Downloading joblib-1.5.3-py3-none-any.whl.metadata (5.5 kB)
Collecting threadpoolctl>=3.2.0 (from scikit-learn>=1.3->-r ../requirements.txt (line 4))
  Downloading threadpoolctl-3.6.0-py3-none-any.whl.metadata (13 kB)
Downloading numpy-2.4.2-cp311-cp311-macosx_14_0_arm64.whl (5.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m 

## 1. Setup & Utility Functions

In [4]:
import pandas as pd
import numpy as np
import hashlib
import json
import os
import re
import glob
import warnings
from pathlib import Path

warnings.filterwarnings('ignore', category=pd.errors.DtypeWarning)
pd.set_option('display.max_columns', 40)
pd.set_option('display.width', 200)

# Paths
BASE_DIR = Path('..').resolve()
SCRAPED_DIR = BASE_DIR / 'data' / 'scraped'
CLEANED_DIR = BASE_DIR / 'data' / 'cleaned'
CLEANED_DIR.mkdir(parents=True, exist_ok=True)

print(f"Base directory: {BASE_DIR}")
print(f"Scraped data:  {SCRAPED_DIR}")
print(f"Cleaned output: {CLEANED_DIR}")


Base directory: /Users/mykyta/projects/indie/racedayai/research
Scraped data:  /Users/mykyta/projects/indie/racedayai/research/data/scraped
Cleaned output: /Users/mykyta/projects/indie/racedayai/research/data/cleaned


In [5]:
# ─── Time Conversion ───────────────────────────────────────────────
def hh_mm_ss_to_seconds(time_str):
    """Convert HH:MM:SS or H:MM:SS or M:SS string to seconds.
    Returns NaN for missing/invalid values."""
    if pd.isna(time_str) or str(time_str).strip() in ('', '0:00', '0:00:00', '--', 'DNS', 'DNF', 'DQ'):
        return np.nan
    try:
        time_str = str(time_str).strip()
        parts = time_str.split(':')
        if len(parts) == 3:
            h, m, s = parts
            return int(h) * 3600 + int(m) * 60 + float(s)
        elif len(parts) == 2:
            m, s = parts
            total = int(m) * 60 + float(s)
            # If > 3600 seconds from MM:SS, it's probably HH:MM misinterpreted
            return total
        else:
            return float(time_str)
    except (ValueError, TypeError):
        return np.nan


def normalize_gender(val):
    """Normalize gender to M/F."""
    if pd.isna(val):
        return np.nan
    val = str(val).strip().upper()
    if val in ('M', 'MALE'):
        return 'M'
    elif val in ('F', 'FEMALE'):
        return 'F'
    return np.nan


def extract_age_group(text, default=None):
    """Extract age group like '40-44' from various formats.
    Handles: '40-44', 'M40-44', 'F45-49', 'MAGE40-44', etc."""
    if pd.isna(text):
        return default
    text = str(text).strip()
    match = re.search(r'(\d{2,3})-(\d{2,3})', text)
    if match:
        return f"{match.group(1)}-{match.group(2)}"
    # Handle single age like "70+" or "75+"
    match = re.search(r'(\d{2,3})\+', text)
    if match:
        return f"{match.group(1)}+"
    return default


def extract_age_band(age_group):
    """Extract lower bound of age group as integer. E.g., '40-44' -> 40."""
    if pd.isna(age_group):
        return np.nan
    match = re.search(r'(\d{2,3})', str(age_group))
    if match:
        return int(match.group(1))
    return np.nan


def hash_athlete(name, country, gender):
    """Create a deterministic hash for athlete deduplication."""
    if pd.isna(name) or str(name).strip() == '':
        return None
    key = f"{str(name).strip().lower()}|{str(country).strip().lower()}|{str(gender).strip().upper()}"
    return hashlib.sha256(key.encode()).hexdigest()[:16]


print("Utility functions loaded.")


Utility functions loaded.


In [6]:
# ─── Data Quality Rules (from Plan 07, Appendix A) ─────────────────
QUALITY_RULES = {
    'min_swim_sec': 600,       # 10 min minimum swim
    'max_swim_sec': 7200,      # 2 hours max swim
    'min_bike_sec': 1800,      # 30 min minimum bike
    'max_bike_sec': 36000,     # 10 hours max bike
    'min_run_sec': 900,        # 15 min minimum run
    'max_run_sec': 25200,      # 7 hours max run
    'min_total_sec': 3600,     # 1 hour minimum total
    'max_total_sec': 61200,    # 17 hours maximum total
    'max_transition_sec': 600, # 10 min max per transition
    'sum_tolerance': 120,      # total must equal sum of splits ± 120 sec
    'min_split_pct': 0.02,     # no segment < 2% of total
    'max_split_pct': 0.70,     # no segment > 70% of total
}


def apply_quality_filters(df, label=""):
    """Apply quality rules and return cleaned DataFrame + anomaly count."""
    n_before = len(df)
    mask = pd.Series(True, index=df.index)
    reasons = []

    # Total time bounds
    if 'total_sec' in df.columns:
        bad = (df['total_sec'] < QUALITY_RULES['min_total_sec']) | (df['total_sec'] > QUALITY_RULES['max_total_sec'])
        mask &= ~bad
        reasons.append(f"total_sec bounds: {bad.sum()}")

    # Segment bounds (only check if column exists and is not all NaN)
    for seg, col in [('swim', 'swim_sec'), ('bike', 'bike_sec'), ('run', 'run_sec')]:
        if col in df.columns and df[col].notna().any():
            bad = (df[col].notna()) & ((df[col] < QUALITY_RULES[f'min_{seg}_sec']) | (df[col] > QUALITY_RULES[f'max_{seg}_sec']))
            mask &= ~bad
            reasons.append(f"{col} bounds: {bad.sum()}")

    # Transition bounds
    for col in ['t1_sec', 't2_sec']:
        if col in df.columns and df[col].notna().any():
            bad = (df[col].notna()) & (df[col] > QUALITY_RULES['max_transition_sec'])
            mask &= ~bad
            reasons.append(f"{col} bounds: {bad.sum()}")

    # Sum tolerance: check that splits sum to total
    split_cols = [c for c in ['swim_sec', 't1_sec', 'bike_sec', 't2_sec', 'run_sec'] if c in df.columns]
    if len(split_cols) >= 3 and 'total_sec' in df.columns:
        split_sum = df[split_cols].sum(axis=1)
        # Only check where all splits are non-null
        all_present = df[split_cols].notna().all(axis=1)
        bad = all_present & ((split_sum - df['total_sec']).abs() > QUALITY_RULES['sum_tolerance'])
        mask &= ~bad
        reasons.append(f"sum tolerance: {bad.sum()}")

    # Split percentage bounds (no segment > 70% or < 2%)
    if 'total_sec' in df.columns:
        for col in ['swim_sec', 'bike_sec', 'run_sec']:
            if col in df.columns and df[col].notna().any():
                pct = df[col] / df['total_sec']
                bad = (pct.notna()) & ((pct < QUALITY_RULES['min_split_pct']) | (pct > QUALITY_RULES['max_split_pct']))
                mask &= ~bad
                reasons.append(f"{col} pct bounds: {bad.sum()}")

    # Negative or zero total
    if 'total_sec' in df.columns:
        bad = df['total_sec'] <= 0
        mask &= ~bad
        reasons.append(f"non-positive total: {bad.sum()}")

    df_clean = df[mask].copy()
    n_after = len(df_clean)
    print(f"  [{label}] Quality filter: {n_before:,} → {n_after:,} ({n_before - n_after:,} removed)")
    for r in reasons:
        if not r.endswith(': 0'):
            print(f"    - {r}")
    return df_clean


print("Quality rules loaded.")


Quality rules loaded.


## 2. Load df6 (Kaggle Half-Ironman) — 840K rows

In [7]:
# Load df6 — times are already in seconds
df6_path = SCRAPED_DIR / 'kaggle' / 'Half_Ironman_df6.csv'
df6_raw = pd.read_csv(df6_path)
print(f"df6 loaded: {len(df6_raw):,} rows, {len(df6_raw.columns)} columns")
print(f"Columns: {list(df6_raw.columns)}")
df6_raw.head(3)


df6 loaded: 840,075 rows, 13 columns
Columns: ['Gender', 'AgeGroup', 'AgeBand', 'Country', 'CountryISO2', 'EventYear', 'EventLocation', 'SwimTime', 'Transition1Time', 'BikeTime', 'Transition2Time', 'RunTime', 'FinishTime']


Unnamed: 0,Gender,AgeGroup,AgeBand,Country,CountryISO2,EventYear,EventLocation,SwimTime,Transition1Time,BikeTime,Transition2Time,RunTime,FinishTime
0,M,40-44,40,Andorra,AD,2019,IRONMAN 70.3 South American Championship Bueno...,1679,119,9107,95,5515,16514
1,M,45-49,45,Andorra,AD,2019,IRONMAN 70.3 South American Championship Bueno...,2070,177,9160,132,6070,17609
2,M,45-49,45,Andorra,AD,2020,IRONMAN 70.3 Bariloche,1667,161,9891,122,5190,17031


In [8]:
# Normalize df6 into unified schema
df6 = pd.DataFrame({
    'athlete_hash': None,
    'athlete_name': None,
    'gender': df6_raw['Gender'],
    'age_group': df6_raw['AgeGroup'],
    'age_band': df6_raw['AgeBand'].astype(float),
    'country': df6_raw['Country'],
    'country_iso2': df6_raw['CountryISO2'],
    'is_pro': False,
    'event_name': df6_raw['EventLocation'],  # EventLocation contains the full event name
    'event_year': df6_raw['EventYear'].astype(int),
    'event_location': df6_raw['EventLocation'].str.extract(r'IRONMAN 70\.3 (.+)')[0],
    'event_distance': '70.3',
    'source': 'kaggle_df6',
    'swim_sec': df6_raw['SwimTime'].astype(float),
    't1_sec': df6_raw['Transition1Time'].astype(float),
    'bike_sec': df6_raw['BikeTime'].astype(float),
    't2_sec': df6_raw['Transition2Time'].astype(float),
    'run_sec': df6_raw['RunTime'].astype(float),
    'total_sec': df6_raw['FinishTime'].astype(float),
    'overall_rank': np.nan,
    'gender_rank': np.nan,
    'age_group_rank': np.nan,
    'division_rank': np.nan,
    'finish_status': 'finisher',
})

# Apply quality filters
df6 = apply_quality_filters(df6, label='df6')
print(f"\ndf6 final: {len(df6):,} rows")
print(f"Gender distribution:\n{df6['gender'].value_counts()}")
print(f"\nDistance: {df6['event_distance'].unique()}")
print(f"Year range: {df6['event_year'].min()} - {df6['event_year'].max()}")


  [df6] Quality filter: 840,075 → 840,073 (2 removed)
    - sum tolerance: 1
    - bike_sec pct bounds: 2

df6 final: 840,073 rows
Gender distribution:
gender
M    635678
F    204395
Name: count, dtype: int64

Distance: <StringArray>
['70.3']
Length: 1, dtype: str
Year range: 2004 - 2020


## 3. Load TriStat — 2.96M rows (HH:MM:SS format)

In [9]:
# Load TriStat in chunks to manage memory
tristat_path = SCRAPED_DIR / 'kaggle' / 'postgres_public_tristat_stat.csv'

# Read with low_memory=False for type inference
print("Loading TriStat (this may take a moment)...")
tristat_raw = pd.read_csv(tristat_path, low_memory=False)
print(f"TriStat loaded: {len(tristat_raw):,} rows, {len(tristat_raw.columns)} columns")
print(f"Columns: {list(tristat_raw.columns)}")
tristat_raw.head(3)


Loading TriStat (this may take a moment)...
TriStat loaded: 2,961,502 rows, 12 columns
Columns: ['event_link', 'gender', 'person_link', 'person_flag', 'person_name', 'person_event_group', 'person_event_swim_time_text', 'person_event_t1_time_text', 'person_event_cycle_time_text', 'person_event_t2_time_text', 'person_event_run_time_text', 'person_event_finish_time_text']


Unnamed: 0,event_link,gender,person_link,person_flag,person_name,person_event_group,person_event_swim_time_text,person_event_t1_time_text,person_event_cycle_time_text,person_event_t2_time_text,person_event_run_time_text,person_event_finish_time_text
0,/rus/result/ironman/ireland-cork/full/2019,M,/irl/profile/halliwell-mark,IRL,"Halliwell, Mark",M45-49,0:00,0:00,7:52:05,19:00,5:26:34,13:37:39
1,/rus/result/ironman/ireland-cork/full/2019,F,/usa/profile/harris-polly,USA,"Harris, Polly",F50-54,0:00,0:00,7:58:53,16:37,5:22:09,13:37:40
2,/rus/result/ironman/ireland-cork/full/2019,M,/fra/profile/peugeot-rodolphe,FRA,"Peugeot, Rodolphe",M25-29,0:00,0:00,7:57:05,10:40,5:30:10,13:37:55


In [10]:
# Convert TriStat HH:MM:SS times to seconds
print("Converting TriStat times from HH:MM:SS to seconds...")
time_cols_map = {
    'person_event_swim_time_text': 'swim_sec',
    'person_event_t1_time_text': 't1_sec',
    'person_event_cycle_time_text': 'bike_sec',
    'person_event_t2_time_text': 't2_sec',
    'person_event_run_time_text': 'run_sec',
    'person_event_finish_time_text': 'total_sec',
}

for src_col, dst_col in time_cols_map.items():
    tristat_raw[dst_col] = tristat_raw[src_col].apply(hh_mm_ss_to_seconds)
    n_valid = tristat_raw[dst_col].notna().sum()
    print(f"  {src_col} → {dst_col}: {n_valid:,} valid values ({n_valid/len(tristat_raw)*100:.1f}%)")

# Check how many have valid total_sec
print(f"\nRecords with valid total_sec: {tristat_raw['total_sec'].notna().sum():,}")
print(f"Records with all NaN times: {tristat_raw[list(time_cols_map.values())].isna().all(axis=1).sum():,}")


Converting TriStat times from HH:MM:SS to seconds...
  person_event_swim_time_text → swim_sec: 2,848,108 valid values (96.2%)
  person_event_t1_time_text → t1_sec: 2,618,460 valid values (88.4%)
  person_event_cycle_time_text → bike_sec: 2,895,272 valid values (97.8%)
  person_event_t2_time_text → t2_sec: 2,756,030 valid values (93.1%)
  person_event_run_time_text → run_sec: 2,907,335 valid values (98.2%)
  person_event_finish_time_text → total_sec: 2,961,502 valid values (100.0%)

Records with valid total_sec: 2,961,502
Records with all NaN times: 0


In [11]:
# Parse TriStat metadata
# Extract event info from event_link (e.g., '/rus/result/ironman/ireland-cork/full/2019')
def parse_tristat_event_link(link):
    """Parse event_link to extract distance and event info."""
    if pd.isna(link):
        return '140.6', None, None, None  # default to full distance
    link = str(link).lower()

    # Distance detection
    if '70.3' in link or 'half' in link:
        distance = '70.3'
    elif 'sprint' in link:
        distance = 'sprint'
    elif 'olympic' in link:
        distance = 'olympic'
    else:
        distance = '140.6'  # default full

    # Extract location (between /ironman/ and /full|half/)
    parts = link.strip('/').split('/')
    location = None
    year = None
    for i, p in enumerate(parts):
        if p == 'ironman' and i + 1 < len(parts):
            location = parts[i + 1].replace('-', ' ').title()
        # Year is usually the last numeric part
        if re.match(r'^\d{4}$', p):
            year = int(p)

    event_name = f"Ironman {distance} {location}" if location else None
    return distance, event_name, location, year

# Apply parsing
parsed = tristat_raw['event_link'].apply(parse_tristat_event_link)
tristat_raw['event_distance'] = parsed.apply(lambda x: x[0])
tristat_raw['event_name_parsed'] = parsed.apply(lambda x: x[1])
tristat_raw['event_location_parsed'] = parsed.apply(lambda x: x[2])
tristat_raw['event_year_parsed'] = parsed.apply(lambda x: x[3])

print(f"Distance distribution:\n{tristat_raw['event_distance'].value_counts()}")
print(f"\nYear range: {tristat_raw['event_year_parsed'].min()} - {tristat_raw['event_year_parsed'].max()}")


Distance distribution:
event_distance
70.3       1492594
140.6      1020358
olympic     277907
sprint      170643
Name: count, dtype: int64

Year range: 1983.0 - 2022.0


In [12]:
# Build normalized TriStat DataFrame
tristat = pd.DataFrame({
    'athlete_hash': tristat_raw.apply(
        lambda r: hash_athlete(r['person_name'], r.get('person_flag', ''), r['gender']), axis=1
    ),
    'athlete_name': tristat_raw['person_name'],
    'gender': tristat_raw['gender'].apply(normalize_gender),
    'age_group': tristat_raw['person_event_group'].apply(extract_age_group),
    'age_band': tristat_raw['person_event_group'].apply(extract_age_group).apply(extract_age_band),
    'country': tristat_raw['person_flag'],
    'country_iso2': tristat_raw['person_flag'],  # person_flag is ISO code
    'is_pro': False,  # TriStat is mostly AG
    'event_name': tristat_raw['event_name_parsed'],
    'event_year': tristat_raw['event_year_parsed'],
    'event_location': tristat_raw['event_location_parsed'],
    'event_distance': tristat_raw['event_distance'],
    'source': 'tristat',
    'swim_sec': tristat_raw['swim_sec'],
    't1_sec': tristat_raw['t1_sec'],
    'bike_sec': tristat_raw['bike_sec'],
    't2_sec': tristat_raw['t2_sec'],
    'run_sec': tristat_raw['run_sec'],
    'total_sec': tristat_raw['total_sec'],
    'overall_rank': np.nan,
    'gender_rank': np.nan,
    'age_group_rank': np.nan,
    'division_rank': np.nan,
    'finish_status': 'finisher',
})

# Drop rows where total_sec is NaN (no valid time data at all)
n_before = len(tristat)
tristat = tristat.dropna(subset=['total_sec'])
print(f"TriStat after dropping NaN total: {n_before:,} → {len(tristat):,}")

# Apply quality filters
tristat = apply_quality_filters(tristat, label='tristat')
print(f"\nTriStat final: {len(tristat):,} rows")
print(f"Gender: {tristat['gender'].value_counts().to_dict()}")
print(f"Distances: {tristat['event_distance'].value_counts().to_dict()}")

# Free raw memory
del tristat_raw
import gc; gc.collect()


TriStat after dropping NaN total: 2,961,502 → 2,961,502
  [tristat] Quality filter: 2,961,502 → 2,426,245 (535,257 removed)
    - total_sec bounds: 38953
    - swim_sec bounds: 61421
    - bike_sec bounds: 34878
    - run_sec bounds: 42399
    - t1_sec bounds: 342247
    - t2_sec bounds: 220598
    - sum tolerance: 22772
    - swim_sec pct bounds: 8608
    - bike_sec pct bounds: 5373
    - run_sec pct bounds: 17656

TriStat final: 2,426,245 rows
Gender: {'M': 1855771, 'F': 570469}
Distances: {'70.3': 1363349, '140.6': 690598, 'olympic': 259692, 'sprint': 112606}


0

## 4. Load CoachCox — 1.1M rows (HH:MM:SS format)

In [13]:
# Load CoachCox results + races + series
cc_results_path = SCRAPED_DIR / 'kaggle' / 'results.csv'
cc_races_path = SCRAPED_DIR / 'kaggle' / 'races.csv'
cc_series_path = SCRAPED_DIR / 'kaggle' / 'series.csv'

cc_raw = pd.read_csv(cc_results_path, low_memory=False)
cc_races = pd.read_csv(cc_races_path)
cc_series = pd.read_csv(cc_series_path)

print(f"CoachCox results: {len(cc_raw):,} rows")
print(f"CoachCox races: {len(cc_races):,} rows")
print(f"CoachCox series: {len(cc_series):,} rows")
print(f"\nResults columns: {list(cc_raw.columns)}")
print(f"\nFinish status distribution:\n{cc_raw['finishStatus'].value_counts()}")
cc_raw.head(3)


CoachCox results: 1,096,719 rows
CoachCox races: 587 rows
CoachCox series: 69 rows

Results columns: ['bib', 'Name', 'athleteLink', 'Country', 'Gender', 'Division', 'divLink', 'divisionRank', 'overallTime', 'overallRank', 'swimTime', 'swimRank', 'bikeTime', 'bikeRank', 'runTime', 'runRank', 'finishStatus', 'raceID', 'athleteID']

Finish status distribution:
finishStatus
Finisher    892707
DNF         105522
DNS          95203
DQ            3169
NC             118
Name: count, dtype: int64


Unnamed: 0,bib,Name,athleteLink,Country,Gender,Division,divLink,divisionRank,overallTime,overallRank,swimTime,swimRank,bikeTime,bikeRank,runTime,runRank,finishStatus,raceID,athleteID
0,40,Nils Frommhold,https://www.coachcox.co.uk/imstats/athlete/28963/,Germany,Male,MPRO,,1.0,8:03:13,1.0,48:19,1.0,4:22:45,3.0,2:48:06,1.0,Finisher,1,28963.0
1,58,Paul Matthews,https://www.coachcox.co.uk/imstats/athlete/143...,United States,Male,MPRO,,2.0,8:04:58,2.0,48:27,4.0,4:24:31,7.0,2:48:27,2.0,Finisher,1,143770.0
2,3,Tj Tollakson,https://www.coachcox.co.uk/imstats/athlete/33054/,United States,Male,MPRO,,3.0,8:07:36,3.0,48:34,9.0,4:19:03,2.0,2:56:01,8.0,Finisher,1,33054.0


In [14]:
# Join races and series for event metadata
cc_races_enriched = cc_races.merge(cc_series, left_on='seriesID', right_on='id', suffixes=('_race', '_series'))
print(f"Enriched races: {len(cc_races_enriched):,}")
print(f"Sample location: {cc_races_enriched['location'].head(3).tolist()}")

# Join results with race metadata
cc_raw = cc_raw.merge(
    cc_races_enriched[['id_race', 'year', 'location', 'continent', 'dnf', 'finishers']],
    left_on='raceID', right_on='id_race', how='left'
)
print(f"Results after join: {len(cc_raw):,}")


Enriched races: 587
Sample location: ['Ironman Alaska', 'Ironman Arizona', 'Ironman Arizona']
Results after join: 1,096,719


In [15]:
# Convert CoachCox times
print("Converting CoachCox times from HH:MM:SS to seconds...")
for src_col, dst_col in [('swimTime', 'swim_sec'), ('bikeTime', 'bike_sec'),
                          ('runTime', 'run_sec'), ('overallTime', 'total_sec')]:
    cc_raw[dst_col] = cc_raw[src_col].apply(hh_mm_ss_to_seconds)
    n_valid = cc_raw[dst_col].notna().sum()
    print(f"  {src_col} → {dst_col}: {n_valid:,} valid ({n_valid/len(cc_raw)*100:.1f}%)")

# CoachCox doesn't have T1/T2 separately
cc_raw['t1_sec'] = np.nan
cc_raw['t2_sec'] = np.nan

# Detect pro athletes
cc_raw['is_pro'] = cc_raw['Division'].str.upper().isin(['MPRO', 'FPRO'])
print(f"\nPro athletes: {cc_raw['is_pro'].sum():,} ({cc_raw['is_pro'].mean()*100:.1f}%)")
print(f"Division examples: {cc_raw['Division'].value_counts().head(10).to_dict()}")


Converting CoachCox times from HH:MM:SS to seconds...
  swimTime → swim_sec: 947,570 valid (86.4%)
  bikeTime → bike_sec: 935,797 valid (85.3%)
  runTime → run_sec: 889,711 valid (81.1%)
  overallTime → total_sec: 890,912 valid (81.2%)

Pro athletes: 20,727 (1.9%)
Division examples: {'M40-44': 176419, 'M35-39': 154891, 'M45-49': 146198, 'M30-34': 121919, 'M50-54': 101959, 'M25-29': 66923, 'M55-59': 49929, 'F40-44': 38088, 'F35-39': 34576, 'F30-34': 31687}


In [16]:
# Build normalized CoachCox DataFrame
coachcox = pd.DataFrame({
    'athlete_hash': cc_raw.apply(
        lambda r: hash_athlete(r['Name'], r['Country'], r['Gender']), axis=1
    ),
    'athlete_name': cc_raw['Name'],
    'gender': cc_raw['Gender'].apply(normalize_gender),
    'age_group': cc_raw['Division'].apply(extract_age_group),
    'age_band': cc_raw['Division'].apply(extract_age_group).apply(extract_age_band),
    'country': cc_raw['Country'],
    'country_iso2': np.nan,  # Not available directly
    'is_pro': cc_raw['is_pro'],
    'event_name': cc_raw['location'],
    'event_year': cc_raw['year'],
    'event_location': cc_raw['location'],
    'event_distance': '140.6',  # CoachCox is full Ironman
    'source': 'coachcox',
    'swim_sec': cc_raw['swim_sec'],
    't1_sec': cc_raw['t1_sec'],
    'bike_sec': cc_raw['bike_sec'],
    't2_sec': cc_raw['t2_sec'],
    'run_sec': cc_raw['run_sec'],
    'total_sec': cc_raw['total_sec'],
    'overall_rank': pd.to_numeric(cc_raw['overallRank'], errors='coerce'),
    'gender_rank': np.nan,
    'age_group_rank': np.nan,
    'division_rank': pd.to_numeric(cc_raw['divisionRank'], errors='coerce'),
    'finish_status': cc_raw['finishStatus'].str.lower().str.strip(),
})

# Drop rows without valid total time
n_before = len(coachcox)
coachcox = coachcox.dropna(subset=['total_sec'])
print(f"CoachCox after dropping NaN total: {n_before:,} → {len(coachcox):,}")

# Apply quality filters (only to finishers; DNF/DNS won't have valid times)
cc_finishers = coachcox[coachcox['finish_status'] == 'finisher'].copy()
cc_nonfinishers = coachcox[coachcox['finish_status'] != 'finisher'].copy()

cc_finishers = apply_quality_filters(cc_finishers, label='coachcox_finishers')

# Keep non-finishers (DNF/DNS/DQ) without quality filtering their times
coachcox = pd.concat([cc_finishers, cc_nonfinishers], ignore_index=True)
print(f"\nCoachCox final: {len(coachcox):,} rows")
print(f"  Finishers: {len(cc_finishers):,}, Non-finishers: {len(cc_nonfinishers):,}")
print(f"  Gender: {coachcox['gender'].value_counts().to_dict()}")

del cc_raw, cc_races, cc_series, cc_races_enriched
gc.collect()


CoachCox after dropping NaN total: 1,096,719 → 890,912
  [coachcox_finishers] Quality filter: 890,154 → 864,224 (25,930 removed)
    - total_sec bounds: 350
    - swim_sec bounds: 8655
    - bike_sec bounds: 19
    - run_sec bounds: 16225
    - swim_sec pct bounds: 1489
    - bike_sec pct bounds: 32
    - run_sec pct bounds: 1951

CoachCox final: 864,982 rows
  Finishers: 864,224, Non-finishers: 758
  Gender: {'M': 705117, 'F': 159683}


0

## 5. Load T100 Pro Results — ~500 rows

In [17]:
# Load all T100 CSV files
t100_files = sorted(glob.glob(str(SCRAPED_DIR / 't100' / '*.csv')))
print(f"Found {len(t100_files)} T100 files")

t100_dfs = []
for f in t100_files:
    df = pd.read_csv(f)
    t100_dfs.append(df)
    print(f"  {Path(f).name}: {len(df)} rows")

t100 = pd.concat(t100_dfs, ignore_index=True)
print(f"\nT100 combined: {len(t100):,} rows")

# T100 is already in the target schema, just need minor adjustments
t100_out = pd.DataFrame({
    'athlete_hash': t100.apply(
        lambda r: hash_athlete(r['athlete_name'], r.get('country', ''), r.get('gender', '')), axis=1
    ),
    'athlete_name': t100['athlete_name'],
    'gender': t100['gender'],
    'age_group': t100.get('age_group', pd.Series(dtype=str)),
    'age_band': np.nan,
    'country': t100.get('country', pd.Series(dtype=str)),
    'country_iso2': np.nan,
    'is_pro': True,  # All T100 are pro
    'event_name': t100['event_name'],
    'event_year': t100['event_year'],
    'event_location': t100['event_name'].str.replace(' T100', '').str.replace(' t100', ''),
    'event_distance': t100['event_distance'],
    'source': 't100',
    'swim_sec': t100['swim_sec'].astype(float),
    't1_sec': t100['t1_sec'].astype(float),
    'bike_sec': t100['bike_sec'].astype(float),
    't2_sec': t100['t2_sec'].astype(float),
    'run_sec': t100['run_sec'].astype(float),
    'total_sec': t100['total_sec'].astype(float),
    'overall_rank': pd.to_numeric(t100.get('overall_rank'), errors='coerce'),
    'gender_rank': pd.to_numeric(t100.get('gender_rank'), errors='coerce'),
    'age_group_rank': pd.to_numeric(t100.get('age_group_rank'), errors='coerce'),
    'division_rank': np.nan,
    'finish_status': 'finisher',
})

t100_out = apply_quality_filters(t100_out, label='t100')
print(f"T100 final: {len(t100_out):,} rows")


Found 14 T100 files
  dubai-t100-2024.csv: 36 rows
  dubai-t100-2025.csv: 34 rows
  french-riviera-t100-2025.csv: 38 rows
  ibiza-t100-2024.csv: 37 rows
  london-t100-2024.csv: 38 rows
  london-t100-2025.csv: 34 rows
  miami-t100-2024.csv: 27 rows
  qatar-t100-2025.csv: 44 rows
  san-francisco-t100-2024.csv: 34 rows
  san-francisco-t100-2025.csv: 40 rows
  singapore-t100-2024.csv: 32 rows
  singapore-t100-2025.csv: 32 rows
  spain-t100-2025.csv: 34 rows
  vancouver-t100-2025.csv: 36 rows

T100 combined: 496 rows
  [t100] Quality filter: 496 → 454 (42 removed)
    - sum tolerance: 42
T100 final: 454 rows


## 6. Load Wiki World Championship Results — 49 JSON files

In [18]:
# Load all Wiki JSON files
wiki_files = sorted(glob.glob(str(SCRAPED_DIR / 'wiki' / '*.json')))
print(f"Found {len(wiki_files)} Wiki JSON files")

wiki_records = []
for f in wiki_files:
    with open(f) as fh:
        data = json.load(fh)
    records = data.get('records', [])
    for r in records:
        wiki_records.append(r)
    if records:
        print(f"  {Path(f).name}: {len(records)} records")

wiki_df = pd.DataFrame(wiki_records)
print(f"\nWiki combined: {len(wiki_df):,} rows")
if len(wiki_df) > 0:
    print(f"Columns: {list(wiki_df.columns)}")
    print(f"Sample event_distance values: {wiki_df['event_distance'].unique()[:5]}")


Found 49 Wiki JSON files
  challenge-roth.json: 40 records
  ironman-703-wc-2007.json: 158 records
  ironman-703-wc-2008.json: 194 records
  ironman-703-wc-2009.json: 221 records
  ironman-703-wc-2010.json: 254 records
  ironman-703-wc-2011.json: 248 records
  ironman-703-wc-2012.json: 362 records
  ironman-703-wc-2013.json: 374 records
  ironman-703-wc-2014.json: 20 records
  ironman-703-wc-2015.json: 20 records
  ironman-703-wc-2016.json: 20 records
  ironman-703-wc-2017.json: 20 records
  ironman-hawaii.json: 282 records
  ironman-wc-2005.json: 20 records
  ironman-wc-2006.json: 20 records
  ironman-wc-2007.json: 20 records
  ironman-wc-2008.json: 146 records
  ironman-wc-2009.json: 151 records
  ironman-wc-2010.json: 164 records
  ironman-wc-2011.json: 158 records
  ironman-wc-2012.json: 194 records
  ironman-wc-2013.json: 188 records
  ironman-wc-2014.json: 20 records
  ironman-wc-2015.json: 20 records
  ironman-wc-2016.json: 20 records
  ironman-wc-2017.json: 20 records
  ironman

In [19]:
# Normalize Wiki data
if len(wiki_df) > 0:
    wiki_out = pd.DataFrame({
        'athlete_hash': wiki_df.apply(
            lambda r: hash_athlete(r.get('athlete_name'), r.get('country', ''), r.get('gender', '')), axis=1
        ),
        'athlete_name': wiki_df.get('athlete_name'),
        'gender': wiki_df.get('gender', pd.Series(dtype=str)).apply(
            lambda x: normalize_gender(x) if pd.notna(x) and str(x).strip() else np.nan
        ),
        'age_group': wiki_df.get('age_group', pd.Series(dtype=str)),
        'age_band': np.nan,
        'country': wiki_df.get('country'),
        'country_iso2': np.nan,
        'is_pro': True,  # Wiki results are championship podiums
        'event_name': wiki_df.get('event_name'),
        'event_year': pd.to_numeric(wiki_df.get('event_year'), errors='coerce'),
        'event_location': wiki_df.get('event_name'),
        'event_distance': wiki_df.get('event_distance'),
        'source': 'wiki',
        'swim_sec': pd.to_numeric(wiki_df.get('swim_sec'), errors='coerce'),
        't1_sec': pd.to_numeric(wiki_df.get('t1_sec'), errors='coerce'),
        'bike_sec': pd.to_numeric(wiki_df.get('bike_sec'), errors='coerce'),
        't2_sec': pd.to_numeric(wiki_df.get('t2_sec'), errors='coerce'),
        'run_sec': pd.to_numeric(wiki_df.get('run_sec'), errors='coerce'),
        'total_sec': pd.to_numeric(wiki_df.get('total_sec'), errors='coerce'),
        'overall_rank': pd.to_numeric(wiki_df.get('overall_rank'), errors='coerce'),
        'gender_rank': pd.to_numeric(wiki_df.get('gender_rank'), errors='coerce'),
        'age_group_rank': pd.to_numeric(wiki_df.get('age_group_rank'), errors='coerce'),
        'division_rank': np.nan,
        'finish_status': 'finisher',
    })

    # Drop rows without valid total
    wiki_out = wiki_out.dropna(subset=['total_sec'])
    wiki_out = apply_quality_filters(wiki_out, label='wiki')
    print(f"Wiki final: {len(wiki_out):,} rows")
else:
    wiki_out = pd.DataFrame()
    print("No wiki records found.")


  [wiki] Quality filter: 5,492 → 5,488 (4 removed)
    - total_sec bounds: 4
    - swim_sec pct bounds: 1
    - bike_sec pct bounds: 1
    - run_sec pct bounds: 1
Wiki final: 5,488 rows


## 7. Combine All Sources into Unified DataFrame

In [20]:
# Concatenate all sources
print("Combining all sources...")
all_dfs = [df6, tristat, coachcox, t100_out]
if len(wiki_out) > 0:
    all_dfs.append(wiki_out)

df_unified = pd.concat(all_dfs, ignore_index=True)
print(f"\nCombined: {len(df_unified):,} total rows")
print(f"\nSource breakdown:")
print(df_unified['source'].value_counts())
print(f"\nDistance breakdown:")
print(df_unified['event_distance'].value_counts())
print(f"\nGender breakdown:")
print(df_unified['gender'].value_counts())
print(f"\nPro vs AG: {df_unified['is_pro'].sum():,} pro, {(~df_unified['is_pro']).sum():,} AG")

# Free individual DataFrames
del df6, tristat, coachcox, t100_out, wiki_out
gc.collect()


Combining all sources...

Combined: 4,137,242 total rows

Source breakdown:
source
tristat       2426245
coachcox       864982
kaggle_df6     840073
wiki             5488
t100              454
Name: count, dtype: int64

Distance breakdown:
event_distance
70.3             2205311
140.6            1558492
olympic           260350
sprint            112606
100km                454
olympic-relay         29
Name: count, dtype: int64

Gender breakdown:
gender
M    3196793
F     934774
Name: count, dtype: int64

Pro vs AG: 19,529 pro, 4,117,713 AG


0

## 8. Derived Feature Engineering

In [21]:
# ─── Split Percentages ─────────────────────────────────────────────
print("Computing derived features...")

# Split ratios (as fraction of total)
for seg in ['swim', 'bike', 'run']:
    col = f'{seg}_sec'
    df_unified[f'{seg}_pct'] = np.where(
        (df_unified['total_sec'] > 0) & df_unified[col].notna(),
        df_unified[col] / df_unified['total_sec'],
        np.nan
    )

# Transition percentage
df_unified['transition_pct'] = np.where(
    (df_unified['total_sec'] > 0) & df_unified['t1_sec'].notna() & df_unified['t2_sec'].notna(),
    (df_unified['t1_sec'] + df_unified['t2_sec']) / df_unified['total_sec'],
    np.nan
)

# Bike-run ratio
df_unified['bike_run_ratio'] = np.where(
    (df_unified['run_sec'] > 0) & df_unified['bike_sec'].notna(),
    df_unified['bike_sec'] / df_unified['run_sec'],
    np.nan
)

print("Split percentages computed.")
print(f"  swim_pct median: {df_unified['swim_pct'].median():.3f}")
print(f"  bike_pct median: {df_unified['bike_pct'].median():.3f}")
print(f"  run_pct median:  {df_unified['run_pct'].median():.3f}")


Computing derived features...
Split percentages computed.
  swim_pct median: 0.109
  bike_pct median: 0.506
  run_pct median:  0.360


In [22]:
# ─── Cohort Statistics ─────────────────────────────────────────────
# Compute median/std per cohort (gender × age_group × distance) for AG only
print("Computing cohort statistics...")

# Filter to AG only for cohort computation
ag_mask = ~df_unified['is_pro']
cohort_cols = ['gender', 'age_group', 'event_distance']

cohort_stats = df_unified[ag_mask].groupby(cohort_cols).agg(
    cohort_median_total=('total_sec', 'median'),
    cohort_std_total=('total_sec', 'std'),
    cohort_median_swim=('swim_sec', 'median'),
    cohort_median_bike=('bike_sec', 'median'),
    cohort_median_run=('run_sec', 'median'),
    cohort_count=('total_sec', 'count'),
).reset_index()

# Only keep cohorts with >= 30 samples
cohort_stats = cohort_stats[cohort_stats['cohort_count'] >= 30]
print(f"Cohorts with ≥30 samples: {len(cohort_stats):,}")
print(f"Total athletes in valid cohorts: {cohort_stats['cohort_count'].sum():,}")

# Merge back to main df
df_unified = df_unified.merge(cohort_stats, on=cohort_cols, how='left')
print(f"Records with cohort stats: {df_unified['cohort_median_total'].notna().sum():,}")


Computing cohort statistics...
Cohorts with ≥30 samples: 278
Total athletes in valid cohorts: 3,835,792
Records with cohort stats: 3,835,792


In [23]:
# ─── Fade Ratio & Cohort Percentile ────────────────────────────────
# Fade ratio: actual run / cohort median run (> 1 means slower than typical)
df_unified['fade_ratio'] = np.where(
    (df_unified['cohort_median_run'] > 0) & df_unified['run_sec'].notna(),
    df_unified['run_sec'] / df_unified['cohort_median_run'],
    np.nan
)

# Cohort percentile: rank within cohort (0-100, lower = faster)
print("Computing cohort percentiles...")
df_unified['cohort_percentile'] = np.nan

for (g, ag, d), group in df_unified.groupby(cohort_cols):
    mask = group.index
    valid = group['total_sec'].notna()
    if valid.sum() >= 30:
        ranks = group.loc[valid, 'total_sec'].rank(pct=True)
        df_unified.loc[ranks.index, 'cohort_percentile'] = ranks.values

# Implied bike intensity factor
df_unified['implied_bike_if'] = np.where(
    (df_unified['cohort_median_bike'] > 0) & df_unified['bike_sec'].notna(),
    df_unified['bike_sec'] / df_unified['cohort_median_bike'],
    np.nan
)

print(f"Fade ratio: median={df_unified['fade_ratio'].median():.3f}, std={df_unified['fade_ratio'].std():.3f}")
print(f"Cohort percentile: {df_unified['cohort_percentile'].notna().sum():,} records with percentile")
print(f"Implied bike IF: median={df_unified['implied_bike_if'].median():.3f}")


Computing cohort percentiles...
Fade ratio: median=1.000, std=0.209
Cohort percentile: 3,835,792 records with percentile
Implied bike IF: median=1.000


## 9. Deduplication

In [24]:
# Deduplication strategy:
# 1. Within-source exact duplicates (same athlete + event + time)
# 2. Cross-source: keep all records but flag potential duplicates

n_before = len(df_unified)

# Within-source dedup: drop exact duplicates on key columns
dedup_cols = ['source', 'athlete_name', 'event_name', 'event_year', 'total_sec', 'gender', 'age_group']
# For df6 (no names), dedup on all demographics + event + time
df_unified = df_unified.drop_duplicates(
    subset=[c for c in dedup_cols if c in df_unified.columns],
    keep='first'
)

n_after = len(df_unified)
print(f"Deduplication: {n_before:,} → {n_after:,} ({n_before - n_after:,} duplicates removed)")
print(f"\nFinal source breakdown:")
print(df_unified['source'].value_counts())


Deduplication: 4,137,242 → 4,124,345 (12,897 duplicates removed)

Final source breakdown:
source
tristat       2419629
coachcox       864947
kaggle_df6     833897
wiki             5418
t100              454
Name: count, dtype: int64


## 10. Athlete Profile Aggregation

In [25]:
# Build athlete profiles for athletes with known identity (hash != None)
print("Building athlete profiles...")

# Filter to athletes with hash
named_athletes = df_unified[df_unified['athlete_hash'].notna()].copy()
print(f"Records with athlete identity: {len(named_athletes):,}")
print(f"Unique athletes: {named_athletes['athlete_hash'].nunique():,}")

# Aggregate by athlete_hash
profiles = named_athletes.groupby('athlete_hash').agg(
    athlete_name=('athlete_name', 'first'),
    gender=('gender', 'first'),
    country=('country', 'first'),
    total_races=('total_sec', 'count'),
    first_race_year=('event_year', 'min'),
    latest_race_year=('event_year', 'max'),
    pb_swim_sec=('swim_sec', 'min'),
    pb_bike_sec=('bike_sec', 'min'),
    pb_run_sec=('run_sec', 'min'),
    pb_total_sec=('total_sec', 'min'),
    avg_total_sec=('total_sec', 'mean'),
    std_total_sec=('total_sec', 'std'),
    avg_fade_ratio=('fade_ratio', 'mean'),
    avg_swim_pct=('swim_pct', 'mean'),
    avg_bike_pct=('bike_pct', 'mean'),
    avg_run_pct=('run_pct', 'mean'),
).reset_index()

# Years active
profiles['years_active'] = profiles['latest_race_year'] - profiles['first_race_year'] + 1

# Distances raced (comma-separated list)
dist_agg = named_athletes.groupby('athlete_hash')['event_distance'].apply(
    lambda x: ','.join(sorted(x.dropna().unique()))
).reset_index()
dist_agg.columns = ['athlete_hash', 'distances_raced']
profiles = profiles.merge(dist_agg, on='athlete_hash', how='left')

# Consistency CV (coefficient of variation)
profiles['consistency_cv'] = profiles['std_total_sec'] / profiles['avg_total_sec']
profiles.loc[profiles['total_races'] < 2, 'consistency_cv'] = np.nan

print(f"Profiles built: {len(profiles):,}")
print(f"Races distribution: {profiles['total_races'].describe()}")


Building athlete profiles...
Records with athlete identity: 3,286,137
Unique athletes: 1,629,366
Profiles built: 1,629,366
Races distribution: count    1.629366e+06
mean     2.016819e+00
std      2.639530e+00
min      1.000000e+00
25%      1.000000e+00
50%      1.000000e+00
75%      2.000000e+00
max      4.220000e+02
Name: total_races, dtype: float64


In [26]:
# ─── Improvement Slope (linear fit of total_sec over years) ────────
from scipy import stats as scipy_stats

def compute_improvement_slope(group):
    """Compute linear regression slope of total_sec vs event_year."""
    valid = group.dropna(subset=['event_year', 'total_sec'])
    if len(valid) < 2:
        return np.nan
    years = valid['event_year'].values.astype(float)
    times = valid['total_sec'].values
    if years.std() == 0:
        return 0.0
    slope, _, _, _, _ = scipy_stats.linregress(years, times)
    return slope  # negative = improving

print("Computing improvement slopes (this may take a minute)...")
slopes = named_athletes.groupby('athlete_hash').apply(compute_improvement_slope)
slopes.name = 'improvement_slope'
profiles = profiles.merge(slopes.reset_index(), on='athlete_hash', how='left')
print(f"Improvement slopes computed. Median: {profiles['improvement_slope'].median():.1f} sec/year")


Computing improvement slopes (this may take a minute)...
Improvement slopes computed. Median: 0.0 sec/year


In [27]:
# ─── Discipline Strength Z-Scores ──────────────────────────────────
# For each athlete, compute their avg segment time relative to their cohort

# Get the most common cohort for each athlete
cohort_lookup = named_athletes.groupby('athlete_hash').agg({
    'gender': 'first',
    'age_group': lambda x: x.mode()[0] if len(x.mode()) > 0 else np.nan,
    'event_distance': lambda x: x.mode()[0] if len(x.mode()) > 0 else np.nan,
}).reset_index()

# Merge cohort stats
cohort_lookup = cohort_lookup.merge(
    cohort_stats[['gender', 'age_group', 'event_distance',
                  'cohort_median_swim', 'cohort_median_bike', 'cohort_median_run',
                  'cohort_std_total']],
    on=['gender', 'age_group', 'event_distance'],
    how='left'
)

# Get athlete average times
athlete_avg = named_athletes.groupby('athlete_hash').agg({
    'swim_sec': 'mean',
    'bike_sec': 'mean',
    'run_sec': 'mean',
}).reset_index()
athlete_avg.columns = ['athlete_hash', 'avg_swim', 'avg_bike', 'avg_run']

z_data = cohort_lookup.merge(athlete_avg, on='athlete_hash', how='left')

# Z-scores (negative = faster than cohort median = STRONGER)
z_data['swim_strength_z'] = -(z_data['avg_swim'] - z_data['cohort_median_swim']) / z_data['cohort_std_total'].clip(lower=300)
z_data['bike_strength_z'] = -(z_data['avg_bike'] - z_data['cohort_median_bike']) / z_data['cohort_std_total'].clip(lower=300)
z_data['run_strength_z'] = -(z_data['avg_run'] - z_data['cohort_median_run']) / z_data['cohort_std_total'].clip(lower=300)

# Dominant discipline
def get_dominant(row):
    vals = {'swim': row.get('swim_strength_z', 0), 'bike': row.get('bike_strength_z', 0), 'run': row.get('run_strength_z', 0)}
    if all(pd.isna(v) for v in vals.values()):
        return np.nan
    return max(vals, key=lambda k: vals[k] if pd.notna(vals[k]) else -999)

z_data['dominant_discipline'] = z_data.apply(get_dominant, axis=1)

# Merge z-scores into profiles
profiles = profiles.merge(
    z_data[['athlete_hash', 'swim_strength_z', 'bike_strength_z', 'run_strength_z', 'dominant_discipline']],
    on='athlete_hash', how='left'
)

print("Discipline z-scores computed.")
print(f"Dominant discipline distribution:\n{profiles['dominant_discipline'].value_counts()}")


Discipline z-scores computed.
Dominant discipline distribution:
dominant_discipline
swim    624169
run     460124
bike    455819
Name: count, dtype: int64


In [28]:
# ─── DNF Rate ──────────────────────────────────────────────────────
# Only from CoachCox which has finishStatus
dnf_data = named_athletes[named_athletes['source'] == 'coachcox'].copy()
if len(dnf_data) > 0:
    dnf_agg = dnf_data.groupby('athlete_hash').agg(
        dnf_count=('finish_status', lambda x: (x != 'finisher').sum()),
        total_cc_races=('finish_status', 'count'),
    ).reset_index()
    dnf_agg['dnf_rate'] = dnf_agg['dnf_count'] / dnf_agg['total_cc_races']

    profiles = profiles.merge(dnf_agg[['athlete_hash', 'dnf_count', 'dnf_rate']], on='athlete_hash', how='left')
    profiles['dnf_count'] = profiles['dnf_count'].fillna(0).astype(int)
    profiles['dnf_rate'] = profiles['dnf_rate'].fillna(0)
    print(f"DNF stats added. Athletes with DNF > 0: {(profiles['dnf_count'] > 0).sum():,}")
else:
    profiles['dnf_count'] = 0
    profiles['dnf_rate'] = 0.0

# Clean up temporary columns
profiles = profiles.drop(columns=['avg_total_sec', 'std_total_sec', 'avg_swim_pct', 'avg_bike_pct', 'avg_run_pct'], errors='ignore')

print(f"\nFinal profile shape: {profiles.shape}")
print(f"Profile columns: {list(profiles.columns)}")
profiles.head(3)


DNF stats added. Athletes with DNF > 0: 747

Final profile shape: (1629366, 22)
Profile columns: ['athlete_hash', 'athlete_name', 'gender', 'country', 'total_races', 'first_race_year', 'latest_race_year', 'pb_swim_sec', 'pb_bike_sec', 'pb_run_sec', 'pb_total_sec', 'avg_fade_ratio', 'years_active', 'distances_raced', 'consistency_cv', 'improvement_slope', 'swim_strength_z', 'bike_strength_z', 'run_strength_z', 'dominant_discipline', 'dnf_count', 'dnf_rate']


Unnamed: 0,athlete_hash,athlete_name,gender,country,total_races,first_race_year,latest_race_year,pb_swim_sec,pb_bike_sec,pb_run_sec,pb_total_sec,avg_fade_ratio,years_active,distances_raced,consistency_cv,improvement_slope,swim_strength_z,bike_strength_z,run_strength_z,dominant_discipline,dnf_count,dnf_rate
0,00002270c5ed2e96,Хабиев Данияр,M,RUS,7,2015.0,2021.0,1178.0,2555.0,1597.0,5537.0,1.143262,7.0,"70.3,olympic,sprint",0.516717,-772.81875,-0.371914,-0.853019,-0.604193,swim,0,0.0
1,000028766426829f,Mark Van Der Pol,M,Netherlands,5,2012.0,2024.0,4262.0,21797.0,17721.0,45224.0,1.088875,13.0,140.6,0.043329,201.755102,0.011835,-0.309085,-0.284992,swim,0,0.0
2,00003fdcfe8312ba,"Chipping, James",M,GBR,2,2016.0,2018.0,1921.0,10242.0,9207.0,21980.0,1.202657,3.0,"140.6,70.3",0.466142,10807.0,0.24242,0.8331,0.406317,bike,0,0.0


## 11. Output to CSV

In [29]:
# ─── Save athlete_race.csv ─────────────────────────────────────────
# Select final columns in order
race_columns = [
    'athlete_hash', 'athlete_name', 'gender', 'age_group', 'age_band',
    'country', 'country_iso2', 'is_pro',
    'event_name', 'event_year', 'event_location', 'event_distance', 'source',
    'swim_sec', 't1_sec', 'bike_sec', 't2_sec', 'run_sec', 'total_sec',
    'overall_rank', 'gender_rank', 'age_group_rank', 'division_rank', 'finish_status',
    'swim_pct', 'bike_pct', 'run_pct', 'transition_pct',
    'bike_run_ratio', 'fade_ratio', 'cohort_percentile', 'implied_bike_if',
]

# Keep only columns that exist
race_columns = [c for c in race_columns if c in df_unified.columns]
df_out = df_unified[race_columns].copy()

# Drop cohort helper columns
df_out = df_out.drop(columns=[c for c in df_out.columns if c.startswith('cohort_median_') or c.startswith('cohort_std_') or c == 'cohort_count'], errors='ignore')

output_path = CLEANED_DIR / 'athlete_race.csv'
df_out.to_csv(output_path, index=False)
print(f"Saved athlete_race.csv: {len(df_out):,} rows → {output_path}")
print(f"File size: {output_path.stat().st_size / 1024 / 1024:.1f} MB")


Saved athlete_race.csv: 4,124,345 rows → /Users/mykyta/projects/indie/racedayai/research/data/cleaned/athlete_race.csv
File size: 1201.5 MB


In [30]:
# ─── Save athlete_profile.csv ──────────────────────────────────────
profile_columns = [
    'athlete_hash', 'athlete_name', 'gender', 'country',
    'total_races', 'years_active', 'distances_raced',
    'pb_swim_sec', 'pb_bike_sec', 'pb_run_sec', 'pb_total_sec',
    'first_race_year', 'latest_race_year',
    'improvement_slope', 'consistency_cv',
    'swim_strength_z', 'bike_strength_z', 'run_strength_z',
    'dominant_discipline',
    'dnf_count', 'dnf_rate', 'avg_fade_ratio',
]

profile_columns = [c for c in profile_columns if c in profiles.columns]
profiles_out = profiles[profile_columns].copy()

output_path = CLEANED_DIR / 'athlete_profile.csv'
profiles_out.to_csv(output_path, index=False)
print(f"Saved athlete_profile.csv: {len(profiles_out):,} rows → {output_path}")
print(f"File size: {output_path.stat().st_size / 1024 / 1024:.1f} MB")


Saved athlete_profile.csv: 1,629,366 rows → /Users/mykyta/projects/indie/racedayai/research/data/cleaned/athlete_profile.csv
File size: 305.6 MB


## 12. Summary & QC

In [31]:
# ─── Final Summary ─────────────────────────────────────────────────
print("=" * 70)
print("ETL SUMMARY")
print("=" * 70)
print(f"\nathlete_race.csv:")
print(f"  Total rows:      {len(df_out):,}")
print(f"  Sources:          {df_out['source'].value_counts().to_dict()}")
print(f"  Distances:        {df_out['event_distance'].value_counts().to_dict()}")
print(f"  Gender:           {df_out['gender'].value_counts().to_dict()}")
print(f"  Pro athletes:     {df_out['is_pro'].sum():,} ({df_out['is_pro'].mean()*100:.1f}%)")
print(f"  Year range:       {df_out['event_year'].min():.0f} - {df_out['event_year'].max():.0f}")
print(f"  Finish status:    {df_out['finish_status'].value_counts().to_dict()}")

print(f"\nathlete_profile.csv:")
print(f"  Total athletes:   {len(profiles_out):,}")
print(f"  With 3+ races:    {(profiles_out['total_races'] >= 3).sum():,}")
print(f"  With 5+ races:    {(profiles_out['total_races'] >= 5).sum():,}")
print(f"  Median races:     {profiles_out['total_races'].median():.0f}")
print(f"  Gender:           {profiles_out['gender'].value_counts().to_dict()}")

print(f"\nDerived features (athlete_race.csv):")
for col in ['swim_pct', 'bike_pct', 'run_pct', 'fade_ratio', 'cohort_percentile', 'bike_run_ratio']:
    if col in df_out.columns:
        valid_pct = df_out[col].notna().mean() * 100
        print(f"  {col:25s}: median={df_out[col].median():.3f}, valid={valid_pct:.1f}%")

print(f"\nDerived features (athlete_profile.csv):")
for col in ['improvement_slope', 'consistency_cv', 'swim_strength_z', 'bike_strength_z', 'run_strength_z']:
    if col in profiles_out.columns:
        valid_pct = profiles_out[col].notna().mean() * 100
        print(f"  {col:25s}: median={profiles_out[col].median():.3f}, valid={valid_pct:.1f}%")

print("\n✅ ETL complete. Ready for Notebook 02.")


ETL SUMMARY

athlete_race.csv:
  Total rows:      4,124,345
  Sources:          {'tristat': 2419629, 'coachcox': 864947, 'kaggle_df6': 833897, 'wiki': 5418, 't100': 454}
  Distances:        {'70.3': 2198944, '140.6': 1558226, 'olympic': 257249, 'sprint': 109443, '100km': 454, 'olympic-relay': 29}
  Gender:           {'M': 3188641, 'F': 930099}
  Pro athletes:     19,459 (0.5%)
  Year range:       1978 - 2025
  Finish status:    {'finisher': 4123587, 'dq': 310, 'dns': 267, 'dnf': 181}

athlete_profile.csv:
  Total athletes:   1,629,366
  With 3+ races:    314,034
  With 5+ races:    124,927
  Median races:     1
  Gender:           {'M': 1267791, 'F': 360620}

Derived features (athlete_race.csv):
  swim_pct                 : median=0.109, valid=97.0%
  bike_pct                 : median=0.506, valid=98.6%
  run_pct                  : median=0.360, valid=98.8%
  fade_ratio               : median=1.000, valid=92.2%
  cohort_percentile        : median=0.500, valid=92.8%
  bike_run_ratio    