#  Case study 4.4. The Grid-World MDP problem. Practical implementation

Consider a simple grid-world problem, where the environment is a rectangular
grid. The state of the environment is given by the location of a robot in the
grid. Hence, the cells of the grid correspond to the possible states of the
environment. At each cell, four actions are possible: north, south, east and
west, which deterministically (i.e., with probability 1) cause the agent to
move one cell in the respective direction of the grid. Actions that would
take the robot off the grid leave its location unchanged, but also results in
a negative (undesired) reward of −1. Other actions result in a reward of 0,
except those that move the agent out of the special states A and B. From
state A, all four actions yield a positive (desired) reward of +10 and take the
agent to A 0 . From state B, all four actions yield a positive reward of +5 and
take the agent to B 0 .

In [1]:
import numpy as np
import itertools as IT
np.set_printoptions(precision=2, suppress=True)
from set_up_grid_world import Grid_World

env = Grid_World()

#initial parameters
gamma = env.gamma
R = env.R
P = env.P

N_states = env.N_states
N_actions = env.N_actions
pi_m = env.pi_m

*The main problem adapting the gridworld script was not that it was long or a different language, I changed it with a couple 'find and replace' actions from a good text editor in almost 5 minutes. The problem was that every index was shifted by 1 (in matlab index start at 1 and in python and the rest of serious programming languages at 0). So the options i tried were:*

1. *If the matlab script was adapted from another language script, maybe i could find the original one and shift back the indexes.*

1. *More 'find and replace' and change every index to add a '-1' at the end, so they would make sense in python.*



Helper Functions

In [2]:
from numpy.linalg import inv

# function to evaluate the state value(V) function of a certain policy
def eval_v(policy):
    P_pi = np.matmul(policy, P)
    R_pi = np.matmul(policy, R)

    return np.matmul(inv(np.identity(P_pi.shape[0]) - gamma * P_pi), R_pi)


# function to evaluate the state-action(Q) value function of a certain policy
def eval_q(policy):
    # product of the transition matrix with the policy
    P_aux = np.matmul(P, policy)

    return np.matmul(inv(np.identity(P_aux.shape[0]) - gamma * P_aux), R)


# rewards for a state
def R_state(state):
    return R[N_actions * state: N_actions * state + N_actions]


# transition probabilities for a state
def P_state(state):
    return P[N_actions * state: N_actions * state + N_actions, ]


def R_(state, action=None):
    if action == None:
        return R_state(state)
    else:
        return R_state(state)[action]


def P_(state, action=None, state_t1=None):
    if action == None:
        if state_t1 == None:
            return P_state(state)
        else:
            return P_state(state)[:, state_t1]
    else:
        if state_t1 == None:
            return P_state(state)[action,]
        else:
            return P_state(state)[action, state_t1]


def policy_(policy, state, action=None):
    if action is not None:
        return policy[state][state * N_actions + action]
    else:
        return [policy[state][state * N_actions + action_] for action_ in range(N_actions)]


# We define the algorithm for policy evaluation with the state value function
def policy_evaluation_v(policy):
    # 1
    v = np.zeros(N_states)
    epsilon = 0.001
    delta = float('inf')
    # 2, 8 stop condition is not stated, instead we do 400 iterations
    while delta>epsilon:
    #for i in range(N_steps):
        # 3
        delta = 0
        # 4
        for state in range(N_states):
            # 5
            v_old = v[state]
            # 6
            v_aux = 0
            for action in range(N_actions):
                v_aux += policy_(policy, state, action) * (R_(state, action) + gamma * sum(
                    [P_(state, action, state_t1) * v[state_t1] for state_t1 in range(N_states)]))
            v[state] = v_aux
            # 7
            delta = max(delta, np.abs(v_old - v[state]))

    # 8,9
    return v


# We define the algorithm for policy evaluation with the state-action
# value function
# We define the algorithm for policy evaluation with the state value function
def policy_evaluation_q(policy):
    # 1
    q = np.zeros(N_states * N_actions)
    epsilon = 0.001
    delta = float('inf')
    # 2, 8 stop condition is not stated, instead we do 400 iterations
    while delta>epsilon:
    #for i in range(N_steps):
        # 3
        delta = 0
        # 4 TO-DO
        for state, action in IT.product(range(N_states), range(N_actions)):
            # 5
            q_old = q[state * N_actions + action]
            # 6
            q_aux = [R_(state, action)]
            for state_t1 in range(N_states):
                q_aux += gamma * P_(state, action, state_t1) * sum(
                    [policy_(policy, state_t1, action_t1) * q[state_t1 * N_actions + action_t1] for action_t1 in
                     range(N_actions)])
            q[state * N_actions + action] = q_aux
            # 7
            delta = max(delta, np.abs(q_old - q[state* N_actions + action]))

    return q


