# AE4350 Assignment : Car Racing Gym

## 1. Imports

In [1]:
import os
import gym
import shimmy
import gymnasium
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
import matplotlib.pyplot as plt
import numpy as np
import copy
%matplotlib inline

## 2. Create Environment

This section defines a custom environment wrapper class, `BasicWrapper`, that extends the functionality of the Gym environment. The wrapper includes features such as preprocessing observations, modifying rewards based on specific conditions, and managing timers and flags for various in-game events. The wrapper also handles resetting the environment and rendering it. Additionally, a helper function, `rgb2gray`, is provided to convert RGB images to grayscale using the luminosity method.


### Creating Environment Wrapper

Helper function to convert RGB to grayscale using luminosity method


In [6]:
def rgb2gray(rgb):
    return np.dot(rgb[..., :3], [0.2989, 0.5870, 0.1140])
    

Custom environment wrapper class

In [91]:
class BasicWrapper(gymnasium.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.grass_timer = 0
        self.grass_detected = False
        self.gas_timer = 0
        self.no_gas = False
        self.prev_observation = None  # Reset the previous observation
        self.prevprev_observation = None  # Reset the previous observation
        self.observation_space = gymnasium.spaces.Box(low=0, high=255, shape=(84, 96,3), dtype=np.uint8)
        self.action_space = gymnasium.spaces.Discrete(13)  # Updated action space
        self.score  = 0

    def _preprocess_observation(self, observation):
        """
        Preprocess the observation by removing bottom bar, converting to grayscale, and combining frames.

        Parameters:
        observation (numpy.ndarray): The current RGB observation from the environment with shape (height, width, 3).

        Returns:
        numpy.ndarray: Preprocessed observation with stacked frames and grayscale, with shape (height, width, 3*num_frames).
        """
        # Remove the bottom bar from the observation
        mod_obs = observation[:84, :, :]

        # Convert observation to grayscale
        mod_obs = rgb2gray(mod_obs)
        mod_obs = np.expand_dims(mod_obs, axis=-1)

        if self.prevprev_observation is not None:
            # Combine the current observation and previous observation
            combined_obs = np.concatenate([self.prevprev_observation.copy(),self.prev_observation.copy(), mod_obs.copy()], axis=-1)
        elif  self.prev_observation is not None:
            combined_obs = np.concatenate([self.prev_observation.copy(), mod_obs.copy(), mod_obs.copy()], axis=-1)
        else:
            # If there is no previous observation, use the current observation as is
            combined_obs = np.concatenate([mod_obs.copy(), mod_obs.copy(), mod_obs.copy()], axis=-1)


        # Update previous observation values
        self.prevprev_observation = copy.deepcopy(self.prev_observation)
        self.prev_observation = copy.deepcopy(mod_obs)

        return combined_obs
    
    def modify_reward(self,reward, action, obs):
        """
        Modify the reward based on various conditions.

        Parameters:
        reward (float): The original reward value.
        action (numpy.ndarray): The action taken in the environment.
        obs (numpy.ndarray): The current observation from the environment with shape (height, width, 3).
        self (BasicWrapper): The instance of the BasicWrapper class.

        Returns:
        float: The modified reward value based on different conditions.
        """
        # Clipping the reward
        reward = np.clip(reward, a_max=1.0, a_min=None)

        # No gas timer
        if action[1] - action[2] <= 0.3:
            self.no_gas = True
        else:
            self.no_gas = False

        if self.no_gas:
            self.gas_timer += 1
        else:
            self.gas_timer = 0

        if self.gas_timer >= 5:
            reward -= 0.5

        # Grass time out
        if np.any(obs[67:77, 46:50, 1] / 255 > 0.5):
            self.grass_detected = True
        else:
            self.grass_detected = False

        if self.grass_detected:
            self.grass_timer += 1
        else:
            self.grass_timer = 0

        if self.grass_timer > 4:
            reward -= 0.1
        if self.grass_timer > 20:
            reward -= 0.5

        return reward
    
    def action_list(self,action):
        
        if action == 0:
            cont = [0,0,0]
        elif  action  == 1:
            cont = [-1, 0, 0]
        elif action == 2:
            cont = [1, 0, 0]
        elif action  == 3:
            cont = [0, 1, 0]
        elif action == 4:
            cont = [0,0,0.8]
        elif action == 5:
            cont = [-0.5, 0.5, 0]
        elif action == 6:
            cont = [0.5, 0.5, 0]
        elif action == 7:
            cont = [-0.5, 0, 0.5]
        elif action == 8:
            cont = [0.5, 0, 0.5]
        elif action == 9:
            cont = [0,1,0.5]
        elif action == 10:
            cont = [0,0.5,0.8]
        elif action == 11:
            cont = [-0.3, 0.2, 0]
        elif action == 12:
            cont = [0.3, 0.2, 0]
        return cont

    
    def step(self, action):
        
        action_cont  = self.action_list(action)
        
        # Perform the action in the environment and retrieve the resulting state
        obs, reward, done, trun, info = self.env.step(action_cont)

        # Modify the reward based on different conditions
        reward = self.modify_reward(reward, action_cont, obs)

        # Preprocess the observation and return updated values
        pre_obs = self._preprocess_observation(obs)
        
        return pre_obs, reward, done, trun, info


    def render(self):
        # Render the environment
        return self.env.render()

    def close(self):
        # Close the environment
        return self.env.close()

    def reset(self, track_id=None, **kwargs):
        observation, info = self.env.reset(**kwargs)
        observation = self._preprocess_observation(observation)
        self.prev_observation = None  # Reset the previous observation
        self.prevprev_observation = None  # Reset the previous observation
        if info is None:
            info = {}
            
        self.grass_timer = 0
        self.grass_detected = False
        self.gas_timer = 0
        self.no_gas = False
        return observation, info

## 3. Creating Model

This section defines the process of creating and training a model using the specified environment.


### Creating Parallel environments

In [92]:
# Creating Environment
environment_name = 'CarRacing-v2'
num_envs = 6  # Number of parallel environments

# Create log path
log_path = os.path.join('Training', 'Logs')

# Create a function to create the environment to be used by DummyVecEnv
def make_env():
    make_kwargs = {'lap_complete_percent': 0.95, 'render_mode': 'human'}
    env = shimmy.openai_gym_compatibility.GymV26CompatibilityV0(env_id=environment_name, make_kwargs=make_kwargs)
    env = BasicWrapper(env)  # Wrap the gym environment with the BasicWrapper
    return env

# Create a list of environments
env_list = [make_env for _ in range(num_envs)]

# Create a vectorized environment using DummyVecEnv
env = DummyVecEnv(env_list)


# Create nn model
model = PPO('CnnPolicy', env, verbose=1, tensorboard_log=log_path)


Using cuda device
Wrapping the env in a VecTransposeImage.


Modifying learing rate of optimiser

In [93]:
# Set maximum gradient norm
max_grad_norm = 0.5
# Retrieve PyTorch optimizer from the model
optimizer = model.policy.optimizer


# Modify optimizer to enable gradient clipping
optimizer.clip_grad_norm = max_grad_norm

### Training Model

In [None]:
model.learn(total_timesteps=1500000)

Logging to Training\Logs\PPO_193
------------------------------
| time/              |       |
|    fps             | 29    |
|    iterations      | 1     |
|    time_elapsed    | 420   |
|    total_timesteps | 12288 |
------------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 27          |
|    iterations           | 2           |
|    time_elapsed         | 892         |
|    total_timesteps      | 24576       |
| train/                  |             |
|    approx_kl            | 0.017545039 |
|    clip_fraction        | 0.237       |
|    clip_range           | 0.2         |
|    entropy_loss         | -4.26       |
|    explained_variance   | -0.0143     |
|    learning_rate        | 0.0003      |
|    loss                 | 0.32        |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.0157     |
|    std                  | 1           |
|    value_loss           | 1.36    

Saving the trained model

In [2]:
Org_path = os.path.join('Training', 'Test Models', 'PPO_fin3')
model.save(Org_path)

NameError: name 'model' is not defined

# 4. Evaluate Model

Evaluation wrapper that pre-processes the observation such that the agent can work. Same as basicwrapper but without reward function altering such that the agent can evaluated.

In [10]:
class EvalWrapper(gymnasium.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.grass_timer = 0
        self.grass_detected = False
        self.gas_timer = 0
        self.no_gas = False
        self.prev_observation = None  # Reset the previous observation
        self.prevprev_observation = None  # Reset the previous observation
        self.observation_space = gymnasium.spaces.Box(low=0, high=255, shape=(84, 96,3), dtype=np.uint8)
        self.score  = 0

    def _preprocess_observation(self, observation):
        """
        Preprocess the observation by removing bottom bar, converting to grayscale, and combining frames.

        Parameters:
        observation (numpy.ndarray): The current RGB observation from the environment with shape (height, width, 3).

        Returns:
        numpy.ndarray: Preprocessed observation with stacked frames and grayscale, with shape (height, width, 3*num_frames).
        """
        # Remove the bottom bar from the observation
        mod_obs = observation[:84, :, :]

        # Convert observation to grayscale
        mod_obs = rgb2gray(mod_obs)
        mod_obs = np.expand_dims(mod_obs, axis=-1)

        if self.prevprev_observation is not None:
            # Combine the current observation and previous observation
            combined_obs = np.concatenate([self.prevprev_observation.copy(),self.prev_observation.copy(), mod_obs.copy()], axis=-1)
        elif  self.prev_observation is not None:
            combined_obs = np.concatenate([self.prev_observation.copy(), mod_obs.copy(), mod_obs.copy()], axis=-1)
        else:
            # If there is no previous observation, use the current observation as is
            combined_obs = np.concatenate([mod_obs.copy(), mod_obs.copy(), mod_obs.copy()], axis=-1)


        # Update previous observation values
        self.prevprev_observation = copy.deepcopy(self.prev_observation)
        self.prev_observation = copy.deepcopy(mod_obs)

        return combined_obs
    

    def step(self, action):
        # Perform the action in the environment and retrieve the resulting state
        obs, reward, done, trun, info = self.env.step(action)

        # Preprocess the observation and return updated values
        pre_obs = self._preprocess_observation(obs)
        
        return pre_obs, reward, done, trun, info


    def render(self):
        # Render the environment
        return self.env.render()

    def close(self):
        # Close the environment
        return self.env.close()

    def reset(self, track_id=None, **kwargs):
        observation, info = self.env.reset(**kwargs)
        observation = self._preprocess_observation(observation)
        self.prev_observation = None  # Reset the previous observation
        self.prevprev_observation = None  # Reset the previous observation
        if info is None:
            info = {}
            
        self.grass_timer = 0
        self.grass_detected = False
        self.gas_timer = 0
        self.no_gas = False
        return observation, info

In [23]:
# Specify the environment name
environment_name = 'CarRacing-v2'

# Create the environment with specific settings
make_kwargs = {'lap_complete_percent': 0.95, 'render_mode': 'human'}
eval_env = shimmy.openai_gym_compatibility.GymV26CompatibilityV0(env_id=environment_name, make_kwargs=make_kwargs)

# Wrap the environment with an evaluation wrapper
eval_env = EvalWrapper(eval_env)

# Function to test the trained model
def test(model, env, n_eval_episodes=10, render=True):
    """
    Evaluate the performance of a trained model on a given environment.

    Parameters:
        model (BaseAlgorithm): The trained reinforcement learning model to be tested.
        env (gym.Env): The evaluation environment.
        n_eval_episodes (int): Number of episodes to run for evaluation.
        render (bool): Whether to render the environment during evaluation.

    Returns:
        tuple: A tuple containing the average and standard deviation of episode rewards.
    """
    episode_rewards = []

    # Evaluate the model over a number of episodes
    for _ in range(n_eval_episodes):
        obs, info = env.reset()
        done = False
        trun = False
        episode_reward = 0.0

        # Run the simulation for one episode
        while not done and not trun:
            # Get the model's action prediction
            action, _ = model.predict(obs.copy())
            # Take a step in the environment
            obs, reward, done, trun, _ = env.step(action)
            episode_reward += reward
            # Render the environment if specified
            if render:
                env.render()

        episode_rewards.append(episode_reward)

    # Calculate and return the average and standard deviation of episode rewards
    return np.average(episode_rewards), np.std(episode_rewards)

# Test the trained model using the evaluation environment
a = test(model, eval_env, n_eval_episodes=5, render=True)

# Close the evaluation environment
eval_env.close()

# Print the average and standard deviation of episode rewards
print(a)


MemoryError: Unable to allocate 1.11 GiB for an array with shape (2048, 6, 3, 84, 96) and data type float32

In [71]:
a

(773.164075564791, 112.73922707228856)