In [3]:
import numpy as np
import random
import datetime

from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.advanced_activations import PReLU
# Define constants for actions
LEFT = 0
UP = 1
RIGHT = 2
DOWN = 3

visited_mark = 0.8  # The visited cells are marked by an 80% gray shade.
pirate_mark = 0.5   # The current cell where the pirate is located is marked by a 50% gray shade.

class TreasureMaze(object):
    def __init__(self, maze, pirate=(0, 0)):
        self._maze = np.array(maze)
        nrows, ncols = self._maze.shape
        self.target = (nrows - 1, ncols - 1)   # target cell where the "treasure" is
        self.free_cells = [(r, c) for r in range(nrows) for c in range(ncols) if self._maze[r, c] == 1.0]
        self.free_cells.remove(self.target)
        if self._maze[self.target] == 0.0:
            raise Exception("Invalid maze: target cell cannot be blocked!")
        if not pirate in self.free_cells:
            raise Exception("Invalid Pirate Location: must sit on a free cell")
        self.reset(pirate)

    def reset(self, pirate):
        self.pirate = pirate
        self.maze = np.copy(self._maze)
        nrows, ncols = self.maze.shape
        row, col = pirate
        self.maze[row, col] = pirate_mark
        self.state = (row, col, 'start')
        self.min_reward = -0.5 * self.maze.size
        self.total_reward = 0
        self.visited = set()

    def update_state(self, action):
        nrows, ncols = self.maze.shape
        nrow, ncol, nmode = pirate_row, pirate_col, mode = self.state

        if self.maze[pirate_row, pirate_col] > 0.0:
            self.visited.add((pirate_row, pirate_col))

        valid_actions = self.valid_actions()

        if not valid_actions:
            nmode = 'blocked'
        elif action in valid_actions:
            nmode = 'valid'
            if action == LEFT:
                ncol -= 1
            elif action == UP:
                nrow -= 1
            if action == RIGHT:
                ncol += 1
            elif action == DOWN:
                nrow += 1
        else:
            mode = 'invalid'

        self.state = (nrow, ncol, nmode)

    def get_reward(self):
        pirate_row, pirate_col, mode = self.state
        nrows, ncols = self.maze.shape
        if pirate_row == nrows - 1 and pirate_col == ncols - 1:
            return 1.0
        if mode == 'blocked':
            return self.min_reward - 1
        if (pirate_row, pirate_col) in self.visited:
            return -0.25
        if mode == 'invalid':
            return -0.75
        if mode == 'valid':
            return -0.04

    def act(self, action):
        self.update_state(action)
        reward = self.get_reward()
        self.total_reward += reward
        status = self.game_status()
        envstate = self.observe()
        return envstate, reward, status

    def observe(self):
        canvas = self.draw_env()
        envstate = canvas.reshape((1, -1))
        return envstate

    def draw_env(self):
        canvas = np.copy(self.maze)
        nrows, ncols = self.maze.shape
        for r in range(nrows):
            for c in range(ncols):
                if canvas[r, c] > 0.0:
                    canvas[r, c] = 1.0
        row, col, valid = self.state
        canvas[row, col] = pirate_mark
        return canvas

    def game_status(self):
        if self.total_reward < self.min_reward:
            return 'lose'
        pirate_row, pirate_col, mode = self.state
        nrows, ncols = self.maze.shape
        if pirate_row == nrows - 1 and pirate_col == ncols - 1:
            return 'win'
        return 'not_over'

    def valid_actions(self, cell=None):
        if cell is None:
            row, col, mode = self.state
        else:
            row, col = cell
        actions = [0, 1, 2, 3]
        nrows, ncols = self.maze.shape
        if row == 0:
            actions.remove(1)
        elif row == nrows - 1:
            actions.remove(3)

        if col == 0:
            actions.remove(0)
        elif col == ncols - 1:
            actions.remove(2)

        if row > 0 and self.maze[row - 1, col] == 0.0:
            actions.remove(1)
        if row < nrows - 1 and self.maze[row + 1, col] == 0.0:
            actions.remove(3)

        if col > 0 and self.maze[row, col - 1] == 0.0:
            actions.remove(0)
        if col < ncols - 1 and self.maze[row, col + 1] == 0.0:
            actions.remove(2)

        return actions

