In [4]:
import sys, math
import numpy as np
from gym_chess_env import ChessBoard_gym
import Box2D
from Box2D.b2 import (edgeShape, circleShape, fixtureDef, polygonShape, revoluteJointDef, contactListener)
# new line
import gym
from gym import spaces
from gym.utils import seeding
# import skvideo.io
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense,BatchNormalization
from collections import deque
from tensorflow.keras.activations import relu, linear
from tensorflow.keras.losses import mean_squared_error
from tensorflow.keras.optimizers import Adam, SGD
import random
from datetime import datetime

In [5]:
class DQN:
    def __init__(self, env):

        self.env = env
        self.action_space = env.action_space
        self.observation_space = env.observation_space
        self.counter = 0

        #######################
        # Change these parameters to improve performance
        self.density_first_layer = 512
        self.density_second_layer = 512
        self.num_epochs = 1
        self.batch_size = 64
        self.epsilon_min = 0.01

        # epsilon will randomly choose the next action as either
        # a random action, or the highest scoring predicted action
        self.epsilon = 1
        self.epsilon_decay = 0.999
        self.gamma = 0.995

        # Learning rate
        self.lr = 0.003
        self.rewards_list = []

        self.replay_memory_buffer = deque(maxlen=500000)
        self.num_action_space = self.action_space.n
#         self.num_observation_space = env.observation_space.shape[0]
# HARD CODED FOR NOW
        self.num_observation_space = 64

        self.model = self.initialize_model()

    def initialize_model(self):
        model = Sequential()
        model.add(Dense(self.density_first_layer, input_dim=self.num_observation_space, activation=relu))
        model.add(BatchNormalization())
        model.add(Dense(self.density_second_layer, activation=relu))
        model.add(BatchNormalization())
        model.add(Dense(self.num_action_space, activation=relu))

        # Compile the model
        model.compile(loss=mean_squared_error,optimizer=SGD(lr=self.lr))
        print(model.summary())
        return model

    def get_action(self, state):

        # The epsilon parameter decides whether we are using the 
        # Q-function to determine our next action 
        # or take a random sample of the action space. 
        if np.random.rand() < self.epsilon:
            return random.randrange(self.num_action_space)

        # Get a list of predictions based on the current state
        predicted_actions = self.model.predict(state)

        # Return the maximum-reward action
        return np.argmax(predicted_actions[0])

    def add_to_replay_memory(self, state, action, reward, next_state, done):
        self.replay_memory_buffer.append((state, action, reward, next_state, done))

    def learn_and_update_weights_by_reply(self):

        # replay_memory_buffer size check
        # if we have fewer than 64 actions in the buffer, 
        # or the counter is not 0, return
        if len(self.replay_memory_buffer) < self.batch_size or self.counter != 0:
            return

        # Early Stopping
        if np.mean(self.rewards_list[-10:]) > 180:
            return

        # Choose batch of random samples from the replay stack 
        random_sample = self.get_random_sample_from_replay_mem()

        # Get the values (in numpy array form) from the random batch of samples
        states, actions, rewards, next_states, done_list = self.get_attribues_from_sample(random_sample)

        # Use the Keras "predict_on_batch" feature to predict the targets
        # based on the random batch of next states in our replay stack
#         print('next states: \n ', next_states, '1- done_list: \n', 1- done_list)
#         pred = self.model.predict_on_batch(next_states)
#         print( "shape of pred is: ", pred.shape )
#         print ('prediction')
#         print (pred)
#         print('amax')
#         print( np.amax(self.model.predict_on_batch(next_states), axis=1) )        
        targets = rewards + self.gamma * (np.amax(self.model.predict_on_batch(next_states), axis=1)) * (1 - done_list)
        
        # Run a prediction on the states in our random sample
        target_vec = self.model.predict_on_batch(states)

        # Create a numpy array sized to match the batch_size
        indexes = np.array([i for i in range(self.batch_size)])

        # The target vector is an array of 
        # state predictions 
        target_vec[[indexes], [actions]] = targets

        # build a model with the existing states and target scores in batches of 64
