# üèÅ F1 Bronze ‚Äì Extended FastF1 Extraction (2018‚Äìpresent)

This notebook reads from `f1_bronze.events` and populates additional bronze tables:

- `f1_bronze.session_results`
- `f1_bronze.weather`
- `f1_bronze.track_status`
- `f1_bronze.race_control_messages`
- `f1_bronze.telemetry_raw` (optional)
- `f1_bronze.car_position` (optional)

Targeting modern era: **2018‚Äìpresent**.


In [0]:
%pip install fastf1 pandas

In [0]:
import fastf1
import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.types import *
from datetime import datetime
import tempfile
import time

print("‚úÖ Libraries imported")

# Configure temporary FastF1 cache (for this cluster run)
cache_dir = tempfile.mkdtemp(prefix="f1-cache-")
fastf1.Cache.enable_cache(cache_dir)
print(f"‚úÖ FastF1 cache configured at: {cache_dir}")

# Config ‚Äì adjust as you like
START_YEAR = 2018
END_YEAR   = 2024   # adjust to latest completed season

SESSION_TYPE = "Race"  # we focus on Race for now

# Heavy tables ‚Äì set to True when you're ready for big data
ENABLE_TELEMETRY = False
ENABLE_POSITION  = False

print(f"üîß Config -> Years: {START_YEAR}-{END_YEAR}, Session: {SESSION_TYPE}")
print(f"üîß Telemetry enabled? {ENABLE_TELEMETRY}")
print(f"üîß Position enabled?  {ENABLE_POSITION}")


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS f1_bronze
COMMENT 'Raw F1 data from FastF1 API (modern era)';


In [0]:
events_df = (
    spark.table("f1_bronze.events")
    .filter((col("year") >= START_YEAR) & (col("year") <= END_YEAR))
    .orderBy("year", "round_number")
)

events = [row.asDict() for row in events_df.collect()]

print(f"Found {len(events)} events in f1_bronze.events between {START_YEAR}‚Äì{END_YEAR}")
if events:
    print("First few events:")
    for e in events[:5]:
        print(f"  {e['year']} R{e['round_number']}: {e['event_name']} ({e['country']})")


In [0]:
from typing import List, Dict, Any, Optional


def load_session(year: int, round_number: int, session_type: str = "Race"):
    """Load a FastF1 session safely; return None on error."""
    try:
        print(f"\nüì¶ Loading session {year} R{round_number} ({session_type})...")
        session = fastf1.get_session(year, round_number, session_type)
        session.load()
        print("   ‚úÖ Session loaded")
        return session
    except Exception as e:
        print(f"   ‚ùå Failed to load session {year} R{round_number}: {e}")
        return None


def extract_session_results(session, year: int, round_number: int) -> List[Dict[str, Any]]:
    """Extract per-driver session results (classification)."""
    out = []
    try:
        results = session.results  # pandas DataFrame
        if results is None or len(results) == 0:
            print("   ‚ö†Ô∏è No session.results available")
            return out

        for _, row in results.iterrows():
            rec = {
                "year": year,
                "round_number": round_number,
                "session_name": session.name,  # e.g. 'Race'
                "driver_number": str(row.get("DriverNumber")) if pd.notna(row.get("DriverNumber")) else None,
                "driver_abbreviation": row.get("Abbreviation"),
                "broadcast_name": row.get("BroadcastName"),
                "team_name": row.get("TeamName"),
                "team_color": row.get("TeamColor"),
                "position": int(row.get("Position")) if pd.notna(row.get("Position")) else None,
                "grid_position": int(row.get("GridPosition")) if pd.notna(row.get("GridPosition")) else None,
                "status": row.get("Status"),
                "points": float(row.get("Points")) if pd.notna(row.get("Points")) else None,
                "laps": int(row.get("Laps")) if pd.notna(row.get("Laps")) else None,
                "best_lap_time_seconds": row.get("BestLapTime").total_seconds()
                    if pd.notna(row.get("BestLapTime")) else None,
                "q1_time_seconds": row.get("Q1").total_seconds()
                    if pd.notna(row.get("Q1")) else None,
                "q2_time_seconds": row.get("Q2").total_seconds()
                    if pd.notna(row.get("Q2")) else None,
                "q3_time_seconds": row.get("Q3").total_seconds()
                    if pd.notna(row.get("Q3")) else None,
                "extraction_timestamp": datetime.now(),
            }
            out.append(rec)
    except Exception as e:
        print(f"   ‚ùå Error extracting session results: {e}")
    return out