class GameExperience(object):
    def __init__(self, model, max_memory=100, discount=0.95):
        self.model = model
        self.max_memory = max_memory
        self.discount = discount
        self.memory = list()
        self.num_actions = model.output_shape[-1]

    def remember(self, episode):
        self.memory.append(episode)
        if len(self.memory) > self.max_memory:
            del self.memory[0]

    def predict(self, envstate):
        return self.model.predict(envstate)[0]

    def get_data(self, data_size=10):
        env_size = self.memory[0][0].shape[1]
        mem_size = len(self.memory)
        data_size = min(mem_size, data_size)
        inputs = np.zeros((data_size, env_size))
        targets = np.zeros((data_size, self.num_actions))
        for i, j in enumerate(np.random.choice(range(mem_size), data_size, replace=False)):
            envstate, action, reward, envstate_next, game_over = self.memory[j]
            inputs[i] = envstate
            targets[i] = self.predict(envstate)
            Q_sa = np.max(self.predict(envstate_next))
            if game_over:
                targets[i, action] = reward
            else:
                targets[i, action] = reward + self.discount * Q_sa
        return inputs, targets
def format_time(seconds):
    if seconds < 400:
        s = float(seconds)
        return "%.1f seconds" % (s,)
    elif seconds < 4000:
        m = seconds / 60.0
        return "%.2f minutes" % (m,)
    else:
        h = seconds / 3600.0
        return "%.2f hours" % (h,)


# Define a function to build the neural network model
def build_model(maze):
    model = Sequential()
    model.add(Dense(maze.size, input_shape=(maze.size,)))
    model.add(PReLU())
    model.add(Dense(maze.size))
    model.add(PReLU())
    model.add(Dense(num_actions))
    model.compile(optimizer='adam', loss='mse')
    return model
def completion_check(model, qmaze):
    # Get the current state of the agent in the maze
    current_state = qmaze.observe()
    
    # Predict the Q-values for the current state using the model
    q_values = model.predict(current_state)
    
    # Get the action with the highest Q-value
    action = np.argmax(q_values)
    
    # Check if the action leads the agent to the goal cell
    if action in qmaze.valid_actions():
        return True
    
    return False
def play_game(model, qmaze, start):
    qmaze.reset(start)
    envstate = qmaze.observe()
    while True:
        prev_envstate = envstate
        q = model.predict(prev_envstate)
        action = np.argmax(q[0])
        envstate, reward, game_status = qmaze.act(action)
        if game_status == 'win':
            print("Pirate found the treasure!")
            break
        elif game_status == 'lose':
            print("Pirate got lost!")
            break

def qtrain(model, maze, **opt):
    global epsilon
    
    n_epoch = opt.get('n_epoch', 15000)
    max_memory = opt.get('max_memory', 1000)
    data_size = opt.get('data_size', 50)
    
    start_time = datetime.datetime.now()
    
    qmaze = TreasureMaze(maze)
    experience = GameExperience(model, max_memory=max_memory)
    
    win_history = []
    hsize = qmaze.maze.size // 2
    win_rate = 0.0
    
    for epoch in range(n_epoch):
        agent_cell = random.choice(qmaze.free_cells)
        qmaze.reset(agent_cell)
        envstate = qmaze.observe()
        n_episodes = 0
        
        while True:
            prev_envstate = envstate
            if np.random.rand() < epsilon:
                action = np.random.randint(0, num_actions)
            else:
                q = model.predict(prev_envstate)
                action = np.argmax(q[0])
            
            envstate, reward, game_status = qmaze.act(action)
            episode = [prev_envstate, action, reward, envstate, game_status]
            experience.remember(episode)
            
            inputs, targets = experience.get_data(data_size=data_size)
            loss = model.train_on_batch(inputs, targets)
            
            n_episodes += 1
            
            if game_status == 'win':
                win_history.append(1)
                break
            elif game_status == 'lose':
                win_history.append(0)
                break
        
        if epoch % hsize == 0:
            if len(win_history) > hsize and np.mean(win_history[-hsize:]) > 0.9:
                epsilon = 0.05
        
        dt = datetime.datetime.now() - start_time
        t = format_time(dt.total_seconds())
        template = "Epoch: {:03d}/{:d} | Loss: {:.4f} | Episodes: {:d} | Win count: {:d} | Win rate: {:.3f} | Time: {}"
        print(template.format(epoch, n_epoch-1, loss, n_episodes, sum(win_history), win_rate, t))
        
        if sum(win_history[-hsize:]) == hsize:
            if completion_check(model, qmaze):
                print("Reached 100%% win rate at epoch: %d" % int(epoch))
                break
    
    dt = datetime.datetime.now() - start_time
    seconds = dt.total_seconds()
    t = format_time(seconds)
    
    print("n_epoch: %d, max_mem: %d, data: %d, time: %s" % (epoch, max_memory, data_size, t))
    return seconds

