In [1]:
import fastf1
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from fastf1.core import Session
from fastf1.api import SessionNotAvailableError
import requests

# Configure matplotlib for inline plotting
%matplotlib inline



In [2]:
fastf1.Cache.enable_cache('../data')  # Cache data in the 'data' directory
fastf1.Cache.offline_mode(True) # Enable offline mode to prevent network traffic

In [3]:
def get_new_ip():
    response = requests.get('https://api.ipify.org')
    return response.text

def change_ip_on_limit(retries=0):
    if retries >= 200 // 60:  # 200 requests per hour
        new_ip = get_new_ip()
        print(f"New IP address: {new_ip}")
        import socket
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        current_ip = s.getsockname()[0]
        if current_ip != new_ip:
            return True
    else:
        import time
        time.sleep(10)
        change_ip_on_limit(retries + 1)
    return False

In [3]:
def collect_session_data(year, race_round, session_name):
    try:
        session = fastf1.get_session(year, race_round, session_name)
        session.load()
        if not session.laps.empty:  # Check if laps were loaded successfully
            return session
        else:
            print(f"Laps data not available for {year} round {race_round} in {session_name} session.")
            return None
    except SessionNotAvailableError:
        print(f"Session not available for {year} round {race_round}")
        return None

In [4]:
# Check for non-numeric data in 'X', 'Y', and 'Z' columns
def check_data_types(telemetry):
    non_numeric_x = telemetry['X'].apply(lambda x: not isinstance(x, (float, int))).sum()
    non_numeric_y = telemetry['Y'].apply(lambda x: not isinstance(x, (float, int))).sum()
    non_numeric_z = telemetry['Z'].apply(lambda x: not isinstance(x, (float, int))).sum()

    print(f"Non-numeric values in 'X': {non_numeric_x}")
    print(f"Non-numeric values in 'Y': {non_numeric_y}")
    print(f"Non-numeric values in 'Z': {non_numeric_z}")

In [5]:
# Function to extract lap and telemetry data for each driver
def extract_lap_telemetry_data(session: Session):
    # Get lap data for all drivers
    laps = session.laps

    # Collect telemetry data for each driver
    telemetry_data = []
    drivers = session.drivers
    for driver in drivers:
        driver_laps = laps.pick_drivers(driver)
        # Get car data (speed, throttle, brake, etc.)
        car_data = driver_laps.get_car_data()
        # Get position data (X, Y, Z coordinates)
        pos_data = driver_laps.get_pos_data()

        # Ensure consistent data types for 'X', 'Y', 'Z' in position data
        pos_data['X'] = pd.to_numeric(pos_data['X'], errors='coerce').fillna(0.0)
        pos_data['Y'] = pd.to_numeric(pos_data['Y'], errors='coerce').fillna(0.0)
        pos_data['Z'] = pd.to_numeric(pos_data['Z'], errors='coerce').fillna(0.0)

        # Aggregate car data
        avg_speed = car_data['Speed'].mean()
        max_speed = car_data['Speed'].max()
        avg_throttle = car_data['Throttle'].mean()
        avg_brake = car_data['Brake'].mean()
        avg_rpm = car_data['RPM'].mean()
        avg_gear = car_data['nGear'].mean()
        max_rpm = car_data['RPM'].max()

        # Aggregate position data
        avg_x_pos = pos_data['X'].mean()
        avg_y_pos = pos_data['Y'].mean()
        avg_z_pos = pos_data['Z'].mean()

        # Status (OnTrack/OffTrack)
        on_track_percentage = pos_data['Status'].value_counts(normalize=True).get('OnTrack', 0) * 100

        driver_abbr = driver_laps['Driver'].values[0]

        telemetry_data.append({
            'Driver': driver_abbr,
            'Avg Speed': avg_speed,
            'Max Speed': max_speed,
            'Avg Throttle': avg_throttle,
            'Avg Brake': avg_brake,
            'Avg RPM': avg_rpm,
            'Max RPM': max_rpm,
            'Avg Gear': avg_gear,
            'Avg X Pos': avg_x_pos,
            'Avg Y Pos': avg_y_pos,
            'Avg Z Pos': avg_z_pos,
            'OnTrack Percentage': on_track_percentage
        })

    return pd.DataFrame(telemetry_data)

In [6]:
# Function to extract race results (final positions) and pit stop data
def extract_race_results_and_pitstops(session: Session):
    # Race results
    results = session.results

    # Collect pit in/out laps
    pit_laps = session.laps.pick_box_laps('both')

    # Combine the race results with pit stop data
    driver_results = []
    for driver in session.drivers:
        driver_pit_laps = pit_laps.pick_drivers(driver)
        driver_result = results.loc[results['DriverNumber'] == driver]

        # Check if the driver exists in the race results
        if driver_result.empty:
            print(f"Driver {driver} not found in race results for {session.event['EventName']}.")
            continue  # Skip this driver if not found

        # Ensure position data exists
        if 'Position' not in driver_result.columns or 'GridPosition' not in driver_result.columns:
            print(f"Missing position data for driver {driver} in {session.event['EventName']}.")
            continue

        final_position = driver_result['Position'].values[0]
        grid_position = driver_result['GridPosition'].values[0]

        driver_abbr = driver_result['Abbreviation'].values[0]

        driver_results.append({
            'Driver': driver_abbr,
            'Final Position': final_position,
            'Qualifying Position': grid_position,
            'Pit Stops': len(driver_pit_laps),
            'Total Pit Time': (driver_pit_laps['PitOutTime'] - driver_pit_laps['PitInTime']).sum() if not driver_pit_laps.empty else pd.Timedelta(0)
        })


    return pd.DataFrame(driver_results)

