There are two major intermidiate steps when solving chess problem:
1. Move Chess:

>> Objective: find the shortest path between 2 spots on a chess board

>> Motivation: Move Chess effectively

2. Capture Chess

>> Objective: capture as many pieces from the opponent within centain fullmoves

>> Motivation: Piece captures as one way of reward.


Today we focus on Move chess

# Load and Import Package

In [None]:
!pip install --upgrade git+https://github.com/arjangroen/RLC.git 
!pip install python-chess  
# Python-Chess is the Python Chess Package that handles the chess environment

Collecting git+https://github.com/arjangroen/RLC.git
  Cloning https://github.com/arjangroen/RLC.git to /tmp/pip-req-build-g9iqp115
  Running command git clone -q https://github.com/arjangroen/RLC.git /tmp/pip-req-build-g9iqp115
Building wheels for collected packages: RLC
  Building wheel for RLC (setup.py) ... [?25l[?25hdone
  Created wheel for RLC: filename=RLC-0.3-cp36-none-any.whl size=22566 sha256=1d0be4daf5c123f601567b83cf2c80ca800b1b298a491c3ab7d60308b08e1e2f
  Stored in directory: /tmp/pip-ephem-wheel-cache-putlw2_u/wheels/04/68/a5/cb835cd3d76a49de696a942739c71a56bfe66d0d8ea7b4b446
Successfully built RLC
Installing collected packages: RLC
Successfully installed RLC-0.3


In [None]:
from RLC.move_chess.environment import Board
from RLC.move_chess.agent import Piece
from RLC.move_chess.learn import Reinforce
import inspect
import numpy as np

# Load Chess

In [None]:
p = Piece(piece='king') # select a chess agent (knight, bishop or rook)
env = Board() # 8*8 chess board
r = Reinforce(p,env) 
#env.render()
# env.visual_board # S start position, F is terminate position

In [None]:
r.visualize_policy()
#r.agent.policy

[['↑', '↑', '↑', '↑', '↑', '↑', '↑', '↑'],
 ['↑', '↑', '↑', '↑', '↑', '↑', '↑', '↑'],
 ['↑', '↑', '↑', '↑', '↑', '↑', '↑', '↑'],
 ['↑', '↑', '↑', '↑', '↑', '↑', '↑', '↑'],
 ['↑', '↑', '↑', '↑', '↑', '↑', '↑', '↑'],
 ['↑', '↑', '↑', '↑', '↑', '↑', '↑', '↑'],
 ['↑', '↑', '↑', '↑', '↑', '↑', '↑', '↑'],
 ['↑', '↑', '↑', '↑', '↑', 'F', '↑', '↑']]


In [None]:
r.agent.value_function

array([[0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0.]])

# 1.1 State Evaluation

If we want our agent to optimize its rewards, we want its policy to guide behavior towards the states with the highest value. This value can be estimated using bootstrapping:

A state (s) is as valuable (V) as the successor state (s') plus the reward (R) for going from s to s'.
Since there can be mulitple actions (a) and multiple successor states they are summed and weighted by their probability (pi).
In a non-deterministic environment, a given action could result in multiple successor states. We don't have to take this into account for this problem because move chess is a deterministic game.
Successor state values are discounted with discount factor (gamma) that varies between 0 and 1.

In [None]:
def evaluate_state(self, state, gamma=0.9, synchronous=True):
        """
        Calculates the value of a state based on the successor states and the immediate rewards.
        Args:
            state: tuple of 2 integers 0-7 representing the state
            gamma: float, discount factor
            synchronous: Boolean
        Returns: The expected value of the state under the current policy.
        """
        greedy_action_value = np.max(self.agent.policy[state[0], state[1], :]) # get max action value at give state
        greedy_indices = [i for i, a in enumerate(self.agent.policy[state[0], state[1], :]) if
                          a == greedy_action_value]  # list the index of all actions with max action value
        prob = 1 / len(greedy_indices)  # probability of an action occuring
        state_value = 0 # set V(S) = 0
        for i in greedy_indices: # for all actions with max action value
            self.env.state = state  # reset state to the one being evaluated
            reward, episode_end = self.env.step(self.agent.action_space[i]) # get reward
            if synchronous: # get successor state value
                successor_state_value = self.agent.value_function_prev[self.env.state] 
                # if synchronous, successor state value is in the same iteration of policy evaluation
            else:
                # if not,  successor state value could be previous or the current value funtion,or combined
                successor_state_value = self.agent.value_function[self.env.state] # otherwise 
            state_value += (prob * (
                    reward + gamma * successor_state_value))  # sum up rewards and discounted successor state value in equation(*)
        return state_value

In [None]:
state = (0,0)
r.agent.value_function[0,0] = r.evaluate_state(state,gamma=1)
r.agent.value_function.astype(int)

array([[-1,  0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0,  0]])

# 1.2 Policy Evaluation

Policy evaluation is the act of doe state evaluation for each state in the statespace
As you can see in my implementatin I simply iterate over all state and update the value function
This is the algorithm provided by Sutton and Barto:

In [None]:
print(inspect.getsource(r.evaluate_policy))

    def evaluate_policy(self, gamma=0.9, synchronous=True):
        self.agent.value_function_prev = self.agent.value_function.copy()  # For synchronous updates
        for row in range(self.agent.value_function.shape[0]):
            for col in range(self.agent.value_function.shape[1]):
                self.agent.value_function[row, col] = self.evaluate_state((row, col), gamma=gamma,
                                                                          synchronous=synchronous)



In [None]:
def evaluate_policy(self, gamma=0.9, synchronous=True):
        self.agent.value_function_prev = self.agent.value_function.copy()  # For synchronous updates
        for row in range(self.agent.value_function.shape[0]): # in each row
            for col in range(self.agent.value_function.shape[1]): # in each column
                self.agent.value_function[row, col] = self.evaluate_state((row, col), gamma=gamma, # in each action, update value function
                                                                          synchronous=synchronous)

In [None]:
r.evaluate_policy(gamma=1)
r.agent.value_function.astype(int)

array([[-1, -1, -1, -1, -1, -1, -1, -1],
       [-1, -1, -1, -1, -1, -1, -1, -1],
       [-1, -1, -1, -1, -1, -1, -1, -1],
       [-1, -1, -1, -1, -1, -1, -1, -1],
       [-1, -1, -1, -1, -1, -1, -1, -1],
       [-1, -1, -1, -1, -1, -1, -1, -1],
       [-1, -1, -1, -1, -1, -1, -1, -1],
       [-1, -1, -1, -1, -1,  0, -1, -1]])

We can iterate this until the value function is stable:

In [None]:
eps=0.1
k_max = 1000
value_delta_max = 0
gamma = 1
synchronous=True
#value_delta_max = 0
for k in range(k_max):
    r.evaluate_policy(gamma=gamma,synchronous=synchronous)
    value_delta = np.max(np.abs(r.agent.value_function_prev - r.agent.value_function))
    value_delta_max = value_delta
    if value_delta_max < eps:
        print('converged at iter',k)
        break

converged at iter 428


In [None]:
r.agent.value_function.astype(int)

array([[-185, -183, -181, -179, -177, -175, -174, -175],
       [-182, -181, -179, -177, -174, -172, -171, -171],
       [-179, -177, -175, -172, -168, -166, -165, -164],
       [-174, -172, -169, -164, -160, -156, -154, -154],
       [-169, -167, -161, -154, -147, -142, -139, -139],
       [-164, -161, -154, -142, -131, -122, -119, -120],
       [-161, -157, -147, -131, -106,  -93,  -92, -102],
       [-160, -156, -145, -126,  -93,    0,  -77,  -93]])

# 1.3 Policy Improvement

Now that we know what the values of the states are, we want to improve our Policy so that we the behavior is guided towards the state with the highest value. Policy Improvement is simply the act of making the policy greedy with respect to the value function.

In my implementation, we do this by setting the value of the action that leads to the most valuable state to 1 (while the rest remains 0)

In [None]:
print(inspect.getsource(r.improve_policy))

    def improve_policy(self):
        """
        Finds the greedy policy w.r.t. the current value function
        """

        self.agent.policy_prev = self.agent.policy.copy()
        for row in range(self.agent.action_function.shape[0]):
            for col in range(self.agent.action_function.shape[1]):
                for action in range(self.agent.action_function.shape[2]):
                    self.env.state = (row, col)  # reset state to the one being evaluated
                    reward, episode_end = self.env.step(self.agent.action_space[action])
                    successor_state_value = 0 if episode_end else self.agent.value_function[self.env.state]
                    self.agent.policy[row, col, action] = reward + successor_state_value

                max_policy_value = np.max(self.agent.policy[row, col, :])
                max_indices = [i for i, a in enumerate(self.agent.policy[row, col, :]) if a == max_policy_value]
                for idx in max_indices:
             

In [None]:
def improve_policy(self):
        """
        Finds the greedy policy w.r.t. the current value function
        """

        self.agent.policy_prev = self.agent.policy.copy() # get pi(i)
        for row in range(self.agent.action_function.shape[0]): # for each row
            for col in range(self.agent.action_function.shape[1]): # for each column
                for action in range(self.agent.action_function.shape[2]): # for each action
                    self.env.state = (row, col)  # reset state to the one being evaluated
                    reward, episode_end = self.env.step(self.agent.action_space[action]) # make step based on action
                    successor_state_value = 0 if episode_end else self.agent.value_function[self.env.state] # update successor state value
                    self.agent.policy[row, col, action] = reward + successor_state_value # get pi(i+1) using (**)

                max_policy_value = np.max(self.agent.policy[row, col, :]) # get max policy value at given state
                max_indices = [i for i, a in enumerate(self.agent.policy[row, col, :]) if a == max_policy_value] # get the index of policy with max policy value
                for idx in max_indices:
                    self.agent.policy[row, col, idx] = 1 # 1 if the policy value is max, otherwise 0

In [None]:
r.improve_policy()
r.visualize_policy()

[['↘', '↘', '↘', '↘', '↘', '↘', '↓', '↙'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↓', '↙'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↓', '↙'],
 ['↘', '↘', '↘', '↘', '↘', '↓', '↙', '↙'],
 ['→', '→', '→', '→', '→', 'F', '←', '←']]


# 1.4 Policy Iteration

We can now find the optimal policy by doing policy evaluation and policy improvement untill the policy is stable

In [None]:
print(inspect.getsource(r.policy_iteration))

In [None]:
def policy_iteration(self, eps=0.1, gamma=0.9, iteration=1, k=32, synchronous=True):
        """
        Finds the optimal policy
        Args:
            eps: float, exploration rate
            gamma: float, discount factor
            iteration: the iteration number
            k: (int) maximum amount of policy evaluation iterations
            synchronous: (Boolean) whether to use synchronous are asynchronous back-ups 

        Returns:

        """
        policy_stable = True
        print("\n\n______iteration:", iteration, "______")
        print("\n policy:")
        self.visualize_policy() 

        # Evaluate Policy
        print("")
        value_delta_max = 0
        for _ in range(k): # max number of iteration for policy evaluation
            self.evaluate_policy(gamma=gamma, synchronous=synchronous) # update V(s)
            value_delta = np.max(np.abs(self.agent.value_function_prev - self.agent.value_function)) # get delta
            value_delta_max = value_delta
            if value_delta_max < eps: # terminate the loop if delta is small 
                break
        print("Value function for this policy:")
        print(self.agent.value_function.round().astype(int))
        action_function_prev = self.agent.action_function.copy()

        # Improve Policy
        print("\n Improving policy:")
        self.improve_policy()

        # Check if Stable
        policy_stable = self.agent.compare_policies() < 1 #check if policy function is similar with previous iteration
        print("policy diff:", policy_stable)

        if not policy_stable and iteration < 1000: # if policy is not stable, restart the whole function
            iteration += 1
            self.policy_iteration(iteration=iteration)
        elif policy_stable:
            print("Optimal policy found in", iteration, "steps of policy evaluation")
        else:
            print("failed to converge.")

In [None]:
r.policy_iteration()



______iteration: 1 ______

 policy:
[['↘', '↘', '↘', '↘', '↘', '↘', '↓', '↙'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↓', '↙'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↓', '↙'],
 ['↘', '↘', '↘', '↘', '↘', '↓', '↙', '↙'],
 ['→', '→', '→', '→', '→', 'F', '←', '←']]

Value function for this policy:
[[-5 -5 -5 -5 -5 -5 -5 -5]
 [-5 -5 -5 -5 -5 -5 -5 -5]
 [-4 -4 -4 -4 -4 -4 -4 -4]
 [-4 -3 -3 -3 -3 -3 -3 -3]
 [-4 -3 -3 -3 -3 -3 -3 -3]
 [-4 -3 -3 -2 -2 -2 -2 -2]
 [-4 -3 -3 -2 -1 -1 -1 -2]
 [-4 -3 -3 -2 -1  0 -1 -2]]

 Improving policy:
policy diff: False


______iteration: 2 ______

 policy:
[['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↘', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['→', '↘', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↗', '→', '↘', '↘', '↘', '↘', '↘', '↓'],
 ['↗', '↗', '→', '↘', '↘', '↘', '↓', '↙'],
 ['↗', '↗', '↗', '→', '↘', '↓', '

# 1.5 Asynchronous Policy Iteration

With policy evaluation, we bootstrap: we make an estimate based on another estimate. So which estimate do we take? We have to options:

We bootstrap from the previous policy evaluation. This means each state value estimate update is based on the same iteration of policy evaluation. This is called synchronous policy iteration

We bootstrap from the freshest estimate. This means a estimate update can be based on the previous or the current value funtion, or a combination of the two. This is called asynchronous policy iteration

# 1.6 Value Iteration
Theory
Value iteration is nothing more than a simple parameter modification to policy iteration. Remember that policy iteration consists of policy evaluation and policy improvement. The policy evaluation step does not necessarily have to be repeated until convergence before we improve our policy. Recall that the policy iteration above took over 400 iterations to converge. If we use ony 1 iteration instead we call it value iteration.