<a href="https://colab.research.google.com/github/shcho999/block-explorer-tutorials/blob/main/00_reinforcement_learning_03.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## https://mpatacchiola.github.io/blog/2017/01/15/dissecting-reinforcement-learning-2.html

In [None]:
env

In [None]:
import numpy as np
import sys
sys.path.append('/content/drive/MyDrive/Colab Notebooks/reinforcement')
from gridworld import GridWorld # Changed to absolute import



In [None]:
def update_utility(utility_matrix, observation, new_observation,
                   reward, alpha, gamma):
    '''Return the updated utility matrix

    @param utility_matrix the matrix before the update
    @param observation the state observed at t
    @param new_observation the state observed at t+1
    @param reward the reward observed after the action
    @param alpha the step size (learning rate)
    @param gamma the discount factor
    @return the updated utility matrix
    '''
    u = utility_matrix[observation[0], observation[1]]
    u_t1 = utility_matrix[new_observation[0], new_observation[1]]
    utility_matrix[observation[0], observation[1]] += \
        alpha * (reward + gamma * u_t1 - u)
    return utility_matrix

In [None]:
action

### 주어진 ploicy matrix기준으로 출발점을 고정하고 policy 기준 action을 진행하면서 발생하는 reward를 이용 Temporal Differencing 기준으로 utility matrix를 업데이트한다.

In [None]:
def main():

    env = GridWorld(3, 4)

    #Define the state matrix
    state_matrix = np.zeros((3,4))
    state_matrix[0, 3] = 1
    state_matrix[1, 3] = 1
    state_matrix[1, 1] = -1
    print("State Matrix:")
    print(state_matrix)

    #Define the reward matrix
    reward_matrix = np.full((3,4), -0.04)
    reward_matrix[0, 3] = 1
    reward_matrix[1, 3] = -1
    print("Reward Matrix:")
    print(reward_matrix)

    #Define the transition matrix
    transition_matrix = np.array([[0.8, 0.1, 0.0, 0.1],
                                  [0.1, 0.8, 0.1, 0.0],
                                  [0.0, 0.1, 0.8, 0.1],
                                  [0.1, 0.0, 0.1, 0.8]])

    #Define the policy matrix
    #This is the optimal policy for world with reward=-0.04
    policy_matrix = np.array([[1,      1,  1,  -1],
                              [0, np.nan,  0,  -1],
                              [0,      3,  3,   3]])
    print("Policy Matrix:")
    print(policy_matrix)

    env.setStateMatrix(state_matrix)
    env.setRewardMatrix(reward_matrix)
    env.setTransitionMatrix(transition_matrix)

    utility_matrix = np.zeros((3,4))
    gamma = 0.999
    alpha = 0.1 #constant step size
    tot_epoch = 300000
    print_epoch = 1000

    for epoch in range(tot_epoch):
        #Reset and return the first observation
        observation = env.reset(exploring_starts=False)
        for step in range(1000):
            #Take the action from the action matrix
            action = policy_matrix[observation[0], observation[1]]
            #Move one step in the environment and get obs and reward
            new_observation, reward, done = env.step(action)
            utility_matrix = update_utility(utility_matrix, observation,
                                            new_observation, reward, alpha, gamma)
            observation = new_observation
            #print(utility_matrix)
            if done: break

        if(epoch % print_epoch == 0):
            print("")
            print("Utility matrix after " + str(epoch+1) + " iterations:")
            print(utility_matrix)
    #Time to check the utility matrix obtained
    print("Utility matrix after " + str(tot_epoch) + " iterations:")
    print(utility_matrix)



if __name__ == "__main__":
    main()

### update with trace matrix

##### patr 2의 MCMC방법이 하나의 에피소드가 완결된 이후에 에피소드의 각각의 step에서 발생하는 reward를 각 스템기준으로 현재가치화하여 각 스테이스의 가치를 결정(action 기준도 동일)

##### 이와 다르게 현재의 방법론은 에피스드가 진행되는 각각 스텝에서 reward를 계산하고 이것을 과거의 지난온 step의 value계산을 업데이트한다. 업데이트 하는 방법은 현재 스템과 지난 스텝간의 거리준으로 할인하여 현재 reward를 과거 스템가치 계산에 업데이트한다.

