In [None]:
import os
import numpy as np
import random
import matplotlib.pyplot as plt
import torch

random.seed(2022)

In [None]:
flag_list = [0, 1, 2, 3, 5, 6, 7, 8]
#  [0, 1, 2
#   3,    5
#   6, 7, 8]

time_list = [0, 1, 2]

# the maze of size 201*201*2
maze_cells = np.zeros((201, 201, 2), dtype=int)

# load maze
def load_maze():
    if not os.path.exists("COMP6247Maze20212022.npy"):
        raise ValueError("Cannot find %s" % file_path)

    else:
        global maze_cells
        maze = np.load("COMP6247Maze20212022.npy", allow_pickle=False, fix_imports=True)
        maze_cells = np.zeros((maze.shape[0], maze.shape[1], 2), dtype=int)
        for i in range(maze.shape[0]):
            for j in range(maze.shape[1]):
                maze_cells[i][j][0] = maze[i][j]  
                # load the maze, with 1 denoting an empty location and 0 denoting a wall
                maze_cells[i][j][1] = 0  
                # initialized to 0 denoting no fire

# get local 3*3 information centered at (x,y).
def get_local_maze_information(x, y):
    global maze_cells
    random_location = random.choice(flag_list)
    around = np.zeros((3, 3, 2), dtype=int)
    for i in range(maze_cells.shape[0]):
        for j in range(maze_cells.shape[1]):
            if maze_cells[i][j][1] == 0:
                pass
            else:
                maze_cells[i][j][1] = maze_cells[i][j][1] - 1  # decrement the fire time

    for i in range(3):
        for j in range(3):
            if x - 1 + i < 0 or x - 1 + i >= maze_cells.shape[0] or y - 1 + j < 0 or y - 1 + j >= maze_cells.shape[1]:
                around[i][j][0] = 0  # this cell is outside the maze, and we set it to a wall
                around[i][j][1] = 0
                continue
            around[i][j][0] = maze_cells[x - 1 + i][y - 1 + j][0]
            around[i][j][1] = maze_cells[x - 1 + i][y - 1 + j][1]
            if i == random_location // 3 and j == random_location % 3:
                if around[i][j][0] == 0: # this cell is a wall
                    continue
                ran_time = random.choice(time_list)
                around[i][j][1] = ran_time + around[i][j][1]
                maze_cells[x - 1 + i][y - 1 + j][1] = around[i][j][1]
    return around

In [None]:
# returns the number of walls surrounding the agent

def check_surroundings(state, position):
    no_walls = 0
    
    for i in range(4):
        if state[state_directions[i]] == 0 or tuple((position[0],position[1]) + x_y_directions[1:][i]) in imperfect_positions:
            no_walls += 1
            
    return no_walls

In [None]:
# returns the position after taking an action

def take_action(position, action):
        return (position[0] + action[0], position[1] + action[1])

In [None]:
# returns true if the action results in hitting a wall or fire

def enter_wall_or_fire(state, action):
    new_pos = state[1 + action[0]][1 + action[1]]
    if new_pos[0] == 0 or new_pos[1] > 0:
        return True
    else:
        return False

In [None]:
# updates the q table using the Bellman optimality equation

def update_q_table(position, new_position, reward, idx):
    next_idx = torch.argmax(q_table[new_position[0], new_position[1], :])

    q_table[position[0], position[1], idx] = q_table[position[0], position[1], idx] + \
    lr * (reward + gamma * q_table[new_position[0], new_position[1], next_idx] \
             - q_table[position[0], position[1], idx])

In [None]:
# writes trace to output file

def write_to_output(steps, position, action, total_reward, state):
    with open('output.txt', 'a') as f:
        
        f.write("Step: " + str(steps) +
                "  x: " + str(position[0].item()) + "  y: " + str(position[1].item()) + 
                "  Action taken: " + str(tuple(action)) + "  Total Reward: " + str(total_reward) + "\n\n")
        
        f.write("State: " + "\n")
        
        f.write(str(state[0, 0]) + " " + str(state[0, 1]) + " " + str(state[0, 2]) + "\n")
        f.write(str(state[1, 0]) + " " + str(state[1, 1]) + " " + str(state[1, 2]) + "\n")
        f.write(str(state[2, 0]) + " " + str(state[2, 1]) + " " + str(state[2, 2]) + "\n\n")

In [None]:
# agent class which holds various useful methods

