In [157]:
from urllib.request import urlopen
import numpy as np
import pandas as pd
import csv
import torch, torch.nn as nn, torch.nn.functional as F, torch.optim as optim
import math
import os
from tqdm.auto import tqdm
from io import BytesIO
from datetime import datetime, timedelta

# Data Preparation

Create a CSV file of information for one race, each row contains info for each lap
- 0th lap will be the driver's best qualifying lap

In [426]:
res = urlopen('https://api.openf1.org/v1/sessions?csv=true')
data = pd.read_csv(BytesIO(res.read()))
data.to_csv("LapData/Sessions.csv", index=False)

In [427]:
Sessions = pd.read_csv("Sessions.csv")
races = Sessions.query("session_type == 'Race' and session_name != 'Sprint'")
qualis = Sessions.query("session_type == 'Qualifying' and session_name == 'Qualifying'")



For each Track:
- Need to find all data for qualifying and race (Every lap they put in)
- Create a dataframe of every driver and all the laps they did including whether they pit, retired, etc
- Each row needs to have Lap time

In [420]:
QualiAndRaceSessions= Sessions[Sessions['session_name'].isin(['Qualifying', 'Race'])]
print(QualiAndRaceSessions)    

     circuit_key circuit_short_name country_code  country_key   country_name  \
6             63             Sakhir          BRN           36        Bahrain   
7             63             Sakhir          BRN           36        Bahrain   
11           149             Jeddah          KSA          153   Saudi Arabia   
12           149             Jeddah          KSA          153   Saudi Arabia   
16            10          Melbourne          AUS            5      Australia   
..           ...                ...          ...          ...            ...   
171            2        Silverstone          GBR            2  Great Britain   
175            4        Hungaroring          HUN           14        Hungary   
176            4        Hungaroring          HUN           14        Hungary   
180            7  Spa-Francorchamps          BEL           16        Belgium   
181            7  Spa-Francorchamps          BEL           16        Belgium   

                      date_end         

In [421]:
def addRow(circuitkey, driver_ids, currentposition, pits, laps,status):
    row = {}
    row['CircuitID'] = circuitkey
    for j in range(len(driver_ids)):
        row[f'driver_ID_{j+1}'] = driver_ids[j]
        row[f'position{j+1}'] = currentposition[j]
        row[f'inPit{j+1}'] = pits[j]
        row[f'status{j+1}'] = status[j]
        row[f'laptime{j+1}'] = laps[j]

    return row

