In [9]:
!pip install gym==0.23.1 einops fancy-einsum

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting einops
  Downloading einops-0.6.0-py3-none-any.whl (41 kB)
[?25l[K     |███████▉                        | 10 kB 19.2 MB/s eta 0:00:01[K     |███████████████▊                | 20 kB 11.8 MB/s eta 0:00:01[K     |███████████████████████▋        | 30 kB 8.7 MB/s eta 0:00:01[K     |███████████████████████████████▌| 40 kB 4.5 MB/s eta 0:00:01[K     |████████████████████████████████| 41 kB 343 kB/s 
[?25hCollecting fancy-einsum
  Downloading fancy_einsum-0.0.3-py3-none-any.whl (6.2 kB)
Installing collected packages: fancy-einsum, einops
Successfully installed einops-0.6.0 fancy-einsum-0.0.3


In [6]:
!wget https://github.com/callummcdougall/arena-v1/raw/main/w6d2/utils.py
!wget https://github.com/callummcdougall/arena-v1/raw/main/w6d2/solutions.py

--2022-12-01 03:37:56--  https://github.com/callummcdougall/arena-v1/raw/main/w6d2/utils.py
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/callummcdougall/arena-v1/main/w6d2/utils.py [following]
--2022-12-01 03:37:56--  https://raw.githubusercontent.com/callummcdougall/arena-v1/main/w6d2/utils.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4242 (4.1K) [text/plain]
Saving to: ‘utils.py.1’


2022-12-01 03:37:56 (56.4 MB/s) - ‘utils.py.1’ saved [4242/4242]

--2022-12-01 03:37:56--  https://github.com/callummcdougall/arena-v1/raw/main/w6d2/solutions.py
Resolving github.com (github.com)... 140.8

In [22]:
from gettext import find
from typing import Optional, Union, Tuple
import numpy as np
import gym
import gym.spaces
import gym.envs.registration
from gym.utils import seeding
import numpy as np
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from PIL import Image, ImageDraw
import utils

MAIN = __name__ == "__main__"
Arr = np.ndarray

In [23]:
class Environment:
    def __init__(self, num_states: int, num_actions: int, start=0, terminal=None):
        self.num_states = num_states
        self.num_actions = num_actions
        self.start = start
        self.terminal = np.array([], dtype=int) if terminal is None else terminal
        (self.T, self.R) = self.build()

    def build(self):
        '''
        Constructs the T and R tensors from the dynamics of the environment.
        Outputs:
            T : (num_states, num_actions, num_states) State transition probabilities
            R : (num_states, num_actions, num_states) Reward function
        '''
        num_states = self.num_states
        num_actions = self.num_actions
        T = np.zeros((num_states, num_actions, num_states))
        R = np.zeros((num_states, num_actions, num_states))
        for s in range(num_states):
            for a in range(num_actions):
                (states, rewards, probs) = self.dynamics(s, a)
                (all_s, all_r, all_p) = self.out_pad(states, rewards, probs)
                T[s, a, all_s] = all_p
                R[s, a, all_s] = all_r
        return (T, R)

    def dynamics(self, state: int, action: int) -> Tuple[Arr, Arr, Arr]:
        '''
        Computes the distribution over possible outcomes for a given state
        and action.
        Inputs:
            state : int (index of state)
            action : int (index of action)
        Outputs:
            states  : (m,) all the possible next states
            rewards : (m,) rewards for each next state transition
            probs   : (m,) likelihood of each state-reward pair
        '''
        raise NotImplementedError

    def render(pi: Arr):
        '''
        Takes a policy pi, and draws an image of the behavior of that policy, if applicable.
        Inputs:
            pi : (num_actions,) a policy
        Outputs:
            None
        '''
        raise NotImplementedError

    def out_pad(self, states: Arr, rewards: Arr, probs: Arr):
        '''
        Inputs:
            states  : (m,) all the possible next states
            rewards : (m,) rewards for each next state transition
            probs   : (m,) likelihood of each state-reward pair
        Outputs:
            states  : (num_states,) all the next states
            rewards : (num_states,) rewards for each next state transition
            probs   : (num_states,) likelihood of each state-reward pair (including zero-prob outcomes.)
        '''
        out_s = np.arange(self.num_states)
        out_r = np.zeros(self.num_states)
        out_p = np.zeros(self.num_states)
        for i in range(len(states)):
            idx = states[i]
            out_r[idx] += rewards[i]
            out_p[idx] += probs[i]
        return (out_s, out_r, out_p)

In [27]:
class Toy(Environment):
    def dynamics(self, state: int, action: int):
        (S0, SL, SR) = (0, 1, 2)
        LEFT = 0
        num_states = 3
        num_actions = 2
        assert 0 <= state < self.num_states and 0 <= action < self.num_actions
        if state == S0:
            if action == LEFT:
                (next_state, reward) = (SL, 1)
            else:
                (next_state, reward) = (SR, 0)
        elif state == SL:
            (next_state, reward) = (S0, 0)
        elif state == SR:
            (next_state, reward) = (S0, 2)
        return (np.array([next_state]), np.array([reward]), np.array([1]))

    def __init__(self):
        super().__init__(3, 2)

toy = Toy()
print(toy.T)
print(toy.R)

[[[0. 1. 0.]
  [0. 0. 1.]]

 [[1. 0. 0.]
  [1. 0. 0.]]

 [[1. 0. 0.]
  [1. 0. 0.]]]
[[[0. 1. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 0. 0.]]

 [[2. 0. 0.]
  [2. 0. 0.]]]


In [29]:
class Norvig(Environment):
    def dynamics(self, state: int, action: int) -> Tuple[Arr, Arr, Arr]:
        def state_index(state):
            assert 0 <= state[0] < self.width and 0 <= state[1] < self.height, print(state)
            pos = state[0] + state[1] * self.width
            assert 0 <= pos < self.num_states, print(state, pos)
            return pos

        pos = self.states[state]
        move = self.actions[action]
        if state in self.terminal or state in self.walls:
            return (np.array([state]), np.array([0]), np.array([1]))
        out_probs = np.zeros(self.num_actions) + 0.1
        out_probs[action] = 0.7
        out_states = np.zeros(self.num_actions, dtype=int) + self.num_actions
        out_rewards = np.zeros(self.num_actions) + self.penalty
        new_states = [pos + x for x in self.actions]
        for (i, s_new) in enumerate(new_states):
            if not (0 <= s_new[0] < self.width and 0 <= s_new[1] < self.height):
                out_states[i] = state
                continue
            new_state = state_index(s_new)
            if new_state in self.walls:
                out_states[i] = state
            else:
                out_states[i] = new_state
            for idx in range(len(self.terminal)):
                if new_state == self.terminal[idx]:
                    out_rewards[i] = self.goal_rewards[idx]
        return (out_states, out_rewards, out_probs)

    def render(self, pi: Arr):
        assert len(pi) == self.num_states
        emoji = ["⬆️", "➡️", "⬇️", "⬅️"]
        grid = [emoji[act] for act in pi]
        grid[3] = "🟩"
        grid[7] = "🟥"
        grid[5] = "⬛"
        print(str(grid[0:4]) + "\n" + str(grid[4:8]) + "\n" + str(grid[8:]))

    def __init__(self, penalty=-0.04):
        self.height = 3
        self.width = 4
        self.penalty = penalty
        num_states = self.height * self.width
        num_actions = 4
        self.states = np.array([[x, y] for y in range(self.height) for x in range(self.width)])
        self.actions = np.array([[0, -1], [1, 0], [0, 1], [-1, 0]])
        self.dim = (self.height, self.width)
        terminal = np.array([3, 7], dtype=int)
        self.walls = np.array([5], dtype=int)
        self.goal_rewards = np.array([1.0, -1])
        super().__init__(num_states, num_actions, start=8, terminal=terminal)

In [69]:
def policy_eval_numerical(env: Environment, pi: Arr, gamma=0.99, eps=1e-08, max_iterations=10_000) -> Arr:
    '''
    Numerically evaluates the value of a given policy by iterating the Bellman equation
    Inputs:
        env: Environment
        pi : shape (num_states,) - The policy to evaluate
        gamma: float - Discount factor
        eps  : float - Tolerance
    Outputs:
        value : float (num_states,) - The value function for policy pi
    '''
    states = np.arange(env.num_states)
    transitions = env.T[states, pi, :]
    rewards = env.R[states, pi, :]
    V_old = np.zeros_like(pi)

    done = False
    iteration = 0
    while not done:
        V_new = (transitions * (rewards + gamma * V_old)).sum(axis=1)
        # Done conditions
        iteration += 1
        if np.abs(V_new - V_old).max() < eps:
            done = True
            print(f"Converged in {iteration} steps")
        elif iteration > max_iterations:
            done = True
            print(f"Failed to converge after {iteration} steps")
        else:
            V_old = V_new

    return V_old


utils.test_policy_eval(policy_eval_numerical, exact=False)

Converged in 107 steps.
Converged in 108 steps
Converged in 101 steps.
Converged in 102 steps
Converged in 143 steps.
Converged in 144 steps
Converged in 145 steps.
Converged in 146 steps
Converged in 115 steps.
Converged in 116 steps


In [87]:
def policy_eval_exact(env: Environment, pi: Arr, gamma=0.99) -> Arr:
    states = np.arange(env.num_states)
    P = env.T[states, pi, :]
    R = env.R[states, pi, :]

    I = np.eye(P.shape[0])
    r = (P * R).sum(axis=1)

    v = np.linalg.inv(I - gamma * P) @ r
    return v


utils.test_policy_eval(policy_eval_exact, exact=True)

In [109]:
def policy_improvement(env: Environment, V: Arr, gamma=0.99) -> Arr:
    '''
    Inputs:
        env: Environment
        V  : (num_states,) value of each state following some policy pi
    Outputs:
        pi_better : vector (num_states,) of actions representing a new policy obtained via policy iteration
    '''
    T = env.T
    R = env.R
    state_action_values = (T * (R + gamma * V)).sum(axis=2)
    pi_better = state_action_values.argmax(axis=1)

    return pi_better


utils.test_policy_improvement(policy_improvement)

In [115]:
def find_optimal_policy(env: Environment, gamma=0.99, max_iterations=10_000):
    '''
    Inputs:
        env: environment
    Outputs:
        pi : (num_states,) int, of actions represeting an optimal policy
    '''
    pi_old = np.zeros(shape=env.num_states, dtype=int)
    converged = False
    for i in range(max_iterations):
        V = policy_eval_exact(env, pi_old, gamma=gamma)
        pi_new = policy_improvement(env, V, gamma=gamma)

        # Check for convergence
        if all(pi_new == pi_old):
            converged = True
            break

        pi_old = pi_new

    if not converged:
        print(f"Did not converge after {max_iterations} steps.")

    return pi_new



utils.test_find_optimal_policy(find_optimal_policy)
penalty = -0.04
norvig = Norvig(penalty)
pi_opt = find_optimal_policy(norvig, gamma=0.99)
norvig.render(pi_opt)

['➡️', '➡️', '➡️', '🟩']
['⬆️', '⬛', '⬆️', '🟥']
['⬆️', '⬅️', '⬆️', '⬅️']
