In [44]:
import numpy as np
from numpy.linalg import inv

class MDP:
    
    '''Constructor for the MDP class

    Inputs:
    T -- Transition function: |A| x |S| x |S'| array
    R -- Reward function: |A| x |S| array
    discount -- discount factor: scalar in [0,1)

    The constructor verifies that the inputs are valid and sets
    corresponding variables in a MDP object'''
    def __init__(self,T,R,discount):
        assert T.ndim == 3, "Invalid transition function: it should have 3 dimensions"
        self.nActions = T.shape[0]
        self.nStates = T.shape[1]
        assert T.shape == (self.nActions,self.nStates,self.nStates), "Invalid transition function: it has dimensionality " + repr(T.shape) + ", but it should be (nActions,nStates,nStates)"
        assert (abs(T.sum(2)-1) < 1e-5).all(), "Invalid transition function: some transition probability does not equal 1"
        self.T = T
        assert R.ndim == 2, "Invalid reward function: it should have 2 dimensions" 
        assert R.shape == (self.nActions,self.nStates), "Invalid reward function: it has dimensionality " + repr(R.shape) + ", but it should be (nActions,nStates)"
        self.R = R
        assert 0 <= discount < 1, "Invalid discount factor: it should be in [0,1)"
        self.discount = discount
        

        
    '''Value iteration procedure
    V <-- max_a R^a + gamma T^a V

    Inputs:
    initialV -- Initial value function: array of |S| entries
    nIterations -- limit on the # of iterations: scalar (default: infinity)
    tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

    Outputs: 
    V -- Value function: array of |S| entries
    iterId -- # of iterations performed: scalar
    epsilon -- ||V^n-V^n+1||_inf: scalar'''
    def valueIteration(self,initialV,nIterations=np.inf,tolerance=0.01,verbose=False):
        V = initialV
        last_V = V
        iterId = 0
        epsilon = 0

        for i in range(nIterations): # limit on # of iterations
            max_reward = None
            for action in range(self.nActions):
                reward = self.R[action] + self.discount * (self.T[action] @ V)
                if max_reward is None: max_reward = reward
                max_reward = np.maximum(max_reward, reward)
            last_V = V
            V = max_reward
            # print(V)
            epsilon =  np.max(np.abs(V - last_V))
            if (epsilon < tolerance): break
            iterId = iterId + 1 # increment

        return [V,iterId,epsilon]
    
    
    
    
    '''Procedure to extract a policy from a value function
    pi <-- argmax_a R^a + gamma T^a V

    Inputs:
    V -- Value function: array of |S| entries

    Output:
    policy -- Policy: array of |S| entries'''
    def extractPolicy(self,V):
        policy = np.array([0]*self.nStates) # default: first action
        max_reward = None
        for action in range(self.nActions):
            reward = self.R[action] + self.discount * (self.T[action] @ V)
            if max_reward is None: max_reward = reward
            for state in range(self.nStates):
                if reward[state] > max_reward[state]:
                    policy[state] = action
                    max_reward[state] = reward[state]
        return policy

In [45]:
# Transition function: |A| x |S| x |S'| array
T = np.array([[[0.5,0.5,0,0],[0,1,0,0],[0.5,0.5,0,0],[0,1,0,0]],[[1,0,0,0],[0.5,0,0,0.5],[0.5,0,0.5,0],[0,0,0.5,0.5]]])


# Reward function: |A| x |S| array
R = np.array([[0,0,10,10],[0,0,10,10]])


# Discount factor: scalar in [0,1)
discount = 0.9        


# MDP object
mdp = MDP(T,R,discount)


# Compute value function by value iteration
[V,nIterations,epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates), nIterations=6, verbose=True)



# extract policy
policy = mdp.extractPolicy(V)
print(f'Policy: {policy}')

Policy: [0 1 1 1]


In [46]:
# Transition function: |A| x |S| x |S'| array
T = np.zeros([4,17,17])
a = 0.7;  # intended move
b = 0.15;  # lateral move

# up (a = 0)

T[0,0,0] = a+b;
T[0,0,1] = b;

T[0,1,0] = b;
T[0,1,1] = a;
T[0,1,2] = b;

T[0,2,1] = b;
T[0,2,2] = a;
T[0,2,3] = b;

T[0,3,2] = b;
T[0,3,3] = a+b;

T[0,4,4] = b;
T[0,4,0] = a;
T[0,4,5] = b;

T[0,5,4] = b;
T[0,5,1] = a;
T[0,5,6] = b;

T[0,6,5] = b;
T[0,6,2] = a;
T[0,6,7] = b;

T[0,7,6] = b;
T[0,7,3] = a;
T[0,7,7] = b;

T[0,8,8] = b;
T[0,8,4] = a;
T[0,8,9] = b;

T[0,9,8] = b;
T[0,9,5] = a;
T[0,9,10] = b;

T[0,10,9] = b;
T[0,10,6] = a;
T[0,10,11] = b;

T[0,11,10] = b;
T[0,11,7] = a;
T[0,11,11] = b;

T[0,12,12] = b;
T[0,12,8] = a;
T[0,12,13] = b;

T[0,13,12] = b;
T[0,13,9] = a;
T[0,13,14] = b;

T[0,14,16] = 1;

T[0,15,11] = a;
T[0,15,14] = b;
T[0,15,15] = b;

T[0,16,16] = 1;

# down (a = 1)

