In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
races_data = pd.read_csv('../../../data/processed/fully_integrated_data.csv')
lap_time_data = pd.read_csv("../../../data/processed/lap_times.csv")

# Merge datasets
merged_data = pd.merge(
    races_data,  # Changed from race_data to races_data
    lap_time_data,
    on=["raceId", "driverId"],
    how="inner"
)

if "Unnamed: 0" in merged_data.columns:
    merged_data = merged_data.drop(columns=["Unnamed: 0"])

merged_data.columns


Index(['raceId', 'season', 'raceNumber', 'circuitId', 'prixName', 'raceDate',
       'driverId', 'constructorId', 'driverStartGridPos', 'driverFinalGridPos',
       'driverFinalRank', 'driverRacePoints', 'driverLapCount',
       'driverFatestLapNumber', 'driverFastestLapTime',
       'driverFastestLapSpeed', 'constructorName', 'constructorNationality',
       'constructorChampionshipStandingPoints',
       'constructorChampionshipStandingPosition',
       'constructorChampionshipStandingWins', 'constructorRacePoints',
       'driverDateOfBirth', 'driverNationality',
       'driverChampionshipStandingPoints',
       'driverChampionshipStandingPosition', 'driverChampionshipStandingWins',
       'circuitName', 'circuitLocation', 'circuitCountry', 'lat', 'lng', 'alt',
       'driverRaceResultStatus', 'driverName', 'driverAge', 'race_time',
       'driverRaceLapNumber', 'driverRaceFinalPosition', 'driverLapTime',
       'driverLapTimeInMilliseconds'],
      dtype='object')

In [3]:

merged_data["TireDegradation"] = (
    merged_data.groupby(["raceId", "driverId"])["driverLapTimeInMilliseconds"]
    .diff() / merged_data.groupby("raceId")["driverLapTimeInMilliseconds"].transform("mean")
)

# Fill missing values with median
merged_data["TireDegradation"] = (
    merged_data.groupby(["raceId", "driverId"])["driverLapTimeInMilliseconds"]
    .diff()
    .fillna(method='bfill')  
) / merged_data.groupby("raceId")["driverLapTimeInMilliseconds"].transform("mean")
merged_data["TireDegradation"] = merged_data.groupby(
    ["circuitId"]
)["TireDegradation"].transform(lambda x: x.fillna(x.median()))

#normalize TireDegradation
merged_data["TireDegradation"] = (
    merged_data["TireDegradation"] - merged_data["TireDegradation"].mean()
) / merged_data["TireDegradation"].std()


  .fillna(method='bfill')


In [4]:
def create_state_space(df):
    
    df['total_laps'] = df.groupby('raceId')['driverLapCount'].transform('max')
    
    # Create normalized lap percentage and bin using FIXED thresholds
    df['lap_pct'] = df['driverRaceLapNumber'] / df['total_laps']
    df['current_lap_stage'] = pd.cut(
        df['lap_pct'],
        bins=[0, 0.2, 0.4, 0.6, 0.8, 1.0],
        labels=[0, 1, 2, 3, 4],
        include_lowest=True
    )
    
    # Track position using final rank (ensure no NaN values)
    df['track_position'] = pd.cut(
        df['driverFinalRank'].fillna(20),  # Assume missing = backmarker
        bins=[0, 3.9, 10.9, 20],  # Exclusive right edges
        labels=[0, 1, 2]  # Leading, Midfield, Back
    )
    
    # Tire age calculation with proper grouping
    df['tire_age'] = df.groupby(['raceId', 'driverId']).cumcount() + 1
    
    # Identify pit stops using lap time spikes (customize threshold per circuit)
    pit_stop_threshold = df.groupby('circuitId')['driverLapTimeInMilliseconds'].transform(
        lambda x: x.quantile(0.95)
    )
    df['pit_flag'] = (df['driverLapTimeInMilliseconds'] > pit_stop_threshold).astype(int)
    
    
    df['tire_age'] = df.groupby(['raceId', 'driverId']).apply(
        lambda g: g['tire_age'].where(g['pit_flag'] == 0, 0).cumsum()
    ).reset_index(drop=True)
    
    df['tire_age'] = pd.cut(
        df['tire_age'],
        bins=[-1, 10, 20, 30, 50],  
        labels=[0, 1, 2, 3]
    )
    
    
    avg_lap = df.groupby('raceId')['driverLapTimeInMilliseconds'].transform('mean')
    std_lap = df.groupby('raceId')['driverLapTimeInMilliseconds'].transform('std')
    df['traffic_z'] = (df['driverLapTimeInMilliseconds'] - avg_lap) / std_lap
    df['traffic_density'] = pd.cut(
        df['traffic_z'],
        bins=[-np.inf, -1, 1, np.inf],
        labels=[0, 1, 2]  # Low, Medium, High traffic
    )
    

    df['pit_stop_loss'] = pd.qcut(
        df['driverLapTimeInMilliseconds'],
        q=[0, 0.3, 0.7, 1],
        labels=[0, 1, 2]  # Low, Medium, High
    )
    
    state_cols = [
        'TireDegradation',
        'current_lap_stage',
        'tire_age',
        'track_position',
        'traffic_density',
        'pit_stop_loss',
        'raceId',
    ]
    
    return df[state_cols].fillna(0).astype(int)

