In [1]:
import numpy as np
import gym
import matplotlib.pyplot as plt
from f110_gym.envs.base_classes import Integrator
import random
import os
import pandas as pd
import time
from sklearn.preprocessing import normalize
from scipy.sparse import csr_matrix
import sys
from f110_gym.envs.f110_env import F110Env

# Fewer actions 

In [2]:
class F1Tenth_navigation:

    def __init__(self,gym_env_code='f110_gym:f110-v0', num_agents=1, map_path=['./f1tenth_racetracks/Austin/Austin_map'], map_ext='.png', sx=0., sy=0., stheta=None, map_centers_file=None, save_path=None, track_name=None, inference=None,reward_file=None,collision_file=None):

        # Environment setup
        self.path_counter = 0
        self.sx, self.sy, self.stheta = sx, sy, stheta
        self.save_path = save_path
        self.track_name = track_name
        self.num_agents = num_agents
        self.map_path = map_path
        self.map_ext = map_ext
        self.map_centers_file = map_centers_file

    
        self.env = gym.make(gym_env_code, map=self.map_path[self.path_counter], map_ext=self.map_ext, num_agents=self.num_agents, timestep=0.01, integrator=Integrator.RK4)
        self.env.add_render_callback(self.render_callback)
       
        file = pd.read_csv(self.map_centers_file[self.path_counter])
        file.columns = ['x', 'y', 'w_r', 'w_l']
        file.index = file.index.astype(int)
        self.map_centers = file.values[:, :2]
        self.track_width = file.loc[0,'w_r'] + file.loc[0,'w_l']
        self.reward_file = reward_file
        self.track_headings = self.calculate_track_headings(self.map_centers)
        self.collision_file = collision_file

        # Random Seed
        self.random_seed = 42
        np.random.seed(self.random_seed)

        # Environment Observation Parameters
        self.num_beams = 1080
        self.n_features = 11
        self.angle = 220

        # LiDAR downsampling parameters
        self.n_sectors = 22
        self.normalized_lidar = np.zeros((1,self.n_sectors))

        # Action Space Parameters
        self.num_angles = self.n_sectors
        self.num_speeds = 5


        # State Space Parameters
        self.num_states = 2 ** self.n_features

        # Speed Parameters
        self.min_speed = 0.8
        self.max_speed = 1.8

        # Action Space
        self.angles_deg = np.linspace(-self.angle // 2, self.angle // 2, self.num_angles)[::-1]
        self.angles = np.radians(self.angles_deg)
        self.speeds = np.linspace(self.min_speed, self.max_speed, self.num_speeds)
    
        # State Space - Q-Table
        if inference is not None:
            self.weights = np.load(inference)
            self.num_collisions = int(inference.split('_')[-1].split('.')[0])
            print(f'Loaded Weights')
        else:
            self.weights = np.zeros((self.num_states,self.num_angles,self.num_speeds))
            self.num_collisions = 0
        
        self.max_weight = 5

        # ELigibility Trace
        self.ET = np.zeros((1,self.num_states))
        self.IS = np.zeros((self.num_angles,self.num_speeds))

        # projection matrix
        if self.n_features == 10:
            zero_prob = 0.85
            one_prob = 0.15
        if self.n_features == 11:
            zero_prob = 0.8
            one_prob = 0.2
        self.projection_matrix = self.get_projection_matrix(zero_prob=zero_prob,one_prob=one_prob)
        # self.bias = np.linspace(-0.5,0.5,self.n_features).reshape(1,-1)
        self.bias = np.zeros((1,self.n_features))

        # binary powers
        self.binary_powers = np.array([2 ** i for i in range(self.n_features)])

        # Training Variables
        self.curr_state = None
        self.next_state = None
        
        self.action_threshold_decay = 0.9998
        self.action_threshold = 0.1 * (self.action_threshold_decay ** self.num_collisions)

        # Imported Classes
        # self.reward_class = Reward(min_speed=self.min_speed, max_speed=self.max_speed, map_centers=self.map_centers, track_width=self.track_width)
        # self.index_selector = IndexSelector(self.map_centers.shape[0])      

        # BTSP Parameters
        self.learning_rate = 1e-3
        self.ET_decay_rate = 0.9
        self.IS_decay_rate = 0.7

        # Reward
        self.reward = 0
        self.episode_reward = 0
        self.cumulative_reward = 0
        self.episodic_rewards = [0]

        # Time
        self.collision_times = [0]       


    def calculate_track_headings(self,track_centers, window_size=5):
        """
        Calculates orientations for track traversal.
        
        Args:
            track_centers (np.ndarray): Shape (N, 2) array of track center points (x, y)
            window_size (int): Number of points to consider for smoothing
        
        Returns:
            np.ndarray: Shape (N,) array of orientation angles in radians
        """
        num_points = track_centers.shape[0]
        half_window = window_size // 2
        
        # Create indices for the future points (with wraparound)
        future_indices = (np.arange(num_points) + half_window) % num_points
        
        # Get the future points
        future_points = track_centers[future_indices]
        
        # Calculate direction vectors
        direction_vectors = future_points - track_centers
        
        # Calculate angles using arctan2
        orientations = np.arctan2(direction_vectors[:, 1], direction_vectors[:, 0])
        
        return orientations
    
    def __update_map(self):
        if self.env.renderer is not None:
            self.env.renderer.close()
        self.path_counter += 1
        if self.path_counter == len(self.map_path):
            self.path_counter = 0
        self.env.map_name = self.map_path[self.path_counter]
        self.env.update_map(f'{self.map_path[self.path_counter]}.yaml',self.map_ext)
        F110Env.renderer = None
        file = pd.read_csv(self.map_centers_file[self.path_counter])
        file.columns = ['x', 'y', 'w_r', 'w_l']
        file.index = file.index.astype(int)
        self.map_centers = file.values[:, :2]
        self.track_width = file.loc[0,'w_r'] + file.loc[0,'w_l']
        self.track_headings = self.calculate_track_headings(self.map_centers)
        print(f'Map updated to {self.track_name[self.path_counter]}')
        
        
    def render_callback(self, env_renderer):
        e = env_renderer
        x = e.cars[0].vertices[::2]
        y = e.cars[0].vertices[1::2]
        top, bottom, left, right = max(y), min(y), min(x), max(x)
        e.score_label.x = left
        e.score_label.y = top - 700
        e.left = left - 800
        e.right = right + 800
        e.top = top + 800
        e.bottom = bottom - 800


    def get_statistical_properties(self,lidar_input,n_sectors=None):
        assert n_sectors is not None, "Number of sectors must be provided"
        #  The [100 :-100] is for selecting only those rays corresponding to 220 fov.
        sector_size = np.asarray(lidar_input[100:-100],dtype=np.float32).shape[0] // n_sectors
        sectors = lidar_input[:sector_size * n_sectors].reshape(n_sectors, sector_size)
        return np.median(sectors, axis=1).reshape(1,-1)
    
    def binarize_vector(self,vector):
        '''
        Function that is used to binarize the input.
        This function takes the projected downsampled-Lidar and binarizes it based on the threshold.
        Args:
            vector (np.ndarray): Projected downsampled-Lidar input.
        Returns:
            np.ndarray: Binary represnetation of the Lidar data which is used as a state.
        '''

        # threshold = (np.min(vector)+ np.max(vector))/2
        # return np.where(vector > threshold, 1, 0)
        return np.where(vector > 0, 1, 0)

    def get_projection_matrix(self,zero_prob=0.5,one_prob=0.5):
        '''
        Function that is used to generate the projection matrix.
        This function takes the number of features and the number of angles and generates a random projection matrix.
        This function is called only once to generate the projection matrix and saves it to a file.
        Args:
            zero_prob (float): Probability of selecting 0.
            one_prob (float): Probability of selecting 1.
        Returns:
            np.ndarray: Projection matrix.
        '''
        # Generate a random matrix with values 0 and 1 based on the given probabilities [prob_0,prob_1]
        if not os.path.exists('Projection_matrices'):
            os.mkdir('Projection_matrices')
        if not os.path.exists(os.path.join('Projection_matrices', f'projection_{self.n_features}f_{self.num_angles}a_s{self.random_seed}.npy')):
            std = np.sqrt(1/self.n_features)
            # matrix = np.random.choice([-1/std, 1/std], size=(self.n_sectors,self.n_features),p=[zero_prob, one_prob])
            # matrix = np.random.choice([0, 1], size=(self.n_sectors, self.n_features), p=[zero_prob,one_prob])
            matrix = np.random.normal(loc=0.0, scale=1/std, size=(self.n_sectors, self.n_features))
            np.save(os.path.join('Projection_matrices', f'projection_{self.n_features}f_{self.num_angles}a_s{self.random_seed}.npy'), matrix)
        else:
            matrix = np.load(os.path.join('Projection_matrices', f'projection_{self.n_features}f_{self.num_angles}a_s{self.random_seed}.npy'))
        return matrix

    def get_binary_representation(self,lidar_input):
        '''
        Function that is used to get the binary representation of the Lidar input.
        This function takes the Lidar input and projects it using the projection matrix.
        It then binarizes the projected Lidar input and returns the binary representation.
        the bias used is here is some types of non linear projections. It is set to zero here.
        Args:
            lidar_input (np.ndarray): Lidar input.
        Returns:
            np.ndarray: Binary representation of the Lidar input.
        '''
        self.normalized_lidar = normalize(lidar_input,axis=1)
        # Do not normalize, just use the raw data
        return self.binarize_vector(np.dot(lidar_input,self.projection_matrix) + self.bias)
    

    def get_state(self, binary):
        return np.dot(binary[0], self.binary_powers)
    

    def select_action(self, state):
        random_number = np.random.rand()
        if random_number < self.action_threshold:
            angle_index = np.random.randint(0, self.num_angles)
            speed_index = np.random.randint(0, self.num_speeds)
        else:
            max_value = np.max(self.weights[state])
            max_indices = np.argwhere(self.weights[state] == max_value)
            angle_index, speed_index  = max_indices[np.random.randint(len(max_indices))]

        self.action_threshold *= self.action_threshold_decay

        return angle_index, speed_index

    def select_action_inference(self, state):
        max_indices = np.argwhere(self.weights[state] == np.max(self.weights[state]))
        angle_index, speed_index  = max_indices[np.random.choice(np.arange(len(max_indices)))]
        return angle_index, speed_index

    def save_reward_time(self):
        if not os.path.exists(os.path.join(self.save_path)):
            os.mkdir(os.path.join(self.save_path))
        
        if self.reward_file is not None:
            r = np.append(np.load(self.reward_file), self.episodic_rewards)
            t = np.append(np.load(self.collision_file), self.collision_times)
            np.save(os.path.join(self.save_path, f'rewards.npy'), np.array(r))
            np.save(os.path.join(self.save_path, f'times.npy'), np.array(t))
        else:
            np.save(os.path.join(self.save_path, f'rewards.npy'), np.array(self.episodic_rewards))
            np.save(os.path.join(self.save_path, f'times.npy'), np.array(self.collision_times))

    def save_weights(self):
        if not os.path.exists(os.path.join(self.save_path)):
            os.mkdir(os.path.join(self.save_path))
        np.save(os.path.join(self.save_path, f'{self.track_name[self.path_counter]}_{self.num_collisions + 1}.npy'), self.weights)
        # print(f'File saved')

    def inference(self):
        try:
            obs, step_reward, done, info = self.env.reset(np.array([[self.sx, self.sy, self.stheta[self.track_name[self.path_counter]]]]))
            lidar = obs['scans'][0]
            lidar_down_sampled = self.get_statistical_properties(lidar,n_sectors=self.n_sectors)
            self.curr_state = self.get_state(self.get_binary_representation(lidar_down_sampled))
            angle_index,speed_index = self.select_action_inference(self.curr_state)
            while not done:
                steering_angle,speed = self.angles[angle_index],self.speeds[speed_index]
                obs, step_reward, done, info = self.env.step(np.array([[steering_angle, speed]]))
                lidar = obs['scans'][0]
                lidar_down_sampled = self.get_statistical_properties(lidar,n_sectors=self.n_sectors)
                self.next_state = self.get_state(self.get_binary_representation(lidar_down_sampled))
                angle_index,speed_index = self.select_action_inference(self.next_state)
                self.curr_state = self.next_state
                
                self.env.render(mode='human')
            raise Exception('Done')
        except Exception as e:
            print(f'Exception: {e}')
            self.env.renderer.close()
            self.env.close()
            F110Env.renderer = None
            

In [3]:
path = './f1tenth_racetracks'
all_map_paths=[]
map_centers = []
maps = []
track_lengths=[]
for folder in os.listdir(path):
    if folder not in ['README.md','.gitignore','convert.py','LICENSE','rename.py','.git']:
        folder_name=folder
        file_name=folder_name.replace(' ','')+'_map'
        map_center = folder_name.replace(' ','')+'_centerline.csv'
        track_lengths.append(len(pd.read_csv(f'{path}/{folder_name}/{map_center}')))
        maps.append(folder_name)
        all_map_paths.append(f'{path}/{folder_name}/{file_name}')
        map_centers.append(f'{path}/{folder_name}/{map_center}')

track_length_list = list(zip(maps,track_lengths))
track_length_list

[('Hockenheim', 914),
 ('Mexico City', 860),
 ('Oschersleben', 739),
 ('Shanghai', 1090),
 ('BrandsHatch', 781),
 ('Monza', 1159),
 ('Catalunya', 931),
 ('SaoPaulo', 862),
 ('Sepang', 1108),
 ('Silverstone', 1178),
 ('Nuerburgring', 1029),
 ('YasMarina', 1110),
 ('Spa', 1401),
 ('Sochi', 1169),
 ('Montreal', 872),
 ('Austin', 1102),
 ('Melbourne', 1060),
 ('Budapest', 876),
 ('Spielberg', 864),
 ('Zandvoort', 864),
 ('Sakhir', 1082),
 ('MoscowRaceway', 813)]

In [4]:
# ['Austin','Mexico City','Spa','Hockenheim','Shanghai','Nuerburgring','Montreal'] - old train maps
train_maps = ['Spa','Hockenheim','Shanghai','Nuerburgring','Montreal', 'Austin','Mexico City'] # 220 fov train maps
test_maps = [i[0] for i in track_length_list if i[0] not in train_maps]
print(f'Train Maps: {train_maps}')
print(f'Test Maps: {test_maps}')

Train Maps: ['Spa', 'Hockenheim', 'Shanghai', 'Nuerburgring', 'Montreal', 'Austin', 'Mexico City']
Test Maps: ['Oschersleben', 'BrandsHatch', 'Monza', 'Catalunya', 'SaoPaulo', 'Sepang', 'Silverstone', 'YasMarina', 'Sochi', 'Melbourne', 'Budapest', 'Spielberg', 'Zandvoort', 'Sakhir', 'MoscowRaceway']


In [5]:
global num_agents,map_path,map_ext,sx,sy,stheta,indices
num_agents = 1
map_ext = '.png'
sx = 0.
sy = 0.
stheta = {'Hockenheim': 2.02,
 'Mexico City': -0.15,
 'Oschersleben': 2.86,
 'Shanghai': -2.93,
 'BrandsHatch': 0.42,
 'Monza': 1.47,
 'Catalunya': -2.14,
 'SaoPaulo': -1.31,
 'Sepang': -3.06,
 'Silverstone': 0.94,
 'Nuerburgring': -2.38,
 'YasMarina': 0.13,
 'Spa': 2.13,
 'Sochi': -2.14,
 'Montreal': -1.35,
 'Austin': -0.65,
 'Melbourne': 2.37,
 'Budapest': 2.45,
 'Spielberg': -2.88,
 'Zandvoort': 1.2,
 'Sakhir': 1.53,
 'MoscowRaceway': 1.46}

# indices = [idx for idx,i in enumerate(maps) if i in test_maps]
indices = [idx for idx,i in enumerate(maps) if i in train_maps]
map_path_subset = [all_map_paths[i] for i in indices]
map_centers_subset = [map_centers[i] for i in indices]
map_names_subset = [maps[i] for i in indices]

In [6]:
# inference_file = 'BTSP_Multiple_training/Austin_32000.npy'
# reward_file='BTSP_Multiple_training/rewards.npy'
# collision_file='BTSP_Multiple_training/times.npy'

inference_file = 'BTSP_Multiple_training/Shanghai_10989.npy'
reward_file='BTSP_Multiple_training/rewards.npy'
collision_file='BTSP_Multiple_training/times.npy'

map_index = 0

map_path = [map_path_subset[map_index]]
map_center = [map_centers_subset[map_index]]
map_name = [map_names_subset[map_index]]

save_path = 'Weights_BTSP/'

simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_centers,save_path=save_path,track_name=map_name,inference=inference_file,reward_file=reward_file,collision_file=collision_file)
print(f'BTSP Inference on {map_names_subset[map_index]}')
simulator.inference()



