This is the first working implementation of a discrete MDP wrapper for the highway environment

In [1]:
# Move up one directory level to import local instance of highway environment
%cd ..
%pwd

d:\Project Files\HoLab\Codes\Mine\HighwayEnv-TRI


'd:\\Project Files\\HoLab\\Codes\\Mine\\HighwayEnv-TRI'

Import necessary modules

In [2]:
import gymnasium as gym
import highway_env

from frozendict import frozendict
import numpy as np
import math
import copy

from matplotlib import pyplot as plt
%matplotlib inline

import warnings
import logging
logger = logging.getLogger()



Define MDP class

In [3]:
class GymDiscreteMDP:
    def __init__(self, *args, **kwargs):
        if "config" in kwargs:
            self.config = kwargs.get("config", None)
        self.env = gym.make(*args, **kwargs)
        self.obs, self.info = self.env.reset()
        action_space = self.env.action_space
        if isinstance(action_space, gym.spaces.discrete.Discrete):
            self.actions = range(action_space.start, action_space.n)
        else:
            raise NotImplementedError("Only discrete action spaces are currently supported")
            # |Can later be extended for other gym actionspaces


    def to_hashable_state(self, obs):
        """
        Create hashable variable using environment state information.
        """        
        raise NotImplementedError("Please Implement this method")

    def step(self, action):
        """
        The function takes an action and returns MDP-compatible state information.
        Can be overridden based on simulation-specific properties.
        """
        obs, reward, done, truncated, info = self.env.step(action)
        logging.debug(obs)
        next_state = self.to_hashable_state(obs)
        return next_state, reward, done, truncated, info

    def copy_env(self):
        """
        Return a copy of the current state of the environment,
        so it can be set back to simulate the outcome of various actions.
        """
        raise NotImplementedError("Please Implement this method")

    def set_env(self, env):
        """
        Set current environent state to the one passed in the argument.
        """
        raise NotImplementedError("Please Implement this method")




class HighwayDiscreteMDP(GymDiscreteMDP):
    '''
    The class currently only supports kinematics observation space in HighwayEnv
    '''
    def __init__(self, *args, **kwargs):
        if (not "config" in kwargs 
            or not "observation" in kwargs["config"]
            or not "features" in kwargs["config"]["observation"]
            or not "Kinematics" == kwargs["config"]["observation"]["type"]):
            kwargs["config"] = self.default_config()
            warnings.warn("Config not specified/does not match requirement. USING DEFAULT CONFIG.\n \
                  To use custom config, please use Kinematics observation space, \
                  and specify (at least) the following features in config:\n \
                  \tpresence, x, y, vx, vy, heading.")
        super().__init__(*args, **kwargs)
        # self.action_dict = self.env.unwrapped.action_type.actions_indexes
        # |Set perception distance to maximum. So, state of all cars in the environment 
        # |are available irrespective of whether they are in the visibility window.
        self.env.unwrapped.PERCEPTION_DISTANCE = float('inf')
        self.initial_state = self.to_hashable_state(self.obs)
    
    def get_env_properties(self):
        return self.initial_state, self.actions

    # def step(self, action):
    #     # set_vehicles(state, self.env)
    #     obs, reward, done, truncated, info = self.env.step(action)
    #     logging.debug(action, obs[0])
    #     next_state = self.to_hashable_state(obs)
    #     return next_state, reward, done, truncated, info

    def to_hashable_state(self, obs):
        road_objects = []
        for road_obj in obs:
            feature_vals = {k: v for k,v in zip(self.config["observation"]["features"], road_obj)}
            veh = {}
            veh["position"] = tuple(np.round((feature_vals["x"],feature_vals["y"]), 2))
            veh["speed"] = tuple(np.round((feature_vals["vx"],feature_vals["vy"]), 2))
            veh["heading"] = np.round(feature_vals["heading"], 3)
            road_objects.append(frozendict(veh))
        return tuple(road_objects)
    
    def default_config(self):
        return {
        "observation": {
            "type": "Kinematics",
            "vehicles_count": 50,
            "features": ["presence", "x", "y", "vx", "vy", "heading"],
            "normalize": False,
            "absolute": True,
            "order": "sorted",
            "observe_intentions": False,
            "include_obstacles": True
            }
        }

    def copy_env(self):
        return copy.deepcopy(self.env)

    def set_env(self, env):
        self.env = copy.deepcopy(env)

    # def set_vehicles(self, vehicles):
    #     # |Would like to avoid using this funtion if possible.
    #     for v, new_v in zip(self.env.unwrapped.road.vehicles, vehicles):
    #         assert id(v) == new_v['id']
    #         v.position = np.array(new_v['position'])
    #         v.heading = new_v['heading']
    #         v.speed = new_v['speed']

    # def step(self, state, action):
    #     # |Avoid using this overloaded function, use super step funtion instead
    #     self.set_vehicles(state, self.env)
    #     obs, reward, done, truncated, info = self.env.step(self.action_dict[action])
    #     next_state = self.to_hashable_state(obs)
    #     return next_state, reward, done, truncated, info