state_df = create_state_space(merged_data)
state_df.head()

  df['tire_age'] = df.groupby(['raceId', 'driverId']).apply(


Unnamed: 0,TireDegradation,current_lap_stage,tire_age,track_position,traffic_density,pit_stop_loss,raceId
0,0,0,0,0,1,1,1
1,0,0,0,0,1,1,1
2,0,0,0,0,1,1,1
3,0,0,0,0,1,1,1
4,0,0,1,0,1,1,1


In [5]:
print(state_df.isnull().sum())
print(state_df.describe(include='all'))

TireDegradation      0
current_lap_stage    0
tire_age             0
track_position       0
traffic_density      0
pit_stop_loss        0
raceId               0
dtype: int64
       TireDegradation  current_lap_stage       tire_age  track_position  \
count    589081.000000      589081.000000  589081.000000   589081.000000   
mean          0.000716           1.903015       0.196937        1.237981   
std           0.969342           1.405782       0.666225        0.743334   
min         -49.000000           0.000000       0.000000        0.000000   
25%           0.000000           1.000000       0.000000        1.000000   
50%           0.000000           2.000000       0.000000        1.000000   
75%           0.000000           3.000000       0.000000        2.000000   
max          49.000000           4.000000       3.000000        2.000000   

       traffic_density  pit_stop_loss         raceId  
count    589081.000000  589081.000000  589081.000000  
mean          1.070591       0.

In [6]:
actions = [
    "PIT_NEXT_LAP", 
    "PIT_IN_2_LAPS", 
    "PIT_IN_3_LAPS", 
    "PIT_IN_4_LAPS", 
    "PIT_IN_5_LAPS", 
    "NO_PIT"
]


In [7]:
# Define pit stop time penalties in milliseconds
pit_stop_loss = {
    'Low': 20000,     # 20 seconds
    'Medium': 25000,  # 25 seconds
    'High': 30000     # 30 seconds
}

def update_state(state, action):
    # Convert list state to dictionary for easier handling
    state_dict = {
        "current_lap_stage": state[0],
        "tire_age": state[1],
        "track_position": state[2],
        "traffic_density": state[3],
        "pit_stop_loss": state[4]
    }
    new_state = state_dict.copy()
    
    # Update tire age (within bounds 0-3)
    new_state["tire_age"] = min(3, new_state["tire_age"] + 1)
    
    # Apply pit stop consequences
    if action != "NO_PIT":
        pit_lap = int(action.split("_")[-2])  # Extract lap number from action
        new_state["current_lap_stage"] = 0  # Reset lap stage after pit
        if state_dict["current_lap_stage"] >= pit_lap:
            new_state["tire_age"] = 0  # Fresh tires
            # Lose positions based on traffic density
            if state_dict["traffic_density"] == 2:  # High traffic
                new_state["track_position"] = min(3, new_state["track_position"] + 1)
    
    # Convert back to list format matching state_dims order
    return [
        new_state["current_lap_stage"],
        new_state["tire_age"],
        new_state["track_position"],
        new_state["traffic_density"],
        new_state["pit_stop_loss"]
    ]

In [8]:
import gym
import numpy as np
from gym import spaces

class F1PitStrategyEnv(gym.Env):
    def __init__(self, state_df):
        super(F1PitStrategyEnv, self).__init__()
        
        # Ensure state_df has the required columns
        required_columns = ['raceId', 'current_lap_stage', 'tire_age', 'track_position', 'traffic_density', 'pit_stop_loss']
        if not all(col in state_df.columns for col in required_columns):
            raise ValueError(f"state_df must contain the following columns: {required_columns}")
        
        self.state_df = state_df
        self.races = self.state_df['raceId'].unique()
        self.current_race_idx = 0
        self.current_step = 0
        
        # State space dimensions
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, 0, 0]),  # Min values for each feature
            high=np.array([
                self.state_df['current_lap_stage'].max(),
                self.state_df['tire_age'].max(),
                self.state_df['track_position'].max(),
                self.state_df['traffic_density'].max(),
                self.state_df['pit_stop_loss'].max()
            ]),
            dtype=np.int32
        )
        
        # Action space: 6 discrete options
        self.action_space = spaces.Discrete(6)
        
    def reset(self):
        # Get initial state for a new race
        self.current_race = self.races[self.current_race_idx]
        race_data = self.state_df[self.state_df['raceId'] == self.current_race]
        self.race_steps = race_data.drop(columns=['raceId']).values  # Exclude raceId from state
        self.current_step = 0
        self.current_race_idx = (self.current_race_idx + 1) % len(self.races)
        return self.race_steps[self.current_step]
    
    def step(self, action):
        # Get current state
        state = self.race_steps[self.current_step]
        
        # Execute action (simple version)
        new_state = self.race_steps[min(self.current_step + 1, len(self.race_steps)-1)]
        
        # Calculate reward
        reward = self._calculate_reward(state, new_state, action)
        
        # Update step
        self.current_step += 1
        done = self.current_step >= len(self.race_steps)
        
        return new_state, reward, done, {}
    
    def _calculate_reward(self, state, new_state, action):
        state = np.array(state, dtype=int)
        new_state = np.array(new_state, dtype=int)
        
        # Simplified reward function
        position_improvement = float(state[2] - new_state[2])  # Lower position number is better
        tire_penalty = float(-0.1 * state[1])  # Older tires get penalty
        pit_penalty = -1.0 if action < 5 else 0.0  # Less discouraging
        
        return position_improvement + tire_penalty + pit_penalty
    # Hypothetical reward function adjustments
