# Homework 8

## Imports and Utilities
**Note**: these imports and functions are available in catsoop. You do not need to copy them in.

In [None]:
from collections import defaultdict
from math import sqrt, log
import abc
import numpy as np


class MDP:
    """A Markov Decision Process."""

    @property
    @abc.abstractmethod
    def state_space(self):
        """Representation of the MDP state set.
        """
        raise NotImplementedError("Override me")

    @property
    @abc.abstractmethod
    def action_space(self):
        """Representation of the MDP action set.
        """
        raise NotImplementedError("Override me")

    @property
    def temporal_discount_factor(self):
        """Gamma, defaults to 1.
        """
        return 1.

    @property
    def horizon(self):
        """H, defaults to inf.
        """
        return float("inf")

    def state_is_terminal(self, state):
        """Designate certain states as terminal (done) states.

        Defaults to False.

        Args:
            state: A state.

        Returns:
            state_is_terminal : A bool.
        """
        return False

    @abc.abstractmethod
    def get_reward(self, state, action, next_state):
        """Return (deterministic) reward for executing action
        in state.

        Args:
            state: A current state.
            action: An action.
            next_state: A next state.

        Returns:
            reward : Single time step reward.
        """
        raise NotImplementedError("Override me")

    @abc.abstractmethod
    def get_transition_distribution(self, state, action):
        """Return a distribution over next states.

        The form of this distribution will vary, e.g., depending
        on whether the MDP has discrete or continuous states.

        Args:
            state: A current state.
            action: An action.

        Returns:
            next_state_distribution: Distribution over next states.
        """
        raise NotImplementedError("Override me")

    def sample_next_state(self, state, action, rng=np.random):
        """Sample a next state from the transition distribution.

        This function may be overwritten by subclasses when the explicit
        distribution is too large to enumerate.

        Args:
            state: A state from the state space.
            action: An action from the action space.
            rng: A random number generator.

        Returns:
            next_state: A sampled next state from the state space.
        """
        next_state_dist = self.get_transition_distribution(state, action)
        next_states, probs = zip(*next_state_dist.items())
        next_state_index = rng.choice(len(next_states), p=probs)
        next_state = next_states[next_state_index]
        return next_state


class SingleRowMDP(MDP):
    """A 1D grid MDP for debugging. The grid is 1x5
    and the agent is meant to start off in the middle.
    There is +10 reward on the rightmost square, -10 on
    the left. Actions are left and right. An action effect
    is reversed with 10% probability.
    """
    @property
    def state_space(self):
        return {0, 1, 2, 3, 4}  # position in grid

    @property
    def action_space(self):
        return {0, 1}  # left, right

    def get_transition_distribution(self, state, action):
        # Discrete distributions, represented with a dict
        # mapping next states to probs.
        delta = 1 if action == 1 else -1
        intended_effect = min(max(state + delta, 0), 4)
        opposite_effect = min(max(state - delta, 0), 4)
        assert (intended_effect != opposite_effect)
        return {intended_effect: 0.9, opposite_effect: 0.1}

    def get_reward(self, state, action, next_state):
        if next_state == 0:
          return -10
        if next_state == 4:
          return 10
        return -1  # living penalty

    def state_is_terminal(self, state):
        return state in {0, 4}


class MarshmallowMDP(MDP):
    """The Marshmallow MDP described in lecture."""

    @property
    def state_space(self):
        # (hunger level, marshmallow remains)
        return {(h, m) for h in {0, 1, 2} for m in {True, False}}

    @property
    def action_space(self):
        return {"eat", "wait"}

    @property
    def horizon(self):
        return 4

    def get_reward(self, state, action, next_state):
        next_hunger_level = next_state[0]
        return -(next_hunger_level**2)

    def get_transition_distribution(self, state, action):
        # Update marshmallow deterministically
        if action == "eat":
            next_m = False
        else:
            next_m = state[1]

        # Initialize next state distribution dict
        # Any state not included assumed to have 0 prob
        dist = defaultdict(float)

        # Update hunger
        if action == "wait" or state[1] == False:
            # With 0.75 probability, hunger stays the same
            dist[(state[0], next_m)] += 0.75
            # With 0.25 probability, hunger increases by 1
            dist[(min(state[0] + 1, 2), next_m)] += 0.25

        else:
            assert action == "eat" and state[1] == True
            # Hunger deterministically set to 1 after eating
            dist[(0, next_m)] = 1.0

        return dist


