![image.png](attachment:image.png)

# <font color=#003091> Assignment: Navigating traffic </font>
### <font color=#0098df> Master in Computer Science & Business Technology</font>
#### <font color=#a6a6a6> Year: 2023</font>  
**David Kremer**


This notebook provides a guideline for the Assignment, and shows the steps to build a RL algorithm that navigates through traffic.

# The RL Toolbox

In the previous sessions we saw how to interact with environments through the Gym interface, how to collect data with the Agent class given a policy, and how to train a policy with the QPolicy class. 

![toolbox2.png](attachment:toolbox2.png)

The Memory, Agent and QPolicy classes can be loaded by simply importing them as:

In [1]:
from rl.agent import Memory, Agent
from rl.dqn import QPolicy

They can be used in the same way as in the previous sessions. 

# The environment

For this assignment the environment is a traffic simulator that we can import in the following way:

In [2]:
import gym
from rl.traffic import TrafficEnv
from rl.traffic import Car

2023-07-04 00:02:25.851 Python[41180:1148084] ApplePersistenceIgnoreState: Existing state will not be touched. New state will be written to /var/folders/qd/gcz7v8b52fjf42gd3137l9g40000gn/T/org.python.python.savedState


To create an environment variable that we can interact with in the same way as with the Gym environments, we just have to use the class TrafficEnv:

In [3]:
env = TrafficEnv(nlanes=4, ncars=5, images=True)

The TrafficEnv accepts 3 arguments for creating the environment:
 - nlanes: number of lanes in the road
 - ncars: number of cars in the road
 - images(true/false): if true, the observations are images of the road, if false the observations are the state of environment
 
 
In the environment, the agent can control a car that is able to accelerate, break and change lanes to navigate through traffic.

# The Task

The task is to build and train a RL algorithm, using the provided toolbox, that interacts with the Traffic environment and achives the following goals:
 - Not crashing
 - Covering the most distance possible in a given time
 - Driving "efficiently" (that is, change lanes the least possible)

The following sections outline the different steps to take to achive that.

## 1 - Understanding the Env

Before starting the training and sampling, it is important to understand the environment. In particular, it is important to understand the action space and the observation space.

In the case of our particular environment, we have two choices of observations: images from the road, or the road's state (that is, positions and velocities of the cars). 

In this stage you should:
 - Understand and describe the shape and type of the action space
 - Understand and describe the shape and type of the observation space for both the images and the state cases.
 - Decide which of the two observation spaces you will choose, and justify your decision.

In [4]:
# Print the shape of observation space and action space
print("Observation space shape:", env.observation_space.shape)

print("Action space discrete shape:", env.action_space[0])
print("Action space box shape:", env.action_space[1])

Observation space shape: (25, 50)
Action space discrete shape: Discrete(3)
Action space box shape: Box(0.0, 1.0, (2,), float64)


Action space is divided into a tuple composed of 2 elements:
- The first element corresponds to three integer values: 0 for moving left, 1 for straight and 2 to the right
- The second value corresponds to a list of two values, the first is the acceleration, and the second braking.

For the observation space:


## 2 - Sampling from the Env

Once the action and observation spaces are known, we should do some "manual" sampling to familiarize ourselves with the environment.

In this stage you should:
 - Rendering the environment and manually input actions.
 - Use the previous functionality to understand what each of the actions mean (and describe them).
 - Use the previous functionality to understand what the final states are (when the simulator returns a end_state). 
 - Build a function (as in previous sessions) that samples the environment with the random policy, and collects the [observation, action, reward, observation, final_state] data.

Reset the environment and render the env

In [5]:
env.reset()
#env.render()

(array([[0.  , 0.75, 0.  , ..., 0.  , 0.75, 0.  ],
        [0.  , 0.  , 0.  , ..., 0.  , 0.  , 0.  ],
        [0.  , 0.  , 0.  , ..., 0.25, 0.25, 0.  ],
        ...,
        [0.  , 0.  , 0.  , ..., 0.  , 0.  , 0.  ],
        [0.  , 0.  , 0.  , ..., 0.  , 0.  , 0.  ],
        [0.  , 0.75, 0.  , ..., 0.  , 0.75, 0.  ]]),
 {})

Creating some inputs 

