In [58]:
import numpy as np
import tensorflow as tf
from random import random, sample
from keras.layers import Dense
from keras.models import Sequential
from collections import deque
%run MazeEnv.ipynb

In [59]:
class Agent:
    def __init__(self):
        # hyperparameters
        self.episodes = 1028
        self.time_allowed_in_game = 500
        self.epsilon = 1
        self.min_epsilon = 0.01
        self.epsilon_multiplier = 0.99
        self.discount_rate = 0.9

        # create 2 models here - one for use with the predictions and the other to train on
        # the target model will be set to the trained model after a specfic num of iterations
        self.model = self.create_model()
        self.target_model = self.create_model()
        # both models must start with the same weights
        self.target_model.set_weights(self.model.get_weights())
        
        # create the list for the replay memory
        self.replay_memory = deque(maxlen=1028)
        
        # keep track of how often the target_weights = normal_weights
        self.t_target_current = 0
        self.t_target_threshold = 512
        # keep track of how often the to get from minibatch and train
        self.t_train_current = 0
        self.t_train_threshold = 256
        # mini batch to train on
        self.mini_batch_size = 64
        
    
    # create the model structure to be used
    def create_model(self):
        model = Sequential()
        
        # inputs will be the x and y coordinates of the maze - 2 input
        model.add(Dense(3, input_dim = 2, activation = "relu"))
        model.add(Dense(4, activation = "linear"))
        
        # the mse loss works best in dnq
        model.compile(optimizer = "adam", loss = "mean_squared_error")
        return model

    
    # choose an action to perform in the game
    def select_action(self, state):
        r = random()
        # choose a random action
        if r < self.epsilon:
            # this is coming from the maze file
            return select_move_from_num(r)
        else:
            x = state[0]
            y = state[1]
            inputs = np.array([x, y]).reshape((1, 2))
            move_prediction = np.array(self.model.predict(inputs)[0])
            prediction = np.argmax(move_prediction)
            return prediction
    
    # reward + bestaction(for next state) == q(state)
    # train the model
    def train(self):
        print("TRAINING")
        # training the model based on the minibatch - check if enough to do the training
        if len(self.replay_memory) < self.mini_batch_size:
            return
        
        # get a sample of the replay memory with the minibatch
        mini_batch = sample(self.replay_memory, self.mini_batch_size)
        
        # use the target values for training the model
        target_y = []
        x = []
        
        # go through each of the transitions for gradient descent
        for i, (old_state, action_direction, reward, new_state, done) in enumerate(mini_batch):
            actual = reward
            if done == False:
                inputs = np.array(new_state).reshape((1, 2))
                move_prediction = np.array(self.target_model.predict(inputs)[0])
                next_state_q_val = max(move_prediction)
                actual = reward + (self.discount_rate * next_state_q_val)
            
            target_y.append(actual)
            x.append(list(old_state))
            
        target_y = np.array(target_y).reshape((len(target_y), 1))
        x = np.array(x).reshape((len(x), 2))
        self.model.fit(x, target_y)
        
        
    # run the game multiple times across the env
    def run_game(self):
        for i in range(self.episodes):
            print(f"EPISODE {i}")
            env = Environment()
            
            for j in range(self.time_allowed_in_game):
                # get the values for the transition tuple to add to replay memory
                old_state = (env.current_point_x, env.current_point_y)
                action_direction = self.select_action(old_state)
                reward = env.move(action_direction)
                new_state = (env.current_point_x, env.current_point_y)
                done = env.is_done()
                self.replay_memory.append((old_state, action_direction, reward, new_state, done))
    
                # increase the train and target values
                self.t_train_current += 1
                self.t_target_current += 1
                
                # update the weights of the train model
                if self.t_train_current  % self.t_train_threshold == 0:
                    self.train()
                
                # update the target model with the weight of the trained model
                if self.t_target_current % self.t_target_threshold == 0:
                    self.target_model.set_weights(self.model.get_weights())
                
                # break out of loop since it is completed - no longer wait on timesteps
                if done:
                    break
            
            # modify the epsilon value after each episode
            # encourage exploration at the beginning then exploitation
            self.epsilon = max(self.epsilon * self.epsilon_multiplier, self.min_epsilon)

