Write a program in Python to implement value iteration, policy iteration, and modified policy iteration
specifically for this simple MDP example.
For this matter, you should start by creating a simple MDP class using class MDP. This class should
include the following members:
▪ a constructor for the MDP class def __init()__ that has the following parameters: self,
T, R, discount. 
o T -- Transition function: |A| x |S| x |S'| array
o R -- Reward function: |A| x |S| array
o discount -- discount factor γ: scalar in [0,1)
The constructor should verify that the inputs are valid (using the assert command) and set
corresponding variables in an MDP object.
▪ a procedure for the value iteration def valueIteration() that has the following
parameters: self, initialV , nIterations, tolerance.
Set nIterations and tolerance to np.inf and 0.01 as default values, respectively.
o initialV -- Initial value function: array of |S| entries
o nIterations -- limit on the number of iterations: scalar (default: infinity)
o tolerance -- threshold on ‖𝑉𝑛 − 𝑉𝑛+1‖∞ that will be compared to a variable epsilon
(initialized to np.inf): scalar (default: 0.01)
This procedure should return a new value function V.
o newV – New value function: array of |S| entries.
o iteration – the number of iterations performed: scalar
o epsilon -- ‖𝑉𝑛 − 𝑉𝑛+1‖∞: scalar

In [2]:
#imports
import numpy as np
import pandas as pd
import sys


In [65]:
#rough
temp= np.array([[1,2,3],[4,5,6]])
isinstance(temp, np.ndarray)
print(temp.shape[0])

2


In [3]:

class MDP:
  def __init__(self,T,R,discount):
    assert isinstance(T, np.ndarray) and isinstance(R, np.ndarray), "The transition and reward array should be numpy arrays"
    assert discount >0 and discount <= 1, "The discount factor should be scalar in [0,1)"
    assert (R.shape[0] == T.shape[1] and T.shape[1] == T.shape[2]), "Invalid shape of input array"#dimension check for R and T
    self.T = T # Transition probability T is a 3 dimensional matrix which contains both TA and TS
    self.R = R
    self.discount = discount
  
  def valueFunction(self, oldV,getMax=False):
    newV = self.R + self.discount*np.dot(self.T,oldV) #Belmans equation
    policy=np.argmax(newV,axis=0)
    VA,VS= newV
    if(not getMax):
      return newV
    maxV= np.maximum(VA,VS)
    return newV,maxV

  #NOTE: The policy determines which transition matrix (for probability distribution) should be taken.
  #for each state we have action A and S. The transition probability for the policy will depend
  # on the action we take on that state. T=> for a state T[A] or T[S]?
  def getTransitionForPolicy(self,policy):
    newT = []
    for ind,item in enumerate(policy):
      newT.append(self.T[item[0]][ind]) #chosing transition matrix according to policy
    newT=np.array(newT)
    return newT;
  
  def valueIteration( self, initialV , nIterations = np.inf, tolerance = 0.01):
    assert isinstance(initialV, np.ndarray), "The intial value function array should be numpy arrays"
    assert self.R.shape[0] == initialV.shape[0], "Invalid shape of input array"

    n = 0
    epsilon = float('inf')
    oldV = initialV
    maxV= None
    newV= None
    while(n< nIterations and epsilon > tolerance):
      newV,maxV= self.valueFunction(oldV,True)
      epsilon = np.linalg.norm(maxV - oldV)
      oldV=maxV
      n+=1
      print(f'iteration {n} \n value iteration inside loop new value : {newV} \n, max value: {maxV} \n, epsilon: {epsilon}')
    return newV,n,epsilon
  
  def extractPolicy(self, V):
    policy=np.argmax(V,axis=0)
    #NOTE: 0 represents A and 1 represents S
    return policy 

  def evaluatePolicy(self, policy):
    #MAIN FORMULA : V = R + gamma*T*V, V= R*(I-gamma*T))^-1
    newT = self.getTransitionForPolicy(policy)
    I = np.identity(self.T[0].shape[0])  #The identity matrix should be the shape as V. The V will have same shape as R. But as we need to substract the matrix with T
    tempInv = np.linalg.inv((I-self.discount*newT))
    newV = np.dot(tempInv,self.R)
    return newV

  def improvePolicy(self, oldV):
    newV= self.valueFunction(oldV)
    policy = self.extractPolicy(newV)
    return policy




  #2 steps, 1) evaluate policy 2) improve policy    
  def policyIteration(self, initialPolicy , nIterations= np.inf):
    assert isinstance(initialPolicy, np.ndarray), "The initial policy should be numpy arrays"
    print(f'The initial policy is {initialPolicy}')
    n=0
    policy= initialPolicy
    newPolicy= None
    while n < nIterations:
      V = self.evaluatePolicy(policy)
      newPolicy = self.improvePolicy(V)
      print(f'iteration {n} \n policy iteration inside loop new value : {V} \n,new policy: {newPolicy} \n')
      if(np.array_equal(policy,newPolicy)):#Convergence Criteria
        break
      policy = newPolicy
      n+=1

    print(f'The final value function is {V}')
    return n,newPolicy