In [6]:
# action = env.action_space.sample()
# for i in range(50):
#     env.step(action)
#     env.render()

In [7]:
env.action_space.sample()

(2, array([0.80595944, 0.68766995]))

Understanding the actions

In [8]:
# for i in range(20):
#     env.step((2,[1,0]))
#     env.render()

Obtaining states

In [9]:
env.is_final

False

In [10]:
env.get_state()

array([ 3.12500000e-01,  1.39245382e-02,  0.00000000e+00, -2.50000000e-01,
        4.15293737e-01,  6.25000000e-02,  5.63713839e-05, -2.50000000e-01,
        1.89608301e-01,  6.25000000e-02,  5.63713839e-05,  1.25000000e-01,
        2.66994080e-01,  4.37500000e-01, -8.19747046e-04,  0.00000000e+00,
        3.33565865e-01,  3.12500000e-01,  0.00000000e+00])

The states give an array composed by:
- Three elements first elements corresponding to the agend: position, velocity and acceleration of the agent.
- Four elements for each other car: relative x-y position wrt the agent, the absolute x position and velocity.


Creating a function that assigns a random policy and ouptuts observation, action, reward, observation and final_state

In [11]:
def randomSample(env):
    action = env.action_space.sample()
    env.step(action)
    obs = env.get_state()
    changing = True if obs[2] != 0 else False
    final_state = env.is_final
    reward = env.reward_func(obs[0], obs[1], changing, final_state)
    return [obs, action, reward, final_state]


## 3 - Defining the reward function

As you may have noticed from the previous section, the reward is always 0. That is because the environment does not define a reward function.

In this stage you should:
 - Define a reward function that will make the agent achive the goals described above (and justify your choice).
 - Test the reward function with the random policy, and obtain the average reward over 10 episodes. 
 
The reward function is a function that, at each frame, returns the reward just obtained at that frame. For building the reward function you have access to:
 - x_agent(float): the 'x' position of the agent (0 to 1, from left to right)
 - v_agent(float): the velocity of the agent
 - changing(bool): true if agent is changing lane, false otherwise
 - crashed(bool): true if agent just crashed in that frame, false otherwise
 
The following code shows how to define the function and include it in the env:

In [12]:
def reward_func(x_agent, y_agent, v_agent, changing, crashed, x_boundary_left, x_boundary_right):
    threshold_distance = 0.2
    threshold_y_difference = 0.2
    threshold_wall_distance = 0.1

    # Initialize penalties
    crash_penalty = 10000
    change_penalty = 15
    slow_penalty = 40
    close_car_penalty = 35
    alignment_penalty = 10
    overtaken_penalty = 50
    wall_penalty = 60

    # Calculate rewards and penalties
    reward = -crash_penalty if crashed else 10
    if not crashed:
        reward -= change_penalty if changing else 0
        reward += 15 if v_agent > 0.95 else (6 if v_agent > 0.75 else -slow_penalty)
        
        min_distance, min_y_difference, close_behind = float('inf'), float('inf'), False

        for car in [car for car in self.cars if car != self.cars[0]]:
            distance = abs(x_agent - car.px)
            min_distance = min(min_distance, distance)
            min_y_difference = min(min_y_difference, abs(y_agent - car.py))
            if car.px < x_agent and car.v > v_agent: 
                close_behind = True 

        reward -= close_car_penalty if min_distance < threshold_distance else 0
        reward -= alignment_penalty if min_y_difference < threshold_y_difference else 0
        reward -= overtaken_penalty if close_behind else 0
        reward -= wall_penalty if x_agent <= x_boundary_left + threshold_wall_distance or x_agent >= x_boundary_right - threshold_wall_distance else 0

    return reward



In [13]:
env.reward_func = reward_func

*Clue*: you don't _need_ to use all the input variables to compute the reward, some of them may be not useful for the task...

## 4 - Preparing the Agent

The action space contains a continuous component. If we want to be able to use the DQN algorithm to learn a policy we will have to discretize it. 

In this stage you should:
 - Create a wrapper that transforms current environment into one with a completely discrete action space.
 - If you chose to use images, you may need to also wrap the environment with the Stack wrapper (an observation wrapper, provided below) that stacks frames into 3 (as we did with FlappyBird).
 - Create an Agent and test it with the random policy for this new discrete environment.

In [14]:
from skimage.transform import resize
# import numpy as np