##### 현재의 방법은 업데트를 MCMC방법에 비해 신속하게 하면서 하나의 에피스드에 속한 각각 스텝이 에피소드 최종결과에 영향을 받아 업데이트 되는 결과를 가져온다. 따라서 value관점에서 업데이트가 빠르게 진행되고 결과적으로 수렴속도가 빠르게 진행된다.

In [None]:
def update_utility(utility_matrix, trace_matrix, alpha, delta):
    '''Return the updated utility matrix

    @param utility_matrix the matrix before the update
    @param alpha the step size (learning rate)
    @param delta the error (Taget-OldEstimte)
    @return the updated utility matrix
    '''
    utility_matrix += alpha * delta * trace_matrix
    return utility_matrix

def update_eligibility(trace_matrix, gamma, lambda_):
    '''Return the updated trace_matrix

    @param trace_matrix the eligibility traces matrix
    @param gamma discount factor
    @param lambda_ the decaying value
    @return the updated trace_matrix
    '''
    trace_matrix = trace_matrix * gamma * lambda_
    return trace_matrix


In [None]:
def main():

    env = GridWorld(3, 4)

    #Define the state matrix
    state_matrix = np.zeros((3,4))
    state_matrix[0, 3] = 1
    state_matrix[1, 3] = 1
    state_matrix[1, 1] = -1
    print("State Matrix:")
    print(state_matrix)

    #Define the reward matrix
    reward_matrix = np.full((3,4), -0.04)
    reward_matrix[0, 3] = 1
    reward_matrix[1, 3] = -1
    print("Reward Matrix:")
    print(reward_matrix)

    #Define the transition matrix
    transition_matrix = np.array([[0.8, 0.1, 0.0, 0.1],
                                  [0.1, 0.8, 0.1, 0.0],
                                  [0.0, 0.1, 0.8, 0.1],
                                  [0.1, 0.0, 0.1, 0.8]])

    #Define the policy matrix
    #This is the optimal policy for world with reward=-0.04
    policy_matrix = np.array([[1,      1,  1,  -1],
                              [0, np.nan,  0,  -1],
                              [0,      3,  3,   3]])
    print("Policy Matrix:")
    print(policy_matrix)

    #Define and print the eligibility trace matrix
    trace_matrix = np.zeros((3,4))
    print("Trace Matrix:")
    print(trace_matrix)

    env.setStateMatrix(state_matrix)
    env.setRewardMatrix(reward_matrix)
    env.setTransitionMatrix(transition_matrix)

    utility_matrix = np.zeros((3,4))
    gamma = 0.999 #discount rate
    alpha = 0.1 #constant step size
    lambda_ = 0.5 #decaying factor
    tot_epoch = 300000
    print_epoch = 100

    for epoch in range(tot_epoch):
        #Reset and return the first observation
        observation = env.reset(exploring_starts=True)
        for step in range(1000):
            #Take the action from the action matrix
            action = policy_matrix[observation[0], observation[1]]
            #Move one step in the environment and get obs and reward
            new_observation, reward, done = env.step(action)
            #Estimate the error delta (Target - OldEstimate)
            delta = reward + gamma * utility_matrix[new_observation[0], new_observation[1]] - \
                                     utility_matrix[observation[0], observation[1]]
            #Adding +1 in the trace matrix for the state visited
            trace_matrix[observation[0], observation[1]] += 1
            #Update the utility matrix
            utility_matrix = update_utility(utility_matrix, trace_matrix, alpha, delta)
            #Update the trace matrix (decaying)
            trace_matrix = update_eligibility(trace_matrix, gamma, lambda_)
            observation = new_observation
            if done: break #return

        if(epoch % print_epoch == 0):
            print("")
            print("Utility matrix after " + str(epoch+1) + " iterations:")
            print(utility_matrix)
            print(trace_matrix)
    #Time to check the utility matrix obtained
    print("Utility matrix after " + str(tot_epoch) + " iterations:")
    print(utility_matrix)



if __name__ == "__main__":
    main()

### SARSA: Python and ε-greedy policy