def get_reward(state, action):
    base_reward = -0.1  # Small penalty per step to encourage efficiency
    
    if action == "PIT_NEXT_LAP":
        # Penalize pit time but less severely
        return base_reward - 0.5
    elif action == "NO_PIT":
        # Penalize high tire wear
        tire_wear = state[2]  # Assuming index 2 tracks tire wear
        return base_reward - (tire_wear * 0.1)
    # Reward for lap completion
    if state[0] % 1 == 0:  # After completing a lap
        return base_reward + 1.0
    return base_reward

In [9]:
# Initialize Q-table
state_dims = [
    int(state_df['current_lap_stage'].max() + 1),
    int(state_df['tire_age'].max() + 1),
    int(state_df['track_position'].max() + 1),
    int(state_df['traffic_density'].max() + 1),
    int(state_df['pit_stop_loss'].max() + 1),
]
num_actions = 6
q_table = np.zeros(shape=tuple(state_dims + [num_actions]))

# Hyperparameters
alpha = 0.1
gamma = 0.9
epsilon = 0.1
n_episodes = 1000

# Initialize environment
env = F1PitStrategyEnv(state_df)

# Training loop
for episode in range(n_episodes):
    state = env.reset()
    done = False
    total_reward = 0
    # During training loop (replace fixed epsilon)
    epsilon = max(0.01, 1.0 - episode / n_episodes)  # Decays from 1.0 to 0.01
    
    while not done:
        # Clip state values to valid range (only first 5 components)
        state = np.array(state[:5])  # Take only the first 5 components
        state = np.clip(state, 0, np.array(state_dims) - 1)
        
        # Convert state to a tuple of integers
        state_tuple = tuple(map(int, state))
        
        # Validate state tuple
        assert all(0 <= x < dim for x, dim in zip(state_tuple, state_dims)), f"Invalid state: {state_tuple}"
        
        # Epsilon-greedy action selection
        if np.random.random() < epsilon:
            action = np.random.randint(num_actions)
        else:
            action = np.argmax(q_table[state_tuple])
        
        # Execute action
        new_state, reward, done, _ = env.step(action)
        
        # Clip new_state values to valid range
        new_state = np.array(state[:5]) 
        new_state = np.clip(new_state, 0, np.array(state_dims) - 1)
        
        # Convert new_state to a tuple of integers
        new_state_tuple = tuple(map(int, new_state))
        
        # Validate new_state tuple
        assert all(0 <= x < dim for x, dim in zip(new_state_tuple, state_dims)), f"Invalid new_state: {new_state_tuple}"
        
        # Q-table update
        old_q = q_table[state_tuple][action]
        max_future_q = np.max(q_table[new_state_tuple])
        new_q = (1 - alpha) * old_q + alpha * (reward + gamma * max_future_q)
        q_table[state_tuple][action] = new_q
        
        # Update state and total reward
        state = new_state
        total_reward += reward
        
    if episode % 100 == 0:
        print(f"Episode {episode}, Total Reward: {total_reward}")

