<font size="10">Part 2</font>

In [1]:
import numpy as np
from environment import Environment_2 as ENV  # class object, see the file "environment.py"

0. Relavent functions

In [2]:
def exploring_starts(env, es=True):
    """ 
    Randomely choose a start (excep from the blue, green, black grids)
    Input:
        env (class) - gridworld environment with the following parameters:
            env.n_row (int) - number of rows
            env.n_col (int) - number of columns
            env.map (2d array) - the map of the gridworld, see in environment.py
    Output:
        if es = True
        (row * env.n_row + col) - return the randomely selected start state
        if es = False
        23 (int) - fix start state
    """
    if es:
        while 1:
            # randomely select a position on the map
            row = np.random.choice([i for i in range(env.n_row)])
            col = np.random.choice([i for i in range(env.n_col)])
            if env.map[row][col] != 'T':
                break  # if the start is not terminal (Black), choose it

        return row * env.n_row + col  # calculate the state number based on the place on the map
    else:
        return 23  # if don't explore starts, start from state 23

def argmax_random(input_array):
    """ 
    Randomely select the maximum value (break the ties arbitrarily)
    Input:
        input_array (1d array) - array needed to be found the maximum value
    Output:
        index for the maximum value, if multiple, randomely select one
    """
    return np.random.choice(np.flatnonzero(input_array == np.max(input_array)))

def print_policy(policy, env):
    """ 
    Print the policy for visualization
    Input:
        policy (1d array) - a deterministic policy
        env (class) - gridworld environment with the following parameters:
            env.n_state (int) - number of states
            env.action_text (1d array) - contain the unicode of the arrows for visualization
                                             left ←    down ↓    right →   up ↑
                                          ['\u2190', '\u2193', '\u2192', '\u2191']
    Output:
        policy_visual (1d array) - visualized policy, with arrows pointing to the moving direction
    """

    policy_visual = ['' for _ in range(env.n_state)]

    for s in range(env.n_state):
        lenth = len(policy[s])
        if lenth == 4:
            policy_visual[s] += 'o'  # 'o' means 4 directions are all available
        else:
            for a in range(lenth):
                policy_visual[s] += env.action_text[policy[s][a]]

    return policy_visual

<font size="6">Part 2.1</font>

1. Monte Carlo method with exploring starts

In [3]:
def MCM_exploring_starts(max_ep, gamma, env):
    """ 
    Monte Carlo method with exploring starts
    Input:
        max_ep (int) - maximum number of episodes
        gamma (float) - reward discount factor
        env (class) - gridworld environment with the following parameters:
            env.n_state (int) - number of states
            env.n_action (int)   - number of actions
            env.model (4d array) - (n_state) by (n_action) by (n) by (4) array, for state s and action a,
                                    there are n possibilities, each row is composed of (p, s_, r, t)
                                    p  - transition probability from (s,a) to (s_)   ### sum of the n p equals 1
                                    s_ - next state
                                    r  - reward of the transition from (s,a) to (s_)
                                    t  - terminal information, a bool value, True/False
    Output:
        Q_all (2d array) - Estimated Q function, for each state and each action
        Q_opt (1d array) - Estimated optimal Q function
        policy_opt (2d array) - Calculated optimum policy, the first dimension is for states, the second dimension is for possible multiple choices
    """

    # Initialize
    policy_opt = [[0] for _ in range(env.n_state)]  # initialize a deterministic policy
    Return = [[[] for _ in range(env.n_action)] for _ in range(env.n_state)]  # Return
    Q_all = [[0 for _ in range(env.n_action)] for _ in range(env.n_state)]  # Q function
    Q_opt = [0 for _ in range(env.n_state)]

    for ep in range(max_ep):
        s = exploring_starts(env, es=True)  # randomely chose the start point
        Traces = []  # initialize the moving trajectory
        
        # Generate an episode
        while 1:
            a = np.random.randint(4) # action under the equiprobable policy

            # determine the next state, reward, terminal
            temp = env.model[s][a]
            # if there are multiple choice, e.g., at green, randomely select one based on p
            p_next = np.random.choice([i for i in range(len(temp))], p = [temp[i][0] for i in range(len(temp))])  
            _, s_, r, t = temp[p_next]

            Traces.append([s, a, r])  # generate the moving trajectory

            s = s_  # update state

            if t:  # if terminal, break
                break
        
        # Loop for each step of the episode
        G = 0  # initialize G
        for index, trace in enumerate(Traces[::-1]):  # Start from the end of the trajectory
            G = gamma * G + trace[2]  # update G
            
            # Unless the pair (s,a) appears, i.e., first visit check
            if not (trace[0], trace[1]) in [(np.array(Traces[::-1])[i,0], np.array(Traces[::-1])[i,1]) for i in range(index+1, len(Traces[::-1]))]:
                Return[trace[0]][trace[1]].append(G)
                Q_all[trace[0]][trace[1]] = np.mean(Return[trace[0]][trace[1]])

    for s in range(env.n_state):  # sweep all the states in the state space
        policy_opt[s] = (np.unique(np.argwhere(Q_all[s] == np.max(Q_all[s])))).tolist()  # find the optimal policy under the Q function

    Q_opt = np.max(Q_all, axis=1)  # find the optimal Q function

    return Q_all, Q_opt, policy_opt