##### greedy_prob = 1 - epsilon + non_greedy_prob
##### Most of the time a value of 0.1 is a good choice for epsilon. Choosing a value that is too high will cause the algorithm to converge slowly (too much exploration). On the other hand, a value which is too small does not guarantee to visit all the state-action pairs leading to sub-optimal policies.

### state_action_matrix 에서 best_action을 policy matrix에 업데이트 -> 새로운 스테이트-> 해당 스테이트에 해당하는 action을 policy_matrix에서 가져온다. -> 해당 action에 greedy p을 배정하고  다른 action에는 non-greedy p를 배정 -> 이렇게 배정확률기준으로 random하게 action을 선택-> 이제 스테이트와 action이 선택되었음으로 이를 지기준을 vaule를 계산하고 이를 state_action_matrix에 업데이트 ---

In [None]:
def update_state_action(state_action_matrix, visit_counter_matrix, observation, new_observation,
                   action, new_action, reward, alpha, gamma):
    '''Return the updated utility matrix

    @param state_action_matrix the matrix before the update
    @param observation the state obsrved at t
    @param new_observation the state observed at t+1
    @param action the action at t
    @param new_action the action at t+1
    @param reward the reward observed after the action
    @param alpha the ste size (learning rate)
    @param gamma the discount factor
    @return the updated state action matrix
    '''
    #Getting the values of Q at t and at t+1
    col = observation[1] + (observation[0]*4)
    q = state_action_matrix[action, col]
    col_t1 = new_observation[1] + (new_observation[0]*4)
    q_t1 = state_action_matrix[int(new_action) ,col_t1]
    #Calculate alpha based on how many time it
    #has been visited
    alpha_counted = 1.0 / (1.0 + visit_counter_matrix[action, col])
    #Applying the update rule
    #Here you can change "alpha" with "alpha_counted" if you want
    #to take into account how many times that particular state-action
    #pair has been visited until now.
    state_action_matrix[action ,col] = state_action_matrix[action ,col] + alpha * (reward + gamma * q_t1 - q)
    return state_action_matrix

def update_visit_counter(visit_counter_matrix, observation, action):
    '''Update the visit counter

    Counting how many times a state-action pair has been
    visited. This information can be used during the update.
    @param visit_counter_matrix a matrix initialised with zeros
    @param observation the state observed
    @param action the action taken
    '''
    col = observation[1] + (observation[0]*4)
    visit_counter_matrix[action ,col] += 1.0
    return visit_counter_matrix

def update_policy(policy_matrix, state_action_matrix, observation):
    '''Return the updated policy matrix

    @param policy_matrix the matrix before the update
    @param state_action_matrix the state-action matrix
    @param observation the state obsrved at t
    @return the updated state action matrix
    '''
    col = observation[1] + (observation[0]*4)
    #Getting the index of the action with the highest utility
    best_action = np.argmax(state_action_matrix[:, col])
    #Updating the policy
    policy_matrix[observation[0], observation[1]] = best_action
    return policy_matrix

def return_epsilon_greedy_action(policy_matrix, observation, epsilon=0.1):
    '''Return an action choosing it with epsilon-greedy

    @param policy_matrix the matrix before the update
    @param observation the state obsrved at t
    @param epsilon the value used for computing the probabilities
    @return the updated policy_matrix
    '''
    tot_actions = int(np.nanmax(policy_matrix) + 1)
    # policy_matrix에는 best action만 업데이트 따라서 best_action에 가장높은 확률을 나머지 action에는 기본확률 배정
    action = int(policy_matrix[observation[0], observation[1]])
    non_greedy_prob = epsilon / tot_actions
    greedy_prob = 1 - epsilon + non_greedy_prob
    weight_array = np.full((tot_actions), non_greedy_prob)
    weight_array[action] = greedy_prob
    return np.random.choice(tot_actions, 1, p=weight_array)