#         print('shape of target_vec', target_vec.shape,' shape of state:', states.shape ,'\ntarget_vectors:', target_vec )
        self.model.fit(states, target_vec, epochs=self.num_epochs, verbose=0)

    def get_attribues_from_sample(self, random_sample):
        states = np.array([i[0] for i in random_sample])
        actions = np.array([i[1] for i in random_sample])
        rewards = np.array([i[2] for i in random_sample])
        next_states = np.array([i[3] for i in random_sample])
        done_list = np.array([i[4] for i in random_sample])
        states = np.squeeze(states)
        next_states = np.squeeze(next_states)
        return np.squeeze(states), actions, rewards, next_states, done_list

    # Get a batch_size sample of previous iterations
    def get_random_sample_from_replay_mem(self):
        random_sample = random.sample(self.replay_memory_buffer, self.batch_size)
        return random_sample

    # Run the keras predict using the current state as input.
    # This will choose the next step.
    def predict(self, current_state):
        return self.model.predict(current_state)

    def train(self, num_episodes=2000, can_stop=True):

        frames = []

        for episode in range(num_episodes):

            # state is a vector of 8 values:
            # x and y position
            # x and y velocity
            # lander angle and angular velocity
            # boolean for left leg contact with ground
            # boolean for right leg contact with ground
            state = env.reset()
            reward_for_episode = 0
            done = False
            state = np.reshape(state, [1, self.num_observation_space])
                
#             if episode > 0: state = env.reset()
            ctr = 0
            while not done:

                if episode % 50 == 0:
                    frame = env.get_FEN()
                    if ctr==100: env.print_board()

                if episode % 50 == 0:
                    if ctr % 200 == 0: print("rewards earned so far is: ", reward_for_episode)
                    frames.append(frame)      

                # use epsilon decay to choose the next state
                received_action = self.get_action(state)
                next_state, reward, done, info = env.step(received_action)

                # Reshape the next_state array to match the size of the observation space
                next_state = np.reshape(next_state, [1, self.num_observation_space])

                # Store the experience in replay memory
                self.add_to_replay_memory(state, received_action, reward, next_state, done)

                # add up rewards
                reward_for_episode += reward
                state = next_state
                self.update_counter()

                # update the model
                self.learn_and_update_weights_by_reply()
                ctr += 1
                
                #if done:
                #    break
            self.rewards_list.append(reward_for_episode)

            # Create a video from every 10th episode
#             if episode % 50 == 0:
#                 fname = "/tmp/videos/episode"+str(episode)+".mp4"
#                 skvideo.io.vwrite(fname, np.array(frames))
#                 del frames
#                 frames = []

            # Decay the epsilon after each experience completion
            if self.epsilon > self.epsilon_min:
                self.epsilon *= self.epsilon_decay
                #self.epsilon *= min(0.995,(self.epsilon_decay + counter*(0.000075)))

            # Check for breaking condition
            last_rewards_mean = np.mean(self.rewards_list[-100:])

            # Once the mean average of rewards is over 200, we can stop training
            if last_rewards_mean > 200 and can_stop:
                print("DQN Training Complete...")
                break
            print(episode, "\t: Episode || Reward: ",reward_for_episode, "\t|| Average Reward: ",last_rewards_mean, "\t epsilon: ", self.epsilon )

    def update_counter(self):
        self.counter += 1
        step_size = 5
        self.counter = self.counter % step_size

    def save(self, name):
        self.model.save(name)

