In [None]:
# Dependencies
!pip install tensorflow
!pip install gym
!pip install keras
!pip install keras-rl2

In [None]:
import pygame
import cv2
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import sys, os
from math import *
sys.path.append("../config/")
import config
from config import Car

In [None]:
# Parameters

# Specify which map
MAP_PATH = os.path.join('../Assets', 'map3.png')


# Init Car object
Car = Car()
# Initialize the window
WIN = pygame.display.set_mode((config.WIDTH, config.HEIGHT))
pygame.display.set_caption('SDC-RL')

# Initalize fonts for text
pygame.font.init()
REWARD_FONT = pygame.font.SysFont('comicsans', 30)
INPUT_FONT  = pygame.font.SysFont('comicsans', 15)

# Load in map of our choosing
MAP_IMAGE = pygame.image.load(MAP_PATH).convert_alpha()

# Initialize the car image
# NB: Convert converts to pixel and speeds up runtime
CAR_IMAGE = pygame.image.load(os.path.join('../Assets', 'car.png')).convert_alpha()
# Car starts facing positive x-axis
CAR_IMAGE = pygame.transform.rotate(pygame.transform.scale(CAR_IMAGE, (Car.width, Car.height)), -90)

# Define all the event IDs
COLLISION   = pygame.USEREVENT+1
REWARD      = pygame.USEREVENT+2

