In [1]:
# Rebecca Henry 18RJH8
# st. no. 20152682
# Oct 20, 2022
# CISC474, A1

In [2]:
import numpy as np
from enum import IntEnum
from copy import deepcopy
from numpy.linalg import inv

In [3]:
class Action(IntEnum):
    up = 0
    right = 1
    down = 2
    left = 3

action_to_str = {
    Action.up : "up",
    Action.right : "right",
    Action.down : "down",
    Action.left : "left",
}

action_to_offset = {
    Action.up : (-1, 0),
    Action.right : (0, 1),
    Action.down : (1, 0),
    Action.left : (0, -1),
}

In [4]:
class GridWorld:

    def __init__(self, height, width, a, b, a_prime, b_prime):
        """
        Creates a gridworld like MDP
         - height (int): Number of rows
         - width (int): Number of columns
         - a (int): the location of the transport cell A
         - b (int): the location of the transport cell B
         - a_prime (int): the location of the cell transported to A'
         - b_prime (int): the location of the cell transported to B'
        """
        self._width = height
        self._height = width
        self._grid_values = [0 for _ in range(height * width)]
        self._a = a
        self._b = b
        self._a_prime = a_prime
        self._b_prime = b_prime
        self.create_next_values()

    def reset(self):
        """
        Reset the state values
        """
        self._grid_values = [0 for _ in range(self._height * self._width)]
        self.create_next_values()

    def _inbounds(self, state):
        return state >= 0 and state < self._width * self._height

    def _inbounds_rc(self, state_r, state_c):
        return state_r >= 0 and state_r < self._height and state_c >= 0 and state_c < self._width

    def _state_to_rc(self, state):
        return state // self._width, state % self._width
    
    def _state_from_action(self, state, action):
        """
        Gets the state as a result of applying the given action
        """
        # check that the state is not a transport cell
        if state!= self._a and state!= self._b:
            state_r, state_c = self._state_to_rc(state)
            # if trying to leave board, stay in location
            if state_r == 0 and action_to_str[action] == "up":
                state = state
                state_r, state_c = state_r, state_c
            elif state_r == self._height-1 and action_to_str[action] == "down":
                state_r, state_c = state_r, state_c
            elif state_c == 0 and action_to_str[action] == "left":
                state_r, state_c = state_r, state_c
            elif state_c == self._width-1 and action_to_str[action] == "right":
                state_r, state_c = state_r, state_c
            # not trying to leave board, move to new state
            else:
                offset = action_to_offset[action]
                state_r += offset[0]
                state_c += offset[1]
        # if the state is transport cell A, transport to cell A'
        elif state == self._a:
            state = self._a_prime
            state_r,state_c = self._state_to_rc(self._a_prime)
        # if the state is transport cell B, transport to cell B'
        elif state == self._b:
            state = self._b_prime
            state_r,state_c = self._state_to_rc(self._b_prime)
        assert self._inbounds_rc(state_r, state_c)
        return state_r * self._width + state_c

    def get_states(self):
        """
        Gets all states in the environment
        """
        return [i for i, c in enumerate(self._grid_values)]
    
    def get_reward(self, state, action):
        """
        Get the reward for being in the current state
        """
        assert self._inbounds(state)
        state_r, state_c = self._state_to_rc(state)
        # Reward is 10 if at transport cell A, and 5 at transport cell B
        if state == self._a:
            return 10
        elif state == self._b:
            return 5
        # if tries to move out of board, reward = -1
        elif state_r == 0 and action==Action.up:
            return -1
        elif state_r == self._height-1 and action==Action.down:
            return -1
        elif state_c == 0 and action==Action.left:
            return -1
        elif state_c == self._width-1 and action==Action.right:
            return -1
        else:
            return 0.0
    
    def get_actions(self, state):
        return [Action.up, Action.down, Action.right, Action.left]
    
    def get_transitions(self, state, action):
        """
        Get a list of transitions as a result of attempting the action in the current state
        Each item in the list is a dictionary, containing the probability of reaching that state and the state itself
        """
        transitions = []
        valid_actions = [Action.up, Action.down, Action.right, Action.left]
        for a in valid_actions:
            # Action we are trying to do
            if a == action:
                transitions.append({
                    "p" : 1.0, 
                    "s": self._state_from_action(state, a)
                })
        return transitions

    def get_value(self, state):
        """
        Get the current value of the state
        """
        assert self._inbounds(state)
        return self._grid_values[state]

    def create_next_values(self):
        """
        Creates a temporary storage for state value updating
        If this is not used, then asynchronous updating may result in unexpected results
        To use properly, run this at the start of each iteration
        """
        self._grid_values_next = [0 for _ in self._grid_values]

    def set_next_values(self):
        """
        Set the state values from the temporary copied values
        To use properly, run this at the end of each iteration
        """
        self._grid_values = deepcopy(self._grid_values_next)

    def set_value(self, state, value):
        """
        Set the value of the state into the temporary copy
        This value will not update into main storage until self.set_next_values() is called.
        """
        assert self._inbounds(state)
        self._grid_values_next[state] = value

    def __str__(self):
        """
        Pretty print the state values
        """
        out_str = ""
        for r in range(self._height):
            for c in range(self._width):
                cell = r * self._width + c
                out_str += "{:>6.2f}".format(self._grid_values[cell])
                out_str += " "
            out_str += "\n"
        return out_str