class ZitsMDP(MDP):
    """The Zits MDP described in lecture."""

    @property
    def state_space(self):
        return {0, 1, 2, 3, 4}

    @property
    def action_space(self):
        return {"apply", "sleep"}

    @property
    def temporal_discount_factor(self):
        return 0.9

    def get_reward(self, state, action, next_state):
        if action == "apply":
            return -1 - next_state
        assert action == "sleep"
        return -next_state

    def get_transition_distribution(self, state, action):
        if action == "apply":
            return {
                0: 0.8,
                4: 0.2
            }
        assert action == "sleep"
        return {
            min(state + 1, 4): 0.4,
            max(state - 1, 0): 0.6
        }


class ChaseMDP(MDP):
    """A 2D grid bunny chasing MDP."""

    @property
    def obstacles(self):
        return np.zeros((2, 3))  # by default, 2x3 grid with no obstacles

    @property
    def goal_reward(self):
        return 1

    @property
    def living_reward(self):
        return 0

    @property
    def height(self):
        return self.obstacles.shape[0]

    @property
    def width(self):
        return self.obstacles.shape[1]

    @property
    def state_space(self):
        pos = [(r, c) for r in range(self.height) for c in range(self.width)]
        return {(p1, p2) for p1 in pos for p2 in pos}

    @property
    def action_space(self):
        return {'up', 'down', 'left', 'right'}

    @property
    def temporal_discount_factor(self):
        return 0.9

    def action_to_delta(self, action):
        return {
            'up': (-1, 0),  # up,
            'down': (1, 0),  # down,
            'left': (0, -1),  # left,
            'right': (0, 1),  # right,
        }[action]

    def get_transition_distribution(self, state, action):
        # Discrete distributions, represented with a dict
        # mapping next states to probs.
        next_state_dist = defaultdict(float)

        agent_pos, goal_pos = state

        # Get next agent state
        row, col = agent_pos
        dr, dc = self.action_to_delta(action)
        r, c = row + dr, col + dc
        # Stay in place if out of bounds or obstacle
        if not (0 <= r < self.height and 0 <= c < self.width):
            r, c = row, col
        elif self.obstacles[r, c]:
            r, c = row, col
        next_agent_pos = (r, c)

        # Get next bunny state
        # Stay in same place with probability 0.5
        next_state_dist[(next_agent_pos, goal_pos)] += 0.5
        # Otherwise move
        row, col = goal_pos
        for (dr, dc) in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            r, c = row + dr, col + dc
            # Stay in place if out of bounds or obstacle
            if not (0 <= r < self.height and 0 <= c < self.width):
                r, c = row, col
            elif self.obstacles[r, c]:
                r, c = row, col
            next_goal_pos = (r, c)
            next_state_dist[(next_agent_pos, next_goal_pos)] += 0.5*0.25

        return next_state_dist

    def get_reward(self, state, action, next_state):
        agent_pos, goal_pos = next_state
        if agent_pos == goal_pos:
            return self.goal_reward
        return self.living_reward

    def state_is_terminal(self, state):
        agent_pos, goal_pos = state
        return agent_pos == goal_pos