def mdp_factory(*args, gym_env, **kwargs):
    if gym_env == "highway":
        highwaymdp = HighwayDiscreteMDP(*args, **kwargs)    
        return highwaymdp
    return 0


Set up MDP

In [5]:
import cProfile

logger.setLevel(logging.INFO)    # Other options: INFO, WARNING, ERROR, CRITICAL

import functools
import multiprocessing as mp
# pool = mp.Pool((mp.cpu_count()-1)) #This uses all processors instead of manually putting the number

In [6]:
config = {
        "observation": {
            "type": "Kinematics",
            "vehicles_count": 50,
            "features": ["presence", "x", "y", "vx", "vy", "heading"],
            "normalize": False,
            "absolute": True,
            "order": "sorted",
            "observe_intentions": False,
            "include_obstacles": True
            }
        }

highway_mdp = mdp_factory('highway-v0', gym_env="highway", config=config, render_mode='human')

initial_state, actions = highway_mdp.get_env_properties()

In [7]:
max_depth = 2   # The number of steps to plan ahead

def run_code():
    visited = set()
    transitions = {}
    frontier = {(initial_state, 0, highway_mdp.copy_env())}
    loop_counter = 0
    while frontier:
        state, depth, curr_env = frontier.pop()
        visited.add(state)
        if depth < max_depth:
            for action in actions:
                highway_mdp.set_env(curr_env)
                # logging.debug(highway_mdp.env.unwrapped.road.vehicles[0])
                if (state[0], action) not in transitions:
                    transitions[(state[0], action)] = {}
                next_state, reward, done, truncated, info = highway_mdp.step(action)
                logging.debug(str(state[0]) + ' | ' + str(action) + ' | ' + str(next_state[0]))
                if next_state[0] not in transitions[(state[0], action)]:
                    transitions[(state[0], action)][next_state[0]] = 0
                transitions[(state[0], action)][next_state[0]] += 1
                if next_state not in visited:
                    frontier.add((next_state, depth + 1, highway_mdp.copy_env()))
                loop_counter += 1
        MDPstatus = "Current Depth: " + str(depth) + " | Frontier: " + str(len(frontier)) +\
                    " | Visited: " + str(len(visited)) + " | Transitions:" + str(len(transitions))
        logging.info(MDPstatus)
    print("Number of execution calls: ", loop_counter)

run_code()

# cProfile.run('run_code()')


INFO:root:Current Depth: 0 | Frontier: 5 | Visited: 1 | Transitions:5
INFO:root:Current Depth: 1 | Frontier: 9 | Visited: 2 | Transitions:10
INFO:root:Current Depth: 1 | Frontier: 13 | Visited: 2 | Transitions:10
INFO:root:Current Depth: 1 | Frontier: 17 | Visited: 3 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 16 | Visited: 4 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 15 | Visited: 5 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 14 | Visited: 6 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 13 | Visited: 7 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 12 | Visited: 8 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 11 | Visited: 8 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 10 | Visited: 8 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 9 | Visited: 9 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 8 | Visited: 9 | Transitions:15
INFO:root:Current Depth: 2 | Frontier: 7 | Visited: 10 | Transitions:

Number of execution calls:  30


In [109]:
print(highway_mdp.env.unwrapped.action_type.actions_indexes)

transitions

{'LANE_LEFT': 0, 'IDLE': 1, 'LANE_RIGHT': 2, 'FASTER': 3, 'SLOWER': 4}


