In [7]:
from gym import Env
from gym.spaces import Discrete, Box, Dict
from collections import OrderedDict
from IPython import display
import random
import numpy as np
import math
import pygame
import torch
import time
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
rng = np.random.default_rng()

In [8]:
# parameters
car_width = 150
car_length = 400
max_loop = 5000
fps = 60
car_hypotenuse = math.sqrt((car_length/2)**2 + (car_width/2)**2)
hypotenuse_angle = math.acos(car_width/(2*car_hypotenuse))
car_wheelbase=250
parking_width = 240
parking_length = 450
road_width = 300
area_width = (2 * road_width) + (2 * parking_length) #1300
area_length = 8 * parking_width #1920
timestep=1
max_steering=30
max_velocity=30 #in m/s
speed_limit=10
max_acceleration=3.4 #around 12kmh/s
max_decceleration=6
natural_decceleration=1

car_image = pygame.image.load('car_sprite.png')
car_image = pygame.transform.scale(car_image, (car_width, car_length))
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
# device = torch.device("cpu")

In [9]:


class CarParking(Env):
  def __init__(self):
    # 1=right, 2=left, 3=stay
    # 1=accelerate, 2=deccelerate, 3=same speed and direction
    self.action_space=Box(low=np.array([1,1]),high=np.array([3,3]),dtype=int)
    self.observation_space=Dict({
        'velocity': Box(low=np.array([0]),high=np.array([150]))
        ,'acceleration': Box(low=np.array([0]),high=np.array([150]))
        ,'angle': Box(low=np.array([0]),high=np.array([359])) #angle of car
        ,'pos': Box(low=np.array([0, 0]), high=np.array([area_length-1, area_width-1])) #calculate 4 corners of car using trigonometry
        ,'steering': Discrete(61,start=-30)#angle of steering wheel
        ,'wheels_inside': Discrete(5,start=0)
        ,'distances': Box(low=np.array([0,0,0,0]),high=np.array([area_width]))
        ,'y_dist': Box(low=np.array([0]),high=np.array([area_length]))
        })

    self.screen = None #pygame.display.set_mode((area_length, area_width))
    self.clock = None #pygame.time.Clock()
    self.max_iterations = 1000
    self.set_vars()


  def check_if_inside(self):
    goal = True
    wheels_inside = 0
    reward = 0
    for wheel in ['lb','rb','lf','rf']:
      if self.wheels[wheel][0] < 0 or self.wheels[wheel][0] >= area_length or self.wheels[wheel][1] < 0 or self.wheels[wheel][1] >= area_width:
        return True,-10000
      if self.wheels[wheel][0] < self.goal[0][0] or self.wheels[wheel][0] > self.goal[1][0] or self.wheels[wheel][1] < self.goal[0][1] or self.wheels[wheel][1] > self.goal[2][1]:
        goal = False
      else:
        wheels_inside += 1
        reward += 1
    self.state['wheels_inside'] = wheels_inside
    if goal:
      return True, 5000
    return False, reward

  def close(self):
    if self.screen:

      pygame.quit()
      self.screen=None
      self.clock=None

  def process_new_position(self, reward):
      self.state['velocity'] += self.state['acceleration']
      if abs(self.state['velocity']) > speed_limit:
        reward -= 1
      self.state['velocity'] = max(-max_velocity, min(self.state['velocity'], max_velocity))

      if self.state['steering']:
        turning_radius = car_wheelbase / math.sin(math.radians(self.state['steering']))
        angular_velocity = self.state['velocity'] / turning_radius
      else:
        angular_velocity = 0

      self.state['pos'] += np.array([self.state['velocity'] * math.sin(math.radians(self.state['angle']))*timestep,-self.state['velocity'] * math.cos(math.radians(self.state['angle']))*timestep])
      self.state['angle'] = (self.state['angle'] + math.degrees(angular_velocity) * timestep)%360
      if self.state['angle'] < 0:
        self.state['angle']+= 360
      

  def calculate_four_corners(self):
    rb = self.state['pos'] + np.array([car_hypotenuse*math.cos(math.radians(self.state['angle'])+hypotenuse_angle),
                                                  car_hypotenuse*math.sin(math.radians(self.state['angle'])+hypotenuse_angle)])
    lb = self.state['pos'] + np.array([car_hypotenuse*math.cos(math.radians(self.state['angle'])+math.pi-hypotenuse_angle),
                                                  car_hypotenuse*math.sin(math.radians(self.state['angle'])+math.pi-hypotenuse_angle)])
    rf = self.state['pos'] + np.array([car_hypotenuse*math.cos(math.radians(self.state['angle'])+2*math.pi-hypotenuse_angle),
                                                  car_hypotenuse*math.sin(math.radians(self.state['angle'])+2*math.pi-hypotenuse_angle)])
    lf = self.state['pos'] + np.array([car_hypotenuse*math.cos(math.radians(self.state['angle'])+math.pi+hypotenuse_angle),
                                                  car_hypotenuse*math.sin(math.radians(self.state['angle'])+math.pi+hypotenuse_angle)])

    return rf, lf, rb, lb
  
  def distance_to_parking(self, car_pos, park_pos):
    return np.linalg.norm([car_pos[0]-park_pos[0], car_pos[1]-park_pos[1]])

  def step(self, action):
    if not self.clock:
      self.clock = pygame.time.Clock()
    reward = 0
    accelerate = True
    # timestep = self.clock.get_time() / 1000
    self.current_iterations += 1
    action = np.array([int((action-1)/3)+1,((action-1)%3)+1]) 
    if action[0]==1:
      
      self.state['steering']+=max_steering*timestep
    elif action[0]==2:
      self.state['steering']-=max_steering*timestep
    else:
      pass

    if self.state['steering'] != max(-max_steering, min(self.state['steering'], max_steering)):
      reward -= 1
      self.state['steering'] = max(-max_steering, min(self.state['steering'], max_steering))

    if action[1]==1:
      if self.state['velocity']>=0:
        
        self.state['acceleration'] += random.uniform(max_acceleration-0.05,max_acceleration+0.05) * timestep * 0.005
      else:
        accelerate=False
        if self.state['acceleration'] < 0:
          self.state['acceleration'] = 0
        self.state['acceleration'] += random.uniform(max_decceleration-0.1,max_decceleration+0.1) * timestep * 0.005
        if abs(self.state['velocity']) < abs(self.state['acceleration']):
          self.state['acceleration'] = -self.state['velocity']
    elif action[1]==2:
      if self.state['velocity']<=0:
        self.state['acceleration'] -= random.uniform(max_acceleration-0.05,max_acceleration+0.05) * timestep * 0.005
      else:
        accelerate=False
        if self.state['acceleration'] > 0:
          self.state['acceleration'] = 0
        self.state['acceleration'] -= random.uniform(max_decceleration-0.1,max_decceleration+0.1) * timestep * 0.005
        if abs(self.state['velocity']) < abs(self.state['acceleration']):
          self.state['acceleration'] = -self.state['velocity']
    else: #action[1]==3:
      self.state['acceleration']=0
      # accelerate=False
      # if abs(self.state['velocity']) > timestep * natural_decceleration:
      #     self.state['acceleration']= -math.copysign(natural_decceleration, self.state['acceleration'])
      # else:
      #     self.state['acceleration'] = -self.state['velocity']

    if accelerate:
      if self.state['acceleration'] != max(-max_acceleration, min(self.state['acceleration'], max_acceleration)):
        reward-=1 #unecessary action
      self.state['acceleration'] = max(-max_acceleration, min(self.state['acceleration'], max_acceleration))
    else:
      if self.state['acceleration'] != max(-max_decceleration, min(self.state['acceleration'], max_decceleration)):
        reward -= 1
      self.state['acceleration'] != max(-max_decceleration, min(self.state['acceleration'], max_decceleration))

    self.process_new_position(reward)
    self.wheels['rf'], self.wheels['lf'], self.wheels['rb'], self.wheels['lb'] = self.calculate_four_corners()
    self.state['distances'] = self.calculate_distances()
    self.state['angle_from_parking'] = self.calculate_angle_from_parking()
    # calculate reward
    reward -= 1
    reward += 500/self.distance_to_parking(self.state['pos'], np.array(self.goal_center))

    # check if done
    done, reward_gain = self.check_if_inside()
    if done and reward_gain < -1000:
      reward = reward_gain
    else:
      reward += reward_gain


    
    info = {}
    coords = None
    self.clock.tick(60)
    if self.current_iterations == self.max_iterations:
      done = True

    return self.state, reward, done, info, coords
  
  def render(self):
    if not self.screen:
      pygame.init()
      pygame.display.init()
      self.screen = pygame.display.set_mode((area_length, area_width), pygame.FULLSCREEN | pygame.SCALED)
      
    self.screen.fill((255, 255, 255))
    if self.goal_number < 8:
      goal = pygame.Rect(self.goal[0][0], self.goal[0][1], parking_width, parking_length)
    else:
      goal = pygame.Rect(self.goal[2][0], self.goal[2][1], parking_width, parking_length)
    pygame.draw.rect(self.screen, (0,0,255), goal)
    
    for i in range(7):
      rectangle_top = pygame.Rect(parking_width*(i+1)-1, 0, 3, parking_length)
      rectangle_bottom = pygame.Rect(parking_width*(i+1)-1, parking_length + 2 * road_width-1, 3, parking_length)
      pygame.draw.rect(self.screen, (0,0,0), rectangle_top)
      pygame.draw.rect(self.screen, (0,0,0), rectangle_bottom)

    
    rotated = pygame.transform.rotate(car_image, -self.state['angle'])
    rotated_rect=rotated.get_rect()
    rotated_rect.center = self.state['pos']
    self.screen.blit(rotated, rotated_rect)
    pygame.draw.circle(self.screen,(0,255,0),self.state['pos'],5)
    pygame.draw.circle(self.screen,(0,255,255),self.goal_center,10)
    for part in ['rf','lf','rb','lb']:
      pygame.draw.circle(self.screen, (255,0,0), self.wheels[part], 5)
    pygame.display.update()

  def set_vars(self):
    x_noise = random.uniform(-(road_width-car_width)/2,(road_width-car_width)/2)
    y_noise = random.uniform(0,50)
    self.state = OrderedDict({
        'velocity': 0
        ,'acceleration': 0
        ,'angle': 90
        ,'pos': np.array([(car_length/2),parking_length+(road_width/2)-1])
        ,'steering': 0
        ,'wheels_inside': 0}) 
    self.goal_number = random.randint(0,15)
    self.wheels = {}
    self.wheels['rf'], self.wheels['lf'], self.wheels['rb'], self.wheels['lb'] = self.calculate_four_corners()
    self.current_iterations = 0
    if self.goal_number < 8:
      self.goal = np.array([[(self.goal_number*parking_width),0], #top left
                            [((self.goal_number+1)*parking_width)-1,0], #top right
                            [(self.goal_number*parking_width),parking_length-1], #inside left
                            [((self.goal_number+1)*parking_width)-1,parking_length-1]]) #inside right
    else:
      self.goal = np.array([[((self.goal_number-8)*parking_width),area_width-1], #bottom left
                            [((self.goal_number-7)*parking_width)-1,area_width-1], #bottom right
                            [((self.goal_number-8)*parking_width),parking_length+(2*road_width)], #inside left
                            [((self.goal_number-7)*parking_width)-1,parking_length+(2*road_width)]]) #inside right
    # get distance of all wheels to the two outside parking lines
    self.state['distances'] = self.calculate_distances()
    #get angle
    self.goal_center = np.array(np.mean(self.goal, axis=0))
    self.state['angle_from_parking'] = self.calculate_angle_from_parking()
    self.normalisation_table = {
      'velocity': [max_velocity,max_velocity*2], #plus by min, divide by range
      'acceleration': [max_acceleration, max_acceleration*2],
      'angle': [0, 360],
      'pos_x': [0, area_length],
      'pos_y': [0, area_width],
      'steering': [max_steering, max_steering * 2],
      'wheels_inside': [0, 4],
      'distances': [0, np.linalg.norm([area_length, area_width])],
      'angle_from_parking': [180, 360]
    }

  def calculate_distances(self):
    distances = np.zeros(4)
    for i, wheel in enumerate(['rf','lf','rb','lb']):
      distances[i] = np.linalg.norm(self.wheels[wheel]-np.mean(self.goal[:2],axis=0))
    return np.array(distances)

  def calculate_angle_from_parking(self):
    return math.degrees(np.arctan2(self.goal_center[1] - self.state['pos'][1], abs(self.goal_center[0] - self.state['pos'][0])))


  def reset(self, seed=None, options=None):
    super().reset(seed=seed)
    self.set_vars()
    return self.state
  
  def convert_actions(self, action):
    return action[1] + ((action[0]-1)*3)
  
  def deconstruct_array(self,arr):
    llist = []
    for key in ['velocity','acceleration','angle','pos','steering','wheels_inside','distances','angle_from_parking']:
      if key =='pos':
        new_arr = arr[key].flatten()
        new_arr[0] = (new_arr[0]+self.normalisation_table['pos_x'][0])/self.normalisation_table['pos_x'][1]
        new_arr[1] = (new_arr[1]+self.normalisation_table['pos_y'][0])/self.normalisation_table['pos_y'][1]
        llist.extend(new_arr.tolist())
      elif key == 'distances':
        new_arr =(arr[key].flatten()+self.normalisation_table[key][0])/self.normalisation_table[key][1]
        llist.extend(new_arr.tolist())
      else:
        llist.append((arr[key]+self.normalisation_table[key][0])/self.normalisation_table[key][1])
        
    return np.array(llist)
  