def print_policy(policy_matrix):
    '''Print the policy using specific symbol.

    * terminal state
    ^ > v < up, right, down, left
    # obstacle
    '''
    counter = 0
    shape = policy_matrix.shape
    policy_string = ""
    for row in range(shape[0]):
        for col in range(shape[1]):
            if(policy_matrix[row,col] == -1): policy_string += " *  "
            elif(policy_matrix[row,col] == 0): policy_string += " ^  "
            elif(policy_matrix[row,col] == 1): policy_string += " >  "
            elif(policy_matrix[row,col] == 2): policy_string += " v  "
            elif(policy_matrix[row,col] == 3): policy_string += " <  "
            elif(np.isnan(policy_matrix[row,col])): policy_string += " #  "
            counter += 1
        policy_string += '\n'
    print(policy_string)

def return_decayed_value(starting_value, global_step, decay_step):
        """Returns the decayed value.

        decayed_value = starting_value * decay_rate ^ (global_step / decay_steps)
        @param starting_value the value before decaying
        @param global_step the global step to use for decay (positive integer)
        @param decay_step the step at which the value is decayed
        """
        decayed_value = starting_value * np.power(0.1, (global_step/decay_step))
        return decayed_value

In [None]:
def main():

    env = GridWorld(3, 4)

    #Define the state matrix
    state_matrix = np.zeros((3,4))
    state_matrix[0, 3] = 1
    state_matrix[1, 3] = 1
    state_matrix[1, 1] = -1
    print("State Matrix:")
    print(state_matrix)

    #Define the reward matrix
    reward_matrix = np.full((3,4), -0.04)
    reward_matrix[0, 3] = 1
    reward_matrix[1, 3] = -1
    print("Reward Matrix:")
    print(reward_matrix)

    #Define the transition matrix
    transition_matrix = np.array([[0.8, 0.1, 0.0, 0.1],
                                  [0.1, 0.8, 0.1, 0.0],
                                  [0.0, 0.1, 0.8, 0.1],
                                  [0.1, 0.0, 0.1, 0.8]])

    #Random policy
    policy_matrix = np.random.randint(low=0, high=4, size=(3, 4)).astype(np.float32)
    policy_matrix[1,1] = np.nan #NaN for the obstacle at (1,1)
    policy_matrix[0,3] = policy_matrix[1,3] = -1 #No action for the terminal states
    print("Policy Matrix:")
    print(policy_matrix)

    env.setStateMatrix(state_matrix)
    env.setRewardMatrix(reward_matrix)
    env.setTransitionMatrix(transition_matrix)

    #utility_matrix = np.zeros((3,4))
    state_action_matrix = np.zeros((4,12))
    visit_counter_matrix = np.zeros((4,12))
    gamma = 0.999
    alpha = 0.001 #constant step size
    tot_epoch = 5000000
    print_epoch = 1000


    for epoch in range(tot_epoch):

        # epoch이 증가할 수록 greedy 확률이 더 커지도록 설정 --> 학습이 진행될수록
        # best_action이 다시 선택될 확률을 높임으로써 수렴을 빠르게 한다. --> local 빠지게 하는 결과를 가져올 수도 있다.
        epsilon = return_decayed_value(0.1, epoch, decay_step=100000)
        #Reset and return the first observation
        observation = env.reset(exploring_starts=True)
        is_starting = True
        for step in range(1000):
            #Take the action from the action matrix
            #action = policy_matrix[observation[0], observation[1]]
            #Take the action using epsilon-greedy
            action = return_epsilon_greedy_action(policy_matrix, observation, epsilon=0.1)
            if(is_starting):
                action = np.random.randint(0, 4)
                is_starting = False
            #Move one step in the environment and get obs and reward
            new_observation, reward, done = env.step(action)
            new_action = policy_matrix[new_observation[0], new_observation[1]]
            #Updating the state-action matrix
            state_action_matrix = update_state_action(state_action_matrix, visit_counter_matrix, observation, new_observation,
                                                      action, new_action, reward, alpha, gamma)
            #Updating the policy
            policy_matrix = update_policy(policy_matrix, state_action_matrix, observation)
            #Increment the visit counter
            visit_counter_matrix = update_visit_counter(visit_counter_matrix, observation, action)
            observation = new_observation
            #print(utility_matrix)
            if done: break

        if(epoch % print_epoch == 0):
            print("")
            print("Epsilon: " + str(epsilon))
            print("State-Action matrix after " + str(epoch+1) + " iterations:")
            print(state_action_matrix)
            print("Policy matrix after " + str(epoch+1) + " iterations:")
            print_policy(policy_matrix)

    #Time to check the utility matrix obtained
    print("State-Action matrix after " + str(tot_epoch) + " iterations:")
    print(state_action_matrix)
    print("Policy matrix after " + str(tot_epoch) + " iterations:")
    print_policy(policy_matrix)


