<a href="https://colab.research.google.com/github/kennethajensen/FormulaOne/blob/main/F1_Download_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from urllib.request import urlopen
from urllib.error import HTTPError
import json
import pandas as pd
import numpy as np
from numpy import empty
import time
import os
from google.colab import drive

# Set the base URL for the OpenF1 API
base_url = 'https://api.openf1.org/v1/'

# Mount my Google Drive and set the path for the data storage
drive.mount('/content/drive')
data_path = '/content/drive/MyDrive/Data Science/[02] Articles - Formula 1 [Work-in-progress]/Data/'

# Names for data files stored locally
meetings_file_name     = 'meetings.csv'
sessions_file_name     = 'qualifying_sessions.csv'
laps_file_name         = 'qualifying_laps.csv'
fastest_laps_file_name = 'qualifying_fastest_laps.csv'
car_data_file_name     = 'car_data.csv'
location_file_name     = 'location.csv'
# Full file paths
meetings_path     = os.path.join(data_path, meetings_file_name)
sessions_path     = os.path.join(data_path, sessions_file_name)
laps_path         = os.path.join(data_path, laps_file_name)
fastest_laps_path = os.path.join(data_path, fastest_laps_file_name)
car_data_path     = os.path.join(data_path, car_data_file_name)
location_path     = os.path.join(data_path, location_file_name)

In [None]:
def get_data(endpoint, filter='', max_retries=5, initial_retry_delay=2):
    """
    Retrieves data from an API endpoint.
    Optionally, applies a filter to select the record to include.

    Args:
        endpoint (str): The endpoint name to query.
        filter (str): The data filter to apply to the request
        max_retries (int): The maximum number of retries to attempt when encountering
                           a 429 (Too many requests) error.
        initial_retry_delay (int): The initial delay between retries
    """

    retries = 0
    retry_delay = initial_retry_delay
    while retries <= max_retries:
      try:
        if filter:
          response = urlopen(base_url + endpoint + '?' + filter)
        else:
          response = urlopen(base_url + endpoint)
        data = json.loads(response.read().decode('utf-8'))
        df = pd.DataFrame(data)
        time.sleep(1)   # Pause for 1 second to avoid rate limiting
        return df
      except HTTPError as e:
        if e.code == 429:   # Too Many Requests
          print(f"Rate limit hit for {endpoint}?{filter}. Retrying in {retry_delay} seconds (Retry {retries+1}/{max_retries})...")
          time.sleep(retry_delay)
          retries += 1
          retry_delay *= 2   # Exponential backoff
        else:
          raise   # Re-raise other HTTP errors immediately
      except Exception as e:
        print(f"An unexpected error occurred: {e}")
        raise   # Re-raise other unexpected errors

    raise Exception(f"Failed to retrieve data for {endpoint}?{filter} after {max_retries} retries.")

In [None]:
def save_dataframe_to_csv(df, path):
    """
    Saves a DataFrame to a CSV file. If the file exists, it appends the data without headers.
    Otherwise, it creates a new file with headers.

    Args:
        df (pd.DataFrame): The DataFrame to save.
        path (str): The file path where the CSV should be saved.
    """
    if os.path.exists(path):
        df.to_csv(path, mode='a', header=False, index=False)
        print(f"Appended {len(df)} rows to existing file: {path}")
    else:
        df.to_csv(path, mode='w', header=True, index=False)
        print(f"Created new file and wrote {len(df)} rows: {path}")

print("Defined save_dataframe_to_csv_smart function.")

## Meetings
First get the complete data set with all **Meetings** where each meeting is either a testing or racing event covering multiple sessions and days.\
Save the data to a CSV file replacing any previous file.

In [None]:
meetings = get_data('meetings')
meetings.to_csv(meetings_path, index=False)

## Qualifying sessions
Second, get all of the qualifying sessions. This includes both the sprint qualifying and the qualifying for the feature race.\
Save the data to a CSV file replacing any previous file.



In [None]:
sessions_filter = 'session_type=Qualifying \
                   &year>=2023 \
                   &country_code=USA'
sessions_filter = sessions_filter.replace(' ', '')   # Remove whitespace

sessions = get_data(endpoint = 'sessions',
                    filter = sessions_filter)