#NOTE: The goal of partial policy evaluation is to estimate the value function of a fixed policy, without changing the policy itself.(i.e the policy remains constant) 
  def evaluatePolicyPartially(self,policy,initialV,nIterations = np.inf,tolerance=0.01,isReturnEpsilon = False):
    newT = self.getTransitionForPolicy(policy)
    n = 0
    epsilon = float('inf')
    V=initialV
    while(n<nIterations and epsilon > tolerance):
      newV = self.R + self.discount*(np.dot(newT,V))
      epsilon = np.linalg.norm(newV - V)
      V= newV
      n+=1
    if( not isReturnEpsilon):
      return V
    return V,epsilon



  def modifiedPolicyIteration(self,initialPolicy,initialV,nEvalIterations=5,nIterations = np.inf,tolerance=0.01):
    #main point: performs approximate policy evaluation using value iteration and then performs approximate policy improvement to obtain a better policy based on the approximate value function. 
    #keyword APPROXIMATE
    policy=initialPolicy
    V= np.zeros(self.R.shape)
    n=0
    newV = None
    epsilon = float('inf')
    while(n<nIterations and epsilon>tolerance):
      print("Inside")
      #Question: If epsilon continues to be less than 0.01, but the policy are not same, the the value of V wont change?
      newV,epsilon = self.evaluatePolicyPartially(policy,V,nEvalIterations,tolerance,True)
      newPolicy = self.improvePolicy(V)
      epsilon = np.linalg.norm(newV - V) #Convergence Criteria
      #ask
      V = newV
      policy = newPolicy
      n+=1
    return policy,n,epsilon




In [5]:
#testing

#initialization
T_A = np.array([[0.5,0.5,0,0],[0,1,0,0],[0.5,0.5,0,0],[0,1,0,0]])
T_S = np.array([[1,0,0,0],[0.5,0,0,0.5],[0.5,0,0.5,0],[0,0,0.5,0.5]])
T = np.array([T_A, T_S])
discount=0.9
R = np.array([[0],[0],[10],[10]])
initalV = np.zeros(R.shape)
mdp = MDP(T,R,discount)

#for policy iteration initial policy pi(PU) = A pi(PF)= A pi(RU)=A pi(RF)=A
initialPolicy= np.array([[0],[0],[0],[0]])

In [6]:
#functions
# mdp.valueIteration(initalV,80,0.01)

mdp.policyIteration(initialPolicy,50)

# answers
# policyIteration #3
# partially #2

The initial policy is [[0]
 [0]
 [0]
 [0]]
iteration 0 
 policy iteration inside loop new value : [[ 0.]
 [ 0.]
 [10.]
 [10.]] 
,new policy: [[0]
 [1]
 [1]
 [1]] 

iteration 1 
 policy iteration inside loop new value : [[31.58510431]
 [38.60401638]
 [44.02417625]
 [54.20159875]] 
,new policy: [[0]
 [1]
 [1]
 [1]] 

The final value function is [[31.58510431]
 [38.60401638]
 [44.02417625]
 [54.20159875]]


(1, array([[0],
        [1],
        [1],
        [1]]))

In [8]:
#Questions
#1. Report the policy, value function, and the number of iterations needed by value iteration when
# using a tolerance of 0.01 and starting from a value function set to 0 for all states
initialV = np.zeros(R.shape)
newV,n,epsilon = mdp.valueIteration(initialV,80,0.01)
policy= mdp.extractPolicy(newV)
print(f'the value iteration final value function = {newV}\n number of iterations ={n} \n value of epsilon = {epsilon}\n the policy = {policy}')

#2. Report the policy, value function, and the number of iterations needed by policy iteration to find
# an optimal policy when starting from the policy that chooses action 0 in all states.
# Note: action 0 corresponds to “A: Advertising” whereas action 1 corresponds to “S: Saving
# money”
initialPolicy= np.array([[0],[0],[0],[0]])
n,newPolicy = mdp.policyIteration(initialPolicy,50)
print(f'the final policy iteration  number of iterations ={n} \n ,optimal policy: {newPolicy}  \n')

