In [42]:
# Rebecca Henry 18RJH8
# st. no. 20152682
# Nov 28, 2022
# CISC474, A2

In [43]:
import numpy as np
from enum import IntEnum
from copy import deepcopy
from numpy.linalg import inv

In [44]:
class Action(IntEnum):
    up = 0
    right = 1
    down = 2
    left = 3

action_to_str = {
    Action.up : "up",
    Action.right : "right",
    Action.down : "down",
    Action.left : "left",
}

action_to_offset = {
    Action.up : (-1, 0),
    Action.right : (0, 1),
    Action.down : (1, 0),
    Action.left : (0, -1),
}

# KINGS MOVES HAS NE, NW, SE, SW TOO

In [424]:
class GridWorld:

    def __init__(self, start, terminal, height, width, windy, stochastic = False):
        """
        Creates a gridworld like MDP
         - height (int): Number of rows
         - width (int): Number of columns
         - start (int): the start cell
         - terminal (int): the terminal cell
         - windy (dict): each key is the state, each value is the value 
        """
        self._start = start
        self._terminal = terminal
        self._width = width
        self._height = height
        self._windy = windy
        self._grid_values = [0 for _ in range(height * width)]
        self._stochastic = stochastic
        if not self._stochastic:
            self._actions = ["N", "E", "S", "W"]
        elif self._stochastic:
            self._actions = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
        self.create_next_values()

    def reset(self):
        """
        Reset the state values
        """
        self._grid_values = [0 for _ in range(self._height * self._width)]
        self.create_next_values()

    def _inbounds(self, state):
        return state >= 0 and state < self._width * self._height

    def _inbounds_rc(self, state_r, state_c):
        return state_r >= 0 and state_r < self._height and state_c >= 0 and state_c < self._width

    def _state_to_rc(self, state):
        return state // self._width, state % self._width
    
    def _state_from_action(self, state, action):
        """
        Gets the state as a result of applying the given action
        """
        state_r,state_c = self._state_to_rc(state)
        
        north_req1 = False
        north_req2 = False
        south_req1 = False
        south_req2 = False
        east_req = False
        west_req = False
        if state_r - 1 >= 0:
            north_req1 = True
        if state_r - 2 >= 0:
            north_req2 = True
        if state_r + 1 < self._height -1:
            south_req1 = True
        if state_r + 2 < self._height - 1:
            south_req2 = True
        if state_c + 1 < self._width-1:
            east_req = True
        if state_c - 1 >= 0:
            west_req = True
            
        #get the wind value of the column if applicable
        if state in list(self._windy.keys()):
            wind = self._windy[state]
        else:
            wind = 0
            
        if self._stochastic:
            # figure out which way it'll move
            results = ['goal','up','down']
            a = np.random.choice(results)
            # move without wind effects first
            if action == 'N' and north_req1:
                state_r -= 1
            elif action == 'S' and south_req1:
                state_r += 1
            elif action == 'W' and west_req:
                state_c -= 1
            elif action == 'E' and east_req:
                state_c += 1
            elif action == 'NE' and north_req1 and east_req:
                state_c += 1
                state_r -= 1
            elif action == 'NW' and north_req1 and west_req:
                state_c -= 1
                state_r -= 1
            elif action == 'SE' and south_req1 and east_req:
                state_c += 1
                state_r += 1
            elif action == 'SW' and south_req1 and west_req:
                state_c -= 1
                state_r += 1
            
            # 1/3 of time move wind - 1 UPWARDS
            if a == 'up':
                state_r -= wind - 1
                if state_r < 0:
                    state_r = 0
            
            # 1/3 of time move wind + 1 DOWNWARDS
            elif a == 'down':
                state_r += wind + 1
                if state_r >= self._height:
                    state_r = self._height-1
                    
            # 1/3 of time move 1 * wind
            elif a == 'goal':
                state_r -= wind
                if state_r < 0:
                    state_r = 0

        # if in a deterministic game
        elif not self._stochastic:
            if action == 'N' and north_req1:
                state_r -= 1
            elif action == 'S' and south_req1:
                state_r += 1
            elif action == 'W' and west_req:
                state_c -= 1
            elif action == 'E' and east_req:
                state_c += 1
            # apply wind effect if neccessary
            if state in self._windy.keys():
                state_r -= wind
                if state_r < 0:
                    state_r = 0
                    
        print(state, action, state_r, state_c)
        assert self._inbounds_rc(state_r, state_c)
        return state_r * self._width + state_c

    def get_states(self):
        """
        Gets all states in the environment
        """
        return [i for i, c in enumerate(self._grid_values)]
    
    def get_reward(self, state):
        """
        Get the reward for being in the current state
        """
        # Reward is 0 at goal state
        if state == self._terminal:
            return 0
        # reward is -1 every other state
        else:
            return -1

    def get_value(self, state):
        """
        Get the current value of the state
        """
        assert self._inbounds(state)
        return self._grid_values[state]

    def create_next_values(self):
        """
        Creates a temporary storage for state value updating
        If this is not used, then asynchronous updating may result in unexpected results
        To use properly, run this at the start of each iteration
        """
        self._grid_values_next = [0 for _ in self._grid_values]

    def set_next_values(self):
        """
        Set the state values from the temporary copied values
        To use properly, run this at the end of each iteration
        """
        self._grid_values = deepcopy(self._grid_values_next)

    def set_value(self, state, value):
        """
        Set the value of the state into the temporary copy
        This value will not update into main storage until self.set_next_values() is called.
        """
        assert self._inbounds(state)
        self._grid_values_next[state] = value

    def __str__(self):
        """
        Pretty print the state values
        """
        out_str = ""
        for r in range(self._height):
            for c in range(self._width):
                cell = r * self._width + c
                out_str += "{:>6.2f}".format(self._grid_values[cell])
                out_str += " "
            out_str += "\n"
        return out_str