2. Monte Carlo method with ϵ-soft approach

In [4]:
def MCM_epsilon_soft(max_ep, gamma, epsilon, env):
    """ 
    Monte Carlo method with ϵ-soft approach
    Input:
        max_ep (int) - maximum number of episodes
        gamma (float) - reward discount factor
        epsilon - Algorithm parameter: small ϵ > 0
        env (class) - gridworld environment with the following parameters:
            env.n_state (int) - number of states
            env.n_action (int)   - number of actions
            env.model (4d array) - (n_state) by (n_action) by (n) by (4) array, for state s and action a,
                                    there are n possibilities, each row is composed of (p, s_, r, t)
                                    p  - transition probability from (s,a) to (s_)   ### sum of the n p equals 1
                                    s_ - next state
                                    r  - reward of the transition from (s,a) to (s_)
                                    t  - terminal information, a bool value, True/False
    Output:
        Q_all (2d array) - Estimated Q function, for each state and each action
        Q_opt (1d array) - Estimated optimal Q function
        policy_opt (2d array) - Calculated optimum policy, the first dimension is for states, the second dimension is for possible multiple choices
    """

    # Initialize
    policy = np.ones([env.n_state, env.n_action]) / env.n_action  # initialize policy, the sum of each row is 1
    policy_opt = [[0] for _ in range(env.n_state)]  # initialize a deterministic policy
    Return = [[[] for _ in range(env.n_action)] for _ in range(env.n_state)]  # Return
    Q_all = [[0 for _ in range(env.n_action)] for _ in range(env.n_state)]  # Q function
    Q_opt = [0 for _ in range(env.n_state)]

    for ep in range(max_ep):
        s = exploring_starts(env, es=False)  # fix start point
        Traces = []  # initialize the moving trajectory
        
        # Generate an episode
        while 1:
            a = np.random.choice(np.arange(env.n_action),p=policy[s])  # choose the action under the current policy

            # determine the next state, reward, terminal
            temp = env.model[s][a]
            p_next = np.random.choice([i for i in range(len(temp))], p = [temp[i][0] for i in range(len(temp))])  # if there are multiple choice, e.g., at green
            _, s_, r, t = temp[p_next]

            Traces.append([s, a, r])  # generate the moving trajectory

            s = s_  # update state

            if t:  # if terminal, break
                break
        
        # Loop for each step of the episode
        G = 0  # initialize G
        for index, trace in enumerate(Traces[::-1]):  # Start from the end of the trajectory
            G = gamma * G + trace[2]  # update G
            
            # Unless the pair (s,a) appears, i.e., first visit check
            if not (trace[0], trace[1]) in [(np.array(Traces[::-1])[i,0], np.array(Traces[::-1])[i,1]) for i in range(index+1, len(Traces[::-1]))]:
                Return[trace[0]][trace[1]].append(G)
                Q_all[trace[0]][trace[1]] = np.mean(Return[trace[0]][trace[1]])

                A_star = argmax_random(Q_all[trace[0]])  # find A*, with ties broken arbitratily

                for a in range(env.n_action):  # sweep all the actions in the action space
                    # determine the policy for the current state based on Q
                    if a == A_star:
                        policy[trace[0], a] = 1 - epsilon + epsilon/env.n_action
                    else:
                        policy[trace[0], a] = epsilon/env.n_action

    for s in range(env.n_state):  # sweep all the states in the state space
        policy_opt[s] = (np.unique(np.argwhere(policy[s] == np.max(policy[s])))).tolist()  # find the optimal policy under the Q function

    Q_opt = np.max(Q_all, axis=1)  # find the optimal Q function

    return Q_all, Q_opt, policy_opt

3. Results