# Set exploration factor
epsilon = 0.1

# Actions dictionary
actions_dict = {
    LEFT: 'left',
    UP: 'up',
    RIGHT: 'right',
    DOWN: 'down',
}

num_actions = len(actions_dict)

# Define your maze as a 2D numpy array
maze = np.array([
    [1., 0., 1., 1., 1., 1., 1., 1.],
    [1., 0., 1., 1., 1., 0., 1., 1.],
    [1., 1., 1., 1., 0., 1., 0., 1.],
    [1., 1., 1., 0., 1., 1., 1., 1.],
    [1., 1., 0., 1., 1., 1., 1., 1.],
    [1., 1., 1., 0., 1., 0., 0., 0.],
    [1., 1., 1., 0., 1., 1., 1., 1.],
    [1., 1., 1., 1., 0., 1., 1., 1.]
])

# Create and build the neural network model
model = build_model(maze)
# Define the starting position for the pirate
pirate_start = (0, 0)
# Train the model using deep Q-learning
qtrain(model, maze, n_epoch=10, max_memory=8*maze.size, data_size=32)
# Create the TreasureMaze environment
qmaze = TreasureMaze(maze, pirate_start)
# Test the model for one game
pirate_start = (0, 0)
play_game(model, qmaze, pirate_start)

Epoch: 000/9 | Loss: 0.0000 | Episodes: 132 | Win count: 0 | Win rate: 0.000 | Time: 26.5 seconds
Epoch: 001/9 | Loss: 0.0001 | Episodes: 134 | Win count: 0 | Win rate: 0.000 | Time: 59.3 seconds
Epoch: 002/9 | Loss: 0.0007 | Episodes: 139 | Win count: 0 | Win rate: 0.000 | Time: 94.8 seconds
Epoch: 003/9 | Loss: 0.0007 | Episodes: 138 | Win count: 0 | Win rate: 0.000 | Time: 130.4 seconds
Epoch: 004/9 | Loss: 0.0010 | Episodes: 64 | Win count: 1 | Win rate: 0.000 | Time: 147.0 seconds
Epoch: 005/9 | Loss: 0.0013 | Episodes: 138 | Win count: 1 | Win rate: 0.000 | Time: 181.5 seconds
Epoch: 006/9 | Loss: 0.0014 | Episodes: 150 | Win count: 1 | Win rate: 0.000 | Time: 219.7 seconds
Epoch: 007/9 | Loss: 0.0004 | Episodes: 133 | Win count: 1 | Win rate: 0.000 | Time: 254.5 seconds
Epoch: 008/9 | Loss: 0.0011 | Episodes: 94 | Win count: 2 | Win rate: 0.000 | Time: 280.0 seconds
Epoch: 009/9 | Loss: 0.0014 | Episodes: 150 | Win count: 2 | Win rate: 0.000 | Time: 320.1 seconds
n_epoch: 9, max