Loaded Weights
BTSP Inference on Hockenheim
Exception: Done


In [7]:
len(simulator.weights[np.sum(simulator.weights,axis=(1,2))!=0])

2048

In [8]:
# inference_file = 'SARSA_Multiple_training/Mexico City_33000.npy'
# reward_file='SARSA_Multiple_training/rewards.npy'
# collision_file='SARSA_Multiple_training/times.npy'

inference_file = 'SARSA_Multiple_training/Montreal_10000.npy'
reward_file='SARSA_Multiple_training/rewards.npy'
collision_file='SARSA_Multiple_training/times.npy'


# map_path_subset = [all_map_paths[i] for i in indices]
# map_centers_subset = [map_centers[i] for i in indices]
# map_names_subset = [map_names[i] for i in indices]

# map_path = map_path_subset[map_index]
# map_center = map_centers_subset[map_index]
# map_name = map_names_subset[map_index]

save_path = 'Weights_SARSA/'

simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_center,save_path=save_path,track_name=map_name,inference=inference_file,reward_file=reward_file,collision_file=collision_file)
print(f'SARSA Inference on {map_names_subset[map_index]}')
simulator.inference()



Loaded Weights
SARSA Inference on Hockenheim
Exception: Done


In [None]:
len(simulator.weights[np.sum(simulator.weights,axis=(1,2))!=0])