# Convert the date columns to datetime objects for comparison
sessions['date_start'] = pd.to_datetime(sessions['date_start'])
sessions['date_end'] = pd.to_datetime(sessions['date_end'])
# Remove all session where the end time is after the current time
sessions = sessions[sessions['date_end'] <= pd.Timestamp.now(tz='UTC')]

sessions.to_csv(sessions_path, index=False)

# Create a list of all the unique sessions
# The session_key should be unique in the dataframe to begin with
all_session_keys = sessions['session_key'].unique()
print(f"Total number of sessions: {len(all_session_keys)}")

# Laps
Retrieve the Lap data and store it in a local file before identifying the fastest qualifying laps for each driver in each session.
- First check for an existing CSV file and the laps that are already stored in the file.
- Get the lap data from the API but only from any qualifying sessions that have not been retrieved earlier.
- Then append the newly retrieved data to the existing CSV file.



In [None]:
laps_csv_file_exists = os.path.exists(laps_path)

if not laps_csv_file_exists:
    # Create an empty dataframe
    laps_from_csv = []
    retrieved_session_keys = empty(0)
else:
    # Load the data from the file
    laps_from_csv = pd.read_csv(laps_path)
    # Get a list of unique sessions
    retrieved_session_keys = laps_from_csv['session_key'].unique()

# Identify the sessions that do not appear in the 'Laps' data file
# The laps from missing sessions will need to be retrived
unretrieved_session_keys = list(set(all_session_keys) - set(retrieved_session_keys))

print(f"Number of sessions already retrieved: {len(retrieved_session_keys)}")
print(f"Number of sessions to be retrieved: {len(unretrieved_session_keys)}")
# The lap data from the qualifying in Baku in 2025 is missing from OpenF1.org
# and the program will try to pick it up every time it is executed

In [None]:
laps = []

# Get the laps from every session not already retrieved
for session_key in unretrieved_session_keys:
    laps_by_session = get_data('laps', f'session_key={session_key}')
    laps.append(laps_by_session)

print(f"Fetched lap data for {len(laps)} sessions.")

## Save the lap data to a file
* Append the newly retrieved laps to the existing `CSV` file or create the file if the file does not already exist.
* If `laps` is currently a list of DataFrames, we will concatenate it into a single DataFrame.


In [None]:
if laps:
    # The laps dataframe is a dataframe of dataframes
    # This collapses the dataframes
    laps = pd.concat(laps, ignore_index=True)
    save_dataframe_to_csv(laps, laps_path)

else:
    print("No new lap data to append or save.")
    laps = pd.DataFrame() # Ensure laps is always a DataFrame, even if empty

## Identify the fastest lap from each driver in every session

To find the fastest lap for each driver in each session, we need to:
1. Ensure the `laps` DataFrame contains data.\
 If `laps` is currently a list of DataFrames, we will concatenate it into a single DataFrame.
2. Group the DataFrame by `session_key` and `driver_number`.
3. For each group, find the row with the minimum `lap_duration`.


In [None]:
if not laps.empty:
    # Create a copy to avoid SettingWithCopyWarning and resets the index
    fastest_laps = laps.copy().reset_index(drop=True)

    # Convert is_pit_out_lap to boolean and drop rows where is_pit_out_lap is True
    fastest_laps['is_pit_out_lap'] = fastest_laps['is_pit_out_lap'].astype(bool)
    fastest_laps = fastest_laps[fastest_laps['is_pit_out_lap'] == False]
    # Ensure 'date_start' is datetime and 'lap_duration' is numeric
    fastest_laps['date_start'] = pd.to_datetime(fastest_laps['date_start'],
                                                format='ISO8601')
    fastest_laps['lap_duration'] = pd.to_numeric(fastest_laps['lap_duration'],
                                                 errors='coerce')
    # Drop rows where lap_duration is NaN (couldn't be converted)
    fastest_laps.dropna(subset=['lap_duration'], inplace=True)

    # Calculate 'date_end' by adding 'lap_duration' to 'date_start'
    fastest_laps['date_end'] = fastest_laps['date_start'] \
                               + pd.to_timedelta(fastest_laps['lap_duration'], unit='s')

    # Find the fastest lap for each driver in each session
    fastest_laps = fastest_laps.loc[fastest_laps.groupby(['session_key', 'driver_number'])['lap_duration'].idxmin()]

    print(f"Found {len(fastest_laps)} fastest laps.")

    save_dataframe_to_csv(fastest_laps, fastest_laps_path)

