In [1]:
import pandas as pd
import pandas as pd
from datetime import datetime, timedelta

WEATHER_DATA = "data/weather_data_v2.csv"
RACE_DATA = "data/race_final/global/data.csv"

weather_data = pd.read_csv(WEATHER_DATA, parse_dates=['date'])
weather_dfs = {(city, state): group \
                    for (city, state), group in weather_data.groupby(["city","state"])}

race_df = pd.read_csv(RACE_DATA, parse_dates=['date'])

def parse_date_column(col:str) -> datetime:
    (month, day, year) = col.split("_")
    return datetime(int("20"+year), int(month), int(day))

race_df["date"] = race_df["date"].apply(parse_date_column)
race_df.head()

  race_df = pd.read_csv(RACE_DATA, parse_dates=['date'])


Unnamed: 0,age,sex,time,race,date,city,state
0,50.0,M,181.65,dog_lake_marathon,2016-10-15,apache junction,az
1,39.0,M,188.566667,dog_lake_marathon,2016-10-15,kennewick,wa
2,27.0,M,196.483333,dog_lake_marathon,2016-10-15,yakima,wa
3,33.0,M,218.666667,dog_lake_marathon,2016-10-15,mesa,az
4,26.0,F,219.016667,dog_lake_marathon,2016-10-15,costa mesa,ca


In [7]:
EARLIEST_WEATHER = weather_data["date"].min()
print(f"Earliest weather date: {EARLIEST_WEATHER}")
race_df_filtered_date = race_df[race_df["date"] > EARLIEST_WEATHER + timedelta(days=365)]
print(f"date filtering: filtered {len(race_df) - len(race_df_filtered_date):,} records, {len(race_df_filtered_date):,} remain")

Earliest weather date: 1999-01-01 00:00:00
date filtering: filtered 0 records, 7,411,671 remain


In [8]:
available_weather_cities = weather_dfs.keys()

# Create a set of lowercase (city, state) tuples for fast lookup
available_weather_cities_lower = {(city.lower(), state.lower()) for (city, state) in available_weather_cities}

# Filter to only records with matching weather data
race_df_filtered = race_df_filtered_date[
    race_df_filtered_date.apply(
        lambda row: (row['city'], row['state']) in available_weather_cities_lower, 
        axis=1
    )
]

print(f"Weather city filtering: filtered {len(race_df_filtered_date) - len(race_df_filtered):,} records, {len(race_df_filtered):,} remain")
race_df_filtered.head()

Weather city filtering: filtered 5,721,668 records, 1,690,003 remain


Unnamed: 0,age,sex,time,race,date,city,state
7,44.0,F,226.316667,dog_lake_marathon,2016-10-15,seattle,wa
8,52.0,M,230.7,dog_lake_marathon,2016-10-15,vancouver,wa
13,62.0,M,239.766667,dog_lake_marathon,2016-10-15,seattle,wa
16,39.0,F,248.383333,dog_lake_marathon,2016-10-15,portland,or
31,39.0,M,184.55,dog_lake_marathon,2015-10-24,seattle,wa


In [22]:
import src.iterator

import importlib
importlib.reload(src.iterator)

from src.iterator import CheckpointIterator

def handle_race(race_meta:tuple[str,str], group:list):
    (race, race_date_dt) = race_meta
    output_data = []
    cities = group.groupby(["city","state"])
    print(f"examining {race}-{race_date_dt}, {len(group)} records, {len(cities)} cities")

    for (city, state), records in group.groupby(["city","state"]):
        lookup = (city.lower(), state.lower())
        if lookup not in weather_dfs:
            print(f"--skipping, no weather data for {city} {state}")
            continue

        weather_records = weather_dfs[lookup]
        full_training_early_date = race_date_dt - timedelta(days=90)
        peak_training_early_date = race_date_dt - timedelta(days=30)

        full_training_records = weather_records[weather_records["date"].between(full_training_early_date, race_date_dt, inclusive='left')]
        peak_training_records = weather_records[weather_records["date"].between(peak_training_early_date, race_date_dt, inclusive='left')]
        
        if len(full_training_records) <= 5 or len(peak_training_records) <= 5:
            print(f"skipping, not enough weather data on {race_date_dt}")
            continue
        
        for period, weather_records in zip(["full","peak"], [full_training_records, peak_training_records]):
            overall_min = weather_records["temp_min"].min()
            overall_max = weather_records["temp_max"].max()
            overall_median_min = weather_records["temp_min"].median()
            overall_median_max = weather_records["temp_max"].median()
            overall_rain = weather_records["precip"].sum()
            overall_days_of_rain = len(weather_records[weather_records["precip"] > 0.2])

            weekend_days = weather_records[weather_records["date"].dt.dayofweek.isin([5, 6])]
            overall_weekend_days_with_rain = len(weekend_days[weekend_days["precip"] > 0.2])
            
            records[period+"_temp_min"] = overall_min
            records[period+"_temp_max"] = overall_max
            records[period+"_temp_median_min"] = overall_median_min
            records[period+"_temp_median_max"] = overall_median_max
            records[period+"_overall_precip"] = overall_rain
            records[period+"_overall_days_of_precip"] = overall_days_of_rain
            records[period+"_overall_weekend_days_of_precip"] = overall_weekend_days_with_rain
        
        output_data.append(records)
    return output_data

