In [1]:
import numpy as np

In [4]:
def policy_evaluation(pi, P, gamma=1.0, thetha=1e-10):
    
    prev_V = np.zeros(len(P))
    
    while True:
        V = np.zeros(len(P))
        
        for s in range(len(P)):
            
            for prob, next_state, reward, done in P[s][pi(s)]:
                ## Each transition tuple has a probability, next state, reward, and a done flag
                ## Indicating whether the 'next_state' is terminal or not
                V[s] += prob * (reward + gamma*prev_V[next_state] * (not done))
                ## Done flag  is used to ensure the value of the next_state when landing on the terminal
                ##state is zero. We don't want infinite sum
        if np.max(np.abs(prev_V - V)) < theta:
            break
        
        prev_V = V.copy()
    
    return V
                  

In [1]:
def policy_improvement(V, P, gamma=1.0):
    
    Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
    
    for s in range(len(P)):
        for a in range(len(P[0])):
            for prob, next_state, reward, done in P[s][a]:
                Q[s][a] += prob * (reward + gamma *V[next_state]*(not done))
                
    new_pi = lambda s: {s : a for s , a in enumerate(np.argmax(Q, axis=1))}[s]
    
    return new_pi

In [2]:
def policy_iteration(P, gamma=1.0, thetha=1e-10):
    
    random_actions = np.random.choice(tuple(P[0].keys()), len(P))
    
    pi = lambda s : {s:a for s, a in enumerate(random_actions)}[s]
    
    while True:
        
        old_pi = {s:pi(s) for s in range(len(P))}
        
        V = policy_evaluation(pi, P, gamma, theta)
        
        pi = policy_improvement(V, P, gamma)
        
        if old_pi == {s:pi(s) for s in range(len(P))}:
            break
            
            
    return V, pi

In [3]:
def value_iteration(P, gamma=1.0, theta=1e-10):
    
    V = np.zeros(len(P), dtype=np.float64)
    
    while True:
        Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
        
        for s in range(len(P)):
            for a in range(len(P[s])):
                for prob, next_state, reward, done in P[s][a]:
                    Q[s][a] += prob*(reward + gamma*V[next_state]*(not done))
                    
        if np.max(np.abs(V - np.max(Q, axis=1))) < theta:
            break
            
        V = np.max(Q, axis=1)
        
        pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
        
        return V, pi