env = CarParking()

In [12]:
#dqn and agent

# Q-network for approximating action-value function
class QNetwork(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size, learning_rate):
        super().__init__()
        # create network layers
        layers = nn.ModuleList()
        
        # input layer
        layers.append(nn.Linear(input_size, hidden_sizes[0]))
        layers.append(nn.ReLU())
        
        # hidden layers
        for i in range(len(hidden_sizes)-1):
            layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
            layers.append(nn.ReLU())
        
        # output layer
        layers.append(nn.Linear(hidden_sizes[-1], output_size))
    
        # combine layers into feed-forward network
        self.net = nn.Sequential(*layers)
        
        # select loss function and optimizer
        # note: original paper uses modified MSE loss and RMSprop
        self.criterion = nn.MSELoss()
        self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=learning_rate)
    
    def forward(self, x):
        # return output of Q-network for the input x
        return self.net(x)
    
    def update(self, inputs, targets):
        # update network weights for a minibatch of inputs and targets:
        self.optimizer.zero_grad()
        outputs = self.net(inputs)
        loss = self.criterion(outputs, targets)
        loss.backward()
        self.optimizer.step()
    
    def copy_from(self, qnetwork):
        # copy weights from another Q-network
        self.net.load_state_dict(qnetwork.net.state_dict())
    

