<a href="https://colab.research.google.com/github/serciex/lane-change/blob/main/v6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Installing Prerequisites

In [2]:
!pip install pygame
!pip install highway-env
!pip install stable-baselines3



Testing

In [None]:
#Import Libraries
from os import truncate
import math
import gymnasium
import highway_env
from matplotlib import pyplot as plt
import pygame
import numpy as np
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import tqdm
import gym
from random import randint
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from collections import deque
import random
import torch.optim as optim
import tqdm
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from IPython.display import display


# Workaround for gym compatibility
if not hasattr(np, 'bool8'):
    np.bool8 = np.bool_

%matplotlib inline

Environment Wrapper

In [None]:
class ENVwrapper(gym.Wrapper):
 def __init__(self,env,parameters):
   super().__init__(env)
   self.observation_space = gym.spaces.Box(
      low=np.array([
          -1000,    # x: position can be negative
          -50,      # y: lateral position
          0,        # vx: velocity >= 0
          -5,       # acceleration: reasonable decel
          -np.pi,   # theta: angle bounds
          -1,       # lane_id: left lane change
          3.0,      # lane_width: minimum width
          -np.pi    # self_curvature: angle bounds
      ]),
      high=np.array([
          10000,    # x: far forward position
          50,       # y: lateral position
          50,       # vx: max highway speed
          5,        # acceleration: reasonable accel
          np.pi,    # theta: angle bounds
          1,        # lane_id: right lane change
          4.0,      # lane_width: maximum width
          np.pi     # self_curvature: angle bounds
      ]),
      dtype=np.float32)
   self.action_space = gym.spaces.Box(
   low=-0.5,  # max left steering
   high=0.5,  # max right steering
   shape=(1,),
   dtype=np.float32
   )

   self.desired_parameters,self.control_parameters, self.reward_weights, self.following_gap = parameters
   self.w1, self.w2, self.w3 = self.reward_weights
   self.current_obs = None
   self.previous_obs = None

   # Add these missing lines:
   self.s0, self.v0, self.epsilon = self.desired_parameters
   self.a, self.b, self.delta, self.T = self.control_parameters
   self.following_gap_threshold = self.following_gap
   self.last_lane = None
   self.gap_controller_checked = False
   self.target_id = 0

 def lane_check(self):
   """
   Determines which adjacent lane offers the largest front gap for a lane change.
   """
   obs = self.current_obs
   ego = self.unwrapped.vehicle
   current_lane_id = self.unwrapped.vehicle.lane_index[2]

   # Reset flag when lane changes
   if self.last_lane is not None and current_lane_id != self.last_lane:
       self.gap_controller_checked = False

   # Only check if not already checked
   if not self.gap_controller_checked:

       # Get front gaps using existing longitudinal_lead_state
       gap_current_front = obs[1][0]

       # Get right lane front gap
       current_lane = list(ego.lane_index)
       lane_right = (current_lane[0], current_lane[1], current_lane[2] + 1) if current_lane[2] < 2 else ego.lane_index
       neighbours_right = self.unwrapped.road.neighbour_vehicles(ego, lane_index=lane_right)
       gap_right_front = neighbours_right[0].position[0] - ego.position[0] if neighbours_right and neighbours_right[0] else -float('inf')

       # Get left lane front gap
       lane_left = (current_lane[0], current_lane[1], current_lane[2] - 1) if current_lane[2] > 0 else ego.lane_index
       neighbours_left = self.unwrapped.road.neighbour_vehicles(ego, lane_index=lane_left)
       gap_left_front = neighbours_left[0].position[0] - ego.position[0] if neighbours_left and neighbours_left[0] else -float('inf')

       # Find best lane
       front_gaps = [gap_current_front, gap_right_front, gap_left_front]
       best_index = np.argmax(front_gaps)

       if best_index == 1:
           direction = 1  # Right
           gap_right_follower = ego.position[0] - neighbours_right[1].position[0] if len(neighbours_right) > 1 and neighbours_right[1] else float('inf')
           self.target_id = direction if gap_right_follower >= self.following_gap_threshold else 0
       elif best_index == 2:
           direction = -1  # Left
           gap_left_follower = ego.position[0] - neighbours_left[1].position[0] if len(neighbours_left) > 1 and neighbours_left[1] else float('inf')
           self.target_id = direction if gap_left_follower >= self.following_gap_threshold else 0
       else:
           self.target_id = 0  # Stay in current lane

       self.gap_controller_checked = True

   self.last_lane = current_lane_id

   return self.target_id

 def idm_controller(self):
   obs = self.current_obs
   s0,v0,epsilon = self.desired_parameters
   a,b,delta,T = self.control_parameters

   v = obs[0][3]
   delta_v = obs[0][4]
   s = obs[0][5]

   # Desired gap: s* = s0 + v*T + (v * delta_v) / (2 * sqrt(a * b))
   desired_gap = self.s0 + max(0, v * self.T + ((v * delta_v) / (2 * math.sqrt(self.a * self.b))))

   # IDM acceleration: a_IDM = a * [ 1 - (v / v0)^delta - (s* / s)^2 ]
   acceleration = self.a * (1 - (v / self.v0)**self.delta - (desired_gap / (s + epsilon))**2)

   return acceleration

 def reward_function(self, obs_old, obs_new):
   target_id = self.lane_check()
   w1,w2,w3 = self.reward_weights
   obs_new = self.current_obs
   obs_old = self.previous_obs

   """
   Reward Function:

   Acceleration Reward: r_acce = w1*f_acce(a_yaw)
   a_yaw = yaw acceleration (rate of change of yaw rate)

   Rate Reward: r_rate = w2*f_rate(w_yaw)
   w_yaw = yaw rate (rate of change of heading)

   Time Reward: r_time = w3*f_time (delta_lat_deviation)
   delta_lat_deviation = change in lateral deviation (self.lat_off)

   Reward = Cummulative Sum of r_acce + Cummulative Sum of r_rate + Cummulative Sum of r_time

   """

   # Convert direction command to actual target lane
   current_lane_id = self.unwrapped.vehicle.lane_index[2]

   if target_id == +1:  # Move right
       actual_target_lane = min(2, current_lane_id + 1)  # Don't exceed rightmost lane
   elif target_id == -1:  # Move left
       actual_target_lane = max(0, current_lane_id - 1)  # Don't exceed leftmost lane
   else:  # target_id == 0, stay in current lane
       actual_target_lane = current_lane_id

   # Use current vehicle lane structure and set actual target lane
   current_lane_index = list(self.unwrapped.vehicle.lane_index)
   current_lane_index[2] = actual_target_lane  # Only change the lane number to actual lane
   self.target_lane = tuple(current_lane_index)

   target_lane_object = self.unwrapped.road.network.get_lane(self.target_lane)
   vehicle_s, _ = self.unwrapped.vehicle.lane.local_coordinates(self.unwrapped.vehicle.position)
   _ , self.delta_lat_deviaton = target_lane_object.local_coordinates(self.unwrapped.vehicle.position)

   obs = obs_new[0]
   obs_old = obs_old[0]

   # Extract lateral and longitudinal velocities from observations
   vx_current = obs[2]  # longitudinal velocity
   vy_current = obs[3]  # lateral velocity
   vx_old = obs_old[2]
   vy_old = obs_old[3]

   dt = 1.0 / self.unwrapped.config['policy_frequency']
   L = self.unwrapped.vehicle.LENGTH

   # Calculate both yaw rates in parallel using bicycle model
   vx_vals = np.array([vx_old, vx_current])
   vy_vals = np.array([vy_old, vy_current])

   # Vectorized calculations
   total_velocities = np.sqrt(vx_vals**2 + vy_vals**2)
   curvatures = np.divide(vy_vals, vx_vals * total_velocities + 1e-6,
                         out=np.zeros_like(vy_vals), where=(abs(vx_vals) > 1e-6))
   yaw_rates = total_velocities * curvatures

   previous_yaw_rate, current_yaw_rate = yaw_rates[0], yaw_rates[1]

   w_yaw = current_yaw_rate
   w_yaw_old = previous_yaw_rate

   self.w_acce = (w_yaw-w_yaw_old)*self.unwrapped.config['policy_frequency']

   # Acceleration Reward
   acce_reward = -w1*abs(self.w_acce)

   # Rate Reward
   rate_reward = -w2*abs(w_yaw)

   # Time Reward
   time_reward = -w3*((self.delta_lat_deviaton)**2)

   # Off Road
   if not self.unwrapped.vehicle.on_road:
       off_road_penalty = -10

   else:
       off_road_penalty = 0

   # Overall Reward
   self.reward = acce_reward + rate_reward + time_reward + off_road_penalty

   return self.reward


 def step(self, agent_action):
   if isinstance(agent_action, np.ndarray):
     agent_action = agent_action[0]

   obs_old = self.current_obs
   vehicle = self.unwrapped.vehicle
   idm_action = self.idm_controller()
   action = [idm_action, agent_action]

   # Prior Observation
   v_prev = obs_old[0][2] if obs_old is not None else 0

   obs, reward, done, info = super().step(action)
   self.previous_obs = obs_old
   self.current_obs = obs
   custom_reward = self.reward_function(obs_old,obs)

   done = done if vehicle.on_road and not info['crashed'] else True

   x = obs[0][0]
   y = obs[0][1]
   vx = obs[0][2]
   thetha = obs[0][4]
   lane_width = vehicle.lane.width

   lane_id = self.lane_check()

   self_curvature = vehicle.lane.heading_at(np.clip(
       vehicle.lane.local_coordinates(vehicle.position)[0],
       0, vehicle.lane.length))

   acceleration = (vx-v_prev)*self.unwrapped.config['policy_frequency']

   agent_observation = np.array([x,y,vx,acceleration,thetha,lane_id,lane_width,self_curvature], dtype=np.float32)

   return agent_observation, custom_reward, done, info

 def reset(self, **kwargs):
   obs, info = super().reset(**kwargs)
   self.current_obs = obs
   self.previous_obs = None
   vehicle = self.unwrapped.vehicle
   self.last_lane = None
   self.gap_controller_checked = False

   obs = obs[0]

   x = obs[0]
   y = obs[1]
   vx = obs[2]
   thetha = obs[4]
   lane_width = self.unwrapped.vehicle.lane.width

   lane_id = self.lane_check()

   self_curvature = vehicle.lane.heading_at(np.clip(
       vehicle.lane.local_coordinates(vehicle.position)[0],
       0, vehicle.lane.length))

   acceleration = 0

   agent_observation = np.array([x,y,vx,acceleration,thetha,lane_id,lane_width,self_curvature], dtype=np.float32)

   return agent_observation, info

Environment

In [None]:
# Initialization
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# IDM Parameters
desired_parameters = [20, 10.0, 1e-6]      # s0, v0
control_parameters = [0.1, 5, 4, 4]        # a, b, δ, T
reward_weights = [1,1,0.05]
following_gap_threshold = 10
parameters = [desired_parameters, control_parameters, reward_weights, following_gap_threshold]

# Configure Environment Conditions
config = {
    "lanes_count": 3,
    "lane_width": 3.75,
    "observation": {
        "type": "Kinematics",
        "features": ["x", "y", "vx", "vy", "heading", "lat_off"]
    },
    "action": {"type": "ContinuousAction"},"ego_spawn_random": True,
    "policy_frequency": 10,
}

environment = gymnasium.make('highway-v0', render_mode='rgb_array', config=config)
env = ENVwrapper(environment,parameters)

Training Loop (Stable Baselines)

In [23]:
from stable_baselines3 import PPO

model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=50000)

  0%|          | 0/1000 [00:00<?, ?it/s]


ValueError: too many values to unpack (expected 4)