In [19]:
# !pip install dill

In [20]:
import numpy as np
from copy import deepcopy
import matplotlib.pyplot as plt
from tqdm import tqdm
from itertools import product
import dill as pickle
import random
import sqlite3
import itertools
import os
import pandas as pd
from collections import defaultdict

np.set_printoptions(precision=3)

### RADIUS SET TO 900

In [21]:
class Agent:
    def __init__(self):
        pass

    def actRules(self, state):
        return 1
    
    def actNaively(self):
        return 4

class Car:
    def __init__(self, tyre="Intermediate"):
        self.default_tyre = tyre
        self.possible_tyres = ["Ultrasoft", "Soft", "Intermediate", "Fullwet"]
        self.pitstop_time = 23
        self.reset()
    
    
    def reset(self):
        self.change_tyre(self.default_tyre)
    
    
    def degrade(self, w, r):
        if self.tyre == "Ultrasoft":
            self.condition *= (1 - 0.0050*w - (2500-r)/90000)
        elif self.tyre == "Soft":
            self.condition *= (1 - 0.0051*w - (2500-r)/93000)
        elif self.tyre == "Intermediate":
            self.condition *= (1 - 0.0052*abs(0.5-w) - (2500-r)/95000)
        elif self.tyre == "Fullwet":
            self.condition *= (1 - 0.0053*(1-w) - (2500-r)/97000)
        
        
    def change_tyre(self, new_tyre):
        assert new_tyre in self.possible_tyres
        self.tyre = new_tyre
        self.condition = 1.00
    
    
    def get_velocity(self):
        if self.tyre == "Ultrasoft":
            vel = 80.7*(0.2 + 0.8*self.condition**1.5)
        elif self.tyre == "Soft":
            vel = 80.1*(0.2 + 0.8*self.condition**1.5)
        elif self.tyre == "Intermediate":
            vel = 79.5*(0.2 + 0.8*self.condition**1.5)
        elif self.tyre == "Fullwet":
            vel = 79.0*(0.2 + 0.8*self.condition**1.5)
        return vel

    
class Track:
    def __init__(self, car=Car()):
        # self.radius and self.cur_weather are defined in self.reset()
        self.total_laps = 162
        self.car = car
        self.possible_weather = ["Dry", "20% Wet", "40% Wet", "60% Wet", "80% Wet", "100% Wet"]
        self.wetness = {
            "Dry": 0.00, "20% Wet": 0.20, "40% Wet": 0.40, "60% Wet": 0.60, "80% Wet": 0.80, "100% Wet": 1.00
        }
        self.p_transition = {
            "Dry": {
                "Dry": 0.987, "20% Wet": 0.013, "40% Wet": 0.000, "60% Wet": 0.000, "80% Wet": 0.000, "100% Wet": 0.000
            },
            "20% Wet": {
                "Dry": 0.012, "20% Wet": 0.975, "40% Wet": 0.013, "60% Wet": 0.000, "80% Wet": 0.000, "100% Wet": 0.000
            },
            "40% Wet": {
                "Dry": 0.000, "20% Wet": 0.012, "40% Wet": 0.975, "60% Wet": 0.013, "80% Wet": 0.000, "100% Wet": 0.000
            },
            "60% Wet": {
                "Dry": 0.000, "20% Wet": 0.000, "40% Wet": 0.012, "60% Wet": 0.975, "80% Wet": 0.013, "100% Wet": 0.000
            },
            "80% Wet": {
                "Dry": 0.000, "20% Wet": 0.000, "40% Wet": 0.000, "60% Wet": 0.012, "80% Wet": 0.975, "100% Wet": 0.013
            },
            "100% Wet": {
                "Dry": 0.000, "20% Wet": 0.000, "40% Wet": 0.000, "60% Wet": 0.000, "80% Wet": 0.012, "100% Wet": 0.988
            }
        }
        self.reset()
    
    
    def reset(self):
        self.radius = np.random.randint(600,1201)
        # self.radius = 900
        self.cur_weather = np.random.choice(self.possible_weather)
        self.is_done = False
        self.pitstop = False
        self.laps_cleared = 0
        self.car.reset()
        return self._get_state()
    
    
    def _get_state(self):
        return [self.car.tyre, self.car.condition, self.cur_weather, self.radius, self.laps_cleared]
        
    
    def transition(self, action=0):
        """
        Args:
            action (int):
                0. Make a pitstop and fit new ‘Ultrasoft’ tyres
                1. Make a pitstop and fit new ‘Soft’ tyres
                2. Make a pitstop and fit new ‘Intermediate’ tyres
                3. Make a pitstop and fit new ‘Fullwet’ tyres
                4. Continue the next lap without changing tyres
        """
        ## Pitstop time will be added on the first eight of the subsequent lap
        time_taken = 0
        if self.laps_cleared == int(self.laps_cleared):
            if self.pitstop:
                self.car.change_tyre(self.committed_tyre)
                time_taken += self.car.pitstop_time
                self.pitstop = False
        
        ## The environment is coded such that only an action taken at the start of the three-quarters mark of each lap matters
        if self.laps_cleared - int(self.laps_cleared) == 0.75:
            if action < 4:
                self.pitstop = True
                self.committed_tyre = self.car.possible_tyres[action]
            else:
                self.pitstop = False
        
        self.cur_weather = np.random.choice(
            self.possible_weather, p=list(self.p_transition[self.cur_weather].values())
        )
        # we assume that degration happens only after a car has travelled the one-eighth lap
        velocity = self.car.get_velocity()
        time_taken += (2*np.pi*self.radius/8) / velocity
        reward = 0 - time_taken
        self.car.degrade(
            w=self.wetness[self.cur_weather], r=self.radius
        )
        self.laps_cleared += 0.125
        
        if self.laps_cleared == self.total_laps:
            self.is_done = True
        
        next_state = self._get_state()
        return reward, next_state, self.is_done, velocity
    
    def step(self, action):
        return self.transition(action)

