#### Import Libraries

In [3]:
import sys
import numpy as np

#### Initialize Actions

In [4]:
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3

##### GridworldMDP Class

This class represents the gridworld environment. It initializes with a given grid shape (default is a 4x4 grid) and creates a dictionary P to store transition probabilities and rewards for each state-action pair.

- __init__ : Initializes the MDP with the specified grid shape. It generates transition probabilities and rewards for each state-action pair and stores them in the P dictionary.
- render : Renders the gridworld by printing it to the terminal.

In [5]:
class GridworldMDP:
    
    def __init__(self, shape=[4,4]):
        if not isinstance(shape, (list,tuple)) or not len(shape) == 2:
            raise ValueError("shape argument must be a list/tuple of length 2")
        
        self.shape = shape 
        self.num_states = np.prod(shape) # 16 for 4x4 grid
        self.num_actions = 4  # UP, RIGHT, DOWN, LEFT
        MAX_Y = shape[0]
        MAX_X = shape[1]

        P = {}  # Transition probabilities
        grid = np.arange(self.num_states).reshape(shape) # 4x4 Matrix grid 0 to 15 values
        print(grid)
        it = np.nditer(grid, flags=['multi_index']) # np.nditer is a powerful tool in NumPy for iterating over elements of arrays. It allows you to iterate over multidimensional arrays in a very flexible manner, providing control over the order of iteration and the data access pattern.
        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            P[s] = {a: [] for a in range(self.num_actions)} # representing matrix in form {0: {0: [], 1: [], 2: [], 3: []}, 1: {0: [], 1: [], 2: [], 3: []}, 2:....
            
            is_terminal = lambda s: s == 0 or s == (self.num_states - 1) # store if the state is terminal state or not
            reward = 0.0 if is_terminal(s) else -1.0 # if the current state is terminal then reward is 0 else it is -1

            if is_terminal(s):  # if it is a terminal state, then the transition probabilities for all actions leading from this state will result in staying in the same state with a reward associated with the terminal state. This is because there are no valid actions that move away from the terminal state.
                P[s][UP] = [(1.0,s, reward, True)] # each action's transition probability tuple (p, s', r, is_terminal) will have a probability of 1.0 for staying in the same state s, the same reward associated with the terminal state, and is_terminal set to True.
                P[s][RIGHT] = [(1.0, s, reward, True)]
                P[s][DOWN] = [(1.0, s, reward, True)]
                P[s][LEFT] = [(1.0, s, reward, True)]
            else:
                ns_up = s if y == 0 else s - MAX_X # If the agent is already at the top row (i.e., y == 0), meaning it cannot move further up, ns_up remains the same as the current state s. Otherwise, if the agent is not at the top row, ns_up is calculated by subtracting the width of the grid (MAX_X) from the current state s, effectively moving the agent one row up.
                ns_right = s if x == (MAX_X - 1) else s + 1 # If the agent is already at the rightmost column (i.e., x == (MAX_X - 1)), meaning it cannot move further to the right, ns_right remains the same as the current state s. Otherwise, if the agent is not at the rightmost column, ns_right is calculated by adding 1 to the current state s, effectively moving the agent one column to the right.
                ns_down = s if y == (MAX_Y - 1) else s + MAX_X # If the agent is already at the bottom row (i.e., y == (MAX_Y - 1)), meaning it cannot move further down, ns_down remains the same as the current state s. Otherwise, if the agent is not at the bottom row, ns_down is calculated by adding the width of the grid (MAX_X) to the current state s, effectively moving the agent one row down.
                ns_left = s if x == 0 else s - 1 # If the agent is already at the leftmost column (i.e., x == 0), meaning it cannot move further to the left, ns_left remains the same as the current state s. Otherwise, if the agent is not at the leftmost column, ns_left is calculated by subtracting 1 from the current state s, effectively moving the agent one column to the left.
                P[s][UP] = [(1.0, ns_up, reward, is_terminal(ns_up))] # For the UP action from state s, the transition probability tuple (1.0, ns_up, reward, is_terminal(ns_up)) is assigned to P[s][UP]
                P[s][RIGHT] = [(1.0, ns_right, reward, is_terminal(ns_right))] # Similarly, for the RIGHT action from state s, the transition probability tuple (1.0, ns_right, reward, is_terminal(ns_right)) is assigned to P[s][RIGHT].
                P[s][DOWN] = [(1.0, ns_down, reward, is_terminal(ns_down))] #  the DOWN action from state s, the transition probability tuple (1.0, ns_down, reward, is_terminal(ns_down)) is assigned to P[s][DOWN]
                P[s][LEFT] = [(1.0, ns_left, reward, is_terminal(ns_left))] # LEFT action from state s, the transition probability tuple (1.0, ns_left, reward, is_terminal(ns_left)) is assigned to P[s][LEFT]
                            
            it.iternext()
        print(P)
        self.P = P
    
    def render(self):
        '''render/print the gridworld to the terminal'''
        grid = np.arange(self.num_states).reshape(self.shape)
        it = np.nditer(grid, flags=['multi_index']) # It then iterates through each element of the grid using NumPy's nditer() function.
        while not it.finished: 
            s = it.iterindex  #  it retrieves the current state s and its corresponding row y and column x indices.
            y, x = it.multi_index 

            if s == 0 or s == self.num_states - 1: # Based on the current state s, it determines whether it represents a terminal state (i.e., the starting state or the goal state). If so, it sets output to " T ", indicating a terminal state; otherwise, it sets output to " o "
                output = " T "
            else:
                output = " o "

            if x == 0: 
                output = output.lstrip() # If the current column x is the leftmost column, it removes leading whitespace from the output
            if x == self.shape[1]-1:
                output = output.rstrip()
            
            sys.stdout.write(output)

            if x == self.shape[1] - 1:
                sys.stdout.write("\n")
            
            it.iternext()


