<a href="https://colab.research.google.com/github/mistryvivek/YRKCS-PRBX/blob/main/Model_2_Classifer_NN_pynb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import numpy as np
import random
import pandas as pd
from torch.nn.utils.rnn import pad_sequence
from sklearn.preprocessing import OneHotEncoder

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
BASE_PATH = r"/content/drive/MyDrive/prbx_data/v1/"
max_race_size = 0

Using device: cpu


## Change Model One to a multi-class classifier

* No pit or a tire type.
* 0: NO PIT, 1: SOFT, 2: MEDIUM, 3: HARD, 4: INTERS, 5: WETS

In [15]:
# TRAINING
training_inputs = []
training_outputs = []

# Encoding tires type
tires = {"SOFT": 1, "MEDIUM": 2, "HARD": 3, "INTERMEDIATE": 4, "WET": 5}


RaceCalender22 = pd.read_csv(BASE_PATH + r"2022/eventCalender2022.csv")
for _, row in RaceCalender22.iterrows():
    if row['EventFormat'] != 'testing':
        TempRaceLoad = pd.read_csv(
            BASE_PATH + f"2022/{row['RoundNumber']}_{row['OfficialEventName']}/{row['RoundNumber']}_{row['OfficialEventName']}_Race.csv".replace(" ", "_")
        )
        for driver in TempRaceLoad['Driver'].unique():
            tires_used = []
            TempRaceLoadDriver = TempRaceLoad[TempRaceLoad['Driver'] == driver].sort_values(by='LapNumber', ascending=True)
            tyre_life_array = TempRaceLoadDriver['TyreLife'].values
            stint_array = TempRaceLoadDriver['Stint'].values
            compound_array = TempRaceLoadDriver['Compound'].values

            if max(TempRaceLoadDriver['LapNumber']) > max_race_size:
                max_race_size = max(TempRaceLoadDriver['LapNumber'])

            stint_change_array = [0 if stint_array[i] == stint_array[i + 1] else tires[compound_array[i+1]]
                                  for i in range(len(stint_array) - 1)] + [0]
            stint_change_array = np.array(stint_change_array)

            mandatory_change_made = [1 if stint_array[i] == 1 else 0
                                     for i in range(len(stint_array))]
            mandatory_change_made = np.array(mandatory_change_made)

            combined_array = np.stack([stint_change_array, mandatory_change_made], axis=1)

            training_inputs.append(combined_array)
            training_outputs.append(stint_change_array)

In [14]:
# TESTING
testing_inputs = []
testing_outputs = []

# Encoding tires type
tires = {"SOFT": 1, "MEDIUM": 2, "HARD": 3, "INTERMEDIATE": 4, "WET": 5}

RaceCalender23 = pd.read_csv(BASE_PATH + r"2023/eventCalender2023.csv")
for _, row in RaceCalender23.iterrows():
    if row['EventFormat'] != 'testing':
        TempRaceLoad = pd.read_csv(
            BASE_PATH + f"2023/{row['RoundNumber']}_{row['OfficialEventName']}/{row['RoundNumber']}_{row['OfficialEventName']}_Race.csv".replace(" ", "_")
        )
        for driver in TempRaceLoad['Driver'].unique():
            TempRaceLoadDriver = TempRaceLoad[TempRaceLoad['Driver'] == driver].sort_values(by='LapNumber', ascending=True)
            tyre_life_array = TempRaceLoadDriver['TyreLife'].values
            stint_array = TempRaceLoadDriver['Stint'].values
            compound_array = TempRaceLoadDriver['Compound'].values

            if max(TempRaceLoadDriver['LapNumber']) > max_race_size:
                max_race_size = max(TempRaceLoadDriver['LapNumber'])

            stint_change_array = [0 if stint_array[i] == stint_array[i + 1] else tires[compound_array[i+1]]
                                  for i in range(len(stint_array) - 1)] + [0]
            stint_change_array = np.array(stint_change_array)
            mandatory_change_made = [1 if stint_array[i] == 1 else 0
                                     for i in range(len(stint_array))]
            mandatory_change_made = np.array(mandatory_change_made)

            combined_array = np.stack([stint_change_array, mandatory_change_made], axis=1)

            testing_inputs.append(combined_array)
            testing_outputs.append(stint_change_array)

In [9]:
class RNNv1b(nn.Module):
    def __init__(self,input_size,hidden_size): #vocab_size
        super(RNNv1b, self).__init__()
        #self.embedding =  nn.Embedding(vocab_size, hidden_size)
        self.linear_x = nn.Linear(input_size, hidden_size)
        self.linear_h = nn.Linear(hidden_size,hidden_size)
        self.linear_y = nn.Linear(hidden_size,1)
        self.tanh = nn.Tanh()

    def forward(self,x,hprev):
        """
        h = self.tanh(self.embedding(x) + self.linear_h(hprev))
        y = self.linear_y(h)
        """
        h = self.tanh(self.linear_x(x) + self.linear_h(hprev))
        y = self.linear_y(h)
        return h,y

In [None]:
def calculate_loss(model,inputs,targets,hprev,vocab_size):
  loss_func = nn.CrossEntropyLoss()
  seq_length = len(inputs)
  outputs = []
  for t in range(seq_length):
    # For each character in the input sequence, pass through RNN with previous hidden state
    hprev,y = model(torch.tensor(inputs[t], device=device),hprev)
    # Gradually build up matrix of output logits of size seq_length * vocab_size - we want it at every time step do not want this.
    # Compute cross entropy loss for seq_length actual targets against estimated distributions
    # RuntimeError: result type Float can't be cast to the desired output type Long
    outputs.append(y)

  outputs = torch.stack(outputs).squeeze(1)
  loss = loss_func(outputs ,targets)

  # For truncated backprop, the next subsequence will use the final hidden state
  # but will not backprop through it so we need to detach
  hprev = hprev.detach()

  return loss, hprev