In [432]:
# Loop over each track per year
for i in tqdm(range(0,len(QualiAndRaceSessions),2)):
    location = QualiAndRaceSessions.iloc[i]['location']
    year = QualiAndRaceSessions.iloc[i]['year']
    circuitkey = QualiAndRaceSessions.iloc[i]['circuit_key']
    QualifyingKey = QualiAndRaceSessions.iloc[i]['session_key']
    RaceKey = QualiAndRaceSessions.iloc[i+1]['session_key']
    
    print(location,year, QualifyingKey, RaceKey)
    # Gather all data pertaining to qualifying from session_key for each driver
    res = urlopen("https://api.openf1.org/v1/laps?csv=true&session_key=" + str(QualifyingKey))
    QualiLaps = pd.read_csv(BytesIO(res.read()))

    res = urlopen("https://api.openf1.org/v1/pit?csv=true&session_key="+str(QualifyingKey))
    QualiPits = pd.read_csv(BytesIO(res.read()))

    res = urlopen("https://api.openf1.org/v1/race_control?csv=true&session_key="+str(QualifyingKey))
    QualiRaceControl = pd.read_csv(BytesIO(res.read()))

    res = urlopen("https://api.openf1.org/v1/stints?csv=true&session_key="+str(QualifyingKey))
    QualiStints = pd.read_csv(BytesIO(res.read()))
    # Create a DataFrame with each row having for each driver: Driver_ID, position, inpit, status, laptime
    # Each Row needs to have Driver_ID, Driver Name, Position(Maybe?), whether they pitted that lap, status, laptime, tire age, compound, If there was a flag,
        
    # Gather all data pertaining to race from session_key for each driver
    res = urlopen("https://api.openf1.org/v1/position?csv=true&session_key=" + str(RaceKey))
    RacePositions = pd.read_csv(BytesIO(res.read()))
    RaceStartPos = RacePositions[:20].sort_values(by='position', ascending=True)
    
    res = urlopen("https://api.openf1.org/v1/laps?csv=true&session_key=" + str(RaceKey))
    RaceLaps = pd.read_csv(BytesIO(res.read()))

    res = urlopen("https://api.openf1.org/v1/pit?csv=true&session_key="+str(RaceKey))
    RacePits = pd.read_csv(BytesIO(res.read()))

    res = urlopen("https://api.openf1.org/v1/race_control?csv=true&session_key="+str(RaceKey))
    RaceControl = pd.read_csv(BytesIO(res.read()))

    res = urlopen("https://api.openf1.org/v1/stints?csv=true&session_key="+str(RaceKey))
    RaceStints = pd.read_csv(BytesIO(res.read()))

    res = urlopen("https://api.openf1.org/v1/sessions?csv=true&session_key="+str(RaceKey))
    RaceMeeting = pd.read_csv(BytesIO(res.read()))

    RaceStartTime = RaceMeeting.iloc[0]['date_start']
    RaceStartDict = RaceStartPos.to_dict('records')

    QualiLaps_sorted = QualiLaps.sort_values(by='driver_number', ascending=False)
    R_NumLaps = RaceLaps.sort_values(by='lap_number',ascending = False).iloc[0]['lap_number']
    
    PositionChanges = RacePositions
    PositionChanges['date'] = pd.to_datetime(PositionChanges['date'], format='ISO8601')
    
    totaltimes = {}
    startinggrid = {}
    crossfinishtime = []
    currentposition = []
    for i in RaceStartDict:
        startinggrid[i['driver_number']]= i['position']
        totaltimes[i['driver_number']] = 0
        crossfinishtime.append(RaceStartTime)

    driver_ids = []
    for driver_number in RaceLaps.sort_values(by='driver_number',ascending=False)['driver_number'].unique():
        driver_ids.append(int(driver_number))

    laplist = []
    status = [1 for _ in range(len(driver_ids))]
    
    laps = []
    pits = []
    currentposition = []
    # 0th Lap (Qualifying Lap)
    for driver_number in driver_ids:
        currentposition.append(int(RaceStartPos.loc[RaceStartPos['driver_number'] == driver_number]['position'].iloc[0]))
        q_driver_laps = QualiLaps_sorted.loc[QualiLaps_sorted['driver_number'] == driver_number]
        if not q_driver_laps.empty:
            fastest_lap = q_driver_laps.sort_values(by='lap_duration', ascending=True).iloc[0]['lap_duration']
            laps.append(float(fastest_lap))
            pits.append(False)
        else:
            print("No laps found")
            pits.append(True)
            laps.append(0)
    
    laplist.append(addRow(circuitkey=circuitkey,
                          driver_ids=driver_ids,
                          currentposition=currentposition,
                          pits=pits,
                          laps=laps,
                          status = status))
    # 1st Lap (Including Formation lap and countdown)
    SecondLapStarts = RaceLaps[RaceLaps['lap_number']==2]
    FirstLapTimes = [-1 for _ in range(len(driver_ids))]
    for i in range(len(driver_ids)):
        LapStartTime = SecondLapStarts[SecondLapStarts['driver_number']==driver_ids[i]]
        if not LapStartTime.empty:
            LapStartTime = LapStartTime.iloc[0]['date_start']
            crossfinishtime[i] = (datetime.fromisoformat(LapStartTime))
            FirstLapTimes[i] = ((datetime.fromisoformat(LapStartTime)-datetime.fromisoformat(RaceStartTime)).total_seconds())
        else:
            status[i] = 0
    
    laps = []
    pits = []
    for driver_idx in range(len(driver_ids)):
        pits.append(False)
        laps.append(FirstLapTimes[driver_idx])
    
    laplist.append(addRow(circuitkey=circuitkey,
                          driver_ids=driver_ids,
                          currentposition=currentposition,
                          pits=pits,
                          laps=laps,
                          status=status))
    # Rest of race
    lap_num = 2
    retired = {}
    # Current position value being used from starting grid - only needs to be updated each lap to see if anyone has made a change
    while (lap_num < R_NumLaps+1):
        laps = []
        pits = []
        for driver_idx in range(len(driver_ids)):
            r_driver_laps =RaceLaps.loc[RaceLaps['driver_number'] == driver_ids[driver_idx]]
            if not r_driver_laps.empty:
                lapdata = r_driver_laps.loc[r_driver_laps['lap_number']==lap_num]
                if not lapdata.empty:
                    laptime = lapdata.iloc[0]['lap_duration']
                    pitted = lapdata.iloc[0]['is_pit_out_lap']
                    if not laptime:
                        print("No laps found")
                    if(math.isnan(laptime)):
                        laps.append(0)
                        pits.append(False)
                    else:
                        laps.append(laptime)
                        pits.append(pitted)
                        totaltimes[driver_ids[driver_idx]] += laptime
                        crossfinishtime[driver_idx] += timedelta(seconds = laptime)
                        # Check if their position has changed
                        filteredpositions = PositionChanges[(PositionChanges['date']<=(crossfinishtime[driver_idx]))&(PositionChanges['driver_number']==driver_ids[driver_idx])]
                        if not filteredpositions.empty:
                            closestrow = filteredpositions.iloc[-1]
                            currentposition[driver_idx] = closestrow['position']
                else:# Driver Retired
                    laps.append(-1)
                    pits.append(False)
                    status[driver_idx] = 0
    
            else: #Driver did not start
                laps.append(-1)
                pits.append(False)
                retired[driver_ids[driver_idx]] = len(retired)
                status[driver_idx] = 0
    
        laplist.append(addRow(circuitkey=circuitkey,
                              driver_ids=driver_ids,
                              currentposition=currentposition,
                              pits=pits,
                              laps=laps,
                              status=status))
    
        lap_num += 1
    
    df = pd.DataFrame(laplist)
    df.to_csv("/Users/theebankumaresan/Documents/Programming/Python/f1lapbylap/f1LapbyLap/LapData/" + f'{year}/{location}.csv')


  0%|          | 0/36 [00:00<?, ?it/s]