class Agent:
    def __init__(self, position):
        self.position = position
        self.state = None
    
    # returns the local maze information (observation) of agent
    def retrieve_state(self):
        self.state = get_local_maze_information(self.position[0], self.position[1])
        return self.state
    
    # checks if agent is in position [1, 1]
    def in_initial_position(self):
        if self.position[0] == 1 and self.position[1] == 1:
            return True
        else:
            return False
        
    # select action via epsilon decay policy
    def choose_action(self, epsilon, train):
        random_num = random.uniform(0, 1)
        if random_num < epsilon and train:
            action_idx = random.randint(0, no_actions - 1)
        else:
            action_idx = torch.argmax(q_table[self.position[0], self.position[1], :])

        action = x_y_directions[action_idx]
        
        return action_idx, action
    
    # set position of agent when resetting from dead end
    def set_position(self, position):
        self.position = position
        
    # get position of agent
    def get_position(self):
        return self.position
    
    # check if agent is in position [199, 199]
    def at_goal(self, new_position):
        if new_position[0] == 199 and new_position[1] == 199:
            return True
        else:
            return False

In [None]:
# training agent method

def train_agent(train=True):
    
    # initialising the agent in position [1,1]
    visited = [(1, 1)]
    position = torch.tensor([1, 1])
    agent = Agent(position)
    
    # keep track of reward
    total_reward = 0
    
    # boolean to check when agent terminated at position [199, 199]
    terminated = False

    # stacks for intersections and intersection actions
    intersections = []
    intersection_actions = []

    # number of steps
    steps = 0

    # take 5000 steps
    while steps < 5000:
        
        # get position of agent and select action
        position = agent.get_position()
        state = agent.retrieve_state()
        idx, action = agent.choose_action(epsilon, train)

        # get no. of walls surrounding agent
        no_walls = check_surroundings(state, position)

        # no. of walls == 3 means dead end, reset agent position to last
        # intersection and close off imperfect position leading to dead end
        if no_walls == 3 and not agent.in_initial_position():
            position = torch.tensor(intersections.pop())
            intersection_action = intersection_actions.pop()
            agent.set_position(position)
            imperfect_pos = take_action(position, intersection_action)
            imperfect_positions.append(imperfect_pos)
            continue
        elif no_walls == 1 or agent.in_initial_position(): # record all intersections and actions taken at them
            intersections.append((position[0], position[1]))
            intersection_actions.append(action)

        # compute the next potential position
        possible_position = position + action

        # check if potential position is a bad state to be in
        if (possible_position[0], possible_position[1]) in imperfect_positions or enter_wall_or_fire(state, action): # if possible position leads to dead end, fire or wall
            new_position = position
            reward = -20
        elif (possible_position[0], possible_position[1]) in visited: # if possible position leads to already visited area
            new_position = position
            reward = -5
        else:
            new_position = possible_position # if possible position leads to new state
            visited.append((possible_position[0], possible_position[1]))
            reward = -1

        # if agent reaches goal, terminate and reward 1000
        if agent.at_goal(new_position):
            reward = 1000
            terminated = True

        # if training, update the q_table, otherwise evaluate and write trace to output
        if train:
            update_q_table(position, new_position, reward, idx)
        else:
            write_to_output(steps, position, action, total_reward, state)

        # set position of agent to new position
        agent.set_position(new_position)

        # accumulate the reward for this epoch and increment steps
        total_reward += reward
        steps += 1

        # break if terminated was set to true before
        if terminated:
            break

    return position, total_reward, terminated

In [None]:
# load maze
load_maze()

In [None]:
# helper variables and hyperparameters are set here
no_rows = 199
no_cols = 199

no_actions = 5

imperfect_positions = []
q_table = torch.zeros((no_rows+1, no_cols+1, no_actions))

# gamma discount factor
gamma = 0.95

# learning rate
lr = 1

# epsilon
epsilon=0.5

# epsilon decay
epsilon_decay = 0.99

state_directions = [(0, 1, 0), (1, 0, 0), (1, 2, 0), (2, 1, 0)]
x_y_directions = np.array([(0, 0), (-1, 0), (0, -1), (0, 1), (1, 0)])

epoch_rewards = []

In [None]:
# run the training 300 times, recording positions, total epoch reward, etc.
# also decay epsilon each epoch
for epoch in range(300):
    position, total_reward, terminated = train_agent(train=True)
    epoch_rewards.append(total_reward)
    print("Epoch: ", epoch, " Epoch Reward: ", total_reward,
          " Last Position Reached: ", position, " Goal Reached: ", terminated)
    epsilon = epsilon * epsilon_decay

In [None]:
# plot total rewards across epochs
plt.xlabel("Total Reward")
plt.ylabel("Epoch")
plt.ylim([-2100,0])
plt.plot(epoch_rewards, 'c', alpha=1)
plt.title('Training Agent')

plt.legend()

In [None]:
# save our q-table
torch.save(q_table, 'q_table.pt')

In [None]:
# load our q-table
q_table = torch.load('q_table.pt')

In [None]:
# evaluate our q-table
f = open('output.txt', 'w')
f.write('Reinforcement Learning for Dynamic Maze Solving - Trace Output File \n\n')
f.write('Action Key: \n' + '(0, 0)  = STAY \n' + '(1, 0)  = LEFT \n' + '(-1, 0) = RIGHT \n' + '(0, -1) = UP \n' + '(0, 1)  = DOWN \n\n')
f.close()

train_agent(train=False)