In [1]:
import gymnasium as gym

import time
import numpy as np
import matplotlib.pyplot as plt
import itertools
from tqdm.notebook import tqdm
import pandas as pd
import pickle

In [2]:
configuration = {

    # Parametrization bellow cannot be changed
    "lanes_count" : 10, # The environment must always have 10 lanes
    "vehicles_count": 50, # The environment must always have 50 other vehicles
    "duration": 120,  # [s] The environment must terminate never before 120 seconds
    "other_vehicles_type": "highway_env.vehicle.behavior.IDMVehicle", # This is the policy of the other vehicles
    "initial_spacing": 2, # Initial spacing between vehicles needs to be at most 2

    # Refer to refer to https://highway-env.farama.org/observations/ to change observation space type
    "observation": {
        "type": "Kinematics"
    },

    # Refer to refer to https://highway-env.farama.org/actions/ to change action space type
    "action": {
        "type": "DiscreteMetaAction",
    },

    # Parameterization bellow can be changed (as it refers mostly to the reward system)
    # "collision_reward": -10,  # The reward received when colliding with a vehicle. (Can be changed)
    # "reward_speed_range": [20, 30],  # [m/s] The reward for high speed is mapped linearly from this range to [0, HighwayEnv.HIGH_SPEED_REWARD]. (Can be changed)
    "simulation_frequency": 15, #15,  # [Hz] (Can be changed)
    "policy_frequency": 5, #5,  # [Hz] (Can be changed)

    "collision_reward": -100,  # The reward received when colliding with a vehicle.
    "right_lane_reward": 0.1,  # The reward received when driving on the right-most lanes, linearly mapped to
    # zero for other lanes.
    "high_speed_reward": 100,  # The reward received when driving at full speed, linearly mapped to zero for
    # lower speeds according to config["reward_speed_range"].
    "lane_change_reward": 0,  # The reward received at each lane change action.
    "reward_speed_range": [20, 30],
    
    # Parameters defined bellow are purely for visualiztion purposes! You can alter them as you please
    "screen_width": 800,  # [px]
    "screen_height": 600,  # [px]
    "centering_position": [0.5, 0.5],
    "scaling": 5,
    "show_trajectories": True,
    "render_agent": True,
    "offscreen_rendering": False
}

default_config = configuration.copy()

In [72]:
def fix_reward(reward, colision_reward=-100, skew_speed=1): 
    """
    This function is used to correct the reward function, which is not correctly outputted by the environment
    Params: 
        reward: float, the reward to fix
        colision_reward: float, the colision reward to fix
        skew_speed: float, the skew speed to fix. It is an exponent applied to the reward
    """
    if reward < 0.01:
        return colision_reward
    else:
        return reward**skew_speed

def decode_meta_action(action):
    """
    Function to output the corresponding action in text-form
    """
    assert action in range(5), "The action must be between 0 and 4"
    if action == 0:
        return "LANE_LEFT"
    elif action == 1:
        return "IDLE"
    elif action == 2:
        return "LANE_RIGHT"
    elif action == 3:
        return "FASTER"
    elif action == 4:
        return "SLOWER"