In [None]:
'''
Environment for Q learning

Action Space:       forward, backward, right, left, nothing
Observation Space:  [Car.x, Car.y, Car.ang, Car.vel, laserscan]
                    First array argument represents all lowerbound for each observation
                    Second is all upperbound for each observation
State:              [Car.x, Car.y, Car.ang, Car.vel, laserscan]
                    TODO: Ask if this is supposed to be same as observation space

'''
class SDCEnv(Env):
    def __init__(self):
        self.action_space = Discrete(5)
        ls_low  = np.full((1, Car.num_laserscan), 0)[0] #Laserscan low
        ls_high = np.full((1, Car.num_laserscan), Car.laserscan_dist)[0] #Laserscan high
        plow    = np.array([0, 0, -np.inf, -config.VEL_MAX])
        phigh   = np.array([config.WIDTH, config.HEIGHT, np.inf, config.VEL_MAX])

        low = np.array([np.float32(np.append(plow, ls_low))])
        high = np.array([np.float32(np.append(phigh, ls_high))])
        self.observation_space = Box(low=low,
                                    high=high)
        self.ls_def     = np.full((1, Car.num_laserscan), -1)[0] #Laserscan default
        self.state      = [Car.x, Car.y, Car.ang, Car.vel]
        self.state.extend(list(self.ls_def)) # extend returns None
        self.game_car   = pygame.Rect(0, 0, Car.width, Car.height)
        self.WALLS      = self.create_walls()
        self.WAYPOINTS  = self.create_waypoints()
        self.laserscan  = []
        self.impactxy   = []
        # information needed for visualization
        self._action    = -1  # Store last action so that we can visualize inputs
        self._reward    = 0.   # Store total reward to display on screen

    '''
    Step function:      Executes action and calulates reward

    Params:
    -------
    action:             Action from action space

    Returns:
    --------
    state:              New state after action, our Car object
    reward:             Reward for taking that action
    done:               Whether the episode is over, if our car crashes
    info:               Debug information
    '''
    def step(self, action):
        self._action = action # Store for visualization
        # Apply the action
        self.handle_movement(action)
        self.check_velocity()
        Car.x  += Car.vel*cos(Car.ang)
        Car.y  += Car.vel*sin(Car.ang)
        self.game_car.centerx = Car.x
        self.game_car.centery = Car.y
        self.detect_wall_collision(self.WALLS)
        self.detect_waypoint_collision(self.game_car, self.WAYPOINTS)
        self.laserscan, self.impactxy = self.get_laserscan(self.WALLS)

        # Check if Car interacted with anything
        # Calculate reward from this: -.01 for nothing, 10 for waypoint, -50 for collision
        reward = -.01
        done = False
        for event in pygame.event.get():
            if event.type == REWARD:
                reward  = 10
            if event.type == COLLISION:
                done    = True
                reward  = -50
        
        self._reward += reward  # For visualization
        # Set the new state
        self.state = [Car.x, Car.y, Car.ang, Car.vel]
        self.state.extend(self.laserscan)

        # Placeholder for information
        info = {}

        return self.state, reward, done, info

    def render(self, mode='human'):
        angle_in_degrees = Car.ang*(180./pi)
        image    = pygame.transform.rotate(CAR_IMAGE, -angle_in_degrees)
        self.game_car = image.get_rect(center=self.game_car.center)
        # Draw background
        WIN.blit(MAP_IMAGE, (0,0))
        # Draw reward text
        reward_text = REWARD_FONT.render("Reward: "+str(round(self._reward,3)),
                                         1, config.GREEN)
        WIN.blit(reward_text, (config.WIDTH - reward_text.get_width()-10, 10))
        # Draw input indicators
        self.draw_indicators(self._action)
        # Draw Walls different color
        #for wall in WALLS:
            #pygame.draw.rect(WIN, config.ORANGE, wall)

        # Draw Car hitbox
        pygame.draw.rect(WIN, config.SOFT_RED, self.game_car)
        # Draw the laserscan
        self.draw_laserscan(self.laserscan, self.impactxy)
        # Draw car
        WIN.blit(image, image.get_rect(center=(Car.x, Car.y)))

        pygame.display.update()
    def reset(self):
        self._reward = 0
        self.game_car.x = config.STARTX
        self.game_car.y = config.STARTX
        Car.reset()
        self.state = [Car.x, Car.y, Car.ang, Car.vel]
        self.state.extend(list(self.ls_def))
        return self.state

    '''
    Function to draw the laserscan in draw_window
    '''
    def draw_laserscan(self, laserscan, impactxy):
        num     = Car.num_laserscan
        angle   = Car.ang
        if not laserscan:
            pass
        else:
            for i in range(0, num):
                # Find the start and endline using trig (similar to forward kinematics)
                # If it equals negative 1 the full laser length is drawn
                if laserscan[i] == -1:
                    pygame.draw.line(WIN, config.RED,
                                    (Car.x, Car.y),
                                    (Car.x+(Car.laserscan_dist*cos(angle)), Car.y+(Car.laserscan_dist*sin(angle))),
                                    1)
                else:
                    pygame.draw.line(WIN, config.RED,
                                    (Car.x, Car.y),
                                    (Car.x+(laserscan[i]*cos(angle)), Car.y+(laserscan[i]*sin(angle))),
                                    1)
                angle += ((2*pi) / num)
            # draw small circle at impact point of laser and obstacle
            for x,y in impactxy:
                pygame.draw.circle(WIN, config.RED, (x,y), 3, 3)

    '''
    Function to draw indicators
    '''
    def draw_indicators(self, action):
        # xy of where we put the indicators
        space = 35
        leftxy    = (config.WIDTH/2 - space, config.HEIGHT/2)
        rightxy  = (config.WIDTH/2 + space, config.HEIGHT/2)
        upxy = (config.WIDTH/2, config.HEIGHT/2-space)
        downxy  = (config.WIDTH/2, config.HEIGHT/2)

        lefttext    = INPUT_FONT.render("A", 1, config.BLACK)
        righttext   = INPUT_FONT.render("D", 1, config.BLACK)
        uptext      = INPUT_FONT.render("W", 1, config.BLACK)
        downtext    = INPUT_FONT.render("S", 1, config.BLACK)

        # center text
        lefttext_rec    = lefttext.get_rect(center=leftxy)
        righttext_rec   = righttext.get_rect(center=rightxy)
        uptext_rec      = uptext.get_rect(center=upxy)
        downtext_rec    = downtext.get_rect(center=downxy)

        # size
        press_size  = 14
        rest_size   = 15

        color_fill = config.TAN
        color_border = config.ORANGE

        if action==0:  # LEFT
            pygame.draw.circle(WIN, color_fill, list(leftxy), press_size, 0)
            WIN.blit(lefttext, lefttext_rec)
        else:
            pygame.draw.circle(WIN, color_border, list(leftxy), rest_size, 3)
            WIN.blit(lefttext, lefttext_rec)
        if action==1:  # RIGHT
            pygame.draw.circle(WIN, color_fill, list(rightxy), press_size, 0)
            WIN.blit(righttext, righttext_rec)
        else:
            pygame.draw.circle(WIN, color_border, list(rightxy), rest_size, 3)
            WIN.blit(righttext, righttext_rec)
        if action==2:  # UP
            pygame.draw.circle(WIN, color_fill, list(upxy), press_size, 0)
            WIN.blit(uptext, uptext_rec)
        else:
            pygame.draw.circle(WIN, color_border, list(upxy), rest_size, 3)
            WIN.blit(uptext, uptext_rec)
        if action==3:  # DOWN
            pygame.draw.circle(WIN, color_fill, list(downxy), press_size, 0)
            WIN.blit(downtext, downtext_rec)
        else:
            pygame.draw.circle(WIN, color_border, list(downxy), rest_size, 3)
            WIN.blit(downtext, downtext_rec)

    '''
    Function to find all the walls from the background image and
    store them in the WALLS list

    Using openCV line detection for black lines

    Params:
    -------
    None

    Returns:
    --------
    Coordinates of the begin/end points of each line segment that makes up each barrier
    '''
    def create_walls(self):
        print('Generating Walls...')
        # Preprocessing
        img = cv2.imread(MAP_PATH, cv2.IMREAD_COLOR)
        lower = np.array([0, 0, 0])
        upper = np.array([0, 0, 0])
        black_mask = cv2.inRange(img, lower, upper) # Isolate all black pixels
        result = 255 - black_mask

        low_threshold = 50
        high_threshold = 150
        edges = cv2.Canny(result, low_threshold, high_threshold)
        dilated = cv2.dilate(edges, np.ones((3,3), dtype=np.uint8))

        rho = 1  # distance resolution in pixels of the Hough grid
        theta = np.pi / 180  # angular resolution in radians of the Hough grid
        threshold = 25  # minimum number of votes (intersections in Hough grid cell)
        min_line_length = 50  # minimum number of pixels making up a line
        max_line_gap = 20  # maximum gap in pixels between connectable line segments
        line_image = np.copy(img) * 0  # creating a blank to draw lines on

        # Run Hough on edge detected image
        # Output "lines" is an array containing endpoints of detected line segments
        lines = cv2.HoughLinesP(dilated, rho, theta, threshold, np.array([]),
                            min_line_length, max_line_gap)

        for line in lines:
            for x1,y1,x2,y2 in line:
                cv2.line(line_image,(x1,y1),(x2,y2),(255,0,0),1)
        lines_edges = cv2.addWeighted(img, 0.8, line_image, 1, 0)
        #cv2.imshow('Edges', dilated)
        cv2.imshow('Detected Walls/Obstacles', line_image)
        cv2.waitKey(1)
        print(str(len(lines))+' lines detected')
        return lines

    '''
    Function to create reward waypoints

    Need to use Blue RGB (0,0, 255) circles in drawing to signify a waypoint


    Returns:
    --------
    Pygame rectangles that approximate the waypoint
    '''
    def create_waypoints(self):
        print('Generating Waypoints...')
        # Preprocessing
        img = cv2.imread(MAP_PATH, cv2.IMREAD_COLOR)
        lower = np.array([255, 0, 0])
        upper = np.array([255, 0, 0])
        blue_mask = cv2.inRange(img, lower, upper) # Isolate all blue pixels
        result = cv2.bitwise_and(img, img, mask=blue_mask)


        gray = cv2.cvtColor(result, cv2.COLOR_BGR2GRAY)
        gray = cv2.medianBlur(gray, 1)
        rows = gray.shape[0]
        circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, 1, rows/8,
                                    param1=100, param2=20, minRadius=0, maxRadius=200)

        WAYPOINTS = []
        if circles is not None:
            circles = np.uint16(np.around(circles))
            print(str(len(circles[0,:]))+' waypoints detected')
            for i in circles[0, :]:
                center = (i[0], i[1])
                # circle center
                cv2.circle(img, center, 1, (0, 100, 100), 3)
                # circle outline
                radius = i[2]
                cv2.circle(img, center, radius, (255, 0, 255), 3)
                WAYPOINTS.append(pygame.Rect(center, (radius, radius)))
        else:
            print('0 waypoints detected')

        cv2.imshow('Detected Waypoints (in purple)', img)
        cv2.waitKey(1)
        return WAYPOINTS

    def handle_movement(self,action):
        if action == 0:                 # LEFT
            Car.ang     -= Car.w
        if action == 1:                 # RIGHT
            Car.ang     += Car.w
        if action == 2:                 # UP
            Car.vel     += Car.acc
        if action == 3:                 # DOWN
            Car.vel     -= Car.acc
        if action == 4:                 # Nothing
            pass

    '''
    Function that checks velocity for max velocity and adds friction damping term

    Params:
    -------
    None

    Returns:
    --------
    Nothing
    '''
    def check_velocity(self):
        # Check if velocity exceeds max velocity
        if Car.vel > config.VEL_MAX:
            Car.vel = config.VEL_MAX
        elif Car.vel < -config.VEL_MAX:
            Car.vel = -config.VEL_MAX
        
        # Apply friction damping term
        if Car.vel > 0:
            if Car.vel - config.FRICTION < 0:
                Car.vel = 0
            else:
                Car.vel -= config.FRICTION
        if Car.vel < 0:
            if Car.vel + config.FRICTION > 0:
                Car.vel = 0
            else:
                Car.vel += config.FRICTION

    
    '''
    Function to detect collision with walls based on line intersection
    we break our car down into four line segments and check against walls

    Params:
    -------
    WALLS:  List of all our line segment walls

    Return:
    -------
    Nothing
    '''
    def detect_wall_collision(self, WALLS):
        bl = ((Car.x-(cos(Car.ang)*Car.height/2)), (Car.y-(sin(Car.ang)*Car.width/2))) # back left point of car 
        fl = ((Car.x+(cos(Car.ang)*Car.height/2)), (Car.y-(sin(Car.ang)*Car.width/2))) # front left point of car 
        br = ((Car.x-(cos(Car.ang)*Car.height/2)), (Car.y+(sin(Car.ang)*Car.width/2))) # back right point of car 
        fr = ((Car.x+(cos(Car.ang)*Car.height/2)), (Car.y+(sin(Car.ang)*Car.width/2))) # front right point of car 

        front   = (fl,fr)
        back    = (bl, br)
        lside   = (fl, bl)
        rside   = (fr, br)
        car_seg = [front, back, lside, rside]

        intersection = False
        for wall in WALLS:
            for x3,y3,x4,y4 in wall:
                for seg in car_seg:
                    x1 = seg[0][0]
                    y1 = seg[0][1]
                    x2 = seg[1][0]
                    y2 = seg[1][1]
                    denom  = (x1-x2)*(y3-y4) - (y1-y2)*(x3-x4)
                    # if denom 0, lines parallel so never intersect
                    if denom == 0:
                        continue
                    t1  = (x1-x3)*(y3-y4) - (y1-y3)*(x3-x4)
                    t   = t1/denom
                    u1  = (x1-x3)*(y1-y2) - (y1-y3)*(x1-x2)
                    u   = u1/denom
                    # Test to see if intersection exists
                    if 0<=t and t<=1 and 0<=u and u<=1:
                        intersection = True
                        pygame.event.post(pygame.event.Event(COLLISION))
                        break # Stop checking for wall intersection if we already found one
            if intersection:
                break
    
    '''
    Function to detect whether we have reached a waypoint
    We can hit a waypoint once. It will refresh after we hit everyother waypoint
    TODO: This is messy but whatever
    
    Params:
    -------
    game_car:   pygame Rect for collision detection
    WAYPOINTS:  Rect objects representing our waypoints

    Return:
    -------
    Nothing
    '''
    def detect_waypoint_collision(self, game_car, WAYPOINTS):
        for waypoint in WAYPOINTS:
            if game_car.colliderect(waypoint):
                if waypoint in Car.expired_waypoints: break
                pygame.event.post(pygame.event.Event(REWARD))
                Car.expired_waypoints.append(waypoint)
        # reset if we have hit every waypoint
        if len(Car.expired_waypoints) == len(WAYPOINTS):
            Car.expired_waypoints = []

    '''
    Function to simulate laserscan
    first laser will point straight ahead of the car, then
    increments by 2pi/num_laserscan 
    -1 is out of range

    Params:
    -------
    WALLS:  List of all our line segment walls

    Return:
    -------
    List of laserscan measurements
    (x,y) position of the impact of laser to obstacle
    '''
    def get_laserscan(self, WALLS):
        num     = Car.num_laserscan
        angle   = Car.ang
        # Use line intersection formula
        x1 = Car.x
        y1 = Car.y
        laserscan   = []
        impactxy    = []
        for i in range(0, num):
            # All variables for Line intersection
            x2 = Car.x+(Car.laserscan_dist*cos(angle))
            y2 = Car.y+(Car.laserscan_dist*sin(angle))
            angle += ((2*pi) / num)
            intersect = False
            for wall in WALLS:
                for x3,y3,x4,y4 in wall:
                    denom  = (x1-x2)*(y3-y4) - (y1-y2)*(x3-x4)
                    if denom == 0: # if denom 0, lines parallel so never intersect
                        continue
                    t1  = (x1-x3)*(y3-y4) - (y1-y3)*(x3-x4)
                    t   = t1/denom
                    u1  = (x1-x3)*(y1-y2) - (y1-y3)*(x1-x2)
                    u   = u1/denom
                    if 0<=t and t<=1 and 0<=u and u<=1: # Test to see if intersection exists
                        Px = x1+(t*(x2-x1))
                        Py = y1+(t*(y2-y1))
                        impactxy.append((Px, Py))
                        # Calculate distance between laserscan origin and intersection
                        dist = sqrt((Px-Car.x)**2 + (Py-Car.y)**2)
                        laserscan.append(dist)
                        intersect = True
                        break # Stop checking for wall intersection if we already found one
                if intersect:
                    break
            if not intersect:
                laserscan.append(-1)

        return laserscan, impactxy