def extract_weather_data(session, year: int, round_number: int) -> List[Dict[str, Any]]:
    """Extract time-series weather data for the session."""
    out = []
    try:
        wdf = session.weather_data  # pandas DataFrame
        if wdf is None or len(wdf) == 0:
            print("   ‚ö†Ô∏è No weather_data available")
            return out

        # Ensure datetime index becomes a column
        wdf = wdf.reset_index().rename(columns={"Time": "Timestamp"})

        for _, row in wdf.iterrows():
            rec = {
                "year": year,
                "round_number": round_number,
                "session_name": session.name,
                "timestamp": row.get("Timestamp"),
                "air_temperature": float(row.get("AirTemp")) if pd.notna(row.get("AirTemp")) else None,
                "track_temperature": float(row.get("TrackTemp")) if pd.notna(row.get("TrackTemp")) else None,
                "humidity": float(row.get("Humidity")) if pd.notna(row.get("Humidity")) else None,
                "pressure": float(row.get("Pressure")) if pd.notna(row.get("Pressure")) else None,
                "wind_speed": float(row.get("WindSpeed")) if pd.notna(row.get("WindSpeed")) else None,
                "wind_direction": float(row.get("WindDirection")) if pd.notna(row.get("WindDirection")) else None,
                "rainfall": float(row.get("Rainfall")) if pd.notna(row.get("Rainfall")) else None,
                "extraction_timestamp": datetime.now(),
            }
            out.append(rec)
    except Exception as e:
        print(f"   ‚ùå Error extracting weather data: {e}")
    return out


def extract_track_status(session, year: int, round_number: int) -> List[Dict[str, Any]]:
    """Extract track status (flags, SC/VSC) events."""
    out = []
    try:
        ts = session.track_status  # pandas DataFrame
        if ts is None or len(ts) == 0:
            print("   ‚ö†Ô∏è No track_status available")
            return out

        ts = ts.reset_index().rename(columns={"Time": "Timestamp"})

        for _, row in ts.iterrows():
            rec = {
                "year": year,
                "round_number": round_number,
                "session_name": session.name,
                "timestamp": row.get("Timestamp"),
                "status": row.get("Status"),          # e.g. '1', '2', '4' codes
                "message": row.get("Message"),        # human readable
                "extraction_timestamp": datetime.now(),
            }
            out.append(rec)
    except Exception as e:
        print(f"   ‚ùå Error extracting track status: {e}")
    return out


def extract_race_control_messages(session, year: int, round_number: int) -> List[Dict[str, Any]]:
    """Extract race control messages (flags, DRS, incidents)."""
    out = []
    try:
        rc = session.race_control_messages  # pandas DataFrame
        if rc is None or len(rc) == 0:
            print("   ‚ö†Ô∏è No race_control_messages available")
            return out

        rc = rc.reset_index().rename(columns={"Time": "Timestamp"})

        for _, row in rc.iterrows():
            rec = {
                "year": year,
                "round_number": round_number,
                "session_name": session.name,
                "timestamp": row.get("Timestamp"),
                "message": row.get("Message"),
                "category": row.get("Category"),
                "flag": row.get("Flag"),
                "scope": row.get("Scope"),
                "extraction_timestamp": datetime.now(),
            }
            out.append(rec)
    except Exception as e:
        print(f"   ‚ùå Error extracting race control messages: {e}")
    return out


In [0]:
def extract_telemetry(session, year: int, round_number: int) -> List[Dict[str, Any]]:
    """Extract raw telemetry for each driver (this can be very large!)."""
    out = []
    try:
        for drv in session.drivers:
            try:
                laps = session.laps.pick_driver(drv)
                if laps is None or len(laps) == 0:
                    continue

                tel = laps.get_telemetry()  # pandas DataFrame
                if tel is None or len(tel) == 0:
                    continue

                tel = tel.reset_index()

                for _, row in tel.iterrows():
                    rec = {
                        "year": year,
                        "round_number": round_number,
                        "session_name": session.name,
                        "driver": drv,
                        "date": row.get("Date"),  # absolute timestamp
                        "session_time": row.get("SessionTime"),  # session-relative time
                        "lap_time": row.get("LapTime"),
                        "distance": float(row.get("Distance")) if pd.notna(row.get("Distance")) else None,
                        "speed": float(row.get("Speed")) if pd.notna(row.get("Speed")) else None,
                        "throttle": float(row.get("Throttle")) if pd.notna(row.get("Throttle")) else None,
                        "brake": float(row.get("Brake")) if pd.notna(row.get("Brake")) else None,
                        "gear": int(row.get("nGear")) if pd.notna(row.get("nGear")) else None,
                        "rpm": float(row.get("RPM")) if pd.notna(row.get("RPM")) else None,
                        "drs": int(row.get("DRS")) if pd.notna(row.get("DRS")) else None,
                        "extraction_timestamp": datetime.now(),
                    }
                    out.append(rec)
            except Exception as e_drv:
                print(f"   ‚ö†Ô∏è Telemetry error for driver {drv}: {e_drv}")
                continue
    except Exception as e:
        print(f"   ‚ùå Error extracting telemetry: {e}")
    return out