iterator = CheckpointIterator(
    items = race_df_filtered.groupby(["race","date"]),
    item_key= lambda x:x[0],
    process_func = handle_race,
    output_path="data/featurized_race_data_v2a.csv",
    key_fields=["race","date"]
)

featurized_df = iterator.process()


EmptyDataError: No columns to parse from file

In [9]:
output_data = []
for (race, race_date_dt), group in race_df_filtered.groupby(["race","date"]):
    cities = group.groupby(["city","state"])
    print(f"examining {race}-{race_date_dt}, {len(group)} records, {len(cities)} cities")

    for (city, state), records in group.groupby(["city","state"]):
        lookup = (city.lower(), state.lower())
        if lookup not in weather_dfs:
            print(f"--skipping, no weather data for {city} {state}")
            continue

        weather_records = weather_dfs[lookup]
        full_training_early_date = race_date_dt - timedelta(days=90)
        peak_training_early_date = race_date_dt - timedelta(days=30)

        full_training_records = weather_records[weather_records["date"].between(full_training_early_date, race_date_dt, inclusive='left')]
        peak_training_records = weather_records[weather_records["date"].between(peak_training_early_date, race_date_dt, inclusive='left')]
        
        if len(full_training_records) <= 5 or len(peak_training_records) <= 5:
            print(f"skipping, not enough weather data on {race_date_dt}")
            continue
        
        for period, weather_records in zip(["full","peak"], [full_training_records, peak_training_records]):
            overall_min = weather_records["temp_min"].min()
            overall_max = weather_records["temp_max"].max()
            overall_median_min = weather_records["temp_min"].median()
            overall_median_max = weather_records["temp_max"].median()
            overall_rain = weather_records["precip"].sum()
            overall_days_of_rain = len(weather_records[weather_records["precip"] > 0.2])

            weekend_days = weather_records[weather_records["date"].dt.dayofweek.isin([5, 6])]
            overall_weekend_days_with_rain = len(weekend_days[weekend_days["precip"] > 0.2])
            
            records[period+"_temp_min"] = overall_min
            records[period+"_temp_max"] = overall_max
            records[period+"_temp_median_min"] = overall_median_min
            records[period+"_temp_median_max"] = overall_median_max
            records[period+"_overall_precip"] = overall_rain
            records[period+"_overall_days_of_precip"] = overall_days_of_rain
            records[period+"_overall_weekend_days_of_precip"] = overall_weekend_days_with_rain
        
        output_data.append(records)

featurized_frame = pd.concat(output_data) 

examining "last_chance_for_boston"_marathon-2003-02-02 00:00:00, 18 records, 9 cities
examining "last_chance_for_boston"_marathon-2004-02-01 00:00:00, 22 records, 9 cities
examining "last_chance_for_boston"_marathon-2005-02-20 00:00:00, 18 records, 7 cities
examining "last_chance_for_boston"_marathon-2007-02-11 00:00:00, 28 records, 3 cities
examining "last_chance_for_boston"_marathon-2010-02-28 00:00:00, 26 records, 6 cities
examining "last_chance_for_boston"_marathon-2011-02-27 00:00:00, 8 records, 3 cities
examining "last_chance_for_boston"_marathon-2012-02-26 00:00:00, 12 records, 3 cities
examining "last_chance_for_boston"_marathon-2016-02-21 00:00:00, 10 records, 3 cities
examining "running_for_the_bay!"_marathon-2010-10-23 00:00:00, 38 records, 13 cities
examining "running_for_the_bay!"_marathon-2011-10-23 00:00:00, 19 records, 7 cities
examining "running_for_the_bay!"_marathon-2012-10-21 00:00:00, 12 records, 6 cities
examining "running_for_the_bay!"_marathon-2013-10-20 00:00:0