Sakhir 2023 7768 7953
Jeddah 2023 7775 7779
Melbourne 2023 7783 7787
Baku 2023 9064 9070
Miami 2023 9074 9078
Monaco 2023 9090 9094
Barcelona 2023 9098 9102
Montréal 2023 9106 9110
Spielberg 2023 9112 9118
Silverstone 2023 9122 9126
Budapest 2023 9129 9133
Spa-Francorchamps 2023 9135 9141
Zandvoort 2023 9145 9149
Monza 2023 9153 9157
Marina Bay 2023 9161 9165
Suzuka 2023 9169 9173
Lusail 2023 9215 9221
Austin 2023 9207 9213
Mexico City 2023 9177 9181
São Paulo 2023 9304 9205
Las Vegas 2023 9314 9189
Yas Island 2023 9193 9197
Sakhir 2024 9468 9472
Jeddah 2024 9476 9480
Melbourne 2024 9484 9488
Suzuka 2024 9492 9496
Shanghai 2024 9664 9673
Miami 2024 9498 9507
Imola 2024 9511 9515
Monaco 2024 9519 9523
Montréal 2024 9527 9531
Barcelona 2024 9535 9539
Spielberg 2024 9541 9550
Silverstone 2024 9554 9558
Budapest 2024 9562 9566
Spa-Francorchamps 2024 9570 9574


Need to get best Qualifying lap time for each driver (This will be their 0th lap time)

Then need to loop over every lap done in the race and add it to the dataframe in a similar way

In [401]:
# Using SilverStone race data since not all races have full data
QualifyingKey = 9161