class OccupancyGrid():
    def __init__(
            self,
            grid_size=[[-50, 50], [-50, 50]],  # X controls the lane-width, Y controls how far ahead
            grid_step=[1, 1],
            n_closest=3,
            ss_bins=[5,6],
            crop_dist=[[-10,10], [-10,25]],
            policy=None,
            sim_frequency=15,
            policy_frequency=3,
            render_mode = 'human',
            seed = 50,
    ):
        """
        Occupancy view class constructor
        Arguments:
            grid_size: list of lists, the size of the grid in the x and y direction, where x controls the lane-width and y controls how far ahead. Lanes are 5m wide, and the car position is (0,0)
            grid_step: list, the step size of the grid in the x and y direction, in meters
            n_closest: int, the number of closest cars to consider in the state space
            ss_bins: list, the number of bins to divide the x and y directions
            crop_dist: list of lists, the distance to crop the x and y directions, above which the values will be clipped
            policy: function, the policy to use in the simulation
            sim_frequency: int, the frequency of the simulation
            policy_frequency: int, the frequency of the policy
            render_mode: str, the mode to render the simulation
            seed: int, the seed to use in the simulation
        """

        self.grid_size = grid_size
        self.grid_step = grid_step
        self.config = default_config.copy()
        self.config["observation"] =  {
            "type": "OccupancyGrid",
            "features": ["presence"],
            "grid_size": grid_size,    # X controls how many lanes, Y controls how far ahead
            "grid_step": grid_step,
        }
        self.config["simulation_frequency"] = sim_frequency
        self.config["policy_frequency"] = policy_frequency
        self.render_mode = render_mode
        self.seed = seed
        self.policy = policy
        self.n_closest, self.ss_bins, self.crop_dist = n_closest, ss_bins, crop_dist
        self.initialize_states()


    def initialize_states(self):
        """
        Initialize the states of the occupancy grid
        """
        # Start the environment
        with gym.make('highway-v0', render_mode=self.render_mode, config=self.config) as env:
            obs, info = env.reset(seed = self.seed)
            self.current_obs = obs
            self.env = env
        
        # States will be stored in a dictionary, with the key being ((x1,x2,...,xn), (y1,y2,...,yn)), and n is the number of neighbors
        # Make ss_bins[0] from the crop_dist[0] and ss_bins[1] from crop_dist[1]
        self.x_bins = np.linspace(self.crop_dist[0][0], self.crop_dist[0][1], self.ss_bins[0])
        self.y_bins = np.linspace(self.crop_dist[1][0], self.crop_dist[1][1], self.ss_bins[1])

        # Each of the nearest neighbors will have a state of the form (x,y). Create the first key of the dictionary in the form (x1,x2,...xn)
        x_keys = list(itertools.product(self.x_bins, repeat=self.n_closest))
        y_keys = list(itertools.product(self.y_bins, repeat=self.n_closest))
        self.states = list(itertools.product(x_keys, y_keys))

    def get_car_positions(self):
        """
        Get the car positions in the occupancy grid
        Returns:
            car_positions: np.array, the car positions in the occupancy grid, in the form ([x1,y1], [x2,y2], ...)
        """ 
        positions = np.nonzero(self.current_obs[0])
        car_positions = np.array([positions[0]*self.grid_step[0] + self.grid_size[0][0], positions[1]*self.grid_step[1] + self.grid_size[1][0]]).T
        return car_positions

    def get_n_closest(self):
        """
        Get the n closest cars to the agent
        Returns:
            closest_car_positions: np.array, the positions of the n closest cars to the agent. If there are less than n_closest cars, the array is padded with the crop_dist values
        """

        car_positions = self.get_car_positions()
        distances = np.linalg.norm(car_positions, axis=1)

        # Remove the agent position
        closest = np.argsort(distances)[1:self.n_closest+1]
        closest_car_positions = car_positions[closest]

        # If there are less than n_closest cars, pad the array with the crop_dist values
        if len(closest_car_positions) < self.n_closest:
            n_missing = self.n_closest - len(closest_car_positions)
            closest_car_positions = np.pad(closest_car_positions, ((0, n_missing), (0,0)), 'constant', constant_values=(self.crop_dist[0][0], self.crop_dist[1][0]))

        # Values that are 
        return closest_car_positions
    
    def get_state(self, type='n_neighbours', decode=False):
        """
        Get the state of the environment
        Arguments:
            type: str, the type of state to get. Options are 'n_neighbours' or 'lane-wise' :
                n_neighbours: the state is the n closest neighbours in the form (x1,x2,...,xn), (y1,y2,...,yn)
                lane-wise: the state is a matrix with the binned distances of the agent to the car in front, back, left-lane and right-lane
            decode: bool, whether to decode the state
        """
        assert type in ['n_neighbours', 'lane-wise'], "The type of state must be either 'n_neighbours' or 'lane-wise'"
        if type == 'n_neighbours':
            state = self.state_n_neighbours()
        elif type == 'lane-wise':
            state = self.state_lane_wise(decode=decode)
        return state

    def state_n_neighbours(self):
        """
        Get the state of the environment in a neighbour-wise manner
        Returns:
            state: tuple, the state of the environment in the form (x1,x2,...,xn), (y1,y2,...,yn)
        """
        n_closest = self.get_n_closest()
        # For the closest cars, get the state
        state_x, state_y = [], []
        # Get the bin values for each of the x,y positions, and return a tuple with the values
        for car in n_closest:
            x = np.digitize(car[0], self.x_bins) - 1
            y = np.digitize(car[1], self.y_bins) - 1
            x_val, y_val = self.x_bins[x], self.y_bins[y]
            state_x.append(x_val)
            state_y.append(y_val)
        state = (tuple(state_x), tuple(state_y))
        return tuple(state)
    
    def state_lane_wise(self, decode=False):
        """
        Get the state of the environment in a lane-wise manner
        Arguments:
            state: np.array, the state of the environment with the binned distances of the agent to the car in front, back, left-lane and right-lane 
            decode: bool, whether to return the state in a decoded manner
        """
        # Get the car positions in relation to the agent. The agent is at position (0,0)
        car_positions = self.get_car_positions()

        # The same lane cars are the ones with the same x+-2, while front and back are the ones with y > 0 and y < 0, respectively
        same_lane = car_positions[np.abs(car_positions[:,0]) <= 2]
        front = same_lane[(same_lane[:,1] > 0) & (same_lane[:,1] < 30)]
        back = same_lane[(same_lane[:,1] < 0) & (same_lane[:,1] > -30)]

        # The other lane cars have to be in the range of (2,7] and [-2,-7), for the left and right lanes, respectively
        left_lane = car_positions[(car_positions[:,0] < -2) & (car_positions[:,0] >= -7) & (np.abs(car_positions[:,1]) < 20)]
        right_lane = car_positions[(car_positions[:,0] > 2) & (car_positions[:,0] <= 7) & (np.abs(car_positions[:,1]) < 20)]
        
        # Now we need to get the cars that are the closest from the arrays above
        front_dist = np.min(front[:,1]) if len(front) > 0 else 30            # The one with the smallest y value, e.g.: (0, 4) is closer than (0,12)
        back_dist = -np.max(back[:,1]) if len(back) > 0 else 30               # The one with the largest y value, e.g.: (0, -4) is closer than (0,-12)     
        
        # For the left and right lanes, we need to get the closest euclidean distance
        left_closest = left_lane[np.argmin(np.linalg.norm(left_lane, axis=1))] if len(left_lane) > 0 else np.array([30,30])
        right_closest = right_lane[np.argmin(np.linalg.norm(right_lane, axis=1))] if len(right_lane) > 0 else np.array([30,30])
        
        # If the left or right cars are in front of the agent, then the euclidean distance will be positive, otherwise negative
        left_dist = np.linalg.norm(left_closest) if left_closest[1] > 0 else -np.linalg.norm(left_closest)
        right_dist = np.linalg.norm(right_closest) if right_closest[1] > 0 else -np.linalg.norm(right_closest)

        # Now lets put the space in bins. 5 means that the distance is below 5m, 10 means that the distance is between 5 and 10m, and so on.
        bins_front = [5,10,15,30]

        # The back car is less important
        bins_back = [5,10,30]
        bins_left_right = [-20,-10,-5,5,10,20]

        front_dist = np.digitize(front_dist, bins_front)
        back_dist = np.digitize(back_dist, bins_back)
        left_dist = np.digitize(left_dist, bins_left_right)
        right_dist = np.digitize(right_dist, bins_left_right)

        state = np.array([front_dist, back_dist, left_dist, right_dist]) if not decode else "Front: {}, Back: {}, Left: {}, Right: {}".format(front_dist, back_dist, left_dist, right_dist)
        return state


    def test_env(self):
        """
        Function to test the environment with a random policy, or with a policy
        """
        obs, info = self.env.reset(seed = self.seed)
        self.current_obs = obs
        done = False
        while not done:
            # start = time.time()
            if self.policy is None:
                action = self.env.action_space.sample()
            else:
                action = self.policy()
            obs, reward, done, truncate, info = self.env.step(action)
            self.current_obs = obs
            print(self.get_state(type='lane-wise', decode=True), decode_meta_action(action), reward)
            time.sleep(1)
            # end = time.time()
            # print(f"Time taken: {end-start}")
        self.env.close()
        return info["score"]