In [10]:
featurized_frame.to_csv("data/featurized_race_data_v2.csv", index=False)

In [22]:
missing_cities = race_cities_df[~race_cities_df.city.isin(merged.city)]

mapped_amount = race_cities_df.runner_count.sum() / (missing_cities.runner_count.sum() + race_cities_df.runner_count.sum())
print(f"Mapped: {race_cities_df.runner_count.sum()}")
print(f"Unmapped: {missing_cities.runner_count.sum()}")
print(f"Coverage: {mapped_amount*100:>11.2f}%")

Mapped: 3711663
Unmapped: 284532
Coverage:       92.88%


In [27]:
import pandas as pd
import requests
from time import sleep
from tqdm.notebook import tqdm
from pathlib import Path

def get_historical_weather(lat, lon, start_date, end_date):
    """Fetch daily weather from Open-Meteo archive API."""
    url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": start_date,
        "end_date": end_date,
        "daily": ["temperature_2m_max", "temperature_2m_min", "precipitation_sum"],
        "temperature_unit": "fahrenheit",
        "timezone": "auto"
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        return response.json()
    return None

def fetch_weather_for_cities(df, start_date, end_date, output_path="data/weather_data.csv", save_every=10):
    """
    Fetch weather for all cities, with progress bar and checkpoint saving.
    
    Args:
        df: DataFrame with city, state, lat, lng columns
        start_date: Start date string "YYYY-MM-DD"
        end_date: End date string "YYYY-MM-DD"
        output_path: Where to save intermediate/final results
        save_every: Save checkpoint every N cities
    """
    output_path = Path(output_path)
    
    # Load existing results if resuming
    if output_path.exists():
        existing_df = pd.read_csv(output_path)
        completed = set(zip(existing_df['city'], existing_df['state']))
        weather_records = existing_df.to_dict('records')
        print(f"Resuming: found {len(completed)} cities already processed")
    else:
        completed = set()
        weather_records = []
    
    # Filter to only cities we haven't done yet
    remaining = df[~df.apply(lambda r: (r['city'], r['state']) in completed, axis=1)]
    
    if len(remaining) == 0:
        print("All cities already processed!")
        return pd.read_csv(output_path)
    
    print(f"Fetching weather for {len(remaining)} cities...")
    
    for i, (idx, row) in enumerate(tqdm(remaining.iterrows(), total=len(remaining), desc="Fetching weather")):
        try:
            data = get_historical_weather(row['lat'], row['lng'], start_date, end_date)
            
            if data and 'daily' in data:
                daily = data['daily']
                for j, date in enumerate(daily['time']):
                    weather_records.append({
                        'city': row['city'],
                        'state': row['state'],
                        'date': date,
                        'temp_max': daily['temperature_2m_max'][j],
                        'temp_min': daily['temperature_2m_min'][j],
                        'precip': daily['precipitation_sum'][j]
                    })
            
            # Save checkpoint periodically
            if (i + 1) % save_every == 0:
                pd.DataFrame(weather_records).to_csv(output_path, index=False)
                
        except Exception as e:
            tqdm.write(f"Error fetching {row['city']}, {row['state']}: {e}")
        
        sleep(0.1)
    
    # Final save
    result_df = pd.DataFrame(weather_records)
    result_df.to_csv(output_path, index=False)
    print(f"Saved {len(result_df)} records to {output_path}")
    
    return result_df

In [29]:
# Fetch weather data (can be interrupted and resumed)
weather_df = fetch_weather_for_cities(
    merged, 
    "2015-01-01", 
    "2025-03-01",
    output_path="data/weather_data.csv",
    save_every=10
)
weather_df.head()

Resuming: found 32 cities already processed
Fetching weather for 458 cities...


Fetching weather:   0%|          | 0/458 [00:00<?, ?it/s]

Saved 233919 records to data/weather_data.csv


Unnamed: 0,city,state,date,temp_max,temp_min,precip
0,los angeles,ca,2015-01-01,53.1,30.8,0.0
1,los angeles,ca,2015-01-02,55.0,31.5,0.0
2,los angeles,ca,2015-01-03,55.5,31.5,0.0
3,los angeles,ca,2015-01-04,64.3,33.8,0.0
4,los angeles,ca,2015-01-05,73.1,41.5,0.0
