# Prototype 3

 <font size="3"> Prototype 3 introduces fixes that seek to solve the problem that a two-player environment brings to the deep-Q-learning algorithm. This requires a revamping of the indexing system. Every record must be seperated by which side's move has produced the result. This makes sure that a checkmate from white does not back-propagate high Q values for black's mistakes that lead to the checkmate.</font>

In [1]:
import numpy as np
import tensorflow as tf

gamma = 0.99  
epsilon = 1  
epsilon_min = 0.1  
epsilon_max = 1.0  
epsilon_interval = (
    epsilon_max - epsilon_min
)  
batch_size = 32  
max_steps_per_episode = 200
num_actions = 4096

In [3]:
from IPython.display import clear_output
from tensorflow import keras
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)

running_reward = 0
episode_count = 0
frame_count = 0

epsilon_random_frames = 50000
epsilon_greedy_frames = 1000000.0
max_memory_length = 100000
update_after_actions = 4
update_target_network = 100
loss_function = keras.losses.Huber()
len_episodes = 0
iterations = 1000

In [4]:
import chess
def filter_legal_moves(board,logits):
    filter_mask = np.zeros(logits.shape)
    legal_moves = board.legal_moves
    for legal_move in legal_moves:
        from_square = legal_move.from_square
        to_square = legal_move.to_square
        idx = move2num[chess.Move(from_square,to_square)]
        filter_mask[idx] = 1
    new_logits = logits*filter_mask
    return new_logits

num2move = {}
move2num = {}
counter = 0
for from_sq in range(64):
    for to_sq in range(64):
        num2move[counter] = chess.Move(from_sq,to_sq)
        move2num[chess.Move(from_sq,to_sq)] = counter
        counter += 1

def translate_board(board): 
    pgn = board.epd()
    foo = []  
    pieces = pgn.split(" ", 1)[0]
    rows = pieces.split("/")
    for row in rows:
        foo2 = []  
        for thing in row:
            if thing.isdigit():
                for i in range(0, int(thing)):
                    foo2.append(chess_dict['.'])
            else:
                foo2.append(chess_dict[thing])
        foo.append(foo2)
    return np.array(foo)

chess_dict = {
    'p' : [1,0,0,0,0,0,0,0,0,0,0,0],
    'P' : [0,0,0,0,0,0,1,0,0,0,0,0],
    'n' : [0,1,0,0,0,0,0,0,0,0,0,0],
    'N' : [0,0,0,0,0,0,0,1,0,0,0,0],
    'b' : [0,0,1,0,0,0,0,0,0,0,0,0],
    'B' : [0,0,0,0,0,0,0,0,1,0,0,0],
    'r' : [0,0,0,1,0,0,0,0,0,0,0,0],
    'R' : [0,0,0,0,0,0,0,0,0,1,0,0],
    'q' : [0,0,0,0,1,0,0,0,0,0,0,0],
    'Q' : [0,0,0,0,0,0,0,0,0,0,1,0],
    'k' : [0,0,0,0,0,1,0,0,0,0,0,0],
    'K' : [0,0,0,0,0,0,0,0,0,0,0,1],
    '.' : [0,0,0,0,0,0,0,0,0,0,0,0],
}

In [5]:
from keras.layers import Dense,Flatten,Reshape
from keras.layers.convolutional import Conv2D
from keras.models import Model, Input
class Q_model():
    def __init__(self):
        self.model = self.create_q_model()

    def create_q_model(self):
    # Network defined by the Deepmind paper
        input_layer = Input(shape=(8, 8, 12))

        # Convolutions on the frames on the screen
        x = Conv2D(filters=64,kernel_size = 2,strides = (2,2))(input_layer)
        x = Conv2D(filters=128,kernel_size=2,strides = (2,2))(x)
        x = Conv2D(filters=256,kernel_size=2,strides = (2,2))(x)
        x = Flatten()(x)

        action = Dense(4096,activation = 'softmax')(x)
        return Model(inputs=input_layer, outputs=action)
    
    def predict(self,env):
        state_tensor = tf.convert_to_tensor(env.translate_board())
        state_tensor = tf.expand_dims(state_tensor, 0)
        action_probs = self.model(state_tensor, training=False)
        action_space = filter_legal_moves(env.board,action_probs[0])
        action = np.argmax(action_space, axis=None)
        move= num2move[action]
        return move,action
    
    def explore(self,env):
        action_space = np.random.randn(4096)
        action_space = filter_legal_moves(env.board,action_space)
        action = np.argmax(action_space, axis=None)
        move= num2move[action]
        return move,action
        
    
model = Q_model()
model_target = Q_model()

