In [1]:
# Note: you may need to update your version of future
# sudo pip install -U future
from __future__ import print_function, division
from builtins import range
import numpy as np

class Grid: # Environment
    def __init__(self, width, height, start):
        self.width = width
        self.height = height
        self.i = start[0]
        self.j = start[1]
    
    # This function makes rewards and actions per state into class attributes but is added to the class once they are specified
    def set(self, rewards, actions):
        # rewards should be a dict of: (i, j): r (row, col): reward
        # actions should be a dict of: (i, j): A (row, col): list of possible actions
        self.rewards = rewards
        self.actions = actions
    
    # Takes a tuple of a state and assigns each x and y index to i and j
    def set_state(self, s):
        self.i = s[0]
        self.j = s[1]
    
    # Returns tuple of current state
    def current_state(self):
        return (self.i, self.j)
    
    # Checks if agent has reached a terminal state
    def is_terminal(self, s):
        return s not in self.actions
    
    # How the agent interacts with the environment
    def move(self, action):
        # Check if legal move first
        if action in self.actions[(self.i, self.j)]:
            if action == 'U':
                self.i -= 1
            elif action == 'D':
                self.i += 1
            elif action == 'R':
                self.j += 1
            elif action == 'L':
                self.j -= 1
        # Return a reward from that state if any. If none, return 0
        return self.rewards.get((self.i, self.j), 0)
    
    # Used to backtrack as the optimal policy is calculated
    def undo_move(self, action):
        # These are the opposite of what U/D/L/R should normally do
        if action == 'U':
            self.i += 1
        elif action == 'D':
            self.i -= 1
        elif action == 'R':
            self.j -= 1
        elif action == 'L':
            self.j += 1
        # Raise an exception if we arrive somewhere we shouldn't be
        assert(self.current_state() in self.all_states())
    
    # Returns true if game is over, else false
    def game_over(self):
        # true if we are in a state where no actions are possible
        return (self.i, self.j) not in self.actions
    
    # Simple way to get all states. ***Possibly buggy***
    def all_states(self):
        # Either a position that has possible next actions or a position that yields a reward
        return set(self.actions.keys()) | set(self.rewards.keys())

# Define a grid that describes the reward for arriving at each state and possible actions at each state
def standard_grid():
    g = Grid(3, 4, (2, 0))
    # 'rewards' disctionary has states as keys and consequnces as values
    rewards = {(0, 3): 1, (1, 3): -1}
    # 'actions' dictionary has states as keys and possible actions as values
    actions = {
    (0, 0): ('D', 'R'),
    (0, 1): ('L', 'R'),
    (0, 2): ('L', 'D', 'R'),
    (1, 0): ('U', 'D'),
    (1, 2): ('U', 'D', 'R'),
    (2, 0): ('U', 'R'),
    (2, 1): ('L', 'R'),
    (2, 2): ('L', 'R', 'U'),
    (2, 3): ('L', 'U'),
    }
    g.set(rewards, actions)
    return g

# Same as above but has a step cost / penalty for moving
def negative_grid(step_cost=-0.1):
    g = standard_grid()
    g.rewards.update({
    (0, 0): step_cost,
    (0, 1): step_cost,
    (0, 2): step_cost,
    (1, 0): step_cost,
    (1, 2): step_cost,
    (2, 0): step_cost,
    (2, 1): step_cost,
    (2, 2): step_cost,
    (2, 3): step_cost,
    })
    return g

In [3]:
# Test out the grid
g = Grid(width = 3, height = 4, start = (2,0))
grid = standard_grid()
grid.all_states()

{(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (1, 0),
 (1, 2),
 (1, 3),
 (2, 0),
 (2, 1),
 (2, 2),
 (2, 3)}

In [2]:
# Prints a representation of the map with the values given 
def print_values(V, g):
    for i in range(g.width):
        print("---------------------------")
        for j in range(g.height):
            v = V.get((i,j), 0)
            if v >= 0:
                print(" %.2f|" % v, end="")
            else:
                print("%.2f|" % v, end="") # -ve sign takes up an extra space
        print("")

# Prints out map with policy actions given each state
def print_policy(P, g):
    for i in range(g.width):
        print("---------------------------")
        for j in range(g.height):
            a = P.get((i,j), ' ')
            print("  %s  |" % a, end="")
        print("")

### Iterative Policy Evaluation
Given a policy, let's find it's value function V(s). We will do this for both a uniform random policy and fixed policy

NOTE:
There are 2 sources of randomness:

1) p(a|s) - deciding what action to take given the state

2) p(s',r|s,a) - the next state and reward given your action-state pair

We are only modeling p(a|s) = uniform. How would the code change if p(s',r|s,a) is not deterministic?

In [4]:
SMALL_ENOUGH = 1e-3 # threshold for convergence

if __name__ == '__main__':
    grid = standard_grid()
    # States will be positions (i,j)
    states = grid.all_states()

    ### Uniformly random actions ###
    # Initialize the value of each state to 0
    V = {}
    for s in states:
        V[s] = 0
    
    # Repeat until convergence
    gamma = 1.0 # discount factor
    while True:
        biggest_change = 0
        for s in states:
            old_v = V[s]
            # V(s) only has value if it's not a terminal state
            if s in grid.actions:
                new_v = 0 # we will accumulate the answer
                p_a = 1.0 / len(grid.actions[s]) # each action has equal probability
                for a in grid.actions[s]:
                    grid.set_state(s)
                    r = grid.move(a)
                    new_v += p_a * (r + gamma * V[grid.current_state()])
                V[s] = new_v
                biggest_change = max(biggest_change, np.abs(old_v - V[s]))
        if biggest_change < SMALL_ENOUGH:
            break
            
    print("Values for uniformly random actions:")
    print_values(V, grid)
    print("\n\n")

    ### Fixed policy ###
    policy = {
    (2, 0): 'U',
    (1, 0): 'U',
    (0, 0): 'R',
    (0, 1): 'R',
    (0, 2): 'R',
    (1, 2): 'R',
    (2, 1): 'R',
    (2, 2): 'R',
    (2, 3): 'U',
    }
    print_policy(policy, grid)

    V = {}
    for s in states:
        V[s] = 0

    # Repeat until convergence
    # Let's see how V(s) changes as we get further away from the reward
    gamma = 0.9 # discount factor
    while True:
        biggest_change = 0
        for s in states:
            old_v = V[s]
            # V(s) only has value if it's not a terminal state
            if s in policy:
                a = policy[s]
                grid.set_state(s)
                r = grid.move(a)
                V[s] = r + gamma * V[grid.current_state()]
                biggest_change = max(biggest_change, np.abs(old_v - V[s]))

        if biggest_change < SMALL_ENOUGH:
            break
    print("Values for fixed policy:")
    print_values(V, grid)

values for uniformly random actions:
---------------------------
-0.03| 0.09| 0.22| 0.00|
---------------------------
-0.16| 0.00|-0.44| 0.00|
---------------------------
-0.29|-0.41|-0.54|-0.77|



---------------------------
  R  |  R  |  R  |     |
---------------------------
  U  |     |  R  |     |
---------------------------
  U  |  R  |  R  |  U  |
Values for fixed policy:
---------------------------
 0.81| 0.90| 1.00| 0.00|
---------------------------
 0.73| 0.00|-1.00| 0.00|
---------------------------
 0.66|-0.81|-0.90|-1.00|


Sources:
    
https://deeplearningcourses.com/c/artificial-intelligence-reinforcement-learning-in-python
    
https://www.udemy.com/artificial-intelligence-reinforcement-learning-in-python