## Transition model

In [11]:
import numpy as np
import numpy.linalg as lin

v = np.array([
    [0.5, 0.5]
])

T = np.array([
    [0.9, 0.1],
    [0.5, 0.5]
])

T_3 = lin.matrix_power(T, 3)
T_10 = lin.matrix_power(T, 10)
T_20 = lin.matrix_power(T, 20)
T_50 = lin.matrix_power(T, 50)
T_100 = lin.matrix_power(T, 100)

print("v:\n", v)
print("v_3:\n", str(np.dot(v, T_3)))
print("v_10:\n", str(np.dot(v, T_10)))
print("v_20:\n", str(np.dot(v, T_20)))
print("v_50:\n", str(np.dot(v, T_50)))
print("v_100:\n", str(np.dot(v, T_100)))

v:
 [[0.5 0.5]]
v_3:
 [[0.812 0.188]]
v_10:
 [[0.83329838 0.16670162]]
v_20:
 [[0.83333333 0.16666667]]
v_50:
 [[0.83333333 0.16666667]]
v_100:
 [[0.83333333 0.16666667]]


## Markov Decision Process

In [45]:
def utility(v, T, u, reward, gamma):
    """Return the state utility.

    @param v the state vector
    @param T transition matrix
    @param u utility vector
    @param reward for that state
    @param gamma discount factor
    @return the utility of the state
    """
    action_utility = np.zeros(4)
    for action in range(0,4):
        action_utility[action] = np.sum(np.multiply(u, np.dot(v, T[:,:,action])))
    future_utility = max(action_utility)
    return reward + gamma * future_utility

def run():
    #Starting state vector
    #The agent starts from (1, 1)
    v = np.array([[0.0, 0.0, 0.0, 0.0, 
                   0.0, 0.0, 0.0, 0.0, 
                   1.0, 0.0, 0.0, 0.0]])

    #Transition matrix loaded from file
    #(It is too big to write here)
    T = np.load("T.npy")

    #Utility vector
    u = np.array([[0.812, 0.868, 0.918,   1.0,
                   0.762,   0.0, 0.660,  -1.0,
                   0.705, 0.655, 0.611, 0.388]])

    #Defining the reward for state (1,1)
    reward = -0.04
    #Assuming that the discount factor is equal to 1.0
    gamma = 1.0

    #Use the Bellman equation to find the utility of state (1,1)
    utility_11 = utility(v, T, u, reward, gamma)
    print("Utility of state (1,1): " + str(utility_11))

run()

Utility of state (1,1): 0.7056


## Value Iteration Algorithm

In [37]:
def value_iteration():
    
    states_count = 12

    T = np.load("T.npy")

    u = np.array([0.0, 0.0, 0.0,  0.0,
                   0.0, 0.0, 0.0,  0.0,
                   0.0, 0.0, 0.0,  0.0])
    
    r = np.array([-0.04, -0.04, -0.04,  +1.0,
                  -0.04,   0.0, -0.04,  -1.0,
                  -0.04, -0.04, -0.04, -0.04])
    gamma = 0.9999
    epsilon = 0.0001

    iteration = 0
    
    while True:
        delta = 0
        u0 = u.copy()
        iteration += 1
        for s in range(states_count):
            reward = r[s]
            v = np.zeros((1, states_count))
            v[0,s] = 1.0
            u[s] = calculate_utility(v, T, u0, reward, gamma)
            delta = max(delta, np.abs(u[s] - u0[s]))
            
        if delta < epsilon * (1 - gamma) / gamma:
                print("=================== FINAL RESULT ==================")
                print("Iterations: " + str(iteration))
                print("Delta: " + str(delta))
                print("Gamma: " + str(gamma))
                print("Epsilon: " + str(epsilon))
                print("===================================================")
                print(u[0:4])
                print(u[4:8])
                print(u[8:12])
                print("===================================================")
                break    
                
value_iteration()