T[1,0,0] = b;
T[1,0,4] = a;
T[1,0,1] = b;

T[1,1,0] = b;
T[1,1,5] = a;
T[1,1,2] = b;

T[1,2,1] = b;
T[1,2,6] = a;
T[1,2,3] = b;

T[1,3,2] = b;
T[1,3,7] = a;
T[1,3,3] = b;

T[1,4,4] = b;
T[1,4,8] = a;
T[1,4,5] = b;

T[1,5,4] = b;
T[1,5,9] = a;
T[1,5,6] = b;

T[1,6,5] = b;
T[1,6,10] = a;
T[1,6,7] = b;

T[1,7,6] = b;
T[1,7,11] = a;
T[1,7,7] = b;

T[1,8,8] = b;
T[1,8,12] = a;
T[1,8,9] = b;

T[1,9,8] = b;
T[1,9,13] = a;
T[1,9,10] = b;

T[1,10,9] = b;
T[1,10,14] = a;
T[1,10,11] = b;

T[1,11,10] = b;
T[1,11,15] = a;
T[1,11,11] = b;

T[1,12,12] = a+b;
T[1,12,13] = b;

T[1,13,12] = b;
T[1,13,13] = a;
T[1,13,14] = b;

T[1,14,16] = 1;

T[1,15,14] = b;
T[1,15,15] = a+b;

T[1,16,16] = 1;

# left (a = 2)

T[2,0,0] = a+b;
T[2,0,4] = b;

T[2,1,1] = b;
T[2,1,0] = a;
T[2,1,5] = b;

T[2,2,2] = b;
T[2,2,1] = a;
T[2,2,6] = b;

T[2,3,3] = b;
T[2,3,2] = a;
T[2,3,7] = b;

T[2,4,0] = b;
T[2,4,4] = a;
T[2,4,8] = b;

T[2,5,1] = b;
T[2,5,4] = a;
T[2,5,9] = b;

T[2,6,2] = b;
T[2,6,5] = a;
T[2,6,10] = b;

T[2,7,3] = b;
T[2,7,6] = a;
T[2,7,11] = b;

T[2,8,4] = b;
T[2,8,8] = a;
T[2,8,12] = b;

T[2,9,5] = b;
T[2,9,8] = a;
T[2,9,13] = b;

T[2,10,6] = b;
T[2,10,9] = a;
T[2,10,14] = b;

T[2,11,7] = b;
T[2,11,10] = a;
T[2,11,15] = b;

T[2,12,8] = b;
T[2,12,12] = a+b;

T[2,13,9] = b;
T[2,13,12] = a;
T[2,13,13] = b;

T[2,14,16] = 1;

T[2,15,11] = b;
T[2,15,14] = a;
T[2,15,15] = b;

T[2,16,16] = 1;

# right (a = 3)

T[3,0,0] = b;
T[3,0,1] = a;
T[3,0,4] = b;

T[3,1,1] = b;
T[3,1,2] = a;
T[3,1,5] = b;

T[3,2,2] = b;
T[3,2,3] = a;
T[3,2,6] = b;

T[3,3,3] = a+b;
T[3,3,7] = b;

T[3,4,0] = b;
T[3,4,5] = a;
T[3,4,8] = b;

T[3,5,1] = b;
T[3,5,6] = a;
T[3,5,9] = b;

T[3,6,2] = b;
T[3,6,7] = a;
T[3,6,10] = b;

T[3,7,3] = b;
T[3,7,7] = a;
T[3,7,11] = b;

T[3,8,4] = b;
T[3,8,9] = a;
T[3,8,12] = b;

T[3,9,5] = b;
T[3,9,10] = a;
T[3,9,13] = b;

T[3,10,6] = b;
T[3,10,11] = a;
T[3,10,14] = b;

T[3,11,7] = b;
T[3,11,11] = a;
T[3,11,15] = b;

T[3,12,8] = b;
T[3,12,13] = a;
T[3,12,12] = b;

T[3,13,9] = b;
T[3,13,14] = a;
T[3,13,13] = b;

T[3,14,16] = 1;

T[3,15,11] = b;
T[3,15,15] = a+b;

T[3,16,16] = 1;


# Reward function: |A| x |S| array
R = -1 * np.ones([4,17]);


# set rewards
R[:,14] = 100;  # goal state
R[:,9] = -70;   # bad state
R[:,16] = 0;    # end state


# Discount factor: scalar in [0,1)
discount = 0.95
        
# MDP object
mdp = MDP(T,R,discount)

In [47]:
#value iteration
[V,nIterations,epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates),nIterations=10000,tolerance=0.01)
print(f'Number of iterations: {nIterations}')
print('Value Function')

with np.printoptions(precision=2):
    print(np.reshape(V[:16],(4,4)))
print('Policy:')
actions = np.array(['U','D','L','R'])
policy = mdp.extractPolicy(V)
print(np.reshape(actions[policy[:16]],(4,4)))

Number of iterations: 25
Value Function
[[ 49.67  55.27  61.57  65.87]
 [ 48.01  52.3   68.14  73.25]
 [ 50.23  -0.42  77.07  81.36]
 [ 66.36  76.31 100.    89.91]]
Policy:
[['R' 'R' 'D' 'D']
 ['R' 'U' 'D' 'D']
 ['D' 'R' 'R' 'D']
 ['R' 'R' 'U' 'L']]