res = urlopen("https://api.openf1.org/v1/laps?csv=true&session_key=" + str(QualifyingKey))
QualiLaps = pd.read_csv(BytesIO(res.read()))

res = urlopen("https://api.openf1.org/v1/pit?csv=true&session_key="+str(QualifyingKey))
QualiPits = pd.read_csv(BytesIO(res.read()))

res = urlopen("https://api.openf1.org/v1/race_control?csv=true&session_key="+str(QualifyingKey))
QualiRaceControl = pd.read_csv(BytesIO(res.read()))

res = urlopen("https://api.openf1.org/v1/stints?csv=true&session_key="+str(QualifyingKey))
QualiStints = pd.read_csv(BytesIO(res.read()))



RaceKey = 9165
res = urlopen("https://api.openf1.org/v1/position?csv=true&session_key=" + str(RaceKey))
RacePositions = pd.read_csv(BytesIO(res.read()))
# RaceStartPos = RacePositions[:20].sort_values(by='position', ascending=True)
RaceStartPos = RacePositions[:20]


res = urlopen("https://api.openf1.org/v1/laps?csv=true&session_key=" + str(RaceKey))
RaceLaps = pd.read_csv(BytesIO(res.read()))

res = urlopen("https://api.openf1.org/v1/pit?csv=true&session_key="+str(RaceKey))
RacePits = pd.read_csv(BytesIO(res.read()))

res = urlopen("https://api.openf1.org/v1/race_control?csv=true&session_key="+str(RaceKey))
RaceControl = pd.read_csv(BytesIO(res.read()))

res = urlopen("https://api.openf1.org/v1/stints?csv=true&session_key="+str(RaceKey))
RaceStints = pd.read_csv(BytesIO(res.read()))

res = urlopen("https://api.openf1.org/v1/sessions?csv=true&session_key="+str(RaceKey))
RaceMeeting = pd.read_csv(BytesIO(res.read()))

In [406]:
RaceStartTime = RaceMeeting.iloc[0]['date_start']
RaceStartDict = RaceStartPos.to_dict('records')
print(QualiRaceControl.loc[0:20])

   category                       date  driver_number           flag  \
0     Other  2023-09-16 12:45:18+00:00            NaN            NaN   
1      Flag  2023-09-16 13:00:00+00:00            NaN          GREEN   
2     Other  2023-09-16 13:09:45+00:00            NaN            NaN   
3     Other  2023-09-16 13:12:14+00:00            NaN            NaN   
4      Flag  2023-09-16 13:18:00+00:00            NaN      CHEQUERED   
5      Flag  2023-09-16 13:19:10+00:00            NaN  DOUBLE YELLOW   
6     Other  2023-09-16 13:19:12+00:00            NaN            NaN   
7      Flag  2023-09-16 13:19:16+00:00            NaN            RED   
8      Flag  2023-09-16 13:19:17+00:00            NaN          CLEAR   
9     Other  2023-09-16 13:21:34+00:00            NaN            NaN   
10    Other  2023-09-16 13:26:42+00:00            NaN            NaN   
11    Other  2023-09-16 13:27:24+00:00            NaN            NaN   
12     Flag  2023-09-16 13:32:56+00:00            NaN          C

In [407]:
QualiLaps_sorted = QualiLaps.sort_values(by='driver_number', ascending=False)
R_NumLaps = RaceLaps.sort_values(by='lap_number',ascending = False).iloc[0]['lap_number']
PositionChanges = RacePositions
PositionChanges['date'] = pd.to_datetime(PositionChanges['date'], format='ISO8601')

totaltimes = {}
startinggrid = {}
crossfinishtime = []
currentposition = []
for i in RaceStartDict:
    startinggrid[i['driver_number']]= i['position']
    totaltimes[i['driver_number']] = 0
    crossfinishtime.append(RaceStartTime)

Currently have correct start time of the event, just need to take into account formation lap and countdown 

In [413]:
driver_ids = []
for driver_number in RaceLaps.sort_values(by='driver_number',ascending=False)['driver_number'].unique():
    driver_ids.append(int(driver_number))