# Policy iteration for state value function
def policy_iteration_v(policy):
    v = np.zeros(N_states)  # 1
    theta = False
    while not theta:  # 2,3
        v = policy_evaluation_v(policy)  # 4-9
        # print('\n v:',v)
        theta = True  # 10
        for s in range(N_states):  # 11
            # print('\n s:',s)
            a = policy_(policy, s)  # 12
            # print('a:',a)

            # 13
            arg_max = np.argmax([(R_(s, a_t1) + gamma *
                                  np.sum([np.dot(P_(s, a_t1, s_t1), v[s_t1]) for s_t1 in range(N_states)]))
                                 for a_t1 in range(N_actions)])
            # print('arg_max:',arg_max)
            policy[s] = [0] * len(policy[s])
            policy[s][N_actions * s + arg_max] = 1

            # print('pi[s]:',policy[s])
            # print('policy_(policy, s):',policy_(policy, s))
            if not (a == policy_(policy, s)): theta = False  # 14
            # print('a not equal to policy_(policy, s)')

            # print('theta:',theta)
        # print('pi:',policy)
    return policy  # 15


# Policy iteration for state-action value function
def policy_iteration_q(policy):
    q = np.zeros(N_states * N_actions)  # 1
    theta = False
    while not theta:  # 2,3
        q = policy_evaluation_q(policy)  # 4-9
        # print('\n q:',q)
        theta = True  # 10
        for s in range(N_states):  # 11
            # print('\n s:',s)
            a = policy_(policy, s)  # 12
            # print('a:',a)

            # 13
            arg_max = np.argmax([q[s * N_actions + a_t1] for a_t1 in range(N_actions)])
            # print('arg_max:',arg_max)
            policy[s] = [0] * len(policy[s])
            policy[s][N_actions * s + arg_max] = 1

            # print('pi[s]:',policy[s])
            # print('policy_(policy, s):',policy_(policy, s))
            if not (a == policy_(policy, s)): theta = False  # 14
            # print('a not equal to policy_(policy, s)')

            # print('theta:',theta)
        # print('pi:',policy)
    return policy  # 15


def value_iteration_v():
    policy = np.zeros([N_states, (N_states * N_actions)])
    v = np.zeros(N_states)  # 1
    epsilon = 0.01
    delta = float('inf')
    while delta > epsilon:  # 2
        delta = 0  # 3

        # print('\n v:',v)
        for s in range(N_states):  # 4
            # print('\n s:',s)

            v_old = v[s]  # 5
            # print('\n v_old:',v_old)
            # 6
            v[s] = max([(R_(s, a) + gamma *
                            np.sum([np.dot(P_(s, a, s_t1), v[s_t1]) for s_t1 in range(N_states)]))
                           for a in range(N_actions)])
            # print('\n v[s]:',v[s])
            delta = max(delta, np.abs(v_old - v[s]))  # 8
            # print('delta:',delta)

    # (out of the while)
    for s in range(N_states):  # 9
        # 10
        arg_max = np.argmax([(R_(s, a) + gamma *
                              np.sum([np.dot(P_(s, a, s_t1), v[s_t1]) for s_t1 in range(N_states)]))
                             for a in range(N_actions)])
        # print('arg_max:',arg_max)
        policy[s] = [0] * (N_states * N_actions)
        policy[s][N_actions * s + arg_max] = 1

        # print('pi[s]:',policy[s])

    # print('pi:',policy)
    return policy  # 11


def value_iteration_q():
    policy = np.zeros([N_states, (N_states * N_actions)])
    q = np.zeros(N_states * N_actions)  # 1
    epsilon = 0.01
    delta = float('inf')
    while delta > epsilon:  # 2, 8
        delta = 0  # 3

        # print('\n q:',q)
        for s, a in IT.product(range(N_states), range(N_actions)):  # 4
            # print('\n s , a:',s , a)

            q_old = q[s * N_actions + a]  # 5
            # print('\n q_old:',q_old)

            # 6
            q[s * N_actions + a] = R_(s, a) + gamma * np.sum([np.dot(P_(s, a, s_t1),
                                                                     max([q[s_t1 * N_actions + a_t1] for a_t1 in
                                                                             range(N_actions)]))
                                                              for s_t1 in range(N_states)])

            # print('\n q(s,a)',q[s * N_actions + a])
            delta = max(delta, np.abs(q_old - q[s * N_actions + a]))  # 7
            # print('delta:',delta)

    # (out of the while)
    for s in range(N_states):  # 9
        # 10
        arg_max = np.argmax([q[s * N_actions + a_t1] for a_t1 in range(N_actions)])
        # print('arg_max:',arg_max)
        policy[s] = [0] * (N_states * N_actions)
        policy[s][N_actions * s + arg_max] = 1

        # print('pi[s]:',policy[s])

    # print('pi:',policy)
    return policy  # 11

