In [None]:
import numpy as np
from time import sleep
import tensorflow as tf
import sys
import os

In [None]:
#number of episode per game
NUM_EPS = 10

#walks
DX = [-1, +1,  0, 0 ]
DY = [ 0,  0, -1, +1]

MIN_DIM = 1000
STEP_LIM = 1000

In [None]:
class Board_Env:

    global NUM_EPS, DX, DY, STEP_LIM, MIN_DIM

    def __init__(self, file_name, show_progress=True, random_play=False, gamma=1, epsilon=0.5):

        """
        file_name: file where the game's configs are presented,
        show_progress: either to display training progress or not,
        random_play: to include some randomness in the play,
        gamma: discount rate,
        epsilon: the value of epsilon in Greedy policy approach
        """

        #read game configs
        with open(file_name+'.txt') as f:
            self.board = f.readlines()

        self.board = [row.rstrip('\n') for row in self.board] 

        self.n_rows = len(self.board)
        self.n_columns = len(self.board[0])
        self.poison_reward = -10
        self.treasure_reward = 10
        self.empty_reward = -1
        
        #three states and four actions (walks)
        self.q_table = np.zeros((1, 4))

        #starting position
        self.si = 0
        self.sj = 0
        
        self.random_play = random_play

        #discount rate
        self.gamma = gamma

        #epsilon-Greedy parameter
        self.epsilon = epsilon

        #to display the progress
        self.show_progress = show_progress

        #DL inits{
        inputs = tf.keras.Input(shape=(3,))
        x = tf.keras.layers.Dense(3, activation='sigmoid')(inputs)
        x = tf.keras.layers.Dense(3, activation='sigmoid')(x)
        predictions = tf.keras.layers.Dense(1)(x)
        
        self.model = tf.keras.Model(inputs=inputs, outputs=predictions)
        
        # The compile step specifies the training configuration.
        self.model.compile(optimizer=tf.keras.optimizers.RMSprop(0.001),
					loss='mse',
					metrics=['accuracy'])
        # }

    def check_if_position_invalide(self, i, j):
        return i < 0 or i >= self.n_rows or j < 0 or j >= self.n_columns

    #as computing q function
    def predict(self, i, j, a):
        x = [i, j, a]
        pred = self.model.predict(np.array([x]))
        return pred[0][0]
        
    # update q_table
    def update(self, i, j, a, y):

        x = [i, j, a, y]
        self.q_table = np.vstack((self.q_table, np.array(x)))

        if self.q_table.shape[0] > MIN_DIM:
            train_data = self.q_table[:,0:3]
            train_labels = self.q_table[:,-1]
            old_stdout = sys.stdout
            sys.stdout = open(os.devnull, "w")
            self.model.fit(train_data, train_labels, batch_size=20, epochs=2, verbose=0)
            sys.stdout.close()
            sys.stdout = old_stdout
            self.q_table = np.zeros((1,4))

    # find the initial position of the agent
    def find_init_position(self):
        for i in range(len(self.board)):
            for j in range(len(self.board[0])):
                if self.board[i][j] == 'A':
                    return i,j
        return -1,-1


    def board_print(self,_board):
        print('------------------')
        for r in _board:
            for e in r:
                print(e, end='')
            print()
        sleep(0.3) # animation delay

    def train(self):

        if self.random_play:
            return

        self.si, self.sj = self.find_init_position()

        for _ in range(NUM_EPS):
            i, j = self.si, self.sj
            gameover = False
            steps_limit = STEP_LIM

            while not gameover and steps_limit > 0:
                steps_limit -= 1
                #choose an action 
                if np.random.uniform(0, 1) <= self.epsilon:
                    a = np.random.randint(0, 4)
                else:
                    temp = [self.predict(i, j, q) for q in range(4)]
                    a = temp.index(max(temp)) 
                
                ii, jj = i, j
                i += DX[a]
                j += DY[a]
                #print("i,j=",i,j)

                if self.check_if_position_invalide(i,j):
                    y = 0
                    gameover = True

                #invaild case (on an obstacle)
                elif self.board[i][j] == 'W':
                    y = 0
                    gameover = True

                #terminial state (win)
                elif self.board[i][j] == 'T':
                    y = self.treasure_reward
                    gameover = True
                
                #terminial state (lose)
                elif self.board[i][j] == 'P':
                    y = self.poison_reward
                    gameover = True

                #empty
                else:
                    y = self.empty_reward
                
                if not gameover:
                    temp = [self.predict(i, j, q) for q in range(4)]
                    y += self.gamma*max(temp)

                if y != 0:
                    self.update(ii, jj, a, y)
    
    def play(self):

        gameover = False
        win = False
        i,j = self.find_init_position()

        # make a copy of hte board to use for display
        pb = [list(self.board[q]) for q in range(len(self.board))]
        
        steps_limit = STEP_LIM
        while not gameover and steps_limit > 0:

            temp = [self.predict(i, j, q) for q in range(4)]
            pb[i][j] = 'A'

            if self.show_progress:
                self.board_print(pb)
            
            while steps_limit>0:
                steps_limit -= 1
                is_random_choice = False

                if self.random_play or np.random.uniform(0, 1) <= self.epsilon:

                    a = np.random.randint(0, 4)
                    is_random_choice = True #and (not self.random_play)
                else:
                    a = temp.index(max(temp)) 

                pb[i][j] = '.'
                i += DX[a]
                j += DY[a]
                
                if self.check_if_position_invalide(i,j) or self.board[i][j] == 'W' or (self.board[i][j] == 'P' and is_random_choice):
                    i -= DX[a]
                    j -= DY[a]
                    temp[a] = -1e99
                    continue
                break
                
            
            if self.check_if_position_invalide(i,j):
                gameover = True

            elif self.board[i][j] == 'W':
                gameover = True

            elif self.board[i][j] == 'T':
                gameover = True
                win = True

            elif self.board[i][j] == 'P':
                gameover = True
        
        if self.show_progress:
            sleep(1)
        
        if win:
            return 1
        return 0    
    