# 3.Report the number of iterations needed by modified policy iteration to converge when varying
# the number of iterations in partial policy evaluation from 1 to 10. Use a tolerance of 0.01, start
# with the policy that chooses action 0 in all states and start with the value function that assigns 0
# to all states.

for value in range(1,11):
  policy,n,epsilon= mdp.modifiedPolicyIteration(initialPolicy,initialV,nEvalIterations=value,nIterations = 100,tolerance=0.01)
  print(f'For modified policy iteration with number_of_iteration_in_partial_policy = {value}\n  number of iterations required to converge ={n} \n optimal policy: {policy}  \n')


# 4.Discuss the impact of the number of iterations in partial policy evaluation on the results and relate the results to value iteration and policy iteration
In this case, the number of partial iteration 


iteration 1 
 value iteration inside loop new value : [[[ 0.]
  [ 0.]
  [10.]
  [10.]]

 [[ 0.]
  [ 0.]
  [10.]
  [10.]]] 
, max value: [[ 0.]
 [ 0.]
 [10.]
 [10.]] 
, epsilon: 14.142135623730951
iteration 2 
 value iteration inside loop new value : [[[ 0. ]
  [ 0. ]
  [10. ]
  [10. ]]

 [[ 0. ]
  [ 4.5]
  [14.5]
  [19. ]]] 
, max value: [[ 0. ]
 [ 4.5]
 [14.5]
 [19. ]] 
, epsilon: 11.022703842524301
iteration 3 
 value iteration inside loop new value : [[[ 2.025]
  [ 4.05 ]
  [12.025]
  [14.05 ]]

 [[ 0.   ]
  [ 8.55 ]
  [16.525]
  [25.075]]] 
, max value: [[ 2.025]
 [ 8.55 ]
 [16.525]
 [25.075]] 
, epsilon: 7.842791276070021
iteration 4 
 value iteration inside loop new value : [[[ 4.75875]
  [ 7.695  ]
  [14.75875]
  [17.695  ]]

 [[ 1.8225 ]
  [12.195  ]
  [18.3475 ]
  [28.72   ]]] 
, max value: [[ 4.75875]
 [12.195  ]
 [18.3475 ]
 [28.72   ]] 
