## Q-Learning

In this demo lets employ q-learning to find optimal policy where we use exploratory policy (ep) to update a random policy to make it optimal(pi).

1.Take an action according to exploratory policy(ep)<br>
2.Observe (s_t+1) and reward(r)<br>
3.Update the state-action function<br> 
4.Update the optimal policy(pi)<br>




In [1]:
import numpy as np
np.set_printoptions(precision=3,suppress=True)
class GridWorld:

    def __init__(self, tot_row, tot_col):
        self.action_space_size = 4
        self.world_row = tot_row
        self.world_col = tot_col
        self.transition_matrix = np.ones((self.action_space_size, self.action_space_size))/ self.action_space_size
        self.reward_matrix = np.zeros((tot_row, tot_col))
        self.state_matrix = np.zeros((tot_row, tot_col))
        self.position = [np.random.randint(tot_row), np.random.randint(tot_col)]


    def setTransitionMatrix(self, transition_matrix):
        if(transition_matrix.shape != self.transition_matrix.shape):
            raise ValueError('The shape of the two matrices must be the same.') 
        self.transition_matrix = transition_matrix

    def setRewardMatrix(self, reward_matrix):
        if(reward_matrix.shape != self.reward_matrix.shape):
            raise ValueError('The shape of the matrix does not match with the shape of the world.')
        self.reward_matrix = reward_matrix

    def setStateMatrix(self, state_matrix):
        '''Set the obstacles in the world.

        The input to the function is a matrix with the
        same size of the world 
        -1 for states which are not walkable.
        +1 for terminal states
         0 for all the walkable states (non terminal)
        '''
        if(state_matrix.shape != self.state_matrix.shape):
            raise ValueError('The shape of the matrix does not match with the shape of the world.')
        self.state_matrix = state_matrix

    def setPosition(self, index_row=None, index_col=None):
        if(index_row is None or index_col is None): self.position = [np.random.randint(tot_row), np.random.randint(tot_col)]
        else: self.position = [index_row, index_col]

    def render(self):
        ''' Print the current world in the terminal.

        O represents the robot position
        - respresent empty states.
        # represents obstacles
        * represents terminal states
        '''
        graph = ""
        for row in range(self.world_row):
            row_string = ""
            for col in range(self.world_col):
                if(self.position == [row, col]): row_string += u" \u25CB " # u" \u25CC "
                else:
                    if(self.state_matrix[row, col] == 0): row_string += ' - '
                    elif(self.state_matrix[row, col] == -1): row_string += ' # '
                    elif(self.state_matrix[row, col] == +1): row_string += ' * '
            row_string += '\n'
            graph += row_string 
        print(graph)            

    def reset(self, exploring_starts=False):
        ''' Set the position of the robot in the bottom left corner.

        It returns the first observation
        '''
        if exploring_starts:
            while(True):
                row = np.random.randint(0, self.world_row)
                col = np.random.randint(0, self.world_col)
                if(self.state_matrix[row, col] == 0): break
            self.position = [row, col]
        else:
            self.position = [self.world_row-1, 0]
        #reward = self.reward_matrix[self.position[0], self.position[1]]
        return self.position

    def step(self, action):
        ''' One step in the world.

        [observation, reward, done = env.step(action)]
        '''
        if(action >= self.action_space_size): 
            raise ValueError('The action is not included in the action space.')

        #Based on the current action and the probability derived
        #from the trasition model it chooses a new actio to perform
        action = np.random.choice(4, 1, p=self.transition_matrix[int(action),:])
        #action = self.transition_model(action)

        #Generating a new position based on the current position and action
        if(action == 0): new_position = [self.position[0]-1, self.position[1]]   #UP
        elif(action == 1): new_position = [self.position[0], self.position[1]+1] #RIGHT
        elif(action == 2): new_position = [self.position[0]+1, self.position[1]] #DOWN
        elif(action == 3): new_position = [self.position[0], self.position[1]-1] #LEFT
        else: raise ValueError('The action is not included in the action space.')

        #Check if the new position is a valid position
        #print(self.state_matrix)
        if (new_position[0]>=0 and new_position[0]<self.world_row):
            if(new_position[1]>=0 and new_position[1]<self.world_col):
                if(self.state_matrix[new_position[0], new_position[1]] != -1):
                    self.position = new_position

        reward = self.reward_matrix[self.position[0], self.position[1]]
        #Done is True if the state is a terminal state
        done = bool(self.state_matrix[self.position[0], self.position[1]])
        return self.position, reward, done



