#### Dependencies

# Dynamic Programming (Reading Assignment)

## Move 37

###  Policy and Value Iteration using Dynamic Programming

#### Dependencies

In [2]:
# @title Dependencies
!pip install gym
import pprint



In [0]:
pp = pprint.PrettyPrinter(indent=2)

In [0]:
# @title grid world env

import numpy as np
import sys
from gym.envs.toy_text import discrete

UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3

class GridworldEnv(discrete.DiscreteEnv):
    """
    Grid World environment from Sutton's Reinforcement Learning book chapter 4.
    You are an agent on an MxN grid and your goal is to reach the terminal
    state at the top left or the bottom right corner.
    For example, a 4x4 grid looks as follows:
    T  o  o  o
    o  x  o  o
    o  o  o  o
    o  o  o  T
    x is your position and T are the two terminal states.
    You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
    Actions going off the edge leave you in your current state.
    You receive a reward of -1 at each step until you reach a terminal state.
    """

    metadata = {'render.modes': ['human', 'ansi']}

    def __init__(self, shape=[4,4]):
        if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
            raise ValueError('shape argument must be a list/tuple of length 2')

        self.shape = shape

        nS = np.prod(shape)
        nA = 4

        MAX_Y = shape[0]
        MAX_X = shape[1]

        P = {}
        grid = np.arange(nS).reshape(shape)
        it = np.nditer(grid, flags=['multi_index'])

        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            P[s] = {a : [] for a in range(nA)}

            is_done = lambda s: s == 0 or s == (nS - 1)
            reward = 0.0 if is_done(s) else -1.0

            # We're stuck in a terminal state
            if is_done(s):
                P[s][UP] = [(1.0, s, reward, True)]
                P[s][RIGHT] = [(1.0, s, reward, True)]
                P[s][DOWN] = [(1.0, s, reward, True)]
                P[s][LEFT] = [(1.0, s, reward, True)]
            # Not a terminal state
            else:
                ns_up = s if y == 0 else s - MAX_X
                ns_right = s if x == (MAX_X - 1) else s + 1
                ns_down = s if y == (MAX_Y - 1) else s + MAX_X
                ns_left = s if x == 0 else s - 1
                P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
                P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
                P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
                P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]

            it.iternext()

        # Initial state distribution is uniform
        isd = np.ones(nS) / nS

        # We expose the model of the environment for educational purposes
        # This should not be used in any model-free learning algorithm
        self.P = P

        super(GridworldEnv, self).__init__(nS, nA, P, isd)

    def _render(self, mode='human', close=False):
        if close:
            return

        outfile = StringIO() if mode == 'ansi' else sys.stdout

        grid = np.arange(self.nS).reshape(self.shape)
        it = np.nditer(grid, flags=['multi_index'])
        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            if self.s == s:
                output = " x "
            elif s == 0 or s == self.nS - 1:
                output = " T "
            else:
                output = " o "

            if x == 0:
                output = output.lstrip() 
            if x == self.shape[1] - 1:
                output = output.rstrip()

            outfile.write(output)

            if x == self.shape[1] - 1:
                outfile.write("\n")

            it.iternext()

#### Policy Evaluation in GridWorld

In [0]:
# @title implement policy evaluation
env = GridworldEnv()

def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
    """
    Evaluate a policy given an environment and a full description of the 
    environment's dynamics.
    
    Args:
        policy: [S, A] shaped matrix representing the policy.
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
    
    Returns:
        Vector of length env.nS representing the value function.
    """
    # Start with a random (all 0) value function
    V = np.zeros(env.nS)
    while True:
        V_next = np.zeros(env.nS)
        for s in range(env.nS): # don't iterate over ending states
          for a, action_prob in enumerate(policy[s]):
            (prob, next_state, reward, _) = env.P[s][a][0]
            V_next[s] += action_prob * prob * (reward + discount_factor * V[next_state])
        delta = max(V - V_next)
        V = V_next # only assign after all states are calculated
        if delta <= theta:
          break
    return np.array(V)

In [6]:
# @title Test policy Eval

random_policy = np.ones([env.nS, env.nA]) / env.nA
v = policy_eval(random_policy, env)
print("Value func:")
print(v.reshape(env.shape))

# Test: Make sure the evaluated policy is what we expected
expected_v = np.array([0, -14, -20, -22, -14, -18, -20, -20, -20, -20, -18, -14, -22, -20, -14, 0])
np.testing.assert_array_almost_equal(v, expected_v, decimal=2)

Value func:
[[  0.         -13.99989315 -19.99984167 -21.99982282]
 [-13.99989315 -17.99986052 -19.99984273 -19.99984167]
 [-19.99984167 -19.99984273 -17.99986052 -13.99989315]
 [-21.99982282 -19.99984167 -13.99989315   0.        ]]


#### Value Iteration

In [0]:
# @title implement policy improvement

def policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=1.0):
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
    
    Args:
        env: The OpenAI envrionment.
        policy_eval_fn: Policy Evaluation function that takes 3 arguments:
            policy, env, discount_factor.
        discount_factor: gamma discount factor.
        
    Returns:
        A tuple (policy, V). 
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
        
    """
    # Start with a random policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    
    V = policy_eval_fn(random_policy, env, discount_factor)
    
    while True:
      for s in range(1, env.nS-1):
        for a, action_prob in enumerate(policy[s]):
          print(env.P[s][a])
        # Implement this!
        break
    
    return policy, V


In [8]:
policy, v = policy_improvement(env)
print("Policy Probability Distribution:")
print(policy)
print("")

print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")

print("Value Function:")
print(v)
print("")

print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")

[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(1.0, 5, -1.0, False)]
[(1.0, 0, -1.0, True)]
[(1.0, 1, -1.0, False)]
[(1.0, 2, -1.0, False)]
[(

KeyboardInterrupt: ignored

In [79]:
# Test the value function
expected_v = np.array([ 0, -1, -2, -3, -1, -2, -3, -2, -2, -3, -2, -1, -3, -2, -1,  0])
np.testing.assert_array_almost_equal(v, expected_v, decimal=2)

AssertionError: ignored