In [None]:
number_boards = 1 #number of different game's configs
repeat_times = 1 #playing same game's configs several times

gamma_vals = [0.01, 0.2, 0.9, 3, 10]
gamma_accuracies = []

best_accuracy = 0
best_gamma = 0

#searching for best disount rate (gamma) value
for gamma in gamma_vals:
    total_games = 0
    number_wins = 0
    for file_idx in range(number_boards):
        for _ in range(repeat_times):
            game = Board_Env(file_name=str(file_idx+1), show_progress=False, random_play= False, gamma=gamma, epsilon=0.5)
            game.train()
            number_wins += game.play()
            total_games += 1
    
    curr_acc = number_wins/total_games

    print(f"Gamma = {gamma}, Accuracy = {curr_acc}")
    gamma_accuracies.append(curr_acc)

    if curr_acc > best_accuracy:
        best_accuracy = curr_acc
        best_gamma = gamma

print(f"Best accuracy = {best_accuracy}, Best gamma = {best_gamma}")


epsilon_vals = [0.01, 0.2, 0.5, 0.8, 1]
epsilon_accuracies = []
    
best_accuracy = 0
best_epsilon = 0

for epsilon in epsilon_vals:
    total_games = 0
    number_wins = 0
    for file_idx in range(number_boards):
        for _ in range(repeat_times):
            game = Board_Env(file_name=str(file_idx+1), show_progress=False, random_play=False, gamma=best_gamma, epsilon=epsilon)
            game.train()
            number_wins += game.play()
            total_games += 1
    
    curr_acc = number_wins/total_games

    print(f"Epsilon = {epsilon}, Accuracy = {curr_acc}")
    epsilon_accuracies.append(curr_acc)

    if curr_acc > best_accuracy:
        best_accuracy = curr_acc
        best_epsilon = epsilon

print(f"Best accuracy {best_accuracy}, Best epsilon = {best_epsilon}")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Gamma = 10, Accuracy = 1.0
Best accuracy = 1.0, Best gamma = 0.01
Epsilon = 0.01, Accuracy = 1.0
Epsilon = 0.2, Accuracy = 1.0
Epsilon = 0.5, Accuracy = 1.0
Epsilon = 0.8, Accuracy = 1.0
Epsilon = 1, Accuracy = 1.0
Best accuracy 1.0, Best epsilon = 0.01


In [23]:
print(gamma_vals, gamma_accuracies)
print(epsilon_vals, epsilon_accuracies)

[0.01, 0.2, 0.9, 3, 10] [1.0, 1.0, 1.0, 1.0, 1.0]
[0.01, 0.2, 0.5, 0.8, 1] [1.0, 1.0, 1.0, 1.0, 1.0]


In [24]:
game.model.summary()

Model: "model_14"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_15 (InputLayer)       [(None, 3)]               0         
                                                                 
 dense_42 (Dense)            (None, 3)                 12        
                                                                 
 dense_43 (Dense)            (None, 3)                 12        
                                                                 
 dense_44 (Dense)            (None, 1)                 4         
                                                                 
Total params: 28
Trainable params: 28
Non-trainable params: 0
_________________________________________________________________