Episode 0, Total Reward: -1023.9
Episode 100, Total Reward: -1148.099999999999
Episode 200, Total Reward: -727.6000000000014
Episode 300, Total Reward: -917.3000000000002
Episode 400, Total Reward: -1027.2000000000005
Episode 500, Total Reward: -613.6000000000006
Episode 600, Total Reward: -516.100000000001
Episode 700, Total Reward: -441.3000000000011
Episode 800, Total Reward: -570.3000000000008
Episode 900, Total Reward: -329.19999999999993


In [10]:
def test_agent(env, q_table):
    state = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        # Clip state values to valid range
        state = np.clip(state[:5], 0, np.array(state_dims) - 1)
        state_tuple = tuple(map(int, state))
        
        # Get action from Q-table
        action = np.argmax(q_table[state_tuple])
        
        # Take step in environment
        state, reward, done, _ = env.step(action)
        total_reward += reward
        
        print(f"Current state: {state}, Action: {actions[action]}, Reward: {reward}")
    
    print(f"Episode finished with total reward: {total_reward}")

test_agent(env, q_table)

Current state: [0 0 0 0 1 2], Action: NO_PIT, Reward: 0.0
Current state: [0 0 0 0 1 2], Action: NO_PIT, Reward: 0.0
Current state: [0 0 0 0 1 2], Action: NO_PIT, Reward: 0.0
Current state: [0 0 0 0 1 2], Action: NO_PIT, Reward: 0.0
Current state: [0 0 0 0 1 2], Action: NO_PIT, Reward: 0.0
Current state: [0 0 0 0 1 1], Action: NO_PIT, Reward: 0.0
Current state: [0 0 1 0 1 1], Action: NO_PIT, Reward: -1.0
Current state: [0 0 2 0 1 1], Action: PIT_NEXT_LAP, Reward: -2.0
Current state: [0 0 3 0 1 1], Action: PIT_NEXT_LAP, Reward: -2.0
Current state: [0 0 3 0 1 1], Action: PIT_NEXT_LAP, Reward: -1.0
Current state: [0 0 0 0 1 1], Action: PIT_NEXT_LAP, Reward: 2.0
Current state: [0 1 0 0 1 1], Action: NO_PIT, Reward: 0.0
Current state: [0 1 0 0 1 1], Action: NO_PIT, Reward: -0.1
Current state: [0 1 0 0 1 1], Action: NO_PIT, Reward: -0.1
Current state: [0 1 0 0 1 1], Action: NO_PIT, Reward: -0.1
Current state: [0 1 0 0 1 1], Action: NO_PIT, Reward: -0.1
Current state: [0 1 0 0 1 1], Action: NO

In [1]:
import numpy as np
import random
import logging
from collections import deque
import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.optim as optim

# Configure logging for debugging; set to WARNING so that INFO messages are not printed
logging.basicConfig(level=logging.WARNING, filename='pit_stop_strategy.log')
logger = logging.getLogger(__name__)

