# Install and Import Dependencies

In [None]:
# pygame used for rendering our maze environment & agent's actions
!pip install pygame

In [None]:
# stable-baselines is where we pull the Deep Q-Network from
!pip install stable-baselines3[extra] 

In [None]:
# DQN = Deep-Q-Network - maximize bellman equation, MLP Policy for model
import gym 
from gym import Env
from gym.spaces import Discrete, Box, Tuple
import numpy as np
import os
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold
import pygame

# Specify the Rendering for the Pygame Display

In [None]:
WIDTH, HEIGHT = 800, 800
#creates the window
WIN = pygame.display.set_mode((WIDTH, HEIGHT))
#Gives the window a name in the top left corner
pygame.display.set_caption("Maze Render!")
#RGB color constants
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
RED = (255, 0, 0)
GREEN = (50, 205, 50)
BLUE = (176,224,230)
#How many frames per second we want to show. When we are accually testing this is should be around 20-45
FPS = 5000
#Background color(currently does nothing because default is white)
WIN.fill(WHITE)
# notify user the display should pop up in a video device, if not rendering code not needed
print("Video display should exist in pop up window as of now") 

In [None]:
#The goal of this funtion is to draw the walls of the maze in the correct areas
#The for loop stuff should only have to be done once, it is currently being done more than one time(This is fine, but unneeded)
def draw_window(state, past_state):#past_past_state 
    #Go through all indexes of WALLS
    for x in range(2*DIM_COL_ROW + 1):
        for y in range(DIM_COL_ROW + 1):
            #If there should be a wall there
            if WALLS[x][y] == 1:
                #Check to see if the line is vertical
                if x % 2 == 0:
                    #pygame.draw.rect takes in 3 inputs (window it is drawing to, Color in RGB, (start location of x, start location of y, how long in x direction, how long in y direction))
                    pygame.draw.rect(WIN, BLACK, (100 + x/2 * 100, (6-y) * 100, 10, 110))
                #Line is horizonal
                else:
                    pygame.draw.rect(WIN, BLACK, (100 + (x//2 * 100), 100 +(DIM_COL_ROW - y) * 100, 110, 10))
    #pygame.draw.circle takes in 4 inputs (window it is drawing to, Color in RGB, center, radius)
    #pygame.draw.circle(WIN, WHITE,(100 * past_past_state[0] + 155, 55 +(DIM_COL_ROW - past_past_state[1]) * 100), 25)
    pygame.draw.circle(WIN, BLUE,(100 * past_state[0] + 155, 55 +(DIM_COL_ROW - past_state[1]) * 100), 25)
    pygame.draw.circle(WIN, RED, (100 * state[0] + 155, 55 +(DIM_COL_ROW - state[1]) * 100), 25)
    pygame.draw.circle(WIN, GREEN, (DIM_COL_ROW * 100 + 55, 155), 25)
    #This takes everything that is on the draw stack and pushes it to the window. 
    pygame.display.update()

# Build the Environment

In [None]:
# class for maze env where agent will learn
class MazeEnv(Env):
    
    # maze member_variables & action / observation space
    def __init__(self, DIM_COL_ROW, STARTING_CELL, WALLS):
        # declare num_col & num_row; boards are square
        self.dim_row_col = DIM_COL_ROW
        # declare starting cell
        self.starting_cell = STARTING_CELL
        # actions we can take, up, right, down, left ; NESW
        self.action_space = Discrete(4) # 0,1,2,3
        # observation - box w/ np.arrays made discrete elements
        self.observation_space = Box(np.array((0,0), dtype=int), np.array((DIM_COL_ROW-1,DIM_COL_ROW-1)), dtype=np.int64)
        # declare curr state to none & initialize in step function if empty
        self.state = None
        # state before the action is made
        self.past_state = self.starting_cell
        # set max_steps to prevent infinite searching in maze
        self.max_steps = 100000 # 1000 baseline, will change
        # set current step for iterating action steps
        self.current_step = 0
        # set value for cell in maze that ends episode
        self.end_cell = np.array((DIM_COL_ROW-1,DIM_COL_ROW-1),dtype=np.int64)
        # set episode termination variable to false
        self.episode_terminated = False
        # set walls for maze below
        self.Walls = WALLS
        
    # moves agent around env; how actions change states
    def step(self, action):
        # assigns starting cell; done this way to avoid assertion errors
        try:
            _ = self.state[0]
        except ValueError:
            self.state = self.starting_cell
        # reward definition for value mutations later on in if statements
        # action discrete values defined below
        # 0 is down 
        # 1 is left
        # 2 is up
        # 3 is right
        # take action & change state cell; passes prevent action in else
        # if action is down or up
        if (action == 0 or action == 2):
            # if wall DNE
            if (self.Walls[2*self.state[0]+1, self.state[1]+ (action//2)] == 0):
                #move past state
                self.past_state = self.state
                # move to new state
                self.state = (self.state[0], self.state[1] + (action - 1))
                # increment steps
                self.current_step += 1    
            # if wall exists
            else:
                pass
        # else action is left or right
        else:
            # if wall DNE
            if (self.Walls[2*(self.state[0] + (action//2)) , self.state[1]] == 0):
                #move past state
                self.past_state = self.state
                # move to new state
                self.state = (self.state[0] + (action - 2), self.state[1])
                # increment steps
                self.current_step += 1
            # if wall exists
            else:
                pass 
        
        # calculate reward & check if at end-condition
        if (np.array_equal(self.state,self.end_cell)):    
            self.episode_terminated = True
            reward = 100
        else:
            # incentive to keep moving; reach end quickly
            reward = -1/(self.dim_row_col*self.dim_row_col)
            
        # end-condition w/ out reward is too many steps taken
        if (self.current_step >= self.max_steps):
            self.episode_terminated = True
            
        # info {} must be returned for step() for Env class; dk why
        info = {}
        
        # return step information
        return np.array(self.state, dtype=np.int64), reward, self.episode_terminated, info
                
    # implements visuals of learning process
    def render(self, mode):
        # implement viz
        clock = pygame.time.Clock()
        clock.tick(FPS)
        draw_window(self.state, self.past_state)#self.past_past_state
        
        for event in pygame.event.get():
        #Are any of the inputs the X in the top left
            if event.type == pygame.QUIT:
                pygame.quit()
    
    # reset sets env's params to starting values
    def reset(self):
        # reset state to starting cell
        self.state = np.array(self.starting_cell, dtype=np.int64)
        # reset past state to starting cell
        self.past_state = np.array(self.starting_cell, dtype=np.int64)
        # reset episode_terminated to episode running
        self.episode_terminated = False
        # reset current step to no steps taken
        self.current_step = 0
        # reset the whole window
        WIN.fill(WHITE)
        # return state to exploit; model.predict(env.reset()) in test model
        #return np.array(self.state, dtype=np.int64)
        return self.state
        

In [None]:
# set dimensions of square maze & starting cell in maze
DIM_COL_ROW = 6
STARTING_CELL = (0,0)
# walls is subject to dim; hardcode changes on input
# even x indices are vert walls; up & down actions permittable if val = 0
# odd x indices are horz walls; right and left actions permittable if val = 0
# end-points of x indices are the edges of the grid
# val = 2 => noise in wall grid (wall N/A; neither T/F)
# specifically, val = 2 => vert & edges-grid top of grid
# for Walls, each row is a set of edges-grid + vert + horz  borders
# for Walls, num_col = dim_col_row+1; border numbers
WALLS = np.array([
                [1,1,1,1,1,1,2],
                [1,0,0,0,0,0,1],
                [1,0,1,0,1,0,2],
                [1,1,1,1,0,1,1],
                [0,0,0,0,0,0,2],
                [1,0,1,1,1,1,1],
                [1,1,0,0,1,0,2],
                [1,0,0,0,0,0,1],
                [1,1,1,0,1,1,2],
                [1,0,0,0,1,1,1],
                [0,1,1,1,0,0,2],
                [1,0,0,0,0,0,1],
                [1,1,1,1,1,1,2]
                ])

# declare environment
env = MazeEnv(DIM_COL_ROW, STARTING_CELL, WALLS)
# ensure env functions with stable_baselines well
check_env(env, warn=True)

# Test The Environment- Random Actions

In [None]:
episodes = 20
average_score = 0
average_steps = 0
# iterate through simulated episodes of the env with random actions
for episode in range(1, episodes+1):
    # make state at starting cell
    state = env.reset()
    # reset boolean for simulation
    done = False
    # score is metric for rewards
    score = 0 
    # logic for scores
    # simulate a singular episode with random action
    while not done:
        # render env
        env.render(True)
        # random actions hence .sample()
        action = env.action_space.sample()
        # step from state with action & capture step info
        n_state, reward, done, info = env.step(action)
        # increment reward metric
        score+=reward
    # display simulation results
    print('Episode:{} Score:{} Steps:{}'.format(episode, score, int((100-score)*(DIM_COL_ROW*DIM_COL_ROW))))
    # update average_score & average_steps
    average_score += score
    average_steps += int((100-score)*(DIM_COL_ROW*DIM_COL_ROW))
# display simulation average score & steps
print("--------------------------------------------------")
print("Average Score:{} Average Steps:{}".format(average_score/episodes, average_steps/episodes))
# env.close() - needed when render implemented

## Train an RL Model ##

In [None]:
# create the paths for saving the model & logging its training statistics
save_path = os.path.join('Training', 'Saved Models')
log_path = os.path.join('Training', 'Logs')
training_log_path = os.path.join(log_path, 'DQN_7')
dqn_path = os.path.join('Training', 'Saved Models', 'DQN_model')

In [None]:
print("Please explicitly type Learn or Load to indicate if you'd like to Learn a new model or Load in the existing Learned model")
print("Note learning a model can take from 5 to 20 minutes. We HIGHLY suggest loading the model instead of re-learning.")
print("-- As this is a .ipynb file, search for where to input your answer if need be  --")
print("-- in vscode the input is at the top of your screen; jupyter notebooks the input is in the cell-block's output --")
userInput = input("Learn or Load: ")
# user input validation
while(userInput.lower() != "learn" and userInput.lower() != "load"):
    userInput = input("Invalid entry. Please type learn or load to indicate your requested action: ")
# execution to learn or load
if userInput.lower() == "learn":
    # create the model object with MlpPolicy & DQN type
    model = DQN('MlpPolicy', env, verbose=1, tensorboard_log=log_path)
    # train the model
    model.learn(total_timesteps=500000)
    # save model to dqn path
    model.save(dqn_path)
    # display success
    print("Successfully learned upon a new DQN model")
else:
    # load model from dqn_path including the existing env
    model = DQN.load(dqn_path, env=env)
    # display success
    print("Successfully loaded the prexisting DQN model")

## Test DQN_Model on MazeEnv - See Predictions

In [None]:
print("DQN Model's predicted path from starting state from {} to {} below".format(STARTING_CELL, (DIM_COL_ROW, DIM_COL_ROW)))
# reset env, run agent predictions until step function indicates ending cell reached
obs = env.reset()
while True:
    action, _states = model.predict(obs)
    print("obs: ", obs)
    obs, rewards, done, info = env.step(action)
    env.render(True)
    if done: 
        print('obs: ', obs)
        break

In [None]:
# run policy on env for n_eval_episodes & return mean reward per episode
evaluate_policy(model, env, n_eval_episodes=1000, render=True, reward_threshold = 70)


In [None]:
# use this line to quit popup without crashing kernel
pygame.quit()