In [2]:
def update_state_action(state_action_matrix,  observation, new_observation,
                        action, reward, alpha, gamma):
    '''Return the updated utility matrix
    '''
    #Getting the values of Q at t and at t+1
    col = observation[1] + (observation[0]*4)
    q = state_action_matrix[action ,col]
    col_t1 = new_observation[1] + (new_observation[0]*4)
    q_t1 = np.max(state_action_matrix[: ,col_t1])
    #Calculate alpha based on how many time it
    #has been visited
    #alpha_counted = 1.0 / (1.0 + visit_counter_matrix[action, col])
    #Applying the update rule
    #Here you can change "alpha" with "alpha_counted" if you want
    #to take into account how many times that particular state-action
    #pair has been visited until now.
    state_action_matrix[action ,col] = state_action_matrix[action ,col] + alpha * (reward + gamma * q_t1 - q)
    return state_action_matrix


In [3]:
def update_policy(policy_matrix, state_action_matrix, observation):
    '''Return the updated policy matrix (q-learning)
    '''
    col = observation[1] + (observation[0]*4)
    #Getting the index of the action with the highest utility
    best_action = np.argmax(state_action_matrix[:, col])
    #Updating the policy
    policy_matrix[observation[0], observation[1]] = best_action
    return policy_matrix

In [4]:
def return_epsilon_greedy_action(policy_matrix, observation, epsilon=0.1):
    tot_actions = int(np.nanmax(policy_matrix) + 1)
    action = int(policy_matrix[observation[0], observation[1]])
    non_greedy_prob = epsilon / tot_actions
    greedy_prob = 1 - epsilon + non_greedy_prob
    weight_array = np.full((tot_actions), non_greedy_prob)
    weight_array[action] = greedy_prob
    return np.random.choice(tot_actions, 1, p=weight_array)

In [5]:
def return_decayed_value(starting_value, global_step, decay_step):
        """Returns the decayed value.

        decayed_value = starting_value * decay_rate ^ (global_step / decay_steps)
        @param starting_value the value before decaying
        @param global_step the global step to use for decay (positive integer)
        @param decay_step the step at which the value is decayed
        """
        decayed_value = starting_value * np.power(0.1, (global_step/decay_step))
        return decayed_value


In [6]:
env = GridWorld(3, 4)

#Define the state matrix
state_matrix = np.zeros((3,4))
state_matrix[0, 3] = 1
state_matrix[1, 3] = 1
state_matrix[1, 1] = -1
print("State Matrix:")
print(state_matrix)

State Matrix:
[[ 0.  0.  0.  1.]
 [ 0. -1.  0.  1.]
 [ 0.  0.  0.  0.]]


In [7]:
#Define the reward matrix
reward_matrix = np.full((3,4), -0.04)
reward_matrix[0, 3] = 1
reward_matrix[1, 3] = -1
print("Reward Matrix:")
print(reward_matrix)

Reward Matrix:
[[-0.04 -0.04 -0.04  1.  ]
 [-0.04 -0.04 -0.04 -1.  ]
 [-0.04 -0.04 -0.04 -0.04]]


In [8]:
#Define the transition matrix
transition_matrix = np.array([[0.8, 0.1, 0.0, 0.1],
                              [0.1, 0.8, 0.1, 0.0],
                              [0.0, 0.1, 0.8, 0.1],
                              [0.1, 0.0, 0.1, 0.8]])