# ==============================
# Environment for Pit Stop Strategy
# ==============================
class PitStopEnv(gym.Env):
    def __init__(self, max_laps=100):
        super(PitStopEnv, self).__init__()
        self.max_laps = max_laps
        
        # Define state: [current_lap, tire_age, position, gap_to_leader, traffic_density, extra_feature]
        self.state_dtype = np.int32  
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32)
        
        # Define action space with 6 actions:
        # 0 = PIT_NEXT_LAP, 1 = PIT_IN_2_LAPS, 2 = PIT_IN_3_LAPS, 3 = PIT_IN_4_LAPS, 4 = PIT_IN_5_LAPS, 5 = NO_PIT
        self.action_space = spaces.Discrete(6)
        self.actions = [
            "PIT_NEXT_LAP", 
            "PIT_IN_2_LAPS", 
            "PIT_IN_3_LAPS", 
            "PIT_IN_4_LAPS", 
            "PIT_IN_5_LAPS", 
            "NO_PIT"
        ]
        
        self.reset()
        
    def reset(self):
        self.current_lap = np.int32(1)
        self.tire_age = 0.0
        self.position = 0
        self.gap_to_leader = 0.0
        self.traffic_density = 0.0
        self.extra_feature = 0.0
        self.state = np.array([self.current_lap, self.tire_age, self.position,
                               self.gap_to_leader, self.traffic_density, self.extra_feature],
                              dtype=np.float32)
        self.total_reward = 0.0
        self.done = False
        return self.state
    
    def step(self, action):
        action_name = self.actions[action]
        # (Logging is now set to WARNING; INFO messages will not print to console)
        logger.info(f"Current state: {self.state}")
        reward = 0.0
        
        if action_name == "NO_PIT":
            reward += 0.1
            logger.info("Action: NO_PIT, rewarding progress: +0.1")
            self.tire_age += 1.0
        else:
            if action_name == "PIT_NEXT_LAP":
                pit_delay = 1
            else:
                pit_delay = int(action_name.split("_")[2])
            penalty = -0.5 - (pit_delay - 1) * 0.05
            reward += penalty
            logger.info(f"Action: {action_name}, applying penalty: {penalty}")
            self.tire_age = 0.0
        
        new_lap = self.current_lap + 1
        if new_lap < self.current_lap:
            logger.warning(f"Lap counter anomaly: {self.current_lap} -> {new_lap}")
        self.current_lap = np.int32(new_lap)
        
        self.position += np.random.choice([-1, 0, 1])
        self.gap_to_leader += np.random.uniform(-0.5, 0.5)
        self.traffic_density = np.random.uniform(0, 5)
        self.extra_feature = np.random.uniform(0, 1)
        
        new_state = np.array([self.current_lap, self.tire_age, self.position,
                              self.gap_to_leader, self.traffic_density, self.extra_feature],
                             dtype=np.float32)
        
        if abs(new_state[0] - self.state[0]) > 100:
            logger.warning(f"Drastic lap counter change: {self.state[0]} -> {new_state[0]}")
        
        self.state = new_state
        
        if self.current_lap >= self.max_laps:
            self.done = True
        
        self.total_reward += reward
        
        info = {"total_reward": self.total_reward}
        return self.state, reward, self.done, info

# ==============================
# DQN Model in PyTorch
# ==============================
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 24)
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, action_size)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# ==============================
# DQN Agent Implementation in PyTorch
# ==============================
class DQNAgent:
    def __init__(self, state_size, action_size, device):
        self.state_size = state_size
        self.action_size = action_size
        self.device = device
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = DQN(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()
        
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        
    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            act_values = self.model(state_tensor)
        return torch.argmax(act_values[0]).item()
    
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0).to(self.device)
            target = reward
            if not done:
                with torch.no_grad():
                    next_q = self.model(next_state_tensor)
                target = reward + self.gamma * torch.max(next_q).item()
            current_q = self.model(state_tensor)
            target_f = current_q.clone().detach()
            target_f[0][action] = target
            self.optimizer.zero_grad()
            output = self.model(state_tensor)
            loss = self.criterion(output, target_f)
            loss.backward()
            self.optimizer.step()
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# ==============================
# Training Loop in PyTorch
# ==============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = PitStopEnv(max_laps=100)
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size, device)
episodes = 1000
batch_size = 32

for e in range(episodes):
    state = env.reset()
    total_episode_reward = 0
    for t in range(env.max_laps):
        action = agent.act(state)
        next_state, reward, done, info = env.step(action)
        total_episode_reward += reward
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            break
    if len(agent.memory) > batch_size:
        agent.replay(batch_size)
    # Only final summary is printed via logger at WARNING level or higher
    if (e+1) % 100 == 0:
        print(f"Episode: {e+1}/{episodes}, Total Reward: {total_episode_reward:.2f}, Epsilon: {agent.epsilon:.2f}")

# Save the trained model to disk
model_save_path = "pit_stop_strategy_model.pth"
torch.save(agent.model.state_dict(), model_save_path)
print(f"Training complete. Model saved to {model_save_path}")


Episode: 100/1000, Total Reward: -29.90, Epsilon: 0.61
Episode: 200/1000, Total Reward: -12.05, Epsilon: 0.37
Episode: 300/1000, Total Reward: -8.20, Epsilon: 0.22
Episode: 400/1000, Total Reward: 4.35, Epsilon: 0.13
Episode: 500/1000, Total Reward: 4.20, Epsilon: 0.08
Episode: 600/1000, Total Reward: 9.10, Epsilon: 0.05
Episode: 700/1000, Total Reward: 7.70, Epsilon: 0.03
Episode: 800/1000, Total Reward: 8.35, Epsilon: 0.02
Episode: 900/1000, Total Reward: 9.20, Epsilon: 0.01
Episode: 1000/1000, Total Reward: 9.90, Epsilon: 0.01
Training complete. Model saved to pit_stop_strategy_model.pth