a) Suppose the agent selects all four actions with equal probability in all
states (uniform random policy). Calculate analytically the value function for
the random policy when γ = 0.9. First of all, analyze the code provided in
grid_world_set_up.m where all involved matrices and vectors are provided as inputs to requested scripts. You should use the script ana_grid_world.m
as the guide for all questions in this case study.
Calculate analytically the value function for the random policy when
γ = 0.9. For that purpose create function bellman_linear.m.
Create function policy_evaluation.m and check that results coincide with
those provided by bellman_linear.m and also with numbers in the figure

In [3]:
# Get the value function for policy pi_m with 2 different methods

v_equations = eval_v(pi_m)
v_iteration = policy_evaluation_v(pi_m)

print('\n Value function calculated using the inverse: \n', np.matrix(v_equations).reshape((5,5)).T)
print('\n Value function calculated iteratively:       \n', np.matrix(v_iteration).reshape((5,5)).T)
print('\n Differences between the 2 matrices:')
print (np.matrix(v_equations).reshape((5,5))-np.matrix(v_iteration).reshape((5,5)))


 Value function calculated using the inverse: 
 [[ 3.31  8.79  4.43  5.32  1.49]
 [ 1.52  2.99  2.25  1.91  0.55]
 [ 0.05  0.74  0.67  0.36 -0.4 ]
 [-0.97 -0.44 -0.35 -0.59 -1.18]
 [-1.86 -1.35 -1.23 -1.42 -1.98]]

 Value function calculated iteratively:       
 [[ 3.31  8.79  4.43  5.33  1.5 ]
 [ 1.53  3.    2.25  1.91  0.55]
 [ 0.05  0.74  0.68  0.36 -0.4 ]
 [-0.97 -0.43 -0.35 -0.58 -1.18]
 [-1.85 -1.34 -1.23 -1.42 -1.97]]

 Differences between the 2 matrices:
[[-0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0.]]


![img](imgs/4.4.1.png)

b) Use your Matlab implementations of PI and VI to compute the optimal
value function and the associated optimal policy. Implement value_iteration.m
and policy_iteration.m to compute the optimal value function and the asso-
ciated optimal policy. Both results should coincide and also cross-check with
those presented in the next figure.

*I created a fancy method for visualizing the policies with arrows (Python supports emojis 👌)*

In [4]:
def plot_policy(policy):
    symbols = ['⬆️','➡️','⬇️','⬅️']
    plot = [''] * 25
    for s in range(N_states):
        plot[s] = symbols[np.argmax(policy[s,s*N_actions:s*N_actions+4])]
        
    print(np.reshape(plot,(5,5)).T)

In [5]:
# Get the optimal policy with 2 different methods

policy_PI = policy_iteration_q(pi_m)
policy_VI = value_iteration_q()

print('\n Policy calculated using the Policy Iteration: \n')
plot_policy(policy_PI)
print('\n Policy calculated using the Value Iteration:  \n')
plot_policy(policy_PI)
print('\n MSE Differences between the 2 policies:', np.sum(np.square(np.matrix(policy_PI)-np.matrix(policy_PI))))


v_equations = eval_v(policy_PI)

print('\n \n Value function of the optimal policy: \n', np.matrix(v_equations).reshape((5,5)).T)


 Policy calculated using the Policy Iteration: 

[['➡️' '⬆️' '⬅️' '⬆️' '⬅️']
 ['⬆️' '⬆️' '⬆️' '⬅️' '⬅️']
 ['⬆️' '⬆️' '⬆️' '⬆️' '⬆️']
 ['⬆️' '⬆️' '⬆️' '⬆️' '⬆️']
 ['⬆️' '⬆️' '⬆️' '⬆️' '⬆️']]

 Policy calculated using the Value Iteration:  

[['➡️' '⬆️' '⬅️' '⬆️' '⬅️']
 ['⬆️' '⬆️' '⬆️' '⬅️' '⬅️']
 ['⬆️' '⬆️' '⬆️' '⬆️' '⬆️']
 ['⬆️' '⬆️' '⬆️' '⬆️' '⬆️']
 ['⬆️' '⬆️' '⬆️' '⬆️' '⬆️']]

 MSE Differences between the 2 policies: 0.0

 
 Value function of the optimal policy: 
 [[21.98 24.42 21.98 19.42 17.48]
 [19.78 21.98 19.78 17.8  16.02]
 [17.8  19.78 17.8  16.02 14.42]
 [16.02 17.8  16.02 14.42 12.98]
 [14.42 16.02 14.42 12.98 11.68]]


![img](imgs/4.4.2.png)