In [None]:
inference_file = 'Q_Multiple_training/MoscowRaceway_33000.npy'
reward_file='Q_Multiple_training/rewards.npy'
collision_file='Q_Multiple_training/times.npy'

# map_path_subset = [all_map_paths[i] for i in indices]
# map_centers_subset = [map_centers[i] for i in indices]
# map_names_subset = [map_names[i] for i in indices]

# map_path = map_path_subset[map_index]
# map_center = map_centers_subset[map_index]
# map_name = map_names_subset[map_index]

save_path = 'Weights_Q/'

simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path_subset,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_centers_subset,save_path=save_path,track_name=map_names_subset,inference=inference_file,reward_file=reward_file,collision_file=collision_file)
print(f'SARSA Inference on {map_names_subset[map_index]}')
simulator.inference()

In [None]:
len(simulator.weights[np.sum(simulator.weights,axis=(1,2))!=0])

# Simple track inference

In [None]:
# Code to run the easy track with no curves
inference_file = 'BTSP_Multiple_training/Mexico City_1765.npy'
# inference_file = 'SARSA_Multiple_training/Nuerburgring_52000.npy'
reward_file='BTSP_Multiple_training/rewards.npy'
collision_file='BTSP_Multiple_training/times.npy'

map_path_subset = ['/home/praneeth/shared_f1_tenth /IMS/IMS_map']
map_centers_subset = ['/home/praneeth/shared_f1_tenth /IMS/IMS_centerline.csv']
map_names_subset = ['IMS_map']