# Deep Q-network (DQN)
class AgentDQN():
    def __init__(self, env, gamma,
                 hidden_sizes=(32, 32),
                 learning_rate=0.001,
                 epsilon=0.1,
                 replay_size=10000,
                 minibatch_size=32,
                 target_update=20):

        self.state_dims = env.observation_space.shape[0]

        # check if the action space has correct type

        self.num_actions = env.action_space.n

        # create Q-networks for action-value function
        self.qnet = QNetwork(self.state_dims, hidden_sizes, self.num_actions, learning_rate)
        self.target_qnet = QNetwork(self.state_dims, hidden_sizes, self.num_actions, learning_rate)

        # copy weights from Q-network to target Q-network
        self.target_qnet.copy_from(self.qnet)

        # initialise replay buffer
        self.replay_buffer = []
        self.replay_size = replay_size
        

        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.minibatch_size = minibatch_size
        self.target_update = target_update
        self.target_update_idx = 0

    def behaviour(self, state):
        # exploratory behaviour policy
        if rng.uniform() >= self.epsilon:
            # convert state to torch format
            if not torch.is_tensor(state):
                state = torch.tensor(state, dtype=torch.float)

            # exploitation with probability 1-epsilon; break ties randomly
            q = self.qnet(state).detach()
            j = rng.permutation(self.num_actions)
            return j[q[j].argmax().item()]
        else:
            # exploration with probability epsilon
            return self.env.action_space.sample()

    def policy(self, state):
        # convert state to torch format
        if not torch.is_tensor(state):
            state = torch.tensor(state, dtype=torch.float)

        # greedy policy
        q = self.qnet(state).detach()
        return q.argmax().item()

    def update(self):
        # update Q-network if there is enough experience
        if len(self.replay_buffer) >= self.minibatch_size:
            # select mini-batch of experiences uniformly at random without replacement
            batch = rng.choice(len(self.replay_buffer), size=self.minibatch_size, replace=False)

            # calculate inputs and targets for the transitions in the mini-batch
            inputs = torch.zeros((self.minibatch_size, self.state_dims))
            targets = torch.zeros((self.minibatch_size, self.num_actions))

            for n, index in enumerate(batch):
                state, action, reward, next_state, terminated = self.replay_buffer[index]
                # inputs are states
                inputs[n, :] = state

                # targets are TD targets
                targets[n, :] = self.target_qnet(state).detach()

                if terminated:
                    targets[n, action] = reward
                else:
                    targets[n, action] = reward + self.gamma*self.target_qnet(next_state).detach().max()

            # train Q-network on the mini-batch
            self.qnet.update(inputs, targets)

        # periodically copy weights from Q-network to target Q-network
        self.target_update_idx += 1
        if self.target_update_idx % self.target_update == 0:
            self.target_qnet.copy_from(self.qnet)

    def train(self, max_episodes, stop_criterion, criterion_episodes):
        # train the agent for a number of episodes
        rewards = []
        num_steps = 0
        for episode in range(max_episodes):
            state, _ = env.reset()
            # convert state to torch format
            state = torch.tensor(state, dtype=torch.float)
            terminated = False
            truncated = False
            rewards.append(0)
            while not (terminated or truncated):
                # select action by following behaviour policy
                action = self.behaviour(state)

                # send the action to the environment
                next_state, reward, terminated, truncated, _ = env.step(action)

                # convert next state to torch format and add experience to replay buffer
                next_state = torch.tensor(next_state, dtype=torch.float)
                
                self.replay_buffer.append((state, action, reward, next_state, terminated))
                if len(self.replay_buffer)>self.replay_size:
                    self.replay_buffer.pop(0)

                # update Q-network
                self.update()

                state = next_state
                rewards[-1] += reward
                num_steps += 1

            print(f'\rEpisode {episode+1} done: steps = {num_steps}, rewards = {rewards[episode]}     ', end='')

            if episode >= criterion_episodes-1 and stop_criterion(rewards[-criterion_episodes:]):
                print(f'\nStopping criterion satisfied after {episode} episodes')
                break

        # plot rewards received during training
        plt.figure(dpi=100)
        plt.plot(range(1, len(rewards)+1), rewards, label=f'Rewards')

        plt.xlabel('Episodes')
        plt.ylabel('Rewards per episode')
        plt.legend(loc='lower right')
        plt.grid()
        plt.show()

    def save(self, path):
        # save network weights to a file
        torch.save(self.qnet.state_dict(), path)

    def load(self, path):
        # load network weights from a file
        self.qnet.load_state_dict(torch.load(path))
        self.target_qnet.copy_from(self.qnet)
 