# ------------------- SARSA -------------------    
class Sarsa(OccupancyGrid):
    def __init__(
        self,s
        alpha=0.75,
        gamma=0.8,
        m=50,
        epsilon=0.6,
        print_stats=False,
        **kwargs,
        ):
        """
        SARSA class constructor
        Arguments:
            alpha: float, the learning rate
            gamma: float, the discount factor
            m: int, the number of episodes to train the agent for
            epsilon: float, the epsilon value for the epsilon-greedy policy
            print_stats: bool, whether to print the statistics during initialization
        """

        super().__init__(**kwargs)
        self.initialize_Q()
        self.alpha, self.gamma, self.m, self.epsilon = alpha, gamma, m, epsilon

    def policy_Q(self, state):
        values = [self.Q[(state, action)] for action in range(5)]
        return np.argmax(values)   

    def initialize_Q(self):
        # Combine the possible states with the possible actions
        keys = list(itertools.product(self.states, range(5)))       # 5 possible actions, 0-4: left, idle, right, accelerate, decelerate
        if len(keys) > 150000:
            print("Warning: The number of states is too large, consider reducing the number of states")
        self.Q = {key: 0 for key in keys}

    def epsilon_greedy(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(5)
        else:
            values = [self.Q[(state, action)] for action in range(5)]
            return np.argmax(values)
        
    def train(self): 
        env = gym.make('highway-v0', render_mode=None, config=self.config)
        obs, info = env.reset(seed = self.seed)
        self.current_obs = obs
        done = False
        state = self.get_state()
        action = self.epsilon_greedy(state)
        for i in tqdm(range(self.m)):
            env.reset()
            done = False
            while not done:
                next_obs, reward, done, truncate, info = env.step(action)
                reward = fix_reward(reward)
                next_state = self.get_state()
                next_action = self.epsilon_greedy(next_state)
                print(next_state, decode_meta_action(next_action), reward)
                self.Q[(state, action)] += self.alpha*(reward + self.gamma*self.Q[(next_state, next_action)] - self.Q[(state, action)])
                state, action = next_state, next_action
                self.current_obs = next_obs
        env.close()

    def test(self):
        env = gym.make('highway-v0', render_mode=self.render_mode, config=self.config)
        obs, info = env.reset(seed = self.seed)
        self.current_obs = obs
        done = False
        state = self.get_state()
        action = self.policy_Q(state)
        while not done:
            next_obs, reward, done, truncate, info = env.step(action)
            next_state = self.get_state()
            next_action = self.policy_Q(next_state)
            state, action = next_state, next_action
            self.current_obs = next_obs
            print(next_state)
        env.close()
        return info["score"]

In [73]:
a = OccupancyGrid()

In [74]:
a.test_env()

Front: 1, Back: 4, Left: 6, Right: 6 IDLE 0.750013881947915
Front: 1, Back: 4, Left: 6, Right: 6 IDLE 0.750013881947915
Front: 5, Back: 4, Left: 6, Right: 4 IDLE 0.750013881947915
Front: 5, Back: 4, Left: 6, Right: 4 FASTER 0.8612260365439561
Front: 5, Back: 4, Left: 6, Right: 4 SLOWER 0.7005165723416802
Front: 5, Back: 4, Left: 6, Right: 3 SLOWER 0.6113342369426019
Front: 5, Back: 4, Left: 6, Right: 4 LANE_LEFT 0.5372146837978964
Front: 4, Back: 4, Left: 6, Right: 6 IDLE 0.5224288037788087
Front: 4, Back: 4, Left: 6, Right: 6 SLOWER 0.5155887104961816
Front: 3, Back: 4, Left: 6, Right: 6 LANE_RIGHT 0.5001388194791494
Front: 5, Back: 4, Left: 5, Right: 4 FASTER 0.6079475694315856
Front: 5, Back: 4, Left: 5, Right: 3 FASTER 0.6739213378304756
Front: 5, Back: 4, Left: 5, Right: 3 IDLE 0.7086255829442321
Front: 5, Back: 4, Left: 5, Right: 3 FASTER 0.8384392437884479
Front: 5, Back: 1, Left: 5, Right: 6 LANE_RIGHT 0.8884931108422481
Front: 5, Back: 1, Left: 4, Right: 6 IDLE 0.9407143482229

AttributeError: 'OccupancyGrid' object has no attribute 'current_obs'