In [6]:
import chess
class ChessEnv():
    def __init__(self):
        self.board = chess.Board()
        self.action_history = {
            'white' : [],
            'black' : [],
        }
        self.state_history = {
            'white' : [],
            'black' : [],
        }
        self.state_next_history = {
            'white' : [],
            'black' : [],
        }
        self.rewards_history = {
            'white' : [],
            'black' : [],
        }
        self.done_history = {
            'white' : [],
            'black' : [],
        }
        self.episode_reward_history = []
        self.move_counter = 1
        self.fast_counter = 0
        self.pgn = ''
        self.pgns = []
        pass
    def translate_board(self):
        return translate_board(self.board)
    
    def reset(self):
        self.board = chess.Board()
        self.pgns.append(self.pgn)
        self.move_counter = 1
        self.fast_counter = 0
        self.pgn = ''
        for turn in self.rewards_history.keys():
            if len(self.rewards_history[turn]) > max_memory_length:
                del self.rewards_history[turn][:1]
                del self.state_history[turn][:1]
                del self.state_next_history[turn][:1]
                del self.action_history[turn][:1]
                del self.done_history[turn][:1]

        if len(self.pgns) > 1000:
          self.pgns.pop(-1)
        return translate_board(self.board)

    def update_pgn(self,move):
      if self.fast_counter % 2 == 0:
          self.pgn += str(self.move_counter)+ '.'
          self.move_counter += 1
      self.fast_counter += 1
      string = str(self.board.san(move))+' '
      self.pgn+=string
      
    
    def step(self,action):
        if self.board.turn:
            turn = 'white'
            opp = 'black'
        else:
            turn = 'black'
            opp = 'white'
        reward = 0
        
        state = self.translate_board()
        self.update_pgn(action)
        self.board.push(action)
        
        state_next = self.board
        state_next = translate_board(state_next)
        
        if self.board.is_checkmate():
            reward = 100

        env.done = self.board.is_game_over()

        self.action_history[turn].append(move2num[action])
        self.state_history[turn].append(state)
        self.state_next_history[turn].append(state_next)
        self.done_history[turn].append(self.done)
        self.rewards_history[turn].append(reward)
        self.rewards_history[opp].append(-reward)
        
    def update_q_values(self):
        sides = ['white','black']
        state_samples = []
        masks = []
        updated_q_values = []
        for turn in sides:
            indices = np.random.choice(range(len(self.done_history[turn])), size=batch_size)
            #Not only the iterations that have been complete. Using done_history to measure len is arbitrary
                
            state_sample = np.array([self.state_history[turn][i] for i in indices])
            state_next_sample = np.array([self.state_next_history[turn][i] for i in indices])
            rewards_sample = [self.rewards_history[turn][i] for i in indices]
            action_sample = [self.action_history[turn][i] for i in indices]
            done_sample = tf.convert_to_tensor(
                [float(self.done_history[turn][i]) for i in indices]
            )
            
            future_rewards = model_target.model.predict(state_next_sample)
            
            updated_q_values = rewards_sample + gamma * tf.reduce_max(
                future_rewards, axis=1
            )

            updated_q_values = updated_q_values * (1 - done_sample) - done_sample
            masks = tf.one_hot(action_sample, num_actions)
            
            state_samples.append(state_sample)
            masks.append(masks)
            updated_q_values.append(updated_q_values)
        return state_sample,masks,updated_q_values
    
env = ChessEnv()

In [7]:
from tensorflow import keras
for _ in range(iterations):
    state = np.array(env.reset())
    episode_reward = 0
    len_episodes += 1
    for timestep in range(1, max_steps_per_episode):
        frame_count += 1

        if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
            move,action = model.explore(env)
        else:
            move,action = model.predict(env)
            
        epsilon -= epsilon_interval / epsilon_greedy_frames
        epsilon = max(epsilon, epsilon_min)
        
        env.step(move)

        if frame_count % update_after_actions == 0 and len(env.done_history) > batch_size:
            state_samples,masks,updated_q_values = env.update_q_values()
            
            for i in range(len(state_samples)):
                with tf.GradientTape() as tape:
                    q_values = model.model(state_samples[i])
                    q_action = tf.reduce_sum(tf.multiply(q_values, masks[i]), axis=1)
                    loss = loss_function(updated_q_values[i], q_action)

                grads = tape.gradient(loss, model.model.trainable_variables)
                optimizer.apply_gradients(zip(grads, model.model.trainable_variables))

        if frame_count % update_target_network == 0:
            model_target.model.set_weights(model.model.get_weights())
            template = "episode {}, frame count {}"
            print(template.format(episode_count, frame_count))
            
        env.episode_reward_history.append(episode_reward)
        if env.done:
            break

    episode_count += 1

episode 0, frame count 100
episode 1, frame count 200
episode 1, frame count 300
episode 2, frame count 400
episode 2, frame count 500
episode 3, frame count 600
episode 3, frame count 700
episode 4, frame count 800
episode 4, frame count 900
episode 5, frame count 1000
episode 5, frame count 1100


KeyboardInterrupt: 