Iterations: 36
Delta: 4.681293019892507e-09
Gamma: 0.9999
Epsilon: 0.0001
[0.81119816 0.86756709 0.91768055 1.        ]
[ 0.76109801  0.          0.66008269 -1.        ]
[0.70474406 0.65465721 0.61074329 0.38727988]


## Policy Iteration Algorithm

In [43]:
def policy_evaluation(p, u, r, T, gamma):
    """
    Return the policy utility.
    @param p policy vector
    @param u utility vector
    @param r reward vector
    @param T transition matrix
    @param gamma discount factor
    @return the utility vector u
    """
    for s in range(12):
        if np.isnan(p[s]): 
            continue
        v = np.zeros((1,12))
        v[0,s] = 1.0
        action = int(p[s])
        u[s] = r[s] + gamma * np.sum(np.multiply(u, np.dot(v, T[:,:,action])))
    
    return u

def expected_action(u, T, v):
    """Return the expected action.

    It returns an action based on the
    expected utility of doing a in state s, 
    according to T and u. This action is
    the one that maximize the expected
    utility.
    @param u utility vector
    @param T transition matrix
    @param v starting vector
    @return expected action (int)
    """
    actions_utility = np.zeros(4)
    for action in range(4):
        actions_utility[action] = np.sum(np.multiply(u, np.dot(v, T[:,:,action])))
    
    return np.argmax(actions_utility)

def print_policy(p, shape):
    """Printing utility.

    Print the policy actions using symbols:
    ^, v, <, > up, down, left, right
    * terminal states
    # obstacles
    """
    state = 0
    policy_string = ""
    for row in range(shape[0]):
        for col in range(shape[1]):
            if(p[state] == -1): policy_string += " *  "            
            elif(p[state] == 0): policy_string += " ^  "
            elif(p[state] == 1): policy_string += " <  "
            elif(p[state] == 2): policy_string += " v  "           
            elif(p[state] == 3): policy_string += " >  "
            elif(np.isnan(p[state])): policy_string += " #  "
            state += 1
        policy_string += '\n'
    print(policy_string)

In [44]:
def policy_iteration():
    states_count = 12

    T = np.load("T.npy")

    u = np.array([0.0, 0.0, 0.0,  0.0,
                  0.0, 0.0, 0.0,  0.0,
                  0.0, 0.0, 0.0,  0.0])
    
    r = np.array([-0.04, -0.04, -0.04,  +1.0,
                  -0.04,   0.0, -0.04,  -1.0,
                  -0.04, -0.04, -0.04, -0.04])
    
    p = np.random.randint(0, 4, size=(12)).astype(np.float32)
    p[5] = np.NaN
    p[3] = p[7] = -1
    
    gamma = 0.9999
    epsilon = 0.0001
    iteration = 0
    
    while True:
        iteration += 1
        #1- Policy evaluation
        u0= u.copy()
        u = policy_evaluation(p, u, r, T, gamma)
        delta = np.absolute(u - u0).max()
        
        if delta < epsilon * (1 - gamma) / gamma: break
        
        for s in range(12):
            if not np.isnan(p[s]) and p[s]!=-1:
                v = np.zeros((1,12))
                v[0,s] = 1.0
                #2- Policy improvement    
                p[s] = return_expected_action(u, T, v)
                
    print("=================== FINAL RESULT ==================")
    print("Iterations: " + str(iteration))
    print("Delta: " + str(delta))
    print("Gamma: " + str(gamma))
    print("Epsilon: " + str(epsilon))
    print("===================================================")
    print(u[0:4])
    print(u[4:8])
    print(u[8:12])
    print("===================================================")
    print_policy(p, shape=(3,4))
    print("===================================================")
    
policy_iteration()

Iterations: 25
Delta: 3.878557808256744e-09
Gamma: 0.9999
Epsilon: 0.0001
[0.81119816 0.86756709 0.91768055 1.        ]
[ 0.76109801  0.          0.66008269 -1.        ]
[0.70474405 0.65465721 0.61074329 0.38727989]
 >   >   >   *  
 ^   #   ^   *  
 ^   <   <   <  