In [5]:
def value_iteration(gw, discount, tolerance=0.1):
    tol = np.Inf
    gw.reset()
    while tol > tolerance:
        gw.create_next_values()
        tol = 0
        # go through each state, to find the best move
        for s in range(0, (gw._height*gw._width)):
            max_val = float(np.NINF)
            v_prev = gw.get_value(s)
            # go through each possible action at this state
            for a in [Action.up, Action.down, Action.left, Action.right]:
                action_value = 0
                # find action values for each action
                for transition in gw.get_transitions(s, a):
                    action_value += 1 * (gw.get_reward(s,a)  + discount * gw.get_value(gw._state_from_action(s, a)))
                # find highest value
                max_val = max(max_val, action_value)
            tol = max(tol, np.abs(v_prev - max_val))
            # set the highest value to the corresponding state
            gw.set_value(s, max_val)
        gw.set_next_values()


In [6]:
def linear_solver(gw, discount):
    """
    Solve the gridworld using a linear solution, gw is the problem space and discount is the 
    adjustment rate
    This uses the bellman equation to find a vector x, which is then applied to the gridworld
    """
    # intialize matrix A, and vectors b and x
    A = np.zeros((gw._height*gw._width, gw._height*gw._width))
    b = np.zeros(gw._height*gw._width)
    x = np.zeros(gw._height*gw._width)
    # fill in matrix A and vector b
    for v in range(0, (gw._height*gw._width)):
        v_val = 1
        v_pos = gw._state_to_rc(v)
        b_val = (0.25*gw.get_reward(v, Action.up)) + (0.25*gw.get_reward(v, Action.down)) + (0.25*gw.get_reward(v, Action.right)) + (0.25*gw.get_reward(v, Action.left))
        up_pos = gw._state_from_action(v, Action.up)
        down_pos = gw._state_from_action(v, Action.down)
        right_pos = gw._state_from_action(v, Action.right)
        left_pos = gw._state_from_action(v, Action.left)
        # fill in the matrix A
        # if out of bounds, the state doesn't change
        if up_pos == v:
            v_val -= 0.25*discount
        if down_pos == v:
            v_val -= 0.25*discount
        if right_pos == v:
            v_val -= 0.25*discount
        if left_pos == v:
            v_val -= 0.25*discount
        A[v][v] = v_val
        # if transported
        if up_pos == down_pos and up_pos == left_pos and up_pos == right_pos and up_pos!= v:
            A[v][up_pos] = 0 - discount
        # if not transported, and not out of bounds
        if up_pos != v and up_pos!=down_pos:
            A[v][up_pos] = 0 - 0.25*discount
        if down_pos != v and down_pos != up_pos:
            A[v][down_pos] = 0 - 0.25*discount
        if right_pos != v and right_pos!= left_pos:
            A[v][right_pos] = 0 - 0.25*discount
        if left_pos != v and left_pos!= right_pos:
            A[v][left_pos] = 0 - 0.25*discount
        b[v] = b_val
    # find vector x
    ainv = inv(A)
    x = np.matmul(ainv,b)
    x = x.reshape(gw._height,gw._width)
    # apply to gridworld
    gw.reset()
    gw.create_next_values()
    for s in range(0,gw._height*gw._width):
        s_pos = gw._state_to_rc(s)
        val = x[s_pos[0]][s_pos[1]]
        gw.set_value(s, val)
    gw.set_next_values()

In [7]:
gw_5x5 = GridWorld(height=5, width=5, a=1, b=3, a_prime=21, b_prime=13)
gw_7x7 = GridWorld(height=7, width=7, a=15, b=5, a_prime=43, b_prime=26)

In [8]:
print("linear solver: gridworld 5x5 discount 0.75:")
linear_solver(gw_5x5, 0.75)
print(gw_5x5)
print("linear solver: gridworld 5x5 discount 0.85:")
linear_solver(gw_5x5, 0.85)
print(gw_5x5)