print(driver_ids)
driver_ids = []
for driver_number in QualiLaps_sorted['driver_number'].unique():
    driver_ids.append(int(driver_number))
    
print(driver_ids)

[81, 77, 63, 55, 44, 40, 31, 27, 24, 23, 22, 20, 16, 14, 11, 10, 4, 2, 1]
[81, 77, 63, 55, 44, 40, 31, 27, 24, 23, 22, 20, 18, 16, 14, 11, 10, 4, 2, 1]


## Status - 1 is running, 0 is retired

In [410]:
laplist = []
status = [1 for _ in range(len(driver_ids))]

laps = []
pits = []
currentposition = []
# 0th Lap (Qualifying Lap)
for driver_number in driver_ids:
    currentposition.append(int(RaceStartPos.loc[RaceStartPos['driver_number'] == driver_number]['position'].iloc[0]))
    q_driver_laps = QualiLaps_sorted.loc[QualiLaps_sorted['driver_number'] == driver_number]
    if not q_driver_laps.empty:
        fastest_lap = q_driver_laps.sort_values(by='lap_duration', ascending=True).iloc[0]['lap_duration']
        laps.append(float(fastest_lap))
        pits.append(False)
    else:
        print("No laps found")
        pits.append(True)
        laps.append(0)

laplist.append(addRow(circuitkey=circuitkey, 
                      driver_ids=driver_ids, 
                      currentposition=currentposition, 
                      pits=pits, 
                      laps=laps,
                      status = status))
# 1st Lap (Including Formation lap and countdown)
SecondLapStarts = RaceLaps[RaceLaps['lap_number']==2]
FirstLapTimes = [-1 for _ in range(len(driver_ids))]
for i in range(len(driver_ids)):
    LapStartTime = SecondLapStarts[SecondLapStarts['driver_number']==driver_ids[i]]
    if not LapStartTime.empty:
        LapStartTime = LapStartTime.iloc[0]['date_start']
        crossfinishtime[i] = (datetime.fromisoformat(LapStartTime))
        FirstLapTimes[i] = ((datetime.fromisoformat(LapStartTime)-datetime.fromisoformat(RaceStartTime)).total_seconds())
    else:
        status[i] = 0
        
laps = []
pits = []
for driver_idx in range(len(driver_ids)):
    pits.append(False)
    laps.append(FirstLapTimes[driver_idx])

laplist.append(addRow(circuitkey=circuitkey, 
                      driver_ids=driver_ids, 
                      currentposition=currentposition, 
                      pits=pits, 
                      laps=laps,
                      status=status))
# Rest of race
lap_num = 2
retired = {}
# Current position value being used from starting grid - only needs to be updated each lap to see if anyone has made a change
while (lap_num < R_NumLaps+1):
    laps = []
    pits = []
    for driver_idx in range(len(driver_ids)):
        r_driver_laps =RaceLaps.loc[RaceLaps['driver_number'] == driver_ids[driver_idx]]
        if not r_driver_laps.empty:
            lapdata = r_driver_laps.loc[r_driver_laps['lap_number']==lap_num]
            if not lapdata.empty:
                laptime = lapdata.iloc[0]['lap_duration']
                pitted = lapdata.iloc[0]['is_pit_out_lap']
                if not laptime:
                    print("No laps found")
                if(math.isnan(laptime)):
                    laps.append(0)
                    pits.append(False)
                else:
                    laps.append(laptime)
                    pits.append(pitted)
                    totaltimes[driver_ids[driver_idx]] += laptime
                    crossfinishtime[driver_idx] += timedelta(seconds = laptime)
                    # Check if their position has changed
                    filteredpositions = PositionChanges[(PositionChanges['date']<=(crossfinishtime[driver_idx]))&(PositionChanges['driver_number']==driver_ids[driver_idx])]
                    if not filteredpositions.empty:
                        closestrow = filteredpositions.iloc[-1]
                        currentposition[driver_idx] = closestrow['position']
            else:# Driver Retired
                laps.append(-1)
                pits.append(False)
                status[driver_idx] = 0
                
        else: #Driver did not start
            laps.append(-1)
            pits.append(False)
            retired[driver_ids[driver_idx]] = len(retired)
            status[driver_idx] = 0

    laplist.append(addRow(circuitkey=circuitkey, 
                          driver_ids=driver_ids, 
                          currentposition=currentposition, 
                          pits=pits, 
                          laps=laps,
                          status=status))
    
    lap_num += 1
    