In [13]:
env = CarParking()
state = env.reset()
env.deconstruct_array(state)


array([0.5       , 0.5       , 0.25      , 0.10416667, 0.39933333,
       0.5       , 0.        , 0.63753781, 0.61333807, 0.78867355,
       0.7692432 , 0.46339552])

In [14]:
gamma = 0.99
hidden_sizes = (128, 128)
learning_rate = 0.001
epsilon = 0.2
replay_size = 10000
minibatch_size = 64
target_update = 20
max_episodes = 100
max_steps = 1000
criterion_episodes = 5

agent = AgentDQN(env,
                 gamma=gamma,
                 hidden_sizes=hidden_sizes,
                 learning_rate=learning_rate,
                 epsilon=epsilon,
                 replay_size=replay_size,
                 minibatch_size=minibatch_size,
                 target_update=target_update)

AttributeError: 'CarParking' object has no attribute 'observation_space'

In [143]:
env = CarParking()
state = env.reset()

n_observations = len(env.deconstruct_array(state))
agent = Agent(gamma=0.99, epsilon=1.0, batch_size=64, n_actions=9, eps_end = 0.01, input_dims=[n_observations], lr=0.03)
scores, eps_history = [], []
n_games=10000

for i in range(n_games):
    score = 0
    turn = 0
    observation = env.reset()
    observation_deconstructed = env.deconstruct_array(observation)
    done = False
    while not done:
        action = agent.choose_action(observation_deconstructed)
        observation_,reward, done, info, coords = env.step(action)
        turn += 1
        if turn == max_loop:
            done = True
        if reward <= -1000:

            score = reward
        else:
            score += reward
        agent.store_transition(observation_deconstructed, action, reward, env.deconstruct_array(observation_), done)
        agent.learn()
        observation = observation_
    scores.append(score)
    eps_history.append(agent.epsilon)
    avg_score = np.mean(scores)
    print(f'latest score: {score}: avg score: {avg_score}')




latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -10000.0
latest score: -10000: avg score: -

KeyboardInterrupt: 

In [43]:
env = CarParking()
state = env.reset()
score = 0
start = time.time()
for i in range(10000):
    action = env.action_space.sample()
    action = np.array([3,1])
    if i <1000:
        action = np.array([3,1])
    # if i > 20 and i < 60:
    #     action = np.array([3,3])
        
    
    
    state, reward, done, info, coords = env.step(env.convert_actions(action))
    # print(action)
    env.render()
    score += reward
    if done:
        env.close()
        print('CAR CRASH')
        break
    # print(state, score, done, info, coords)
    # for wheel in ['rf','lf','rb','lb']:
    #   coord = coords[wheel]
    #   plt.scatter(coord[0], area_length-coord[1], label=wheel)
    # plt.scatter(env.state['pos'][0],area_length-env.state['pos'][1])
    # # Add labels and a legend
    # plt.xlabel('X-axis')
    # plt.ylabel('Y-axis')
    # plt.xlim(0,2500)
    # plt.ylim(0,2500)
    # plt.title('Car Position')
end = time.time()
print(end-start)



CAR CRASH
2.3230268955230713


In [None]:
env.observation_space.low

AttributeError: 'Dict' object has no attribute 'low'

In [61]:
# np.linalg.norm(env.state['pos'][0]-env.goal_center[0], env.state['pos'][1]-env.goal_center[1])
env.state['pos'][1]-env.goal_center[1]


374.5