else:
    fastest_laps = pd.DataFrame() # Ensure fastest_laps is always a DataFrame, even if empty
    print("No lap data available to process.")

# Get a complete list of all qualifying laps


*   Combine the data from the CSV file with any newly retrieved qualifying laps
*   Get evey unique combination of the session and the driver
* Merge in the start and end time for each of the fastest qualifying laps

This will be used to get the car and location data for just those laps.



In [None]:
# Combine the laps from the CSV file with the newly retrieved laps
# to get a complete list
all_qualifying_laps = pd.concat([laps_from_csv, laps], ignore_index=True)


unique_laps = all_qualifying_laps[['session_key','driver_number']].drop_duplicates()

display(unique_laps)

In [None]:
display(unique_laps)

In [None]:
if not unique_laps.empty and not fastest_laps.empty:
    # Select only the necessary columns from fastest_laps to merge
    fastest_laps_for_merge = fastest_laps[['session_key', 'driver_number', 'date_start', 'date_end']]

    # Merge unique_laps_keys with these selected columns
    unique_laps = pd.merge(
        unique_laps,
        fastest_laps_for_merge,
        on=['session_key', 'driver_number'],
        how='left'
    )

    print("Added 'date_start' and 'date_end' to unique_laps_keys.")
    display(unique_laps.head())
else:
    print("Either unique_laps or fastest_laps DataFrame is empty, cannot add dates.")

# Car data

In [None]:
car_data_csv_file_exists = os.path.exists(car_data_path)

if not car_data_csv_file_exists:
    # Create an empty dataframe with the expected columns
    car_data_from_csv = pd.DataFrame(columns=['session_key', 'driver_number'])
    retrieved_car_data_laps = pd.DataFrame(columns=['session_key', 'driver_number'])
else:
    # Load the data from the file
    car_data_from_csv = pd.read_csv(car_data_path)
    # Get a list of unique session_key and driver_number combinations that have car data
    retrieved_car_data_laps = car_data_from_csv[['session_key','driver_number']].drop_duplicates()

# Convert DataFrames to sets of tuples for efficient comparison
# This creates a unique identifier for each session-driver combination (session_key, driver_number)
set_unique_laps = set(tuple(row) for row in unique_laps[['session_key', 'driver_number']].values)
set_retrieved_car_data_laps = set(tuple(row) for row in retrieved_car_data_laps.values)

# Identify the unique session_key and driver_number combinations that have not been retrieved
unretrieved_combinations_set = set_unique_laps - set_retrieved_car_data_laps

# Convert the set of unretrieved combinations back into a DataFrame
unretrieved_car_data_laps = pd.DataFrame(list(unretrieved_combinations_set), columns=['session_key', 'driver_number'])


if not unretrieved_car_data_laps.empty and not fastest_laps.empty:
    # Now, merge with the unique_laps DataFrame to get the date_start and
    # date_end for these unretrieved combinations
    unretrieved_car_data_laps = pd.merge(
        unretrieved_car_data_laps,
        unique_laps[['session_key', 'driver_number', 'date_start', 'date_end']],
        on=['session_key', 'driver_number'],
        how='left'
    )
    # Remove records that do not have both a date_start and a date_end
    unretrieved_car_data_laps.dropna(subset=['date_start', 'date_end'], inplace=True)
else:
    unretrieved_car_data_laps = pd.DataFrame(columns=['session_key', 'driver_number', 'date_start', 'date_end'])

print(f"Number of laps already retrieved: {len(retrieved_car_data_laps)}")
print(f"Number of laps to be retrieved: {len(unretrieved_car_data_laps)}")

In [None]:
car_data = []