# class Stack(gym.ObservationWrapper):
#     def __init__(self, ienv, frames=3):
#         super(Stack, self).__init__(ienv)   
#         self.frames = frames
#         self.im_size = tuple(ienv.observation_space.shape)
#         self.c_obs = np.zeros((self.frames,) + self.im_size)
#         self.observation_space = gym.spaces.Box(low=0, high=1, shape=self.c_obs.shape)

#     def observation(self, obs):
#         self.c_obs = np.concatenate( [self.c_obs[1:,:], obs[np.newaxis,:]], axis=0 )
#         return self.c_obs

In [15]:
# # example:
# stacked_env = Stack(env, frames=4)
!pip install tiles



In [16]:
import gym
from gym import spaces
import numpy as np
from collections import deque


class Stack(TrafficEnv):
    def __init__(self, nlanes, ncars):
        super().__init__(nlanes, ncars)
        self.l_lims = np.array((0.0, 0.0)) # f, b
        self.h_lims = np.array((2.0, 1.0)) # f, b
        self.action_space = spaces.Tuple((spaces.Discrete(3), spaces.Box(self.l_lims, self.h_lims, dtype=float)))

    def reward_func(x_agent, y_agent, v_agent, changing, crashed, x_boundary_left, x_boundary_right):
        threshold_distance = 0.2
        threshold_y_difference = 0.2
        threshold_wall_distance = 0.1

        # Initialize penalties
        crash_penalty = 10000
        change_penalty = 15
        slow_penalty = 40
        close_car_penalty = 35
        alignment_penalty = 10
        overtaken_penalty = 50
        wall_penalty = 60

        # Calculate rewards and penalties
        reward = -crash_penalty if crashed else 10
        if not crashed:
            reward -= change_penalty if changing else 0
            reward += 15 if v_agent > 0.95 else (6 if v_agent > 0.75 else -slow_penalty)
        
            min_distance, min_y_difference, close_behind = float('inf'), float('inf'), False

            for car in [car for car in self.cars if car != self.cars[0]]:
                distance = abs(x_agent - car.px)
                min_distance = min(min_distance, distance)
                min_y_difference = min(min_y_difference, abs(y_agent - car.py))
                if car.px < x_agent and car.v > v_agent: 
                    close_behind = True 

            reward -= close_car_penalty if min_distance < threshold_distance else 0
            reward -= alignment_penalty if min_y_difference < threshold_y_difference else 0
            reward -= overtaken_penalty if close_behind else 0
            reward -= wall_penalty if x_agent <= x_boundary_left + threshold_wall_distance or x_agent >= x_boundary_right - threshold_wall_distance else 0

        return reward



    


# Define the discrete actions
discrete_actions = [(0, [0.0, 0.0]), (0, [0.0, 0.5]), (0, [0.0, 1.0]),  # Left with different acceleration/braking levels
                    (0, [0.5, 0.0]), (0, [0.5, 0.5]), (0, [0.5, 1.0]),  # Left with different acceleration/braking levels
                    (0, [1.0, 0.0]), (0, [1.0, 0.5]), (0, [1.0, 1.0]),  # Left with different acceleration/braking levels

                    (1, [0.0, 0.0]), (1, [0.0, 0.5]), (1, [0.0, 1.0]),  # Straight with different acceleration/braking levels
                    (1, [0.5, 0.0]), (1, [0.5, 0.5]), (1, [0.5, 1.0]),  # Straight with different acceleration/braking levels
                    (1, [1.0, 0.0]), (1, [1.0, 0.5]), (1, [1.0, 1.0]),  # Straight with different acceleration/braking levels

                    (2, [0.0, 0.0]), (2, [0.0, 0.5]), (2, [0.0, 1.0]),  # Right with different acceleration/braking levels
                    (2, [0.5, 0.0]), (2, [0.5, 0.5]), (2, [0.5, 1.0]),  # Right with different acceleration/braking levels
                    (2, [1.0, 0.0]), (2, [1.0, 0.5]), (2, [1.0, 1.0])]  # Right with different acceleration/braking levels


# Map continuous actions to discrete actions
def map_action(action):
    return discrete_actions[action]