In [9]:
# Initialize the environment, set up parameters
max_ep = 5000
gam = 0.95
eps = 0.85  # epsilon, Algorithm parameter: small ϵ > 0
Env = ENV()

# Find the optimal policy
Q_1, Q_opt_1, pol_opt_1 = MCM_exploring_starts(max_ep, gam, Env)
Q_2, Q_opt_2, pol_opt_2 = MCM_epsilon_soft(max_ep, gam, eps, Env)

# Print the results
print('Monte Carlo method with exploring starts:')
# print(np.array(Q_1), '\n')
print((np.array(print_policy(pol_opt_1, Env))).reshape([5, -1]), '\n')

print('Monte Carlo method with ϵ-soft approach:')
# print(np.array(Q_2), '\n')
print((np.array(print_policy(pol_opt_2, Env))).reshape([5, -1]), '\n')

Monte Carlo method with exploring starts:
[['→' '→' '←' '→' '←']
 ['↑' '↑' '↑' '→' '↑']
 ['↑' '↑' '↑' '→' 'o']
 ['↓' '↑' '↑' '↑' '↑']
 ['o' '←' '←' '↑' '↑']] 

Monte Carlo method with ϵ-soft approach:
[['→' '↑' '←' '←' '→']
 ['↑' '↑' '↑' '↑' '↑']
 ['↑' '↑' '↑' '↑' 'o']
 ['↓' '↑' '↑' '↑' '↑']
 ['o' '←' '←' '↑' '↑']] 



In [13]:
print('Monte Carlo method with ϵ-soft approach: \n')
max_ep = 5000
eps = 0.90
Q_2_1, Q_opt_2_1, pol_opt_2_1 = MCM_epsilon_soft(max_ep, gam, eps, Env)
print('ϵ =', eps)
print((np.array(print_policy(pol_opt_2_1, Env))).reshape([5, -1]), '\n')

max_ep = 1000
eps = 0.70
Q_2_2, Q_opt_2_2, pol_opt_2_2 = MCM_epsilon_soft(max_ep, gam, eps, Env)
print('ϵ =', eps)
print((np.array(print_policy(pol_opt_2_2, Env))).reshape([5, -1]), '\n')

# !!!!!!!!!!!!! Take a long time to run (around 15 min) !!!!!!!!!!!!!!!!
max_ep = 500
eps = 0.50
Q_2_3, Q_opt_2_3, pol_opt_2_3 = MCM_epsilon_soft(max_ep, gam, eps, Env)
print('ϵ =', eps)
print((np.array(print_policy(pol_opt_2_3, Env))).reshape([5, -1]), '\n')

Monte Carlo method with ϵ-soft approach: 

ϵ = 0.9
[['→' '↑' '←' '←' '↑']
 ['↑' '↑' '↑' '↑' '↑']
 ['↑' '↑' '↑' '→' 'o']
 ['↓' '↑' '↑' '↑' '↑']
 ['o' '←' '←' '↑' '↑']] 

ϵ = 0.7
[['→' '←' '←' '←' '→']
 ['↑' '↑' '↑' '←' '↑']
 ['↑' '↑' '↑' '↑' 'o']
 ['↑' '↑' '↑' '↑' '↑']
 ['o' '↑' '↑' '↑' '↑']] 

ϵ = 0.5
[['→' '←' '←' '←' '↑']
 ['↑' '↑' '↑' '←' '↑']
 ['→' '↑' '↑' '↑' 'o']
 ['→' '↑' '↑' '←' '←']
 ['o' '↑' '↑' '↑' '←']] 



<font size="6">Part 2.2</font>

1. Use a behaviour policy with equiprobable moves