In [566]:
def sarsa(gw, alpha, gamma, e, max_eps=400):
    
    # initialize q table with arbitrary values, except terminal state is 0
    gw.reset()
    q = {}
    # initialize q table
    for state in gw.get_states():
        q[state] = {}
        if state != gw._terminal:
            for action in gw._actions:
                q[state][action] = -1
        elif state == gw._terminal:
            for action in gw._actions:
                q[state][action] = 0
    directions = gw._actions
    
    # set the gw
    gw.create_next_values()
    
    # run an episode
    for i in range(0, max_eps):
        # assign starting state
        s = gw._start
        # epsilon-greedy action selection
        rc = np.random.choice(['exploit','explore'], p=[1-e, e])
        if rc == 'exploit':
            n = np.argmax(list(q[s].values()))
            a = directions[n]
        elif rc == 'explore':
            a = np.random.choice(directions)

        # run until find terminal state and update q table
        path = []
        while s != gw._terminal:          
            # find next s, r
            s_prime = gw._state_from_action(s,a)
            r = gw.get_reward(s_prime)
            
            # find next action from s_prime using epsilon greedy
            rc = np.random.choice(['exploit','explore'], p=[1-e, e])
            if rc == 'exploit':
                n = np.argmax(list(q[s_prime].values()))
                a_prime = directions[n]
            elif rc == "explore":
                a_prime = np.random.choice(directions)
                
            # get q value
            q[s][a] += alpha*(r + (gamma * q[s_prime][a_prime]) - q[s][a])
            
            # update gw
            gw.set_value(s, s_prime)
            
            # set next s and a
            s = s_prime
            a = a_prime

        gw.set_next_values()

        # makes path
        state = gw._start
        while state != gw._terminal:
            path.append(state)
            next_state = gw.get_value(state)
            state = next_state
        gw.create_next_values()
        for state in path:
            gw.set_value(state, path.index(state)+1)
        gw.set_value(gw._terminal, len(path)+1)
        gw.set_next_values()    
        

In [567]:
def qlearning(gw, gamma):
    print("finish this")

In [568]:
windy = {3:1, 4:1, 5:1, 6:2, 7:2, 8:1,
        13:1, 14:1, 15:1, 16:2, 17:2, 18:1,
        23:1, 24:1, 25:1, 26:2, 27:2, 28:1,
        33:1, 34:1, 35:1, 36:2, 37:2, 38:1,
        43:1, 44:1, 45:1, 46:2, 47:2, 48:1,
        53:1, 54:1, 55:1, 56:2, 57:2, 58:1,
        63:1, 64:1, 65:1, 66:2, 67:2, 68:1}
gw = GridWorld(start=30, terminal=37, height=7, width=10, windy=windy)

In [569]:
print("sarsa: alpha/learning rate = , gamma/discount rate = , epsilon = ")

sarsa(gw, alpha=0.1, gamma=0.9, e=0.5,  tolerance = 5000)
print(gw)


sarsa: alpha/learning rate = , gamma/discount rate = , epsilon = 


TypeError: sarsa() got an unexpected keyword argument 'tolerance'