In [1]:
2# install required system dependencies
!apt-get install -y xvfb x11-utils 
!apt-get install x11-utils > /dev/null 2>&1
!pip install PyVirtualDisplay==2.0.* \
PyOpenGL==3.1.* \
PyOpenGL-accelerate==3.1.* \
gym[box2d]==0.17.* 
!pip install pyglet

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  libxxf86dga1
Suggested packages:
  mesa-utils
The following NEW packages will be installed:
  libxxf86dga1 x11-utils xvfb
0 upgraded, 3 newly installed, 0 to remove and 29 not upgraded.
Need to get 993 kB of archives.
After this operation, 2,981 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic/main amd64 libxxf86dga1 amd64 2:1.1.4-1 [13.7 kB]
Get:2 http://archive.ubuntu.com/ubuntu bionic/main amd64 x11-utils amd64 7.7+3build1 [196 kB]
Get:3 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 xvfb amd64 2:1.19.6-1ubuntu4.8 [784 kB]
Fetched 993 kB in 1s (866 kB/s)
Selecting previously unselected package libxxf86dga1:amd64.
(Reading database ... 160975 files and directories currently installed.)
Preparing to unpack .../libxxf86dga1_2%3a1.1.4-1_amd64.deb ...
Unpacking libxxf86dga1:amd64 (2:1.

In [2]:
!rm ./vid/*.*

rm: cannot remove './vid/*.*': No such file or directory


In [3]:
from gym import Env, spaces
import numpy as np
import random

In [79]:
class SteeringWheelEnv(Env):
  """
  Environment to replicate automated steering of the wheel in an environment (road)
  """
  def __init__(self):

    # The first set of action is the degree of turn
      # -1 indicates complete left turn, 1 indicates complete right turn and 0 indicates wheel is aligned centered
    # The second set of action is change in speed of the vehicle.
      # Here we will consider 0 as speed reduction and 1 as speed increase 
    #self.action_space = spaces.MultiDiscrete([3,2])        
    self.action_space = spaces.Tuple((
                                        spaces.Box(low=np.array([-1]), high=np.array([1])),
                                        spaces.Discrete(3)
                                      ))


    # degree of turn of the road from left(negative) to right(positve)
    # self.observation_space = spaces.Box(low=np.array([-1]), high=np.array([1]))    
    
    # The environment returns 
      # (Continuous)the amount of turn required for the road ahead. Here -1 indicates left turn, 0 indicates straight road and 1 indicating right turn.
      # (Continuous)the position of the vehicle with respect to the current road lane. -1 indicating extreme left, 0 for center and 1 for extreme right
    self.observation_space = spaces.Tuple((
                                            spaces.Box(low=np.array([-1]), high=np.array([1])),
                                            spaces.Box(low=np.array([-1]), high=np.array([1]))
                                          ))

    # intial randomized turn of the road
    self.road_turn = random.uniform(-1,1)
    # intial position of vehicle on the orad
    self.road_position = random.uniform(-1,1)
    # amount of turn ± from 0 that will be considered almost straight road. Used by the state road_turn
    self.turn_margin_to_center = 0.2

    # duration of drive
    self.drive_duration = 100   # seconds

    # margin of error accepted for the vehicle to be considered aligned with the road (center). Used by the state road_position
    self.vehicle_road_center_threshold = 0.4


  def step(self, action):
    # decrease time step
    self.drive_duration -= 1

    action_turn, action_speed = action        # float, integer

    # get updated road postion of vehicle after action execution
    self.road_position = self.get_updated_road_position(self.road_position, self.road_turn, action_turn, action_speed)
    # assigning default reward to action
    reward = -1

    if self.road_turn < -self.turn_margin_to_center or self.road_turn > self.turn_margin_to_center:     # road is turning left or right
      # when turning need to reduce speed
      if action_speed == 0:
        if self.road_position>=-self.vehicle_road_center_threshold and self.road_position<=self.vehicle_road_center_threshold:
          # wheel and road turn are aligned straight. The vehicle is almost center of the road
          reward = 1
    else:
      # for straight road
      if action_turn >= -self.turn_margin_to_center and action_turn <= self.turn_margin_to_center:
          # wheel and road turn are aligned straight. The vehicle is almost center of the road
          if self.road_position>= -self.vehicle_road_center_threshold and self.road_position<= self.vehicle_road_center_threshold:
            reward = 1
    
    # get next state after action
    self.road_turn, self.road_position = self.get_next_state(self.road_turn, self.road_position)
    # check if task is done     
    if self.drive_duration >0:
      task_done = False
    else:
      task_done = True

    return (self.road_turn, self.road_position), reward, task_done, {}

  
  def get_updated_road_position(self, road_position, road_turn, action_turn, action_speed):
    """
    Return the updated vehicle road position based on turning and speed change action of the agent
    """
    
    # NOTE: Probability is used in effect of road turn in position in order to simulate real-world scenarios(noise) 
    # like misjudged wheel turn, obstacles on road, etc
    

    update_margin = 0.0
    # For left turning road
    if road_turn < -self.turn_margin_to_center:    # this means the road_turn is pushing towards -1 (left)
      # For low speed
      if action_speed == 0:
        if action_turn < -self.turn_margin_to_center: 
          # vehicle also turning left
          update_margin = action_turn -road_turn           
        elif action_turn > self.turn_margin_to_center:       
          # for vehicle turning right in a left turning road
          update_margin = (-1*road_turn) + action_turn
        else:                     
          # for vehicle going straight in a left turning road
          update_margin = (-1*road_turn)                # road_turn is negative(towards left) and hence the vehicle position is displaced towards the right (positve)
      if action_speed == 1:
        # For higher speed, the effect of turn is less
        update_margin /= 2 

    # For right turning road
    elif road_turn > self.turn_margin_to_center:       # this means the road_turn is pushing towards 1 (right)
      # For low speed
      if action_speed == 0:
        if action_turn > self.turn_margin_to_center:       
        # vehicle also turning right
          update_margin = action_turn - road_turn           
        elif action_turn < -self.turn_margin_to_center:    
        # for vehicle turning left in a right turning road
          update_margin = (-1*road_turn) + action_turn
        else:                    
        # for vehicle going straight in a right turning road
          update_margin = (-1*road_turn)                # road_turn is positive(towards right) and hence the vehicle position is displaced towards the left (negative)
      if action_speed == 1:
        # For higher speed, the effect of turn is less
        update_margin /= 2 
    
    # For straight road
    else:
      if action_turn < -self.turn_margin_to_center: 
        # vehicle turning left
        update_margin = action_turn - road_turn             # vehicle moves towards left side of the lane (negative)
      elif action_turn > self.turn_margin_to_center:
        # vehicle turning right
        update_margin = action_turn - road_turn             # vehicle moves towards right side of the lane (positive)
      else:
        update_margin = 0.0

    if update_margin:
      road_position += update_margin
    return road_position


  def get_lower_and_upper_limit(self, val):
    """
    Get next road turn range
    """
    lower_limit = -1
    upper_limit = 1
    if val - self.turn_margin_to_center>-1:
      lower_limit = val - self.turn_margin_to_center
    if val + self.turn_margin_to_center<1:
      upper_limit = val + self.turn_margin_to_center
    return lower_limit, upper_limit


  def get_next_state(self, road_turn, road_position):
    """
    Get next state of environment given the current state.
    """
    lower_limit, upper_limit = self.get_lower_and_upper_limit(road_turn)
    next_road_turn = random.uniform(lower_limit, upper_limit)

    if next_road_turn < -self.turn_margin_to_center:
      # For left
      road_position += next_road_turn
    elif next_road_turn > self.turn_margin_to_center:
      road_position += next_road_turn
    else:
      road_position = road_position   # no change for straight road

    if road_position<-1:
      road_position = -1
    if road_position >1:
      road_position = 1
    return next_road_turn, road_position


  def reset(self):
    """
    Reset environment states
    """
    # intial randomized turn of the road
    self.road_turn = random.uniform(-1,1)
    # intial position of vehicle on the orad
    self.road_position = random.uniform(-1,1)
    
    # reset duration of drive
    self.drive_duration = 100   # seconds
    return self.road_turn, self.road_position


In [80]:
env = SteeringWheelEnv()



In [81]:
print('Sample observation from environment:',env.observation_space.sample())
# The first param is the turn of road turn and second is the position of vehicle on the road

Sample observation from environment: (array([-0.24665473], dtype=float32), array([0.5833879], dtype=float32))


In [82]:
# testing random actions on the environment
for epoch in range(10):
  obs = env.reset()
  done = False
  total_reward = 0
  while not done:
    obs, reward, done, i = env.step(env.action_space.sample())
    total_reward += reward
  print('Test: {}, Total Reward Collected: {}' .format(epoch, total_reward))

Test: 0, Total Reward Collected: -86
Test: 1, Total Reward Collected: -84
Test: 2, Total Reward Collected: -82
Test: 3, Total Reward Collected: -82
Test: 4, Total Reward Collected: -74
Test: 5, Total Reward Collected: -80
Test: 6, Total Reward Collected: -84
Test: 7, Total Reward Collected: -82
Test: 8, Total Reward Collected: -86
Test: 9, Total Reward Collected: -64


In [83]:
# Now we aim to create an optimal policy based on the observation
def policy(obs):
  # here observation is the road condition and position of vehicle in road
  road_turn, vehicle_road_pos = obs
  turn = 0.0    # default action is to go straight
  speed = 1   # default speed action
  if road_turn < 0 or road_turn >0: 
    # For left turning road or right turning road
    speed = 0
    turn = road_turn-vehicle_road_pos
  else:
    # going straight
    turn = 0.0
  return turn, speed

In [84]:
# testing implemented agent's policy on the environment
o = env.reset()
done = False
total_reward = 0
while not done:
  action = policy(o)
  o, reward, done, i = env.step(action)
  total_reward += reward
print('Total Reward Collected: {}' .format(total_reward))

Total Reward Collected: 98