In [9]:
#Random policy
policy_matrix = np.random.randint(low=0, high=4, size=(3, 4)).astype(np.float32)
policy_matrix[1,1] = np.NaN #NaN for the obstacle at (1,1)
policy_matrix[0,3] = policy_matrix[1,3] = -1 #No action for the terminal states
print("Policy Matrix:")
print(policy_matrix)
 

Policy Matrix:
[[ 1.  1.  1. -1.]
 [ 2. nan  1. -1.]
 [ 1.  3.  3.  2.]]


In [10]:

exploratory_policy_matrix = np.array([[1,      1, 1, -1],
                                      [0, np.NaN, 0, -1],
                                      [0,      1, 0,  3]])

print("Exploratory Policy Matrix:")
print(exploratory_policy_matrix)
  

Exploratory Policy Matrix:
[[ 1.  1.  1. -1.]
 [ 0. nan  0. -1.]
 [ 0.  1.  0.  3.]]


In [11]:
 

env.setStateMatrix(state_matrix)
env.setRewardMatrix(reward_matrix)
env.setTransitionMatrix(transition_matrix)

state_action_matrix = np.zeros((4,12))
visit_counter_matrix = np.zeros((4,12))
gamma = 0.999
alpha = 0.001 #constant step size
tot_epoch = 5000
print_epoch = 1000

In [12]:
for epoch in range(tot_epoch):
    #Reset and return the first observation
    observation = env.reset(exploring_starts=True)
    epsilon = return_decayed_value(0.1, epoch, decay_step=50000)
    is_starting = True
    
    for step in range(1000):
        #Take the action using epsilon-greedy
        action = return_epsilon_greedy_action(exploratory_policy_matrix, observation, epsilon=0.001)
        if(is_starting):
            action = np.random.randint(0, 4)
            is_starting = False
        #Move one step in the environment and get obs and reward
        new_observation, reward, done = env.step(action)
        #Updating the state-action matrix
        state_action_matrix = update_state_action(state_action_matrix,observation, new_observation,
                                                  action, reward, alpha, gamma)
        #Updating the policy
        policy_matrix = update_policy(policy_matrix, state_action_matrix, observation)
        observation = new_observation
        if done: break

    if(epoch % print_epoch == 0):
        print("")
        print("Epsilon: " + str(epsilon))
        print("State-Action matrix after " + str(epoch+1) + " iterations:")
        print(state_action_matrix)
        print("Policy matrix after " + str(epoch+1) + " iterations:")
        print(policy_matrix)

#Time to check the utility matrix obtained
print("State-Action matrix after " + str(tot_epoch) + " iterations:")
print(state_action_matrix)
print("Policy matrix after " + str(tot_epoch) + " iterations:")
print(policy_matrix)


Epsilon: 0.1
State-Action matrix after 1 iterations:
[[ 0.     0.     0.     0.     0.     0.     0.     0.     0.     0.
   0.     0.   ]
 [ 0.     0.     0.001  0.     0.     0.     0.     0.     0.     0.
   0.     0.   ]
 [ 0.     0.     0.     0.     0.     0.     0.     0.     0.     0.
   0.     0.   ]
 [ 0.     0.    -0.     0.     0.     0.     0.     0.     0.     0.
   0.     0.   ]]
Policy matrix after 1 iterations:
[[ 1.  1.  1. -1.]
 [ 2. nan  1. -1.]
 [ 1.  3.  3.  2.]]

Epsilon: 0.0954992586021436
State-Action matrix after 1001 iterations:
[[-0.001 -0.     0.007  0.    -0.013  0.     0.05   0.    -0.006 -0.001
  -0.009 -0.022]
 [-0.002  0.116  0.553  0.    -0.001  0.    -0.026  0.    -0.001 -0.006
  -0.001 -0.001]
 [-0.001  0.001  0.     0.    -0.001  0.    -0.005  0.    -0.001 -0.001
  -0.001 -0.001]
 [-0.001 -0.001  0.001  0.    -0.001  0.     0.     0.    -0.001 -0.001
  -0.001 -0.021]]
Policy matrix after 1001 iterations:
[[ 2.  1.  1. -1.]
 [ 2. nan  0. -1.]
 [ 1.