{(frozendict.frozendict({'position': (178.19, 0.0), 'speed': (25.0, 0.0), 'heading': 0.0}),
  0): {frozendict.frozendict({'position': (296.98, 0.2), 'speed': (0.04, -0.01), 'heading': -0.173}): 1},
 (frozendict.frozendict({'position': (178.19, 0.0), 'speed': (25.0, 0.0), 'heading': 0.0}),
  1): {frozendict.frozendict({'position': (296.98, 0.2), 'speed': (0.04, -0.01), 'heading': -0.173}): 1},
 (frozendict.frozendict({'position': (178.19, 0.0), 'speed': (25.0, 0.0), 'heading': 0.0}),
  2): {frozendict.frozendict({'position': (296.98, 0.2), 'speed': (0.04, -0.01), 'heading': -0.173}): 1},
 (frozendict.frozendict({'position': (178.19, 0.0), 'speed': (25.0, 0.0), 'heading': 0.0}),
  3): {frozendict.frozendict({'position': (296.98, 0.2), 'speed': (0.04, -0.01), 'heading': -0.173}): 1},
 (frozendict.frozendict({'position': (178.19, 0.0), 'speed': (25.0, 0.0), 'heading': 0.0}),
  4): {frozendict.frozendict({'position': (296.98, 0.2), 'speed': (0.04, -0.01), 'heading': -0.173}): 1},
 (frozendi

# TEST CODE

In [127]:
# |DEBUGGING CODE

print(highway_mdp.env.unwrapped.road.vehicles[0])
tmp = highway_mdp.copy_env()
print(tmp.unwrapped.road.vehicles[0])
highway_mdp.step(1)
print(highway_mdp.env.unwrapped.road.vehicles[0])
print(tmp.unwrapped.road.vehicles[0])
highway_mdp.set_env(tmp)
print(highway_mdp.env.unwrapped.road.vehicles[0])


# dir(highway_mdp.env.unwrapped)

next_state, reward, done, truncated, info = highway_mdp.step(1)
# next_state


MDPVehicle #544: [207.38302935   8.        ]
MDPVehicle #248: [207.38302935   8.        ]
1 [1.   1.   0.08 1.   0.   0.  ]
MDPVehicle #544: [232.38302935   8.        ]
MDPVehicle #248: [207.38302935   8.        ]
MDPVehicle #248: [207.38302935   8.        ]


In [None]:
config = {
        "observation": {
            "type": "Kinematics",
            "vehicles_count": 50,
            "features": ["presence", "x", "y", "vx", "vy", "heading"],
            "normalize": False,
            "absolute": True,
            "order": "sorted",
            "observe_intentions": False,
            "include_obstacles": True
            }
        }

env = gym.make('highway-v0', config=config, render_mode='human')

In [75]:
vehicles = []
for veh_state in obs:
    feature_vals = {k: v for k,v in zip(config["observation"]["features"], veh_state)}
    veh = {}
    veh["position"] = tuple(np.round((feature_vals["x"],feature_vals["y"]), 2))
    veh["speed"] = tuple(np.round((feature_vals["vx"],feature_vals["vy"]), 2))
    veh["heading"] = np.round(feature_vals["heading"], 3)
    vehicles.append(frozendict(veh))
vehicles = tuple(vehicles)
vehicles

(frozendict.frozendict({'position': (1.0, 0.08), 'speed': (1.0, 0.0), 'heading': 0.0}),
 frozendict.frozendict({'position': (0.08, 0.0), 'speed': (-0.24, 0.0), 'heading': 0.0}),
 frozendict.frozendict({'position': (0.33, -0.04), 'speed': (-0.18, 0.0), 'heading': 0.003}),
 frozendict.frozendict({'position': (0.45, 0.04), 'speed': (-0.31, 0.0), 'heading': 0.005}),
 frozendict.frozendict({'position': (0.72, 0.0), 'speed': (-0.19, 0.0), 'heading': 0.0}),
 frozendict.frozendict({'position': (0.93, 0.04), 'speed': (-0.19, 0.0), 'heading': 0.0}),
 frozendict.frozendict({'position': (1.0, -0.08), 'speed': (-0.27, 0.0), 'heading': 0.0}),
 frozendict.frozendict({'position': (1.0, -0.04), 'speed': (-0.24, -0.0), 'heading': -0.0}),
 frozendict.frozendict({'position': (1.0, -0.0), 'speed': (-0.16, 0.0), 'heading': 0.001}),
 frozendict.frozendict({'position': (1.0, -0.08), 'speed': (-0.28, 0.0), 'heading': 0.0}),
 frozendict.frozendict({'position': (0.0, 0.0), 'speed': (0.0, 0.0), 'heading': 0.0}),


In [77]:
import sys

# sys.getsizeof(transitions)
sys.getsizeof(highway_mdp.copy_env())
# sys.getsizeof(int(10.0))


48