def extract_position_data(session, year: int, round_number: int) -> List[Dict[str, Any]]:
    """Extract car position traces (X/Y/Z) per driver."""
    out = []
    try:
        for drv in session.drivers:
            try:
                laps = session.laps.pick_driver(drv)
                if laps is None or len(laps) == 0:
                    continue

                pos = laps.get_pos_data()  # pandas DataFrame
                if pos is None or len(pos) == 0:
                    continue

                pos = pos.reset_index()

                for _, row in pos.iterrows():
                    rec = {
                        "year": year,
                        "round_number": round_number,
                        "session_name": session.name,
                        "driver": drv,
                        "date": row.get("Date"),
                        "session_time": row.get("SessionTime"),
                        "x": float(row.get("X")) if pd.notna(row.get("X")) else None,
                        "y": float(row.get("Y")) if pd.notna(row.get("Y")) else None,
                        "z": float(row.get("Z")) if pd.notna(row.get("Z")) else None,
                        "distance": float(row.get("Distance")) if pd.notna(row.get("Distance")) else None,
                        "extraction_timestamp": datetime.now(),
                    }
                    out.append(rec)
            except Exception as e_drv:
                print(f"   ‚ö†Ô∏è Position error for driver {drv}: {e_drv}")
                continue
    except Exception as e:
        print(f"   ‚ùå Error extracting position data: {e}")
    return out


In [0]:
session_results_rows = []
weather_rows = []
track_status_rows = []
rcm_rows = []
telemetry_rows = []
position_rows = []

for ev in events:
    year = int(ev["year"])
    rnd  = int(ev["round_number"])

    print(f"\n==================== {year} R{rnd} ‚Äì {ev['event_name']} ====================")

    session = load_session(year, rnd, SESSION_TYPE)
    if session is None:
        continue

    # Core tables (cheap)
    session_results_rows.extend(extract_session_results(session, year, rnd))
    weather_rows.extend(extract_weather_data(session, year, rnd))
    track_status_rows.extend(extract_track_status(session, year, rnd))
    rcm_rows.extend(extract_race_control_messages(session, year, rnd))

    # Heavy tables
    if ENABLE_TELEMETRY:
        telemetry_rows.extend(extract_telemetry(session, year, rnd))

    if ENABLE_POSITION:
        position_rows.extend(extract_position_data(session, year, rnd))

    # Tiny sleep to be nice to upstream resources
    time.sleep(1)

print("\n‚úÖ Extraction loop complete")
print(f"   Session results rows: {len(session_results_rows)}")
print(f"   Weather rows        : {len(weather_rows)}")
print(f"   Track status rows   : {len(track_status_rows)}")
print(f"   Race control rows   : {len(rcm_rows)}")
print(f"   Telemetry rows      : {len(telemetry_rows)}")
print(f"   Position rows       : {len(position_rows)}")


In [0]:
# Helper to write if we have any rows
def write_bronze_table(rows, table_name: str, partition_cols: Optional[list] = None):
    if not rows:
        print(f"‚ö†Ô∏è No data for {table_name}, skipping write")
        return

    pdf = pd.DataFrame(rows)
    sdf = spark.createDataFrame(pdf)  # infer schema

    writer = sdf.write.format("delta").mode("overwrite")
    if partition_cols:
        writer = writer.partitionBy(*partition_cols)

    writer.saveAsTable(table_name)
    print(f"‚úÖ Wrote {sdf.count()} rows to {table_name}")


# Write core tables
write_bronze_table(session_results_rows, "f1_bronze.session_results", partition_cols=["year"])
write_bronze_table(weather_rows,         "f1_bronze.weather",         partition_cols=["year"])
write_bronze_table(track_status_rows,    "f1_bronze.track_status",    partition_cols=["year"])
write_bronze_table(rcm_rows,             "f1_bronze.race_control_messages", partition_cols=["year"])

# Write heavy tables if enabled
if ENABLE_TELEMETRY:
    write_bronze_table(telemetry_rows, "f1_bronze.telemetry_raw", partition_cols=["year"])

if ENABLE_POSITION:
    write_bronze_table(position_rows, "f1_bronze.car_position", partition_cols=["year"])


In [0]:
%sql
SELECT 
  year,
  COUNT(*) AS rows,
  COUNT(DISTINCT concat(year, '-', round_number, '-', driver_number)) AS driver_sessions
FROM f1_bronze.session_results
GROUP BY year
ORDER BY year;


In [0]:
%sql
SELECT 
  year,
  COUNT(*) AS rows,
  MIN(timestamp) AS first_ts,
  MAX(timestamp) AS last_ts
FROM f1_bronze.weather
GROUP BY year
ORDER BY year;


In [0]:
print("üèÅ Extended F1 Bronze Extraction Complete!")
print("=" * 40)
for tbl in [
    "f1_bronze.session_results",
    "f1_bronze.weather",
    "f1_bronze.track_status",
    "f1_bronze.race_control_messages",
    "f1_bronze.telemetry_raw",
    "f1_bronze.car_position",
]:
    try:
        c = spark.table(tbl).count()
        print(f"üìä {tbl}: {c:,} rows")
    except Exception:
        print(f"üìä {tbl}: (not created)")
print()
print(f"üìÅ FastF1 cache location: {cache_dir}")