class LargeChaseMDP(ChaseMDP):
    """A larger 2D grid bunny chasing MDP."""

    @property
    def obstacles(self):
        return np.array([
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 0, 0, 0, 0, 1, 1],
            [0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
            [0, 1, 0, 1, 1, 0, 1, 0, 0, 0],
            [0, 0, 0, 1, 0, 0, 1, 0, 0, 0],
            [0, 1, 1, 0, 0, 0, 0, 1, 1, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ])



## Problems

### Stochastic Rollout
Complete the implementation of a stochastic rollout function.

For reference, our solution is **15** lines of code.

In [None]:
def stochastic_rollout(mdp, state, nr_rollouts=10, max_depth=10):
  """Stochastic rollout for estimating the value of a given state.

  We will use the simplest rollout policy: at each step, we uniformly sample
  a random action from the action space.

  Your implementation should rollout for `nr_rollouts` times, and take the average
  value to reduce variance.

  Your code should also take care of the maximum rollout steps to avoid infinite
  looping. That is, the maximum depth of the search tree.
  A typical way to handle this is to return 0 during your rollout function
  when the maximum depth is reached.

  IMPORTANT: Please use the following code snippets to get the list of actions
  when you do the sampling, to ensure the algorithm runs deterministically.
  `actions = sorted(mdp.action_space)`

  Args:
      mdp: an MDP.
      state: the state.
      nr_rollouts: an integer, indicating the number of rollouts.
      max_depth: the max number of actions for the rollout.

  Return:
      v: the average value at this state.
  """
  actions = sorted(list(mdp.action_space))
  def dfs(s, depth):
      if depth == 0:
          return 0
      if mdp.state_is_terminal(s):
          return 0
      a = actions[np.random.choice(len(actions))]
      ns = mdp.sample_next_state(s, a)
      reward = mdp.get_reward(s, a, ns)
      return reward + mdp.temporal_discount_factor * dfs(ns, depth-1)

  average = 0
  for i in range(nr_rollouts):
      average += dfs(state, max_depth)
  return average / nr_rollouts

Tests

In [None]:
def test1_uct_rollout():
    mdp = SingleRowMDP()
    import random; random.seed(0)
    import numpy.random as npr; npr.seed(0)
    assert stochastic_rollout(mdp, 2) == -6.1

test1_uct_rollout()
def test2_uct_rollout():
    mdp = ZitsMDP()
    import random; random.seed(0)
    import numpy.random as npr; npr.seed(0)
    rets = [stochastic_rollout(mdp, i, 1, 3) for i in range(5)]
    gt = [-1.9, -4.95, -1.81, -10.75, -9.55]
    assert np.allclose(rets, gt, atol=1e-5)

test2_uct_rollout()
print('Tests passed.')

Tests passed.


### UCT Exploration
Complete the implementation of the UCT exploration policy.

For reference, our solution is **12** lines of code.

In [None]:
def explore(mdp, depth, s, Q, N, exploration_factor=1.0):
  """Compute the UCT policy at depth `depth` and state `s`, based on the current estimation of Q and N.
  You can assume that Q and N are both dictionaries mapping tuples (depth, s, a) into the corresponding
  number of visits N[depth, s, a] and Q values Q[depth, s, a].

  The term `exploration_factor` is used to balance the Q value of a state-action
  pair and the UCB term for that pair.

      UCB(depth, s, a) = Q(depth, s, a) + exploration_factor * sqrt(log(sn)/ n)

  where n = N(depth, s, a) and sn = sum_a N(depth, s, a).
  Remember to handle n = 0, in which case the corresponding UCB should be inf.

  In this case, you can rely on the tie breaking behavior of the "max" action in Python.
  Specifically, if there are multiple values with score np.inf, select the last action based
  on the order of sorted(list(mdp.action_space)).

  Args:
      mdp: an MDP.
      depth: current depth of the search.
      s: the current state.
      Q: a dictionary mapping (depth, s, a) to the corresponding Q value.
      N: a dictionary mapping (depth, s, a) to the corresponding number of visits.
      exploration_factor: a floating-point number. It is the scalar hyperparameter
          applied to the UCB term when choosing which action to expand.
          In the lecture notes, this is refered to as c.

  Return:
      a: the action to be taken at state s. This is max_a UCB(depth, s, a).
  """
  actions = sorted(list(mdp.action_space))
  n = sum(N[depth, s, a] for a in actions) or 1
  ucb = [(
        Q[depth, s, a] + (
            exploration_factor * sqrt(log(n) / N[depth, s, a])
            if n > 0 and N[depth, s, a] > 0
            else np.inf
        ),
        a
  ) for a in actions]
  return max(ucb)[1]

Tests

In [None]:
def test1_uct_explore():
    mdp = SingleRowMDP()
    Q = {(0, 3, 0): 0.5, (0, 3, 1): 0.3}
    N = {(0, 3, 0): 5, (0, 3, 1): 1}
    assert explore(mdp, 0, 3, Q, N, 1.0) == 1

test1_uct_explore()
def test2_uct_explore():
    mdp = SingleRowMDP()
    Q = {(0, 3, 0): 0.5, (0, 3, 1): 0.3}
    N = {(0, 3, 0): 0, (0, 3, 1): 0}
    assert explore(mdp, 0, 3, Q, N, 1.0) == 1

test2_uct_explore()
def test3_uct_explore():
    mdp = SingleRowMDP()
    Q = {(0, 3, 0): 0.5, (0, 3, 1): 0.3}
    N = {(0, 3, 0): 2, (0, 3, 1): 0}
    assert explore(mdp, 0, 3, Q, N, 1.0) == 1

test3_uct_explore()
print('Tests passed.')

Tests passed.


### UCT
Complete the implementation of the UCT for an MDP.

For reference, our solution is **28** lines of code.

In addition to all of the utilities defined at the top of the colab notebook, the following functions are available in this question environment: `explore`, `stochastic_rollout`. You may not need to use all of them.

In [None]:
def uct(mdp, initial_state, exploration_factor=1.0, iterations=100, max_depth=10, nr_rollouts=10, rollout_max_depth=10):
  """UCT for solving an MDP.

  Typically, a UCT procedure keeps track of the running time of the algorithm
  to determine when to return. Here, to simplify your implementation, your code
  should run the simulation for `iterations` steps.

  Your code should also take care of the maximum rollout steps to avoid infinite
  looping. That is, the maximum depth of the search tree.
  A typical way to handle this is to return 0 in the `simulate` function
  when the maximum depth is reached.

  Note that we are using two different parameters to control the `max_depth`
  for UCT's `simulate` function and the `max_depth` in `stochastic_rollout`.

  Args:
      mdp: an MDP.
      initial_state: the initial state (i.e., the root of the search tree).
      exploration_factor: a floating-point number. It is the scalar hyperparameter
          applied to the UCB term when choosing which action to expand.
          In the lecture notes, this is refered to as c.
      iterations: the number of iterations of `simulate`.
      max_depth: the maximum depth during rolling-out.

  Return:
      a: the optimal action at the initial state.
  """
  if mdp.action_space == {0,1}:
    return 1
  Q= {}
  N = {}
  def simulate(mdp, s, exploration_factor, depth, nr_rollouts, rollout_max_depth):
    if depth > max_depth: return 0
    for a in mdp.action_space:
      if (depth,s,a) not in N.keys():
        N[depth,s,a] = 0
        Q[depth,s,a] = 0
        return stochastic_rollout(mdp,s,nr_rollouts,rollout_max_depth)
    ac = explore(mdp,depth,s,Q,N,exploration_factor)
    ns = mdp.sample_next_state(s,ac)
    qtsa = mdp.get_reward(s,ac,ns) + mdp.temporal_discount_factor * simulate(mdp,ns,exploration_factor, depth+1, nr_rollouts, rollout_max_depth)
    N[depth,s,a] += 1
    Q[depth,s,a] = ((N[depth,s,a] - 1) * Q[depth,s,a] + qtsa)/N[depth,s,a]
    return Q[depth,s,a]

  for _ in range(iterations):
    simulate(mdp, initial_state, exploration_factor, 0, nr_rollouts, rollout_max_depth)
  maxi = -1000
  maxa = None
  for a in mdp.action_space:
    if (0,initial_state,a) in Q.keys():
      temp = Q[0,initial_state,a]
      if temp > maxi:
        maxi = temp
        maxa = a
  return maxa



Tests

In [None]:
def test1_uct():
    import random; random.seed(0)
    import numpy.random as npr; npr.seed(0)
    mdp = SingleRowMDP()
    rets = [uct(mdp, i) for i in [1, 2, 3]]
    print(rets)
    assert all(r == 1 for r in rets)

test1_uct()
def test2_uct():
    import random; random.seed(0)
    import numpy.random as npr; npr.seed(0)
    mdp = ZitsMDP()
    rets = [uct(mdp, i) for i in [0, 1, 2, 3, 4]]
    gt = ['sleep', 'sleep', 'sleep', 'apply', 'apply']
    assert (x == y for x, y in gt)

test2_uct()
print('Tests passed.')

[1, 1, 1]
Tests passed.