In [22]:
new_car = Car()
env = Track(new_car)

In [23]:
class TDLambdaAgent:
    def __init__(self, epsilon=0.1, epsilon_decay=0.995, alpha=0.1,
                  gamma=0.99, td_lambda=0, n_actions=5,
                    no_change_after_lap=150, state_space_discretization=100):
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.Q = defaultdict(lambda: np.zeros(self.n_actions))
        self.E = defaultdict(lambda: np.zeros(self.n_actions))  # Eligibility traces
        self.n_actions = n_actions
        self.state_space_discretization = state_space_discretization
        self.epsilon_decay = epsilon_decay
        self.no_change_after_lap = no_change_after_lap
        self.td_lambda = td_lambda
        
    def _discretize_state(self, state):
        tyre, condition, weather, radius, laps_cleared = state
        condition = int(condition * self.state_space_discretization)
        
        # Discretizing the radius as well by rounding to nearest hundred
        radius = round(radius, -2)
        return (tyre, condition, weather, radius, laps_cleared)
    
    def act(self, state):
        state = self._discretize_state(state)
        _, _, _, _, laps_cleared = state
        
        if np.random.rand() < self.epsilon:
            if laps_cleared >= self.no_change_after_lap:
                return 4  # Don't change tires
            return np.random.choice(self.n_actions)
        else:
            action = np.argmax(self.Q.get(state, np.zeros(self.n_actions)))
            if laps_cleared >= self.no_change_after_lap and action < 4:
                return 4  # Don't change tires
            return action
        
    def update(self, state, action, reward, next_state):
        state = self._discretize_state(state)
        next_state = self._discretize_state(next_state)
        
        best_next_action = np.argmax(self.Q[next_state])
        td_error = reward + self.gamma * self.Q[next_state][best_next_action] - self.Q[state][action]
        
        # Increment the eligibility trace for the current state-action pair
        self.E[state][action] += 1  
        
        # Update Q-values for all state-action pairs using TD error and the eligibility traces
        for s, actions in self.Q.items():
            for a in range(self.n_actions):
                self.Q[s][a] += self.alpha * td_error * self.E[s][a]
                
                # Decay the eligibility trace for the state-action pair
                self.E[s][a] *= self.gamma * self.td_lambda


retieving best combinations for all td(lambda) models

In [24]:
#KEY PARAMETERS

lambda_value = 0.8
table_name = f"gs_results_td_lambda_{lambda_value}".replace(".", "")
table_names = [f"gs_results_td_lambda_{l}".replace(".", "") for l in [0,0.2,0.4,0.6,0.8,1]]
directory = "e-greedy/agents" #dir to create
gs_db_name = 'e-greedy/grid_search_greedy_r900.db'

In [25]:
import pandas as pd
import sqlite3
from IPython.display import display

#KEY PARAMETERS
lambda_values = [0, 0.2, 0.4, 0.6, 0.8, 1]
table_names = [f"gs_results_td_lambda_{l}".replace(".", "") for l in lambda_values]
directory = "e-greedy/agents" #dir to create
gs_db_name = 'e-greedy/grid_search_greedy_r900.db'