map_index = 0

# map_path = map_path_subset[map_index]
# map_center = map_centers_subset[map_index]
# map_name = map_names_subset[map_index]

save_path = 'Weights_BTSP/'

simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path_subset,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_centers_subset,save_path=save_path,track_name=map_names_subset,inference=inference_file,reward_file=reward_file,collision_file=collision_file)
print(f'BTSP Inference on {map_name}')
simulator.inference()

In [None]:
btsp_times = []
sarsa_times = []
q_times = []

inference_btsp = 'BTSP_Multiple_training/Austin_32000.npy'
inference_sarsa = 'SARSA_Multiple_training/Mexico City_33000.npy'
inference_q = 'Q_Multiple_training/MoscowRaceway_33000.npy'
reward_file='BTSP_Multiple_training/rewards.npy'
collision_file='BTSP_Multiple_training/times.npy'
save_path = 'Weights_BTSP/'
map_path_subset = [all_map_paths[i] for i in indices]
map_centers_subset = [map_centers[i] for i in indices]
map_names_subset = [map_names[i] for i in indices]

In [None]:
btsp_times_new = []
sarsa_times_new = []
q_times_new = []

In [None]:
for i in range(len(map_centers_subset)):
    map_path = map_path_subset[i]
    map_center = map_centers_subset[i]
    map_name = map_names_subset[i]

    btsp_simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_center,save_path=save_path,track_name=map_name,inference=inference_btsp,reward_file=reward_file,collision_file=collision_file)
    start_time = time.time()
    btsp_simulator.inference()
    btsp_times_new.append(time.time() - start_time)
    print(f'BTSP Inference on {map_name} took {btsp_times_new[-1]} seconds')

    del btsp_simulator

    sarsa_simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_center,save_path=save_path,track_name=map_name,inference=inference_sarsa,reward_file=reward_file,collision_file=collision_file)
    start_time = time.time()
    sarsa_simulator.inference()
    sarsa_times_new.append(time.time() - start_time)
    print(f'SARSA Inference on {map_name} took {sarsa_times_new[-1]} seconds')

    del sarsa_simulator

    q_simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_center,save_path=save_path,track_name=map_name,inference=inference_q,reward_file=reward_file,collision_file=collision_file)
    start_time = time.time()
    q_simulator.inference()
    q_times_new.append(time.time() - start_time)
    print(f'Q Inference on {map_name} took {q_times_new[-1]} seconds')

    del q_simulator