# Iterate through each of the missing laps
for index, row in unretrieved_car_data_laps.iterrows():

    # Get the session and driver number from the current row
    session_key = row['session_key']
    driver_number = row['driver_number']

    # date_start and date_end columns are already datetime objects from previous steps
    # We need to ensure they are UTC and then format them.
    date_start_utc = row['date_start'].tz_convert('UTC')
    date_end_utc = row['date_end'].tz_convert('UTC')

    formatted_date_start = date_start_utc.strftime('%Y-%m-%dT%H:%M:%SZ')
    formatted_date_end = date_end_utc.strftime('%Y-%m-%dT%H:%M:%SZ')

    data_filter = f"session_key={session_key}& \
                    driver_number={driver_number}& \
                    date>={formatted_date_start}& \
                    date<={formatted_date_end}"
    data_filter = data_filter.replace(' ', '')

    # Fetching 'car_data'
    car_data_by_session_driver = get_data('car_data', data_filter)
    if not car_data_by_session_driver.empty:
        car_data.append(car_data_by_session_driver)

In [None]:
if car_data:
    # The 'car_data' dataframe is a dataframe of dataframes
    # This collapses the dataframes
    car_data = pd.concat(car_data, ignore_index=True)
    save_dataframe_to_csv(df=car_data, path=car_data_path)

else:
    print("No new car data to append or save.")

# Location

In [None]:
location_csv_file_exists = os.path.exists(car_data_path)

if not location_csv_file_exists:
    # Create an empty dataframe with the expected columns
    location_from_csv = pd.DataFrame(columns=['session_key', 'driver_number'])
    retrieved_location_laps = pd.DataFrame(columns=['session_key', 'driver_number'])
else:
    # Load the data from the file
    location_from_csv = pd.read_csv(location_path)
    # Get a list of unique session_key and driver_number combinations that have car data
    retrieved_location_laps = location_from_csv[['session_key','driver_number']].drop_duplicates()

# Convert DataFrames to sets of tuples for efficient comparison
# This creates a unique identifier for each session-driver combination (session_key, driver_number)
set_unique_laps = set(tuple(row) for row in unique_laps[['session_key', 'driver_number']].values)
set_retrieved_location_laps = set(tuple(row) for row in retrieved_location_laps.values)

# Identify the unique session_key and driver_number combinations that have not been retrieved
unretrieved_combinations_set = set_unique_laps - set_retrieved_location_laps

# Convert the set of unretrieved combinations back into a DataFrame
unretrieved_location_laps = pd.DataFrame(list(unretrieved_combinations_set), columns=['session_key', 'driver_number'])

if not unretrieved_location_laps.empty and not fastest_laps.empty:
    # Now, merge with the unique_laps DataFrame to get the date_start and
    # date_end for these unretrieved combinations
    unretrieved_location_laps = pd.merge(
        unretrieved_location_laps,
        unique_laps[['session_key', 'driver_number', 'date_start', 'date_end']],
        on=['session_key', 'driver_number'],
        how='left'
    )
    # Remove records that do not have both a date_start and a date_end
    unretrieved_location_laps.dropna(subset=['date_start', 'date_end'], inplace=True)
else:
    unretrieved_location_laps = pd.DataFrame(columns=['session_key', 'driver_number', 'date_start', 'date_end'])

print(f"Number of laps already retrieved: {len(retrieved_location_laps)}")
print(f"Number of laps to be retrieved: {len(unretrieved_location_laps)}")

In [None]:
location = []

# Iterate through each of the missing laps
for index, row in unretrieved_location_laps.iterrows():
    # Get the session and driver number from the current row
    session_key = row['session_key']
    driver_number = row['driver_number']

    # date_start and date_end columns are already datetime objects from previous steps
    # We need to ensure they are UTC and then format them.
    date_start_utc = row['date_start'].tz_convert('UTC')
    date_end_utc = row['date_end'].tz_convert('UTC')

    formatted_date_start = date_start_utc.strftime('%Y-%m-%dT%H:%M:%SZ')
    formatted_date_end = date_end_utc.strftime('%Y-%m-%dT%H:%M:%SZ')

    data_filter = f"session_key={session_key}& \
                    driver_number={driver_number}& \
                    date>={formatted_date_start}& \
                    date<={formatted_date_end}"
    data_filter = data_filter.replace(' ', '')

    # Fetching 'location'
    location_by_session_driver = get_data('location', data_filter)
    if not location_by_session_driver.empty:
        location.append(location_by_session_driver)

In [None]:
if location:
    # The 'location' dataframe is a dataframe of dataframes
    # This collapses the dataframes
    location = pd.concat(location, ignore_index=True)
    save_dataframe_to_csv(df=location, path=location_path)

else:
    print("No new location data to append or save.")