In [None]:
env = SDCEnv()

In [None]:
env.observation_space.sample()

In [None]:
#episodes = 10
#for episode in range(1, episodes+1):
    #state = env.reset()
    #done = False
    #score = 0 
    
    #while not done:
        #env.render()
        #action = env.action_space.sample()
        #n_state, reward, done, info = env.step(action)
        #score+=reward
    #print('Episode:{} Score:{}'.format(episode, score))

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

In [None]:
states  = env.observation_space.shape
#states  = env.observation_space.shape[0] # Use this if using flatten layer
actions = env.action_space.n
actions

In [None]:
def build_model(states, actions):
    model = Sequential()
    #model.add(Flatten(input_shape=(1, states)))
    model.add(Dense(24, activation='relu', input_shape=(states)))
    model.add(Dense(24, activation='relu'))
    model.add(Flatten())
    model.add(Dense(actions, activation='linear'))
    return model

In [None]:
del model

In [None]:
model = build_model(states, actions)
model.summary()

In [None]:
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

In [None]:
def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=5000, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy, 
                  nb_actions=actions, nb_steps_warmup=100, target_model_update=1e-2)
    return dqn

In [None]:
nb_steps            = 50000                     # Training Steps
action_repetition   = 1                         # Set >1 to go this many steps before observing env again
nb_max_start_steps  = 40                         # Number of steps to take with default start_step_policy
start_step_policy   = lambda observation: 2     # Force agent to take this action for nb_max_start_steps
nb_max_episode_steps= None                      # If not None, will reset env after this many steps

dqn = build_agent(model, actions)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=nb_steps, visualize=True, verbose=1,
        action_repetition=action_repetition, nb_max_start_steps=nb_max_start_steps,
        start_step_policy=None, nb_max_episode_steps=nb_max_episode_steps)

In [None]:
episodes = 100
scores = dqn.test(env, nb_episodes=episodes, visualize=False)
print(np.mean(scores.history['episode_reward']))

In [None]:
# See the result of the trained model
_ = dqn.test(env, nb_episodes=15, visualize=True)

In [None]:
pygame.display.quit()
pygame.quit()
cv2.destroyAllWindows()

# Save the weights

In [None]:
dqn.save_weights('models/dqn_weights_'+str(nb_steps)+'.h5f', overwrite=True)

# How to reload a trained agent

In [None]:
del model
del dqn
del env

In [None]:
env = SDCEnv()
actions = env.action_space.n
states = env.observation_space.shape
model = build_model(states, actions)
dqn = build_agent(model, actions)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])

In [None]:
filename = 'models/dqn_weights_50000.h5f'
dqn.load_weights(filename)

In [None]:
_ = dqn.test(env, nb_episodes=5, visualize=True)