In [None]:
plt.figure(figsize=(10, 6))
plt.boxplot([btsp_times, sarsa_times, q_times], labels=['BTSP', 'SARSA', 'Q'])
plt.title('Inference Times for Different Methods')
plt.ylabel('Time (seconds)')
plt.xlabel('Method')
plt.show()

In [None]:
plt.figure(figsize=(10, 6))
plt.boxplot([btsp_times_new, sarsa_times_new, q_times_new], labels=['HCL', 'SARSA', 'Q'])
plt.title('Time to 1st collision (test tracks)')
plt.ylabel('Time (seconds)')
plt.xlabel('Algorithm')
plt.show()

In [None]:
np.save('HCL_test_times.npy',btsp_times_new)    
np.save('SARSA_test_times.npy',sarsa_times_new)
np.save('Q_test_times.npy',q_times_new)

In [None]:
btsp_times = np.load('HCL_test_times.npy')
sarsa_times = np.load('SARSA_test_times.npy')
q_times = np.load('Q_test_times.npy')


In [None]:
boxplot = plt.boxplot([btsp_times,sarsa_times,q_times],labels=['HCL', 'SARSA($\lambda$)', 'Q-learning'],showfliers=False)
plt.grid(True, linestyle='--', alpha=0.3)
plt.ylabel('Time (s)')
plt.xlabel('Algorithm')
plt.title('Time to $1$ collision (Test)')

plt.tight_layout()
plt.show()