In [6]:
    rewards_list = []

    # Run 100 episodes to generate the initial training data
    #num_test_episode = 100

    # Create the OpenAI Gym Enironment with LunarLander-v2
    env = ChessBoard_gym()

    # set the numpy random number generatorseeds
    env.seed(21)
    np.random.seed(21)

    # max number of training episodes
    training_episodes = 200

    # number of test runs with a satisfactory number of good landings
    #high_score = 0
 
    # initialize the Deep-Q Network model
    model = DQN(env)

    # Train the model
    model.train(training_episodes, True)

    now = datetime.now()
    date_time = now.strftime("%Y%m%d-%H%M%S")
    model.save('model/chess-reinf_model' + date_time + '.h5')

    

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_3 (Dense)              (None, 512)               33280     
_________________________________________________________________
batch_normalization_2 (Batch (None, 512)               2048      
_________________________________________________________________
dense_4 (Dense)              (None, 512)               262656    
_________________________________________________________________
batch_normalization_3 (Batch (None, 512)               2048      
_________________________________________________________________
dense_5 (Dense)              (None, 13)                6669      
Total params: 306,701
Trainable params: 304,653
Non-trainable params: 2,048
_________________________________________________________________
None
rewards earned so far is:  0


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)





BL - BL - BL - BL - BL - BL - BL - BL -  

bp1 - BL - BL - bp4 - bp5 - bp6 - bp7 - bp8 -  

BL - bp2 - bp3 - BL - BL - BL - BL - BL -  

BL - BL - BL - BL - BL - BL - BL - BL -  

BL - BL - BL - BL - BL - BL - BL - BL -  

BL - BL - BL - wp4 - BL - wp6 - BL - BL -  

wp1 - wp2 - wp3 - BL - wp5 - BL - wp7 - wp8 -  

BL - BL - BL - BL - BL - BL - BL - BL -  




rewards earned so far is:  -172
rewards earned so far is:  -322
episode completed, board position is
0 	: Episode || Reward:  -396 	|| Average Reward:  -396.0 	 epsilon:  0.999
episode completed, board position is
1 	: Episode || Reward:  -396 	|| Average Reward:  -396.0 	 epsilon:  0.998001
episode completed, board position is
2 	: Episode || Reward:  -414 	|| Average Reward:  -402.0 	 epsilon:  0.997002999
episode completed, board position is
3 	: Episode || Reward:  -416 	|| Average Reward:  -405.5 	 epsilon:  0.996005996001
episode completed, board position is
4 	: Episode || Reward:  -406 	|| Average Reward:  -405.6 	 eps

### Testing

In [7]:

print("Starting Testing of the trained model...")

done = False
frames = []
num_test_episode = 5
num_observation_space = 64
high_score = 0

# Run some test episodes to see how well our model performs
for test_episode in range(num_test_episode):
    current_state = env.reset()
#     num_observation_space = env.observation_space.shape[0]
    current_state = np.reshape(current_state, [1, num_observation_space])
    reward_for_episode = 0
    done = False
    while not done:

        frame = env.get_FEN()
        frames.append(frame)

        selected_action = np.argmax(model.predict(current_state)[0])
        new_state, reward, done, info = env.step(selected_action)
        new_state = np.reshape(new_state, [1, num_observation_space])
        current_state = new_state
        reward_for_episode += reward
    rewards_list.append(reward_for_episode)
    print(test_episode, "\t: Episode || Reward: ", reward_for_episode)
    if reward_for_episode >= 200:
        high_score += 1
    if test_episode % 100 == 0:
        env.print_board()


now = datetime.now() # current date and time
rewards_mean = np.mean(rewards_list[-100:])
print("Average Reward: ", rewards_mean )
print("Total tests above 200: ", high_score)

Starting Testing of the trained model...
episode completed, board position is
0 	: Episode || Reward:  -500



BL - BL - BL - BL - BL - BL - BL - BL -  

bp1 - bp2 - bp3 - bp4 - bp5 - bp6 - bp7 - bp8 -  

BL - BL - BL - BL - BL - BL - BL - BL -  

BL - BL - BL - BL - BL - BL - BL - BL -  

BL - BL - BL - BL - BL - BL - BL - BL -  

BL - BL - BL - BL - BL - BL - BL - BL -  

wp1 - wp2 - wp3 - wp4 - wp5 - wp6 - wp7 - wp8 -  

BL - BL - BL - BL - BL - BL - BL - BL -  




episode completed, board position is
1 	: Episode || Reward:  -500
episode completed, board position is
2 	: Episode || Reward:  -500
episode completed, board position is
3 	: Episode || Reward:  -500
episode completed, board position is
4 	: Episode || Reward:  -500
Average Reward:  -500.0


NameError: name 'high_score' is not defined

In [None]:
num_observation_space, current_state