if __name__ == "__main__":
    main()

### Q-learning: off-policy control

In [None]:


def update_state_action(state_action_matrix, visit_counter_matrix, observation, new_observation,
                        action, reward, alpha, gamma):
    '''Return the updated utility matrix

    @param state_action_matrix the matrix before the update
    @param observation the state obsrved at t
    @param new_observation the state observed at t+1
    @param action the action at t
    @param new_action the action at t+1
    @param reward the reward observed after the action
    @param alpha the ste size (learning rate)
    @param gamma the discount factor
    @return the updated state action matrix
    '''
    #Getting the values of Q at t and at t+1
    col = observation[1] + (observation[0]*4)
    q = state_action_matrix[action ,col]
    col_t1 = new_observation[1] + (new_observation[0]*4)
    q_t1 = np.max(state_action_matrix[: ,col_t1])
    #Calculate alpha based on how many time it
    #has been visited
    alpha_counted = 1.0 / (1.0 + visit_counter_matrix[action, col])
    #Applying the update rule
    #Here you can change "alpha" with "alpha_counted" if you want
    #to take into account how many times that particular state-action
    #pair has been visited until now.
    state_action_matrix[action ,col] = state_action_matrix[action ,col] + alpha * (reward + gamma * q_t1 - q)
    return state_action_matrix

def update_visit_counter(visit_counter_matrix, observation, action):
    '''Update the visit counter

    Counting how many times a state-action pair has been
    visited. This information can be used during the update.
    @param visit_counter_matrix a matrix initialised with zeros
    @param observation the state observed
    @param action the action taken
    '''
    col = observation[1] + (observation[0]*4)
    visit_counter_matrix[action ,col] += 1.0
    return visit_counter_matrix

def update_policy(policy_matrix, state_action_matrix, observation):
    '''Return the updated policy matrix (q-learning)

    @param policy_matrix the matrix before the update
    @param state_action_matrix the state-action matrix
    @param observation the state obsrved at t
    @return the updated state action matrix
    '''
    col = observation[1] + (observation[0]*4)
    #Getting the index of the action with the highest utility
    best_action = np.argmax(state_action_matrix[:, col])
    #Updating the policy
    policy_matrix[observation[0], observation[1]] = best_action
    return policy_matrix

def return_epsilon_greedy_action(policy_matrix, observation, epsilon=0.1):
    tot_actions = int(np.nanmax(policy_matrix) + 1)
    action = int(policy_matrix[observation[0], observation[1]])
    non_greedy_prob = epsilon / tot_actions
    greedy_prob = 1 - epsilon + non_greedy_prob
    weight_array = np.full((tot_actions), non_greedy_prob)
    weight_array[action] = greedy_prob
    return np.random.choice(tot_actions, 1, p=weight_array)

def print_policy(policy_matrix):
    '''Print the policy using specific symbol.

    * terminal state
    ^ > v < up, right, down, left
    # obstacle
    '''
    counter = 0
    shape = policy_matrix.shape
    policy_string = ""
    for row in range(shape[0]):
        for col in range(shape[1]):
            if(policy_matrix[row,col] == -1): policy_string += " *  "
            elif(policy_matrix[row,col] == 0): policy_string += " ^  "
            elif(policy_matrix[row,col] == 1): policy_string += " >  "
            elif(policy_matrix[row,col] == 2): policy_string += " v  "
            elif(policy_matrix[row,col] == 3): policy_string += " <  "
            elif(np.isnan(policy_matrix[row,col])): policy_string += " #  "
            counter += 1
        policy_string += '\n'
    print(policy_string)