linear solver: gridworld 5x5 discount 0.75:
  2.21   9.38   3.34   5.11   0.75 
  0.66   2.20   1.33   1.25   0.05 
 -0.23   0.38   0.32   0.15  -0.44 
 -0.71  -0.25  -0.18  -0.31  -0.78 
 -1.26  -0.83  -0.74  -0.85  -1.29 

linear solver: gridworld 5x5 discount 0.85:
  2.91   9.05   4.05   5.23   1.19 
  1.18   2.70   1.90   1.64   0.33 
 -0.07   0.60   0.53   0.27  -0.43 
 -0.86  -0.35  -0.27  -0.46  -1.01 
 -1.60  -1.12  -1.01  -1.17  -1.67 



In [40]:
print("linear solver: gridworld 7x7 discount 0.75:")
linear_solver(gw_7x7, 0.75)
print(gw_7x7)
print("linear solver: gridworld 7x7 discount 0.85:")
linear_solver(gw_7x7, 0.85)
print(gw_7x7)

linear solver: gridworld 7x7 discount 0.75:
 -0.66  -0.03  -0.14  -0.05   0.92   4.91   0.67 
  0.49   2.00   0.82   0.34   0.48   1.03  -0.01 
  2.11   9.39   2.19   0.57   0.23   0.14  -0.42 
  0.61   2.11   0.90   0.30   0.06  -0.12  -0.59 
 -0.26   0.34   0.21   0.04  -0.08  -0.25  -0.70 
 -0.72  -0.26  -0.18  -0.20  -0.27  -0.43  -0.87 
 -1.26  -0.82  -0.70  -0.68  -0.73  -0.88  -1.33 

linear solver: gridworld 7x7 discount 0.85:
 -0.41   0.28   0.17   0.26   1.28   4.86   1.00 
  0.95   2.46   1.27   0.69   0.79   1.29   0.19 
  2.66   9.05   2.66   0.91   0.44   0.25  -0.39 
  1.04   2.52   1.28   0.51   0.14  -0.17  -0.73 
 -0.14   0.49   0.33   0.07  -0.15  -0.43  -0.97 
 -0.89  -0.38  -0.29  -0.35  -0.49  -0.74  -1.27 
 -1.61  -1.11  -0.95  -0.95  -1.06  -1.29  -1.82 



In [43]:
value_iteration(gw_5x5, 0.75, 0.0)
print("value iteration: gridworld 5x5 discount 0.75:")
print(gw_5x5)
value_iteration(gw_5x5, 0.85, 0.0)
print("value iteration: gridworld 5x5 discount 0.85:")
print(gw_5x5)

value iteration: gridworld 5x5 discount 0.75:
  9.83  13.11   9.83   8.65   6.49 
  7.38   9.83   7.38   6.49   4.86 
  5.53   7.38   5.53   4.86   3.65 
  4.15   5.53   4.15   3.65   2.74 
  3.11   4.15   3.11   2.74   2.05 

value iteration: gridworld 5x5 discount 0.85:
 15.28  17.98  15.28  12.98  11.03 
 12.99  15.28  12.99  11.04   9.38 
 11.04  12.99  11.04   9.38   7.98 
  9.38  11.04   9.38   7.98   6.78 
  7.98   9.38   7.98   6.78   5.76 



In [15]:
value_iteration(gw_7x7, 0.75, 0.0)
print("value iteration: gridworld 7x7 discount 0.75:")
print(gw_7x7)
value_iteration(gw_7x7, 0.85, 0.0)
print("value iteration: gridworld 7x7 discount 0.85:")
print(gw_7x7)

value iteration: gridworld 7x7 discount 0.75:
  5.53   7.38   5.53   4.15   5.50   7.33   5.50 
  7.38   9.83   7.38   5.53   4.15   5.50   4.13 
  9.83  13.11   9.83   7.38   5.53   4.15   3.11 
  7.38   9.83   7.38   5.53   4.15   3.11   2.33 
  5.53   7.38   5.53   4.15   3.11   2.33   1.75 
  4.15   5.53   4.15   3.11   2.33   1.75   1.31 
  3.11   4.15   3.11   2.33   1.75   1.31   0.98 

value iteration: gridworld 7x7 discount 0.85:
 11.04  12.99  11.04   9.38  10.01  11.78  10.01 
 12.99  15.28  12.99  11.04   9.38  10.01   8.51 
 15.28  17.98  15.28  12.99  11.04   9.38   7.98 
 12.99  15.28  12.99  11.04   9.38   7.98   6.78 
 11.04  12.99  11.04   9.38   7.98   6.78   5.76 
  9.38  11.04   9.38   7.98   6.78   5.76   4.90 
  7.98   9.38   7.98   6.78   5.76   4.90   4.16 

