<a href="https://colab.research.google.com/github/madsbibow/RL-exercises/blob/master/ValueIterStuff.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np

In [49]:
class MDP:
    '''A simple MDP class.  It includes the following members'''

    def __init__(self,T,R,discount):
        '''Constructor for the MDP class

        Inputs:
        T -- Transition function: |A| x |S| x |S'| array
        R -- Reward function: |A| x |S| array
        discount -- discount factor: scalar in [0,1)

        The constructor verifies that the inputs are valid and sets
        corresponding variables in a MDP object'''

        assert T.ndim == 3, "Invalid transition function: it should have 3 dimensions"
        self.nActions = T.shape[0]
        self.nStates = T.shape[1]
        assert T.shape == (self.nActions,self.nStates,self.nStates), "Invalid transition function: it has dimensionality " + repr(T.shape) + ", but it should be (nActions,nStates,nStates)"
        assert (abs(T.sum(2)-1) < 1e-5).all(), "Invalid transition function: some transition probability does not equal 1"
        self.T = T
        assert R.ndim == 2, "Invalid reward function: it should have 2 dimensions" 
        assert R.shape == (self.nActions,self.nStates), "Invalid reward function: it has dimensionality " + repr(R.shape) + ", but it should be (nActions,nStates)"
        self.R = R
        assert 0 <= discount < 1, "Invalid discount factor: it should be in [0,1)"
        self.discount = discount
        self.state_range = range(self.nStates)

    def valueIteration(self,initialV,nIterations=np.inf,tolerance=0.01):
        '''Value iteration procedure
        V <-- max_a R^a + gamma T^a V

        Inputs:
        initialV -- Initial value function: array of |S| entries
        nIterations -- limit on the # of iterations: scalar (default: infinity)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        V -- Value function: array of |S| entries
        iterId -- # of iterations performed: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''
        
        # temporary values to ensure that the code compiles until this
        # function is coded
        assert initialV.shape == (self.nStates,), "Invalid initial value: it has dimensionality {}, but is should be {}".format(initialV.shape,(self.nStates,))   
        V = initialV 
        iterId = 0
        epsilon = 0

        while iterId <= nIterations:
          V_old = V.copy()
          V     = (self.R + self.discount*np.dot(self.T, V)).max(0)
          
          epsilon = np.abs(V - V_old).max()
          if epsilon < tolerance:
            break
          iterId = iterId + 1
        
        return [V,iterId,epsilon]

    def extractPolicy(self,V):
        '''Procedure to extract a policy from a value function
        pi <-- argmax_a R^a + gamma T^a V

        Inputs:
        V -- Value function: array of |S| entries

        Output:
        policy -- Policy: array of |S| entries'''

        # temporary values to ensure that the code compiles until this
        # function is coded
        assert V.shape == (self.nStates,), "Invalid initial value: it has dimensionality {}, but is should be {}".format(V.shape,(self.nStates,))
        policy = (self.R + self.discount*np.dot(self.T, V)).argmax(0)

        return policy 

    def evaluatePolicy(self,policy):
        '''Evaluate a policy by solving a system of linear equations
        V^pi = R^pi + gamma T^pi V^pi

        Input:
        policy -- Policy: array of |S| entries

        Ouput:
        V -- Value function: array of |S| entries'''

        # temporary values to ensure that the code compiles until this
        # function is coded

        assert policy.shape == (self.nStates,), "Invalid initial value: it has dimensionality {}, but is should be {}".format(policy.shape,(self.nStates,))

        R_pi = self.R[policy,self.state_range]
        T_pi = self.T[policy,self.state_range,:]
        V = np.linalg.solve(np.eye(self.nStates)-self.discount*T_pi,R_pi) 
        
        return V
        
    def policyIteration(self,initialPolicy,nIterations=np.inf):
        '''Policy iteration procedure: alternate between policy
        evaluation (solve V^pi = R^pi + gamma T^pi V^pi) and policy
        improvement (pi <-- argmax_a R^a + gamma T^a V^pi).

        Inputs:
        initialPolicy -- Initial policy: array of |S| entries
        nIterations -- limit on # of iterations: scalar (default: inf)

        Outputs: 
        policy -- Policy: array of |S| entries
        V -- Value function: array of |S| entries
        iterId -- # of iterations peformed by modified policy iteration: scalar'''

        # temporary values to ensure that the code compiles until this
        # function is coded
        assert initialPolicy.shape == (self.nStates,), "Invalid initial value: it has dimensionality {}, but is should be {}".format(initialPolicy.shape,(self.nStates,))

        policy = initialPolicy
        V = np.zeros(self.nStates)
        iterId = 0

        while iterId <= nIterations:
            policy_old = policy.copy()
            V = self.evaluatePolicy(policy)
            
            policy = self.extractPolicy(V)

            iterId = iterId + 1
            
            if np.array_equal(policy,policy_old):
              break

        return [policy,V,iterId]
            
    def evaluatePolicyPartially(self,policy,initialV,nIterations=np.inf,tolerance=0.01):
        '''Partial policy evaluation:
        Repeat V^pi <-- R^pi + gamma T^pi V^pi

        Inputs:
        policy -- Policy: array of |S| entries
        initialV -- Initial value function: array of |S| entries
        nIterations -- limit on the # of iterations: scalar (default: infinity)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        V -- Value function: array of |S| entries
        iterId -- # of iterations performed: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''

        # temporary values to ensure that the code compiles until this
        # function is coded
        assert policy.shape == (self.nStates,), "Invalid initial value: it has dimensionality {}, but is should be {}".format(policy.shape,(self.nStates,))
        assert initialV.shape == (self.nStates,), "Invalid initial value: it has dimensionality {}, but is should be {}".format(initialV.shape,(self.nStates,))
        
        V = initialV
        iterId = 0

        R_pi = self.R[policy,self.state_range]
        T_pi = self.T[policy,self.state_range,:]


        while iterId <= nIterations:
            V_old = V.copy()
            V = R_pi + self.discount*np.dot(T_pi,V)
            iterId = iterId + 1  
            epsilon = np.max(np.abs(V-V_old))
            if epsilon < tolerance:
              break  

        return [V,iterId,epsilon]

    def modifiedPolicyIteration(self,initialPolicy,initialV,nEvalIterations=5,nIterations=np.inf,tolerance=0.01):
        '''Modified policy iteration procedure: alternate between
        partial policy evaluation (repeat a few times V^pi <-- R^pi + gamma T^pi V^pi)
        and policy improvement (pi <-- argmax_a R^a + gamma T^a V^pi)

        Inputs:
        initialPolicy -- Initial policy: array of |S| entries
        initialV -- Initial value function: array of |S| entries
        nEvalIterations -- limit on # of iterations to be performed in each partial policy evaluation: scalar (default: 5)
        nIterations -- limit on # of iterations to be performed in modified policy iteration: scalar (default: inf)
        tolerance -- threshold on ||V^n-V^n+1||_inf: scalar (default: 0.01)

        Outputs: 
        policy -- Policy: array of |S| entries
        V -- Value function: array of |S| entries
        iterId -- # of iterations peformed by modified policy iteration: scalar
        epsilon -- ||V^n-V^n+1||_inf: scalar'''

        # temporary values to ensure that the code compiles until this
        # function is coded
        assert initialPolicy.shape == (self.nStates,), "Invalid initial value: it has dimensionality {}, but is should be {}".format(initialPolicy.shape,(self.nStates,))
        assert initialV.shape == (self.nStates,), "Invalid initial value: it has dimensionality {}, but is should be {}".format(initialV.shape,(self.nStates,))        
        policy = initialPolicy
        V = initialV
        iterId = 0
        epsilon = 0

        while iterId <= nIterations:
            V_old = V.copy()
            #policy_old = policy.copy()
            V, _ , _ = self.evaluatePolicyPartially(policy,V,nEvalIterations,tolerance)
            policy = self.extractPolicy(V)
            iterId = iterId + 1  
            epsilon = np.max(np.abs(V-V_old))
            if  epsilon < tolerance:
              break


        return [policy,V,iterId,epsilon]

In [51]:
''' Construct simple MDP as described in Lecture 2a Slides 13-14'''
# Transition function: |A| x |S| x |S'| array
T = np.array([[[0.5,0.5,0,0],[0,1,0,0],[0.5,0.5,0,0],[0,1,0,0]],[[1,0,0,0],[0.5,0,0,0.5],[0.5,0,0.5,0],[0,0,0.5,0.5]]])
# Reward function: |A| x |S| array
R = np.array([[0,0,10,10],[0,0,10,10]])
# Discount factor: scalar in [0,1)
discount = 0.9        
# MDP object
mdp = MDP(T,R,discount)

In [40]:
[V,nIterations,epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates))
policy = mdp.extractPolicy(V)

print('Iterations {}, epsilon{}'.format(nIterations,epsilon))
print('V',V)
print('policy',policy)

Iterations 57, epsilon0.009860138819838937
V [31.49636306 38.51527513 43.935435   54.1128575 ]
policy [0 1 1 1]


In [41]:
V_CHECK = mdp.evaluatePolicy(policy)
print(V_CHECK)
V = mdp.evaluatePolicy(np.array([1,0,1,0]))
print(V)

[31.58510431 38.60401638 44.02417625 54.20159875]
[-0.         -0.         18.18181818 10.        ]


In [42]:
[policy,V,iterId] = mdp.policyIteration(np.array([0,0,0,0]))

print(policy,V,iterId)

[0 1 1 1] [31.58510431 38.60401638 44.02417625 54.20159875] 2


In [43]:
[V,iterId,epsilon] = mdp.evaluatePolicyPartially(np.array([1,0,1,0]),np.array([0,10,0,13]))
print(V,iterId,epsilon)

[V_CHECK,_,_] = mdp.evaluatePolicyPartially(policy,np.array([0,10,0,13]))
print(V_CHECK)


[ 0.          0.08727964 18.18181818 10.08727964] 45 0.009697737297875264
[31.49784208 38.51675415 43.93691402 54.11433652]


In [52]:
[policy,V,iterId,epsilon] = mdp.modifiedPolicyIteration(np.array([1,0,1,0]),np.array([0,10,0,13]))

print(policy,V,iterId,epsilon)

[0 1 1 1] [31.5055624  38.52447447 43.94463435 54.12205685] 12 0.008837989685982706


All Tests


In [53]:
'''Test each procedure'''
[V,nIterations,epsilon] = mdp.valueIteration(initialV=np.zeros(mdp.nStates))
policy = mdp.extractPolicy(V)
V = mdp.evaluatePolicy(np.array([1,0,1,0]))
[policy,V,iterId] = mdp.policyIteration(np.array([0,0,0,0]))
[V,iterId,epsilon] = mdp.evaluatePolicyPartially(np.array([1,0,1,0]),np.array([0,10,0,13]))
[policy,V,iterId,epsilon] = mdp.modifiedPolicyIteration(np.array([1,0,1,0]),np.array([0,10,0,13]))