def return_decayed_value(starting_value, global_step, decay_step):
        """Returns the decayed value.

        decayed_value = starting_value * decay_rate ^ (global_step / decay_steps)
        @param starting_value the value before decaying
        @param global_step the global step to use for decay (positive integer)
        @param decay_step the step at which the value is decayed
        """
        decayed_value = starting_value * np.power(0.1, (global_step/decay_step))
        return decayed_value


In [None]:
def main():

    env = GridWorld(3, 4)

    #Define the state matrix
    state_matrix = np.zeros((3,4))
    state_matrix[0, 3] = 1
    state_matrix[1, 3] = 1
    state_matrix[1, 1] = -1
    print("State Matrix:")
    print(state_matrix)

    #Define the reward matrix
    reward_matrix = np.full((3,4), -0.04)
    reward_matrix[0, 3] = 1
    reward_matrix[1, 3] = -1
    print("Reward Matrix:")
    print(reward_matrix)

    #Define the transition matrix
    transition_matrix = np.array([[0.8, 0.1, 0.0, 0.1],
                                  [0.1, 0.8, 0.1, 0.0],
                                  [0.0, 0.1, 0.8, 0.1],
                                  [0.1, 0.0, 0.1, 0.8]])

    #Random policy
    policy_matrix = np.random.randint(low=0, high=4, size=(3, 4)).astype(np.float32)
    policy_matrix[1,1] = np.nan #NaN for the obstacle at (1,1)
    policy_matrix[0,3] = policy_matrix[1,3] = -1 #No action for the terminal states
    print("Policy Matrix:")
    print(policy_matrix)
    print_policy(policy_matrix)

    #Adversarial exploration policy
    #exploratory_policy_matrix = np.array([[2,      3, 2, -1],
                                          #[2, np.nan, 1, -1],
                                          #[1,      1, 1,  0]])

    exploratory_policy_matrix = np.array([[1,      1, 1, -1],
                                          [0, np.nan, 0, -1],
                                          [0,      1, 0,  3]])

    print("Exploratory Policy Matrix:")
    print(exploratory_policy_matrix)
    print_policy(exploratory_policy_matrix)

    env.setStateMatrix(state_matrix)
    env.setRewardMatrix(reward_matrix)
    env.setTransitionMatrix(transition_matrix)

    state_action_matrix = np.zeros((4,12))
    visit_counter_matrix = np.zeros((4,12))
    gamma = 0.999
    alpha = 0.001 #constant step size
    tot_epoch = 5000000
    print_epoch = 1000

    for epoch in range(tot_epoch):
        #Reset and return the first observation
        observation = env.reset(exploring_starts=True)
        epsilon = return_decayed_value(0.1, epoch, decay_step=50000)
        is_starting = True
        for step in range(1000):
            #Take the action from the action matrix
            #action = policy_matrix[observation[0], observation[1]]
            #Take the action using epsilon-greedy
            action = return_epsilon_greedy_action(exploratory_policy_matrix, observation, epsilon=0.001)
            if(is_starting):
                action = np.random.randint(0, 4)
                is_starting = False
            #Move one step in the environment and get obs and reward
            new_observation, reward, done = env.step(action)
            #Updating the state-action matrix
            state_action_matrix = update_state_action(state_action_matrix, visit_counter_matrix, observation, new_observation,
                                                      action, reward, alpha, gamma)
            #Updating the policy
            policy_matrix = update_policy(policy_matrix, state_action_matrix, observation)
            #Increment the visit counter
            visit_counter_matrix = update_visit_counter(visit_counter_matrix, observation, action)
            observation = new_observation
            if done: break

        if(epoch % print_epoch == 0):
            print("")
            print("Epsilon: " + str(epsilon))
            print("State-Action matrix after " + str(epoch+1) + " iterations:")
            print(state_action_matrix)
            print("Policy matrix after " + str(epoch+1) + " iterations:")
            print_policy(policy_matrix)

    #Time to check the utility matrix obtained
    print("State-Action matrix after " + str(tot_epoch) + " iterations:")
    print(state_action_matrix)
    print("Policy matrix after " + str(tot_epoch) + " iterations:")
    print_policy(policy_matrix)


if __name__ == "__main__":
    main()