def best_combination_by_eval_avg(table_name, db_name=gs_db_name):
    """
    Fetch the row with the best eval_avg for the given table.

    Args:
    - table_name (str): Name of the table to query.
    - db_name (str): Name of the SQLite database.

    Returns:
    - pd.DataFrame: A single-row DataFrame containing the best combination by eval_avg.
    """
    # Connect to the SQLite database
    conn = sqlite3.connect(db_name)
    # Fetch the top row ordered by eval_avg in descending order
    df = pd.read_sql_query(f"SELECT * from {table_name} ORDER BY eval_avg DESC LIMIT 1", conn)
    conn.close()
    return df

# Create an empty DataFrame to hold all results
all_results_df = pd.DataFrame()

for t_name, lambda_val in zip(table_names, lambda_values):
    result_df = best_combination_by_eval_avg(t_name)
    result_df["lambda_value"] = lambda_val
    all_results_df = pd.concat([all_results_df, result_df])

# Display the combined results
display(all_results_df)

Unnamed: 0,epsilon,epsilon_decay,alpha,gamma,no_change_after_lap,avg_last_50,min_last_50,max_last_50,eval_avg,overall_avg,lambda_value
0,0.2,0.995,0.2,0.6,160,-16054.310293,-16144.504846,-16002.402039,-15972.868982,-16054.310293,0.0
0,0.05,0.995,0.9,0.4,160,-16087.192161,-16146.33762,-15995.5945,-15930.752616,-16087.192161,0.2
0,0.2,0.995,0.8,1.0,160,-16106.523543,-16143.556947,-16028.776164,-15925.539765,-16106.523543,0.4
0,0.2,0.999,0.2,1.0,160,-16128.751981,-16156.691405,-16100.812556,-15953.534127,-16128.751981,0.6
0,0.1,0.99,0.6,0.2,160,-16067.208146,-16155.672758,-15996.008652,-15991.691644,-16067.208146,0.8
0,0.1,0.99,0.0,0.0,160,-16049.145336,-16091.438366,-15991.42796,-15997.234823,-16049.145336,1.0


track with random radius

In [26]:
from project_helper import *
import pandas as pd
import sqlite3
from IPython.display import display
from collections import defaultdict
import numpy as np
from tqdm import tqdm
import os

training_episodes = 25

def train_agent(td_lambda_value, num_episodes=25):
    agent = TDLambdaAgent(td_lambda=td_lambda_value)
    episode_rewards = []
    for episode in tqdm(range(num_episodes)):
        state = env.reset()
        done = False
        episode_reward = 0
        while not done:
            action = agent.act(state)
            reward, next_state, done, _ = env.step(action)
            agent.update(state, action, reward, next_state)
            state = next_state
            episode_reward += reward
        episode_rewards.append(episode_reward)
        agent.epsilon *= agent.epsilon_decay

    return agent

new_car = Car()
env = Track(new_car)

# Check and create directory if it doesn't exist
if not os.path.exists(directory):
    os.makedirs(directory)

best_agents = {}

# Assuming table_names is defined somewhere
for t_name, lambda_val in tqdm(zip(table_names, lambda_values)):
    best_df = best_combination_by_eval_avg(t_name)
    try:
        best_agent_params = best_df.iloc[0]  # Get the best parameters from the df
    except IndexError:
        print(f"No data for {t_name} with lambda value {lambda_val}, skipping...")
        continue  # Skip the rest of the loop for this lambda_val if the line fails

    agent_filepath = f"{directory}/td_lambda_{lambda_val}_agent.pkl"
    if os.path.exists(agent_filepath):
        print(f"Agent for lambda value {lambda_val} already exists, skipping...")
        continue

    # Train agent
    trained_agent = train_agent(lambda_val)
    best_agents[lambda_val] = trained_agent
    
    # Save agent using pickle
    with open(agent_filepath, "wb") as f:
        pickle.dump(trained_agent, f)

print("All agents trained and saved!")

0it [00:00, ?it/s]

Agent for lambda value 0 already exists, skipping...
Agent for lambda value 0.2 already exists, skipping...
Agent for lambda value 0.4 already exists, skipping...


100%|██████████| 25/25 [41:53<00:00, 100.55s/it]
6it [41:59, 419.94s/it]

Agent for lambda value 0.8 already exists, skipping...
Agent for lambda value 1 already exists, skipping...
All agents trained and saved!