In [60]:
agent = Agent()
agent.run_game()

EPISODE 0
EPISODE 1
EPISODE 2
TRAINING
EPISODE 3
TRAINING
EPISODE 4
TRAINING
TRAINING
EPISODE 5
EPISODE 6
EPISODE 7
TRAINING
TRAINING
EPISODE 8
TRAINING
TRAINING
EPISODE 9
TRAINING
EPISODE 10
TRAINING
TRAINING
EPISODE 11
TRAINING
TRAINING
EPISODE 12
TRAINING
TRAINING
EPISODE 13
TRAINING
EPISODE 14
TRAINING
EPISODE 15
TRAINING
TRAINING
EPISODE 16
TRAINING
TRAINING
EPISODE 17
TRAINING
TRAINING
EPISODE 18
TRAINING
TRAINING
EPISODE 19
TRAINING
TRAINING
EPISODE 20
TRAINING
TRAINING
EPISODE 21
TRAINING
TRAINING
EPISODE 22
TRAINING
EPISODE 23
TRAINING


KeyboardInterrupt: 

In [50]:
agent.model.get_weights()

[array([[-0.9204836 , -0.647189  , -0.31126982],
        [ 1.0514965 ,  0.6012424 , -0.46006852]], dtype=float32),
 array([0., 0., 0.], dtype=float32),
 array([[ 0.10577381, -0.7944095 ,  0.8722613 ,  0.13030863],
        [-0.7834987 ,  0.04127634,  0.47940183,  0.37097836],
        [ 0.2646073 , -0.15513116, -0.28247386, -0.06056374]],
       dtype=float32),
 array([0., 0., 0., 0.], dtype=float32)]

In [51]:
env_test = Environment()
env_test.current_point_x = 1
env_test.current_point_y = 1
env_test.maze[env_test.current_point_y, env_test.current_point_x] = -1

#env_test.end_point_y = 9
state = (env_test.current_point_x, env_test.current_point_y)
#state = (env_test.end_point_x, env_test.end_point_y)
            
for j in range(100):
    x = state[0]
    y = state[1]
    inputs = np.array([x, y]).reshape((1, 2))
    move_prediction = np.array(agent.model.predict(inputs)[0])
    print(move_prediction)
    prediction_action = np.argmax(move_prediction)
    env_test.move(prediction_action)
    state = (env_test.current_point_x, env_test.current_point_y)
    print(state, prediction_action)
    env_test.maze[env_test.current_point_y, env_test.current_point_x] = 1
    done = env_test.is_done()
    if done:
        print("DONE")
        break
        
print(env_test.maze)

[ 0.01385774 -0.10407791  0.1142775   0.01707211]
(1, 0) 2
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 0
[0. 0. 0. 0.]
(0, 0) 

In [47]:
env_test = Environment()

for row in range(10):
    arr = []
    for col in range(10):
        inputs = np.array([col, row]).reshape((1, 2))
#         print(row, col)
        move_prediction = np.array(agent.model.predict(inputs)[0])
        prediction_action = np.argmax(move_prediction)
        if prediction_action == 0:
            arr.append("<")
        elif prediction_action == 1:
            arr.append(">")
        elif prediction_action == 2:
            arr.append("^")
        else:
            arr.append("v")
    print(arr)

['v', '<', '<', '<', '<', '<', '<', '<', '<', '<']
['v', '<', '<', '<', '<', '<', '<', '<', '<', '<']
['v', '<', '<', '<', '<', '<', '<', '<', '<', '<']
['v', '<', '<', '<', '<', '<', '<', '<', '<', '<']
['v', 'v', '<', '<', '<', '<', '<', '<', '<', '<']
['v', 'v', '<', '<', '<', '<', '<', '<', '<', '<']
['v', 'v', '<', '<', '<', '<', '<', '<', '<', '<']
['v', 'v', '<', '<', '<', '<', '<', '<', '<', '<']
['v', 'v', '<', '<', '<', '<', '<', '<', '<', '<']
['v', 'v', 'v', '<', '<', '<', '<', '<', '<', '<']