In [None]:
gmdp = GridworldMDP()
gmdp 

In [2]:
def print_value(x, mdp, form='.2f'):
    """ Print a value function array in a nice format."""
    x = x.reshape(mdp.shape)
    print('\n'.join(' '.join(' ' + str(format(cell, form)) if cell >= 0 else str(format(cell, '.2f')) for cell in row) for row in x))

In [None]:
def print_deterministic_policy(policy, mdp):
    """ Print a policy array in a nice format."""
    action_dict = {0: 'U', 1: 'R', 2: 'D', 3: 'L'}
    policy = np.array([action_dict[x] for x in np.argmax(policy, axis=1)]).reshape(mdp.shape)
    policy[0, 0] = '-'
    policy[-1, -1] = '-'
    print('\n'.join(' '.join(str(cell) for cell in row) for row in policy))


In [None]:
def init_value(mdp):
    """ Returns a initialized value function array for given MDP."""
    return np.zeros(mdp.num_states)

In [None]:
def random_policy(mdp):
    """ Returns the random policy for a given MDP.
    policy[x][y] is the probability of action with y for state x.
    """
    return np.ones([mdp.num_states, mdp.num_actions]) / mdp.num_actions

#### Dynamic Programming

In [None]:
def policy_evaluation_one_step(mdp, V, policy, discount=0.99):
    V_new = V.copy()
    for s in range(mdp.num_states):
        for a, action_prob in enumerate(policy[s]):
            for prob, next_state, reward, done in mdp.P[s][a]:
                V+= action_prob * prob * (reward + discount * V[next_state])
        
        V_new[s] = V
        
    return V_new


In [None]:
def policy_evaluation(mdp, policy, discount=0.99, theta=0.00001):
    V = init_value(mdp)
    max_delta = theta + 1
    
    while max_delta > theta:
        V_new = policy_evaluation_one_step(mdp, V, policy, discount)
        max_delta = np.max(np.abs(V-V_new))
        V = V_new
    
    return V