In [3]:
def MCM_behaviour_policy(max_ep, gamma, env):
    """ 
    Monte Carlo method with behaviour policy with equiprobable moves
    Input:
        max_ep (int) - maximum number of episodes
        gamma (float) - reward discount factor
        env (class) - gridworld environment with the following parameters:
            env.n_state (int) - number of states
            env.n_action (int)   - number of actions
            env.model (4d array) - (n_state) by (n_action) by (n) by (4) array, for state s and action a,
                                    there are n possibilities, each row is composed of (p, s_, r, t)
                                    p  - transition probability from (s,a) to (s_)   ### sum of the n p equals 1
                                    s_ - next state
                                    r  - reward of the transition from (s,a) to (s_)
                                    t  - terminal information, a bool value, True/False
    Output:
        Q_all (2d array) - Estimated Q function, for each state and each action
        Q_opt (1d array) - Estimated optimal Q function
        policy_opt (2d array) - Calculated optimum policy, the first dimension is for states, the second dimension is for possible multiple choices
    """

    # Initialize
    policy_opt = [[0] for _ in range(env.n_state)]  # initialize a deterministic policy
    # Q_all = [[(random.random()) for _ in range(env.n_action)] for _ in range(env.n_state)]  # Q function
    Q_all = [[-0.21 for _ in range(env.n_action)] for _ in range(env.n_state)]  # Q function
    
    Q_opt = [0.0 for _ in range(env.n_state)]
    C = [[0.0 for _ in range(env.n_action)] for _ in range(env.n_state)]  # cumulative sum
    
    pi = np.argmax(Q_all, axis=1)  # initialize the optimal policy with ties broken consistently (np.argmax() always selects the first largest term)
    b = np.ones([env.n_state, env.n_action]) / env.n_action  # equiprobable behaviour policy

    for ep in range(max_ep):
        # s = exploring_starts(env, es=False)  # fix start point
        s = exploring_starts(env, es=True)  # exploring starts

        Traces = []  # initialize the moving trajectory
        
        # Generate an episode
        while 1:
            a = np.random.choice(np.arange(env.n_action),p=b[s])  # choose the action under the equiprobable behaviour policy
            
            # determine the next state, reward, terminal
            temp = env.model[s][a]
            p_next = np.random.choice([i for i in range(len(temp))], p = [temp[j][0] for j in range(len(temp))])  # if there are multiple choice, e.g., at green
            _, s_, r, t = temp[p_next]

            Traces.append([s, a, r])  # generate the moving trajectory

            s = s_  # update state

            if t:  # if terminal, break
                break
        
        # Loop for each step of the episode
        G = 0  # initialize G
        W = 1  # initialize weight
        for index, trace in enumerate(Traces[::-1]):  # Start from the end of the trajectory
            G = gamma * G + trace[2]  # update G
            C[trace[0]][trace[1]] += W  # update C
            Q_all[trace[0]][trace[1]] += (W / C[trace[0]][trace[1]]) * (G - Q_all[trace[0]][trace[1]])  # update Q

            pi[trace[0]] = np.argmax(Q_all[trace[0]])  # update policy

            if trace[1] != pi[trace[0]]: # check whether the policy takes the same action as the behaviour policy
                break

            W = W / b[trace[0]][trace[1]]  # update W


    for s in range(env.n_state):  # sweep all the states in the state space
        policy_opt[s] = (np.unique(np.argwhere(Q_all[s] == np.max(Q_all[s])))).tolist()  # find the optimal policy under the Q function

    Q_opt = np.max(Q_all, axis=1)  # find the optimal Q function

    return Q_all, Q_opt, policy_opt

2. Results

In [5]:
# Initialize the environment, set up parameters
max_ep = 200000
gam = 0.95
Env = ENV()

# Find the optimal policy
Q_3, Q_opt_3, pol_opt_3 = MCM_behaviour_policy(max_ep, gam, Env)

# Print the results
print('Monte Carlo method with behaviour policy with equiprobable moves:')
# print(np.array(Q_3), '\n')
print((np.array(print_policy(pol_opt_3, Env))).reshape([5, -1]), '\n')

Monte Carlo method with behaviour policy with equiprobable moves:
[['→' 'o' '←' '→' 'o']
 ['→↑' '↑' '←↑' '↑' '↑']
 ['↓' '↑' '←↑' '←' 'o']
 ['↓' '→' '↑' '→↑' '↑']
 ['o' '→' '↑' '↓' '↑']] 



<font size="6">Part 2.3</font>

1. Policy iteration (permute the locations of the green and blue squares with probability 0.1)