, epsilon: 6.112850833490049
iteration 5 
 value iteration inside loop new value : [[[ 7.6291875]
  [10.9755   ]
  [17.6291875]
  [20.9755 

NOTES:
Policy Iteration vs Modified Policy Iteration
**bold text**

The main difference between normal policy iteration and modified policy iteration lies in how they perform policy evaluation and policy improvement.

In normal policy iteration, the algorithm performs exact policy evaluation to determine the value function for a given policy, and then performs exact policy improvement to obtain a better policy based on the value function. This process is repeated until the policy converges to an optimal policy. Exact policy evaluation involves solving a set of linear equations or matrix inversion, which can be computationally expensive, especially for large problems.

In modified policy iteration, the algorithm performs approximate policy evaluation using value iteration, and then performs approximate policy improvement to obtain a better policy based on the approximate value function. This process is repeated until the policy converges to an optimal policy. Value iteration involves iteratively updating the value function using the Bellman equation, which is less computationally expensive than exact policy evaluation.

The advantage of modified policy iteration over normal policy iteration is that it can converge to an optimal policy faster, especially for large problems, because it avoids the computational expense of exact policy evaluation. However, the policy obtained by modified policy iteration may not be as accurate as the one obtained by normal policy iteration because it relies on approximations.

Partial Policy Iteration
**bold text**
In reinforcement learning, partial policy evaluation refers to the process of estimating the value function of a policy based on incomplete information. Specifically, it involves estimating the value function of a policy for some states, without necessarily having information about the policy for all states.

One common approach to partial policy evaluation is to use the Bellman equation, which relates the value function of a state to the value function of its successor states. By recursively applying the Bellman equation, we can estimate the value function for a set of states, even if we do not have complete information about the policy.

THE POLICY DURING POLICY ITERATION DOES NOT CHANGE. The goal of partial policy evaluation is to estimate the value function of a fixed policy, without changing the policy itself.


Identity Matrix
**bold text**
An identity matrix is a special type of square matrix in linear algebra that has ones along the diagonal and zeros elsewhere. It is denoted by the symbol "I" and has the property that when it is multiplied by any square matrix of the same size, the result is the original matrix.

For example, the 2x2 identity matrix is:


I = [[1, 0],
     [0, 1]]
When this matrix is multiplied by any 2x2 matrix A, the result is simply A:


I * A = A * I = A

In [None]:
tt = np.array([[1],[2],[3]])
arr1 = np.array([[1, 2], [3, 4]])
arr2 = np.array([[10, 20], [30, 40]])
temp=np.zeros((arr2.shape))
print(f'test {arr2.shape}')
print(f'shape is {temp.shape}, temp value is {temp}')

I = np.zeros((4,1))
T_A = np.array([[0.5,0.5,0,0],[0,1,0,0],[0,1,0,0],[0.5,0.5,0,0]])
print(f'{I} \n {T_A} \n {I-T_A}')

# no axis provided, array elements will be flattened
arr_flat = np.append(arr1, arr2,axis=1)

test= np.ones(tt.shape)
# print(f'{test} and {test.shape}')  # [ 1  2  3  4 10 20 30 40]

test (2, 2)
shape is (2, 2), temp value is [[0. 0.]
 [0. 0.]]
[[0.]
 [0.]
 [0.]
 [0.]] 
 [[0.5 0.5 0.  0. ]
 [0.  1.  0.  0. ]
 [0.  1.  0.  0. ]
 [0.5 0.5 0.  0. ]] 
 [[-0.5 -0.5  0.   0. ]
 [ 0.  -1.   0.   0. ]
 [ 0.  -1.   0.   0. ]
 [-0.5 -0.5  0.   0. ]]


In [59]:
T_A = np.array([[0.5,0.5,0,0],[0,1,0,0],[0,1,0,0],[0.5,0.5,0,0]])
T_S = np.array([[1,1,0,0],[0.5,0,0.5,0],[0,0,0.5,0.5],[0.5,0,0,0.5]])
T = np.array([T_A, T_S])
temp=[]
print(f'Test {T[0][0]} \n')
temp.append(T[0][0])
print(f'temp is {temp}')
temp.append(T[0][0])
print(f'temp 2 is {temp}')
newTemp=np.array(temp)
print(f'{newTemp}')




print(f'flatten {T_A.flatten()} \n {T_A.flatten()} \n {T.flatten()} ')
print(f'Argmax is {np.argmax(T_A)}')


print(f'New transformation is {T}')
discount=0.9
R = np.array([[0],[0],[10],[10]])
initialV = np.array([[0],[0],[0],[0]])

newV = R + discount*np.dot(T,initialV)
A,S= newV
temp = np.maximum(A,S)
print(f' newV is {newV}')

print(f' value is {temp}')

policy=np.argmax(newV,axis=0)
print(f' policy is {policy}')


Test [0.5 0.5 0.  0. ] 

temp is [array([0.5, 0.5, 0. , 0. ])]
temp 2 is [array([0.5, 0.5, 0. , 0. ]), array([0.5, 0.5, 0. , 0. ])]
[[0.5 0.5 0.  0. ]
 [0.5 0.5 0.  0. ]]
flatten [0.5 0.5 0.  0.  0.  1.  0.  0.  0.  1.  0.  0.  0.5 0.5 0.  0. ] 
 [0.5 0.5 0.  0.  0.  1.  0.  0.  0.  1.  0.  0.  0.5 0.5 0.  0. ] 
 [0.5 0.5 0.  0.  0.  1.  0.  0.  0.  1.  0.  0.  0.5 0.5 0.  0.  1.  1.
 0.  0.  0.5 0.  0.5 0.  0.  0.  0.5 0.5 0.5 0.  0.  0.5] 
Argmax is 5
New transformation is [[[0.5 0.5 0.  0. ]
  [0.  1.  0.  0. ]
  [0.  1.  0.  0. ]
  [0.5 0.5 0.  0. ]]

 [[1.  1.  0.  0. ]
  [0.5 0.  0.5 0. ]
  [0.  0.  0.5 0.5]
  [0.5 0.  0.  0.5]]]
 newV is [[[ 0.]
  [ 0.]
  [10.]
  [10.]]

 [[ 0.]
  [ 0.]
  [10.]
  [10.]]]
 value is [[ 0.]
 [ 0.]
 [10.]
 [10.]]
 policy is [[0]
 [0]
 [0]
 [0]]


In [None]:
for i,p in enumerate(policy):
  print(f'{i} {p[0]}')

0 0
1 0
2 0
3 0