In [7]:
# Function to extract detailed weather data
def extract_weather_data(session):
    weather_data = session.weather_data

    avg_temp = weather_data['AirTemp'].mean()
    avg_humidity = weather_data['Humidity'].mean()
    avg_pressure = weather_data['Pressure'].mean()
    avg_track_temp = weather_data['TrackTemp'].mean()
    max_wind_speed = weather_data['WindSpeed'].max()
    wind_direction = weather_data['WindDirection'].mean()

    # Rainfall (boolean flag)
    rain_flag = 1 if weather_data['Rainfall'].any() else 0

    return {
        'Avg Air Temp': avg_temp,
        'Avg Humidity': avg_humidity,
        'Avg Pressure': avg_pressure,
        'Avg Track Temp': avg_track_temp,
        'Max Wind Speed': max_wind_speed,
        'Wind Direction': wind_direction,
        'Rain (yes/no)': rain_flag
    }

In [None]:
# Main function to loop through races from 2018 to 2024 and collect data
def collect_f1_data(start_year=2018, end_year=2024):
    data = []

    # Loop through each year and race
    for year in range(start_year, end_year + 1):
        for race_round in range(1, 24):  # Assuming 22 races per season
            try:
                # Collect race and qualifying sessions
                race_session = collect_session_data(year, race_round, 'R')
                qual_session = collect_session_data(year, race_round, 'Q')

                # Ensure the race session is available
                if race_session is None or qual_session is None or race_session.results.empty:
                    print(f"Skipping {year} round {race_round} due to unavailable session data.")
                    continue

                # Extract data
                race_data = extract_race_results_and_pitstops(race_session)
                if race_data.empty or 'Driver' not in race_data.columns:
                    print(f"Skipping {year} round {race_round} due to empty race data or missing driver data.")
                    continue

                lap_data = extract_lap_telemetry_data(race_session)
                weather_data = extract_weather_data(race_session)

                # Extract qualifying data from the qualifying session
                qual_laps = qual_session.laps
                qualifying_positions = qual_laps[['Driver', 'Position']].drop_duplicates('Driver').rename(columns={'Position': 'Qualifying Position'})

                # Ensure 'Driver' column exists in both race_data and lap_data before merging
                if 'Driver' in race_data.columns and 'Driver' in lap_data.columns and 'Driver' in qualifying_positions.columns:
                    combined_data = race_data.merge(lap_data, on='Driver', how='inner')
                    combined_data = combined_data.merge(qualifying_positions, on='Driver', how='inner')
                else:
                    print(f"Skipping {year} round {race_round} due to missing driver data.")
                    continue

                # Combine race, lap, and weather data
                combined_data = race_data.merge(lap_data, on='Driver')
                combined_data['Race ID'] = f"{year}-{race_round}"
                combined_data['Year'] = year
                combined_data['Track'] = race_session.event['EventName']
                combined_data['Avg Temp'] = weather_data['Avg Air Temp']
                combined_data['Max Wind Speed'] = weather_data['Max Wind Speed']
                combined_data['Rain'] = weather_data['Rain (yes/no)']

                # Append to final dataset
                data.append(combined_data)
            except Exception as e:
                print(f"Error with race {year} round {race_round}: {e}")
                continue

    # Check if any data was collected before concatenating
    if not data:
        print("No valid race data was collected.")
        return pd.DataFrame()  # Return an empty DataFrame instead of failing

    return pd.concat(data, ignore_index=True)

# Call the main function to collect the data
f1_data = collect_f1_data()
f1_data.head()  # Display the first few rows

In [None]:
# Main function to loop through races from 2018 to 2024 and collect data
def collect_f1_data(start_year=2018, end_year=2024):
    already_collected = {
        2018: 'all',
        2019: 'all',
        2020: 'all',
        2021: 'all',
        2022: 'all',
        2023: 'all',
        2024: 'all'
    }
    # Loop through each year and race
    for year in range(start_year, end_year + 1):
        for race_round in range(1, 24):  # Assuming 22 races per season
            if already_collected[year] and (already_collected[year] == 'all' or race_round in already_collected[year]):
                print(f"Skipping {year} race round")
                continue
            try:
                # Collect race and qualifying sessions
                race_session = collect_session_data(year, race_round, 'R')
                qual_session = collect_session_data(year, race_round, 'Q')
            except Exception as e:
                print(f"Error with race {year} round {race_round}: {e}")
                continue

    return True

# Call the main function to collect the data
f1_data = collect_f1_data()