df = pd.DataFrame(laplist)

IndexError: single positional indexer is out-of-bounds

Make a Dataset:

- Helper functions for getting information from each id

DataSet Class:

- <strike> Initializes with a directory containing race data files organized by year </strike>
- <strike> 3 methods for setting current year, round and next round to automatically load appropriate file </strike>
- method for processing race data for a given index, transforming into tensors 
    - Handles NaN values by replacing them with 0 and scales features like driver IDs and lap times
    - prepares sequences of lap times up to current lap for model input
    - extracts and scales relevant features from dataset to create input-output pairs for model training
- <strike> method for returning length of dataset, accounts for fact that last lap is a label </strike>


In [11]:
class RaceDataSet(torch.utils.data.Dataset):
    
    def __init__(self, dir):
        self.dir = dir
        self.year = 2023
        self.round = 1
        
        self.currentyear = os.listdir(self.dir+f'{self.year}/')
        if(self.round < len(self.currentyear)):
            self.currentrace = pd.read_csv(self.dir+f'{self.year}/{self.currentyear[self.round-1]}.csv')
        else:
            self.currentrace = pd.read_csv(self.dir+f'{self.year}/{self.currentyear[-1]}.csv')
            
        
    def set_year(self, year):
        self.year = year
        self.currentyear = os.listdir(self.dir+f'{self.year}/')
    
    def set_round(self, round):
        self.round = round
        if(self.year==2025):
            return
        if(self.round < len(self.currentyear)):
            self.currentrace = pd.read_csv(self.dir+f'{self.year}/{self.currentyear[self.round-1]}.csv')
        else:
            self.currentrace = pd.read_csv(self.dir+f'{self.year}/{self.currentyear[-1]}.csv')
            
        
    def nextround(self):
        self.round += 1
        if(self.round <len(self.currentyear)):
            self.set_round(self.round)
        else:
            self.round = 1
            self.set_year(self.year+1)
            self.set_round(self.round)

    def __len__(self):
        return len(self.currentrace)-1


    # method for processing race data for a given index, transforming into tensors
    # - Handles NaN values by replacing them with 0 and scales features like driver IDs and lap times
    # - prepares sequences of lap times up to current lap for model input
    # - extracts and scales relevant features from dataset to create input-output pairs for model training

Helper functions for displaying input/output tensors in english
- converts tensor into a chart which shows: driver name, position, inpit, laptime and status

Creating the LSTM Model

In [12]:
class RacePredictor(nn.Module):
    def __init__(self, input_size, output_size, lstm_hids, lstm_layers, dropout):
        super(RacePredictor, self).__init__()
        
        self.input_size = input_size
        self.lstm_hids = lstm_hids
        self.lstm_layers = lstm_layers
        
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=lstm_hids, num_layers=lstm_layers, dropout=dropout, batch_first=True)
        
        self.fc = nn.Linear(in_features=lstm_hids, out_features=output_size)
        
        nn.init.xavier_uniform_(self.fc.weight.data)
        
        for name,params in self.named_parameters():
            if name[:6] == 'weight':
                nn.init.xavier_uniform_(params)
            elif name[:4] == 'bias':
                nn.init.constant_(params, 0.0)
                
    def forward(self, ins, prev_states = None):
        lstm_outs, next_states = self.lstm(ins, prev_states)
        outs = self.fc(lstm_outs)
        return outs, next_states
    
    def zero_states(self):
        hidden_state = torch.zeros(self.lstm_layers,1,self.lstm_hids)
        cell_state = torch.zeros(self.lstm_layers,1,self.lstm_hids)
        return(hidden_state, cell_state)

Helper functions for training the LSTM model
- training method, testing method and method to do both