In [2]:
from abc import ABC, abstractmethod
import numpy as np

### GridWorld using Value Iteration: In this example, we will use value iteration to obtain the Q/value -function for the GridWorld problem and extract a policy

In [8]:
# first, define an interface for an MDP
class MDP:
    @abstractmethod
    def get_states(self):
        pass   

    @abstractmethod
    def get_initial_state(self):
        pass

    @abstractmethod
    def get_terminal_states(self):
        pass    

    @abstractmethod
    def get_actions(self, state):
        pass

    @abstractmethod
    def get_transitions(self, state, action):
        pass

    @abstractmethod
    def get_rewards(self, state, action, next_state):
        pass

    @abstractmethod
    def is_terminal(self, state):
        pass

    @abstractmethod
    def get_discount_factor(self):
        pass
    


# now, implement the gridworld problem MDP
class GridWorld(MDP):

    def __init__(self, discount_factor=0.9) :

        # initialise the set of all possible states, in this case tuples (x,y) of all grid cells, excluding walls
        # will use the example from lectures and tute week 7
        self.width = 4
        self.height = 3
        self.walls = [(1,1)]
        self.states = []
        for x in range(self.width):
            for y in range(self.height):
                if (x,y) not in self.walls:
                   self.states.append((x,y))

        print(f"States: {self.states}")

        # specify terminal states
        self.terminal_states=[(3,1),(3,2)] 

        # create a dummy terminal state which is the successor to all terminal states
        self.exit = (-1,-1)

        # specify initial state
        self.initial_state = (0,0)

        # specify probability of splipping
        self.noise = 0.2

        # specify/enumerate the actions
        self.terminate = 0
        self.up = 1
        self.down = 2
        self.left = 3
        self.right = 4
        
        # set the discount factor
        self.gamma = discount_factor

        # specify rewards
        self.rewards =  {(3,1) : -1, (3,2) : 1}

        # empty list for storing the discounted reward at each step opf the episode
        self.episode_discounted_rewards = []


    def get_states(self):
        return self.states   


    def get_initial_state(self):
        return self.initial_state


    def get_terminal_states(self):
        return self.terminal_states


    def get_actions(self, state):
        if state not in self.terminal_states:
            actions = [self.up, self.down, self.left, self.right]
        else:
            # for terminal states, the only valid action is to 'terminate' the episode
            actions = [self.terminate]
        return actions        

    
    # for given state-action pair, returns possible successor states along with their corresponding transition probabilities
    def get_transitions(self, state, action):
        
        # if we're in an terminal state, then the only allowed action is to transition into the 'exit' state which terminates the episode
        if state in self.terminal_states:
            if action == self.terminate:
                return [(self.exit, 1.0)]
            else:
                return []

        # probability of not slipping
        straight = 1 - 2*self.noise

        transitions = []
        (x,y) = state

        if action == self.up:
            transitions.append(self.valid_add(state, (x,y+1), straight))
            transitions.append(self.valid_add(state, (x-1,y), self.noise))
            transitions.append(self.valid_add(state, (x+1,y), self.noise))

        elif action == self.down:
            transitions.append(self.valid_add(state, (x,y-1), straight))
            transitions.append(self.valid_add(state, (x-1,y), self.noise))
            transitions.append(self.valid_add(state, (x+1,y), self.noise))

        elif action == self.left:
            transitions.append(self.valid_add(state, (x-1,y), straight))
            transitions.append(self.valid_add(state, (x,y-1), self.noise))
            transitions.append(self.valid_add(state, (x,y+1), self.noise))

        elif action == self.right:
            transitions.append(self.valid_add(state, (x+1,y), straight))
            transitions.append(self.valid_add(state, (x,y-1), self.noise))
            transitions.append(self.valid_add(state, (x,y+1), self.noise))

        # convert list to set to remove duplicates
        return set(transitions)
    

    def get_rewards(self, state, action, next_state):
        if (next_state in self.terminal_states):
            reward = self.rewards[next_state]
        else:
            reward = 0.0

        # store discounted reward for the step
        step = len(self.episode_discounted_rewards)
        self.episode_discounted_rewards.append(reward * (self.gamma**step))

        return reward    


    def is_exit(self, state):
        return (state == self.exit)    


    def get_discount_factor(self):
        return self.gamma  


    def valid_add(self, state, next_state, probability):

        # check if next state is a wall
        if next_state in self.walls:
            return (state, probability)

        # check if next state is off grid
        (x,y) = next_state
        if (x >=0 and x<=3 and y>=0 and y<=2):
            return (next_state, probability)
        else:
            return (state, probability)
