In [None]:
# Import necessary libraries for the data creation step of the project
import fastf1 as f1
import pandas as pd
import seaborn as sns
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F        

# 1. Data creation and engineering

## 1-1. Data Creation

The primary data source is from the FastF1 library (https://github.com/theOehrly/Fast-F1). We will be using the telemetry data from the 2023 season to predict change of positions for all drivers. The justification for the use of data from 2023 has been specified in the data-analysis.ipynb file. 

In [None]:
# Select all race events in 2023
Events = f1.get_event_schedule(2023)
Events_Race = Events[Events['Session5'] == 'Race']
total_tele = pd.DataFrame()
Event = Events_Race.loc[1, :]

# Load session object
session = f1.core.Session(Event, session_name = 'Race', f1_api_support = True)
session.load(laps = True, telemetry = True, weather = True, messages = True)

# Load laps and results data
sesh_l = session.laps
sesh_r = session.results

# Attain all the drivers from the lap
drivers = list(sesh_l['Driver'].unique())

for drv in drivers:
    total_drv = pd.DataFrame()

    # Total number of laps the driver had 
    total_laps = int(sesh_l.pick_driver(drv).LapNumber.iloc[-1])
    
    distance = 0

    for j in range(total_laps):

        temp_tele = sesh_l.pick_driver(drv).iloc[j].get_telemetry().add_distance()
        temp_tele['Brake'] = temp_tele['Brake'].astype(int)

        # Adding data from session.Laps
        # Laps: which lap the driver is in
        # Compound: Which compound it's in
        # TyreLife: How long the Tyre has been used 
        # TrackStatus: What the track status is 
        temp_tele['Lap'] = j+1
        temp_tele['Compound'] = sesh_l.pick_driver(drv).iloc[j]['Compound']
        temp_tele['TyreLife'] = sesh_l.pick_driver(drv).iloc[j]['TyreLife']
        temp_tele['TrackStatus'] = sesh_l.pick_driver(drv).iloc[j]['TrackStatus']

        # Combining the dataset 
        total_drv = pd.concat([total_drv, temp_tele.reset_index(drop=True)], axis = 0)

    # Drop columns we don't need 
    total_drv.drop(columns = ['Time', 'Source', 'DriverAhead', 'DistanceToDriverAhead'], inplace = True)

    # Add a status column for each telemetry input
    outcome = sesh_r[sesh_r['Abbreviation'] == drv]['Status'].values[0]

    if outcome == 'Finished':
        total_drv['Status'] = 'Finished'
    
    elif '+' in outcome:
        total_drv['Status'] = 'Finished'
        total_drv['Status'].iloc[-1] = 'Lapped'
    
    else:
        total_drv['Status'] = 'Finished'
        total_drv['Status'].iloc[-1] = 'DNF'

    for i in total_drv.columns:
        new_col = drv + '_' + i
        total_drv.rename(columns = {i: new_col}, inplace = True)

    # Concatenate all the data from a single race together
    total_tele = pd.concat([total_tele, total_drv.reset_index(drop = True)], axis = 1)


The data is consisted of the following columns for each driver 

| Feature | Type | Description |
| --- | --- | --- |
| Date | TimeDelta | The timestamp of when the data was collected |
| SessionDate | TimeDelta | The relative timestamp of the session |
| RPM | int | The RPM of the vehicle |
| Speed | int | The speed of the vehicle |
| nGear | int | The gear status of the vehicle |
| Throttle | int | The % of throttle pressure |
| Brake | Bool | The brake status |
| DRS | Bool | The DRS status |
| RelativeDistance | int | Distance driven since first sample |
| X | int | X position (1/10 m) |
| Y | int | Y position (1/10 m) |
| Z | int | Z position (1/10 m) |
| Status | Cat (str) | Current status of the driver (DNF, Finished etc) |
| TrackStatus | Cat (str) | Flag (Yellow flag, Safety Car, Red Flag, Virtual Safety Car) | 
| Compound | Cat (str)|The Tyre Compound (Soft, Medium, Hard, Intermediate, Wet) |
| PitIn | Bool | Driver pit in status |
| PitOut | Bool | Driver pit out status |
| Distance | int | The total distance driven for the lap |
| Corner | int| The distance to the nearest turn |
| Angle | Cat (str) | The severity of the turn divided into 4 classes (Low (0-45), Med-Low (45-90), Med-High (90-120), High (120-180)) |

Here the categorical and boolean values will seperately be encoded as Dummy Variables. 

In [None]:
# Find the Date and Session Time column with the most input amongst drivers and use that
# SessionTime is kept to merge weather data 
def mx_len(df, col_name):
    mx_len = 0

    for col in df.columns:
        if col_name in col:
            if total_tele[col].count() > mx_len:

                if mx_len != 0:
                    df.drop(columns = col_name, inplace = True)
                mx_len = max(mx_len, df[col].count())
                df.rename(columns = {col: col_name}, inplace = True)
            else:
                df.drop(columns = col, inplace = True)
    df = df[[col_name] + [col for col in df.columns if col != col_name]]
    return df

total_tele = mx_len(total_tele, 'Date')
total_tele = mx_len(total_tele, 'SessionTime')

We will also add the weather data of the specified timestamp from the FastF1 library. The weather_data is a telemetry data with specified descriptions as show below, all of which will be included into our data.

| Feature | Type | Description |
| --- | --- | --- |
| AirTemp | Int | Temperature |
| Humidity | Int | Humidity |
| Pressure | Int | Air pressure|
| RainFall | bool | Show if there is rainfall |
| TrackTemp | Int | Temperature of the track |
| WindDirection | Int | Direction of the wind |
| WindSpeed | Int | Speed of the wind | 

In [None]:
# Add weatherdata 
weather_data = session.weather_data
weather_data['Time'] = pd.to_timedelta(weather_data['Time'])
weather_data['Rainfall'] = weather_data['Rainfall'].astype(int)

# Add the weather data df to the total_tele df and drop SessionTime as it is no longer needed
total_tele = pd.merge_asof(total_tele, weather_data, left_on = 'SessionTime', right_on = 'Time', direction = 'nearest')
total_tele.drop(columns=['Time'], inplace=True)


## 1-2. Feature Engineering

Here, our existing data will undergo feature engineering for the model (Transformer) of our choice. 

### 1-2-1. Distance 

Re-scale the values of the 'Distance' columns such that they are all within the range (0, 1)

In [None]:
# Range 
for i in drivers:
    total_tele[i + '_Distance'] /= 5412

### 1-2-2. TrackStatus

In [None]:
# Feature Engineer TrackStatus to attain just the crucial part of the track
# Define function to return the largest integer value within the track status input
def return_max(col):
    if isinstance(col, float): return col
    else: 
        col = max(list(col))
    return col

for i in total_tele.columns:
    if 'TrackStatus' in i:
        total_tele[i] = total_tele[i].apply(return_max)

### 1-2-3. Imputation

In [None]:
# Imputation for continuous features
cts = ['RPM', 'Speed', 'nGear', 'Brake', 'Throttle', 'DRS', 'X', 'Y', 'Z', 'TyreLife', 'Distance', '_Corner']
for cols in cts:
    for drv in drivers:
        total_tele[drv + '_' + cols].fillna(0, inplace = True)

# Imputation for the compound column
for drv in drivers:
    total_tele[drv + '_' + 'Compound'].fillna('Done', inplace = True)

# Imputation for the status column
for drv in drivers:
    i = drv + '_' + 'Status'
    final_rec = list(total_tele[total_tele[i].isna() == False][i])[-1]
    total_tele[i].fillna(final_rec, inplace = True)

# Imputing the angle column
for drv in drivers:
    total_tele[drv + '_Angle'].fillna('Done', inplace = True)

# Imputing the Lap and TrackStatus columns
def impute(col, val):
    for drv in drivers:
        if sesh_r[sesh_r['Abbreviation'] == drv]['Status'].values[0] == 'Finished':
            i = drv + '_' + col
            total_tele[i].fillna(val, inplace = True)
            # total_tele[i].fillna(-1, inplace = True)
        else:
            i = drv + '_' + col
            final_rec = list(total_tele[total_tele[i].isna() == False][i])[-1]
            total_tele[i].fillna(final_rec, inplace = True)

# imputing Acc_Distance
def impute_acc_dist():
    for drv in drivers:
        i = drv + '_' + 'Acc_Distance'
        final_rec = list(total_tele[total_tele[i].isna() == False][i])[-1]
        total_tele[i].fillna(final_rec, inplace = True)
            
impute('Lap', 57)
impute('TrackStatus', '9')
impute_acc_dist()

### 1-2-4. Dummy Variables

In [None]:
# Set the categorical columns as dummy variables
cols = []
for drv in drivers:
    col_1 = drv +'_Status'
    col_2 = drv + '_TrackStatus'
    col_3 = drv + '_Compound'
    col_4 = drv + '_nGear'
    col_5 = drv + '_Angle'
    cols.append(col_1)
    cols.append(col_2)
    cols.append(col_3)
    cols.append(col_4)
    cols.append(col_5)
    total_tele[col_1] = pd.Categorical(total_tele[col_1], categories = ['Finished', 'DNF', 'Lapped'])
    total_tele[col_2] = pd.Categorical(total_tele[col_2], categories = ['1', '2', '4', '5', '6', '7', '8', '0'])
    total_tele[col_3] = pd.Categorical(total_tele[col_3], categories = ['SOFT', 'MEDIUM', 'HARD', 'INTERMEDIATE', 'WET', 'DONE'])
    ## the '9' represents the chequered flag for the driver
    total_tele[col_4] = pd.Categorical(total_tele[col_4], categories = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'])
    total_tele[col_5] = pd.Categorical(total_tele[col_5], categories = ['Low', 'Med-Low', 'Med-High', 'High', 'Done'])
temp = pd.get_dummies(total_tele[cols], dtype = int)

total_tele.drop(columns = cols, inplace = True)
total_tele = pd.concat([total_tele, temp], axis = 1)

### 1-2-5. Pit_in & Pit_out

In [None]:
for drv in drivers:
    sesh = sesh_l.pick_drivers(drv)
    sesh['PitInTime'] = pd.to_timedelta(sesh['PitInTime'])
    sesh['PitOutTime'] = pd.to_timedelta(sesh['PitOutTime'])
    In_df = sesh[sesh['PitInTime'].isna() == False]['PitInTime'].to_frame()
    Out_df = sesh[sesh['PitOutTime'].isna() == False]['PitOutTime'].to_frame()

    temp = pd.to_timedelta(total_tele['SessionTime'])

    in_temp = pd.merge_asof(In_df, temp, left_on = 'PitInTime', right_on = 'SessionTime', direction = 'nearest').SessionTime
    out_temp = pd.merge_asof(Out_df, temp, left_on = 'PitOutTime', right_on = 'SessionTime', direction = 'nearest').SessionTime

    total_tele[drv + '_Pit'] = 0

    if len(in_temp) == len(out_temp):
        for pit_in, pit_out in zip(in_temp, out_temp):
            total_tele[drv + '_Pit'].loc[(pit_in <= total_tele['SessionTime'])&(total_tele['SessionTime'] <= pit_out)] = 1
    else:
        for i in range(len(out_temp)):
            total_tele[drv + '_Pit'].loc[(in_temp.loc[i]  <= total_tele['SessionTime'])&(total_tele['SessionTime'] <= out_temp.loc[i])] = 1
        total_tele[drv + '_Pit'].loc[(in_temp.loc[len(in_temp)-1] == total_tele['SessionTime'])] = 1

### 1-2-6. Scaling the Integer Columns

In [None]:
# Attain the max value for the columns: RPM, Speed, Throttle, Distance
def id_max(col_name):
    max_val = int()
    for i in drivers:
        max_val = max(max_val, max(total_tele[i + '_'+ col_name]))
    return max_val 

info = dict()
for i in ['RPM', 'Speed', 'Throttle']:
    info[i] = id_max(i)

In [None]:
for drv in drivers:
    for cols in info.keys():
        total_tele[drv + '_' + cols] /= info[cols]

## 1-3. Wrap Up

In [None]:
# Drop SessionTime and Date column as it is no longer needed
total_tele.drop(columns = ['SessionTime', 'Date'], inplace = True)

In [None]:
for i in drivers:
    total_tele.drop(columns = [i + '_RelativeDistance'], inplace = True)

In [None]:
total_tele[(total_tele['VER_Distance'] != 0)]['VER_Distance']

# 2. Sequence and Target Generation (Data Wrangling)

This section of the code will generate sequences and targets using the data that we've just created. 

In [None]:
# Create sequences and labels for the data 
seq = []
lab = []
target = []
remainder = []
start_seq = 0
end_seq = 0
drivers_distance = [i+'_Distance' for i in drivers]
distances = total_tele[drivers_distance]
for i in range(50, len(total_tele), 50):
    end_seq = i

    # Append 50 items to the sequence list and convert them to numpy
    seq.append(total_tele.loc[start_seq:end_seq-1].to_numpy())

    # Append the last item of every subsequent 50 items to the target list and convert them to numpy
    target.append(distances.loc[start_seq:end_seq-1].to_numpy())
    start_seq = i
    #lab.append(total_tele.loc[end_seq:end_seq+10].to_numpy())
seq = np.array(seq)[:-1]
target = np.array(target)[1:]
#lab = np.array(lab)
remainder.append(total_tele.loc[len(total_tele) - len(total_tele)%50+1: len(total_tele)].to_numpy())
seq = torch.stack([torch.tensor(s.astype(float), dtype=torch.float) for s in seq])
target = torch.stack([torch.tensor(s.astype(float), dtype=torch.float) for s in target])
# target = target/57

In [None]:
# Create training and test set 
# Shuffling the sequence and target dataset
np.random.seed(12)
shuffled_indices = np.random.permutation(len(seq))
seq = seq[shuffled_indices]
target = target[shuffled_indices]

# Define testing dataset and training dataset 
train_size = int(len(seq)*0.8)
val_size = int(len(seq)*0.9)

train_seq = seq[:train_size]
train_target = target[:train_size]

val_seq = seq[train_size:val_size]
val_target = target[train_size:val_size]

test_seq = seq[val_size:]
test_target = target[val_size:]


In [None]:
from torch.utils.data import DataLoader

bs = 16

# Load and preprocess data for deep learning models
input_dataloader = DataLoader(train_seq, batch_size=bs)
target_dataloader = DataLoader(train_target, batch_size=bs)

test_input_dataloader = DataLoader(val_seq, batch_size=bs)
test_target_dataloader = DataLoader(val_target, batch_size=bs)

In [None]:
realtest_input_dataloader = DataLoader(test_seq, batch_size=bs)
realtest_target_dataloader = DataLoader(test_target, batch_size=bs)

# 3. Setting up the model 

In [None]:
class FFN(torch.nn.Module):
    def __init__(self, hidden_dim):
        super(FFN, self).__init__()
        self.gelu = nn.GELU()
        self.ffn = nn.Linear(hidden_dim, hidden_dim)
        self.norm = nn.LayerNorm(hidden_dim)

    def forward(self, x):
        identity = x
        x = self.ffn(x)
        x = self.gelu(x) + identity
        x = self.norm(x)
        return x

class SelfAttnLayer(torch.nn.Module):
    def __init__(self, hidden_dim, num_heads):
        super(SelfAttnLayer, self).__init__()
        self.gelu = nn.GELU()
        self.mha = nn.MultiheadAttention(hidden_dim, num_heads, batch_first=True)
        self.norm = nn.LayerNorm(hidden_dim)

    def forward(self, x):
        identity = x
        x, _ = self.mha(x, x, x)
        x = self.gelu(x) + identity
        x = self.norm(x)
        return x


class Model(torch.nn.Module):

    def __init__(self, hidden_dim, num_heads, n_layers, rate):
        super(Model, self).__init__()
        self.linear_in = nn.Linear(927, hidden_dim)
        self.gelu = nn.GELU()

        module_lst = []
        for _ in range(n_layers):
            module_lst.append(SelfAttnLayer(hidden_dim, num_heads))
            module_lst.append(FFN(hidden_dim))
            module_lst.append(nn.Dropout(p = rate))

        self.module_lst = nn.ModuleList(module_lst)

        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.out_head = nn.Linear(hidden_dim, 40)

    def forward(self, x):
        x = self.linear_in(x)
        x = self.gelu(x)
        for module in self.module_lst:
            x = module(x)
        x = self.out_head(x)
        return x


# 4. Training the model

# 4-1. Hyperparameter Tuning

For Hyperparameter tuning we will be using the optuna library for the following parameters:

| Hyperparameter | Range |
| --- | --- |
| hidden_dim | [256, 512, 768, 1024] |
| num_heads | [2, 4, 8] |
| n_layers | [1, 2, 3, 4] |
| rate | [0.1, 0.75, 0.5] |


In [None]:
import optuna

In [None]:
def objective(trial):
  n_layers = trial.suggest_int("n_layers", 1, 4)
  hidden_dim = trial.suggest_int("hidden_dim", 256, 1024, step = 256)
  num_heads = trial.suggest_categorical("num_heads",[2, 4, 8])
  rate = trial.suggest_categorical('rate', [0.1, 0.75, 0.5])
  #optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
  #lr = trial.suggest_float("lr", 0.0001, 0.01)
  #opimizer = getattr(optim, optimizer_name)(model.parameters(), lr = lr)
  device = torch.device('cuda')
  model = Model(hidden_dim, num_heads, n_layers, rate).to(device)
  model = model.float()
  loss = nn.L1Loss()
  optimizer = torch.optim.AdamW(model.parameters(), lr = 0.0003) # 0.01, 0.005

  for epoch in range(300):


    model.train()
    loss_val = 0

    for s, t in zip(input_dataloader, target_dataloader):
      s = s.to(device)
      t = t.to(device)
      optimizer.zero_grad()
      output = model(s)
      mae_loss = loss(output, t)
      mae_loss.backward()
      optimizer.step()

    model.eval()
    test_loss_val = 0
    for s, t in zip(test_input_dataloader, test_target_dataloader):
        s = s.to(device)
        t = t.to(device)
        # opt.zero_grad()
        logits = model(s)

        # outputs.append(logits)
        mse_loss = loss(logits, t)
        # mse_loss.backward()
        params = []
        #for name, param in model.named_parameters():
        #    if param.requires_grad:
        #        params.append([name, param.data])
        # opt.step()

        test_loss_val += (mse_loss.item())

    test_avg_loss = test_loss_val / len(test_input_dataloader)

    trial.report(test_avg_loss, epoch)
    
    if trial.should_prune():
      raise optuna.exceptions.TrialPruned()

  return test_avg_loss

study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=10)
print('Best trial:')
trial = study.best_trial

print("value: {:.4f}".format(trial.value))

print("Params: ")
for key, value in trial.params.items():
  print("    {}: {}".format(key, value))


# 4-2. Model Fitting

In [None]:
device = torch.device('cuda')
model = Model(1024, 4).to(device)
loss = nn.MSELoss()
opt = torch.optim.AdamW(model.parameters(), lr = 0.0003) # 0.01, 0.005
opt_2 = torch.optim.SGD(model.parameters(), lr = 0.0003)
# opt = torch.optim.RMSprop(model.parameters(), lr = 0.01)

losses = []
test_losses = []

outputs = []
tot = []
min_val_loss = 1000000000000
for epoch in range(500):
    model.train()
    loss_val = 0
    for s, t in zip(input_dataloader, target_dataloader):
        s = s.to(device)
        t = t.to(device)
        opt.zero_grad()
        logits = model(s)

        # outputs.append(logits)
        mse_loss = loss(logits, t)
        mse_loss.backward()
        params = []
        #for name, param in model.named_parameters():
        #    if param.requires_grad:
        #        params.append([name, param.data])
        opt.step()

        loss_val += (mse_loss.item())
    # tot.append(params)
    avg_loss = loss_val / len(input_dataloader)
    losses.append(avg_loss)

    model.eval()
    test_loss_val = 0
    for s, t in zip(test_input_dataloader, test_target_dataloader):
        s = s.to(device)
        t = t.to(device)
        # opt.zero_grad()
        logits = model(s)

        # outputs.append(logits)
        mse_loss = loss(logits, t)
        # mse_loss.backward()
        params = []
        #for name, param in model.named_parameters():
        #    if param.requires_grad:
        #        params.append([name, param.data])
        # opt.step()

        test_loss_val += (mse_loss.item())
    # tot.append(params)
    test_avg_loss = test_loss_val / len(test_input_dataloader)
    if test_avg_loss < min_val_loss:
        min_val_loss = test_avg_loss
        best_model = model.state_dict()

    test_losses.append(test_avg_loss)    
    print(f"Epoch {epoch}, {avg_loss= }, {test_avg_loss= }")


# 4-3. Model Evaluation

In [None]:
model.load_state_dict(best_model)
model.eval()
test_loss_val = 0
for s, t in zip(realtest_input_dataloader, realtest_target_dataloader):
    s = s.to(device)
    t = t.to(device)
    logits = model(s)
    mse_loss = loss(logits, t)
    params = []
    test_loss_val += (mse_loss.item())
test_avg_loss = test_loss_val / len(realtest_input_dataloader)

# 5. Result Analysis