In [11]:
def MCM_policy_iteration(max_ep, gamma, env, permute=False):
    """ 
    Monte Carlo method with policy iteration (permute the locations of the green and blue squares with probability 0.1)
    Input:
        max_ep (int) - maximum number of episodes
        gamma (float) - reward discount factor
        env (class) - gridworld environment with the following parameters:
            env.n_state (int) - number of states
            env.n_action (int)   - number of actions
            env.model (4d array) - (n_state) by (n_action) by (n) by (4) array, for state s and action a,
                                    there are n possibilities, each row is composed of (p, s_, r, t)
                                    p  - transition probability from (s,a) to (s_)   ### sum of the n p equals 1
                                    s_ - next state
                                    r  - reward of the transition from (s,a) to (s_)
                                    t  - terminal information, a bool value, True/False
        permute (bool) - whether the green and blue squares will permute, default as False
    Output:
        Q_all (2d array) - Estimated Q function, for each state and each action
        Q_opt (1d array) - Estimated optimal Q function
        policy_opt (2d array) - Calculated optimum policy, the first dimension is for states, the second dimension is for possible multiple choices
    """

    # Initialize
    policy = np.ones([env.n_state, env.n_action]) / env.n_action  # initialize policy, the sum of each row is 1
    epsilon = 0.85

    policy_opt = [[0] for _ in range(env.n_state)]  # initialize a deterministic policy

    Q_all = [[0.0 for _ in range(env.n_action)] for _ in range(env.n_state)]  # Q function
    Q_opt = [0.0 for _ in range(env.n_state)]
    C = [[0.0 for _ in range(env.n_action)] for _ in range(env.n_state)]  # cumulative sum
    
    b = np.ones([env.n_state, env.n_action]) / env.n_action  # equiprobable behaviour policy

    for ep in range(max_ep):
        s = exploring_starts(env, es=True)  # exploring starts
        Traces = []  # initialize the moving trajectory

        # Generate an episode
        while 1:
            a = np.random.choice(np.arange(env.n_action),p=b[s])  # choose the action under the equiprobable behaviour policy

            # determine the next state, reward, terminal
            temp = env.model[s][a]
            p_next = np.random.choice([i for i in range(len(temp))], p = [temp[i][0] for i in range(len(temp))])  # if there are multiple choice, e.g., at green
            _, s_, r, t = temp[p_next]

            Traces.append([s, a, r])  # generate the moving trajectory

            s = s_  # update state

            # permute with probability 0.1 if permute = True
            if permute and (np.random.randint(10) == 0):
                env.permute_G_B()

            if t:  # if terminal, break
                break
        
        # Loop for each step of the episode
        G = 0  # initialize G
        W = 1  # initialize weight
        for index, trace in enumerate(Traces[::-1]):  # Start from the end of the trajectory
            G = gamma * G + trace[2]  # update G
            C[trace[0]][trace[1]] += W  # update C
            Q_all[trace[0]][trace[1]] += W / C[trace[0]][trace[1]] * (G - Q_all[trace[0]][trace[1]])  # update Q

            # use ϵ-soft approach to update the policy
            A_star = argmax_random(Q_all[trace[0]])  # find A*, with ties broken arbitratily

            for a in range(env.n_action):  # sweep all the actions in the action space
                # determine the policy for the current state based on Q
                if a == A_star:
                    policy[trace[0], a] = 1 - epsilon + epsilon/env.n_action
                else:
                    policy[trace[0], a] = epsilon/env.n_action

            W = W * (policy[trace[0], trace[1]] / b[trace[0]][trace[1]])  # update W

    for s in range(env.n_state):  # sweep all the states in the state space
        policy_opt[s] = (np.unique(np.argwhere(policy[s] == np.max(policy[s])))).tolist()  # find the optimal policy under the Q function

    Q_opt = np.max(Q_all, axis=1)  # find the optimal Q function

    return Q_all, Q_opt, policy_opt

2. Results

In [13]:
# Initialize the environment, set up parameters
max_ep = 50000
gam = 0.95
Env = ENV()

# Find the optimal policy
Q_4, Q_opt_4, pol_opt_4 = MCM_policy_iteration(max_ep, gam, Env, permute=True)
Q_5, Q_opt_5, pol_opt_5 = MCM_policy_iteration(max_ep, gam, Env)

# Print the results
print('Monte Carlo method with policy iteration (permute the blue and green squares):')
# print(np.array(Q_4), '\n')
print((np.array(print_policy(pol_opt_4, Env))).reshape([5, -1]), '\n')

print('Monte Carlo method with policy iteration:')
# print(np.array(Q_5), '\n')
print((np.array(print_policy(pol_opt_5, Env))).reshape([5, -1]), '\n')

Monte Carlo method with policy iteration (permute the blue and green squares):
[['→' '↑' '←' '→' '←']
 ['↑' '↑' '↑' '↑' '↑']
 ['↑' '↑' '↑' '↑' 'o']
 ['↓' '↑' '↑' '↑' '↑']
 ['o' '←' '←' '↑' '↑']] 

Monte Carlo method with policy iteration:
[['→' '←' '←' '←' '→']
 ['↑' '↑' '↑' '↑' '↑']
 ['↑' '↑' '↑' '↑' 'o']
 ['↓' '↑' '↑' '↑' '↑']
 ['o' '←' '←' '↑' '↑']] 