# Wrap the environment with a custom wrapper to handle the mapping of actions
class DiscreteActionWrapper(gym.ActionWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.action_space = spaces.Discrete(len(discrete_actions))
    
    def action(self, action):
        return map_action(action)

class RewardWrapper(gym.RewardWrapper):
    def __init__(self, env):
        super().__init__(env)

    def reward_func(self, x_agent, y_agent, v_agent, changing, crashed, x_boundary_left, x_boundary_right):
        threshold_distance = 0.2
        threshold_y_difference = 0.2
        threshold_wall_distance = 0.1

        # Initialize penalties
        crash_penalty = 200
        change_penalty = 3
        slow_penalty = 10
        close_car_penalty = 8
        alignment_penalty = 3
        overtaken_penalty = 10
        wall_penalty = 12

        # Calculate rewards and penalties
        reward = -crash_penalty if crashed else 2
        if not crashed:
            reward -= change_penalty if changing else 0
            reward += 3 if v_agent > 0.95 else (1.5 if v_agent > 0.75 else -slow_penalty)
            
            min_distance, min_y_difference, close_behind = float('inf'), float('inf'), False

            for car in [car for car in self.env.cars if car != self.env.cars[0]]:
                distance = abs(x_agent - car.px)
                min_distance = min(min_distance, distance)
                min_y_difference = min(min_y_difference, abs(y_agent - car.py))
                if car.px < x_agent and car.v > v_agent: 
                    close_behind = True 

            reward -= close_car_penalty if min_distance < threshold_distance else 0
            reward -= alignment_penalty if min_y_difference < threshold_y_difference else 0
            reward -= overtaken_penalty if close_behind else 0
            reward -= wall_penalty if x_agent <= x_boundary_left + threshold_wall_distance or x_agent >= x_boundary_right - threshold_wall_distance else 0

        return reward

    def reward(self, reward, done=False):
        # Get necessary information
        x_agent = self.env.cars[0].px / self.env.rsx
        y_agent = self.env.cars[0].py
        v_agent = self.env.cars[0].v
        changing = self.env.cars[0].va != 0.0
        crashed = self.env.is_final
        x_boundary_left = 0
        x_boundary_right = self.env.cars[0].rsx

        # Use your custom reward function
        new_reward = self.reward_func(x_agent, y_agent, v_agent, changing, crashed, x_boundary_left, x_boundary_right)

        return new_reward


In [17]:
# # Take a step in the environment
# step_output = env.step(action)
# print("Returned value:", step_output)


In [18]:
# import gym

# # Create the environment
# env = TrafficEnv(nlanes=4, ncars=5)
# # Reset the environment
# obs = env.reset()

# # Run the episode
# done = False
# while not done:
#     # Choose an action
#     action = env.action_space.sample()

#     # Take a step in the environment
#     step_output = env.step(action)
#     print("Returned value:", step_output)

#     # Unpack the returned values
#     obs  = step_output

#     # Print the observation, reward, done, and info
#     print("Observation:", obs)
#     print("Done:", done)
#     #print("Info:", info)

#     # Render the environment
#     env.render()

# # Close the environment
# env.close()







In [19]:
#sample = randomSample(env)#

In [20]:
#env.render()

In [21]:
# env.close()

*Clue*: You may want to define the actions "manually", that is, define the act_dict dictionary by hand specifying a list of discrete actions that are combinations of the actions of the original space. In that way you can also filter combinations that don not make sense, like breaking and accelerating at the same time.

*Clue2*: If you chose to use images, you will have to apply two wrappers to the env. You can do this by simply wrapping an alredy wrapped env, and in this case the order should not matter.

## 5 - Preparing the DQN

With the agent ready, we just need to prepare the DQN algorithm. For that, we need to define the neural network for the Q function.

In this stage you should:
- Define the NN architecture. Remember the input shape will be the observation space, and the output shape should be the action space. (In particular, if you are using images, you may want to do a CNN).
- Implement the NN in Pytorch and create a QPolicy with that NN.
- Test it by collecting data with the agent created in the previous section, this time using the policy given by the untrained QPolicy instead of a random policy.

In [22]:
from stable_baselines3 import DQN
from stable_baselines3.dqn import MlpPolicy
import os
import time


# Saving logs to visulise in Tensorboard, saving models
models_dir = f"models/Highway-{time.time()}"
logdir = f"logs/Highway-{time.time()}"
if not os.path.exists(models_dir):
    os.makedirs(models_dir)
if not os.path.exists(logdir):
    os.makedirs(logdir)



# # The learning agent and hyperparameters
# model = DQN(
#     policy=MlpPolicy,
#     env=env,
#     verbose=1,
# )

In [23]:
import gym
from gym import spaces
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
import torch

# Create the environment
env = TrafficEnv(nlanes=4, ncars=5)
stacked_env = Stack(nlanes=4, ncars=5)
stacked_env.reset()

# Wrap the environment with the DiscreteActionWrapper to handle the mapping of actions
env = DiscreteActionWrapper(stacked_env)


device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

# Define hyperparameters
policy = "MlpPolicy"  # Policy architecture
learning_rate = 0.00001  # Learning rate
buffer_size = 10000  # Replay buffer size
batch_size = 50  # Minibatch size
gamma = 0.2  # Discount factor
exploration_fraction = 0.3  # Fraction of time spent exploring
exploration_initial_eps = 1.0  # Initial exploration rate
exploration_final_eps = 0.05  # Final exploration rate
target_update_interval = 1000  # Update the target network every X steps
train_freq = 1  # Update the model every X steps
gradient_steps = 1  # Number of gradient steps to take during training
policy_kwargs = dict()  # Additional arguments for the policy

# Initialize the DQN model with hyperparameters
model = DQN(
    policy=policy,
    env=env,
    learning_rate=learning_rate,
    buffer_size=buffer_size,
    batch_size=batch_size,
    gamma=gamma,
    exploration_fraction=exploration_fraction,
    exploration_initial_eps=exploration_initial_eps,
    exploration_final_eps=exploration_final_eps,
    target_update_interval=target_update_interval,
    train_freq=train_freq,
    gradient_steps=gradient_steps,
    policy_kwargs=policy_kwargs,
    verbose=1,
    device=device
)

# TIMESTEPS = 20000

# for i in range(50):
#     model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name="DQN")
#     model.save(f"{models_dir}/{TIMESTEPS * (i + 1)}")

# # Save the trained model
# model.save("dqn_traffic")

# # Evaluate the trained agent
# mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)
# print(f"Mean reward: {mean_reward}")

# # To load the trained model
# model = DQN.load("dqn_traffic")

# # Test the trained agent
# obs = env.reset()
# while True:
#     print("Observation:", obs)
#     action, _states = model.predict(obs)
#     print("Predicted action:", action)
#     obs, rewards, dones, info = env.step(action)
#     print("Action executed:", action)
#     env.render()
#     if dones:
#         break

Using mps device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




## 6 - Training the DQN

With all the "ingredients" ready, we can finally train the policy.

In this stage you should:
 - Define and implement the training loop (as we did in previous sessions).
 - Train the policy and collect training stats such as mean reward per iteration.
 - Display the training stats and mean rewards obtained by your agent.
 - (optional) Tune the hyperparameters to get better results. 
 - (optional) Visualize the Q values during an episode.
 - (optional) Enjoy watching how your agent drives.

*Clue*: Training the agent may take a long time and it may be difficult to find good hyperparameters, so if it does not converge quickly it does not necessarily mean you did something wrong. If you want to try, a road with fewer cars (or just 1 car) and fewer lanes (like 3) will be easier to learn.

In [41]:
from IPython import display
import numpy as np

# Assuming `TrafficEnv` and `DiscreteActionWrapper` are defined somewhere
env = TrafficEnv(nlanes=4, ncars=5)
env = RewardWrapper(env)
env = DiscreteActionWrapper(env)
env.reset()

# Load the model
models_dir = "models/Highway-1688422589.4609852/"
model_path = f"{models_dir}80000" #180000
best_model = DQN.load(model_path, env=env)
obs, _ = env.reset()

while True:
    action, _states = best_model.predict(obs)
    action = int(action)
    step_output = env.step(action)  # Call step and store result in variable

    # Ensure you're handling the correct number of returned values
    if len(step_output) == 5:
        obs, rewards, dones, extra_value, info = step_output
    else:
        obs, rewards, dones, info = step_output

    env.render()

    # Convert numpy array to PIL Image
    
    if dones:
        env.reset()
        break



NameError: name 'RewardWrapper' is not defined

In [30]:
env.close()