# MIT 6.036 Spring 2019: Homework 9

This colab notebook provides code and a framework for question 1 and 5 of the [the homework](https://openlearninglibrary.mit.edu/courses/course-v1:MITx+6.036+1T2019/courseware/Week9/week9_homework).  You can work out your solutions here, then submit your results back on the homework page when ready.


## Setup

First, download the code distribution for this homework that contains test cases and helper functions.

Run the next code block to download and import the code for this lab.

In [2]:
!rm -rf code_for_hw9*
!wget --no-check-certificate https://introml_oll.odl.mit.edu/cat-soop/_static/6.036/homework/hw09/code_for_hw9.zip
!unzip code_for_hw9.zip
!mv code_for_hw9/* .

from dist import *
from sm import *
from util import *
from mdp import *

import mdp
import numpy as np

zsh:1: no matches found: code_for_hw9*
--2023-04-04 15:13:28--  https://introml_oll.odl.mit.edu/cat-soop/_static/6.036/homework/hw09/code_for_hw9.zip
Resolving introml_oll.odl.mit.edu (introml_oll.odl.mit.edu)... 3.226.240.108
Connecting to introml_oll.odl.mit.edu (introml_oll.odl.mit.edu)|3.226.240.108|:443... connected.
  Unable to locally verify the issuer's authority.
HTTP request sent, awaiting response... 200 OK
Length: 8058 (7.9K) [application/zip]
Saving to: ‘code_for_hw9.zip’


2023-04-04 15:13:29 (549 MB/s) - ‘code_for_hw9.zip’ saved [8058/8058]

Archive:  code_for_hw9.zip
   creating: code_for_hw9/
  inflating: code_for_hw9/util.py    
   creating: __MACOSX/
   creating: __MACOSX/code_for_hw9/
  inflating: __MACOSX/code_for_hw9/._util.py  
  inflating: code_for_hw9/sm.py      
  inflating: __MACOSX/code_for_hw9/._sm.py  
  inflating: code_for_hw9/mdp.py     
  inflating: __MACOSX/code_for_hw9/._mdp.py  
  inflating: code_for_hw9/tests.py   
  inflating: __MACOSX/code_for_hw9

## 1) State Machines

We will implement state machines as sub-classes of the `SM` class, which specifies the `start_state`, `transition_fn` and `output_fn`.

```
class SM:
    start_state = None  # default start state
    def transition_fn(self, s, i):
        '''s:       the current state
           i:       the given input
           returns: the next state'''
        raise NotImplementedError
    def output_fn(self, s):
        '''s:       the current state
           returns: the corresponding output'''
        raise NotImplementedError
```

An example of a sub-class is the `Accumulator` state machine, which adds up (accumulates) its input and outputs the sum. Convince yourself that the implementation works as expected before moving on.

```
class Accumulator(SM):
    start_state = 0
    def transition_fn(self, s, i):
        return s + i
    def output_fn(self, s):
        return s
```

### 1.1 Transduce
Implement the `transduce` method for the `SM` class. It is given an input sequence (a list) and returns an output sequence (a list) of the outputs of the state machine on the input sequence. Assume `self.transition_fn` and `self.output_fn` are defined.

In [5]:
class SM:
    start_state = None

    def transduce(self, input_seq):
        '''input_seq: a list of inputs to feed into SM
           returns:   a list of outputs of SM'''
        output_seq = []
        s = self.start_state
        for x in input_seq:
            s = self.transition_fn(s, x)
            output_seq += [self.output_fn(s)]
        return output_seq

Below is the `Accumulator` state machine implementation that you saw above as well as an unit test to help test your `SM` class.

In [111]:
class Accumulator(SM):
    start_state = 0

    def transition_fn(self, s, i):
        print('trans s', s, 's_t_1', i)
        return s + i

    def output_fn(self, s):
        print('out s_t', s)
        return s
    
def test_accumulator_sm():
    res = Accumulator().transduce([-1, 2, 3, -2, 5, 6])
    print('res', res)
    assert(res == [-1, 1, 4, 2, 7, 13])
    print("Test passed!")

# Unit test
test_accumulator_sm()

trans s 0 s_t_1 -1
out s_t -1
trans s -1 s_t_1 2
out s_t 1
trans s 1 s_t_1 3
out s_t 4
trans s 4 s_t_1 -2
out s_t 2
trans s 2 s_t_1 5
out s_t 7
trans s 7 s_t_1 6
out s_t 13
res [-1, 1, 4, 2, 7, 13]
Test passed!


### 1.2 Binary Addition
Implement a `Binary_Addition` state machine that takes in a sequence of pairs of binary digits (0,1) representing two reversed binary numbers and returns a sequence of digits representing the reversed sum. For instance, to sum two binary numbers `100` and `011`, the input sequence will be `[(0, 1), (0, 1), (1, 0)]`. You will need to define `start_state`, `transition_fn` and `output_fn`. Note that when transduced, the input sequence may need to be extended with an extra (0,0) to output the final carry.

In [69]:
class Binary_Addition(SM):
    start_state = (0, 0) # Change

    def transition_fn(self, s, x):
        # Your code here
        carry_bit = s[1]
        a_bit, b_bit = x
        res = (a_bit + b_bit + carry_bit) % 2
        carry_bit = ((a_bit and carry_bit) or (b_bit and carry_bit) or (a_bit and b_bit))
        return (res, carry_bit)
        
    def output_fn(self, s):
        # Your code here
        bit, carry = s
        return bit

In [70]:
def test_binary_addition_sm():
    res = Binary_Addition().transduce([(1, 1), (1, 0), (0, 0)])
    print(res)
    assert(res == [0, 0, 1])
    print("Test passed!")

# Unit test
test_binary_addition_sm()

[0, 0, 1]
Test passed!


### 1.3 Reverser
Implement a state machine that reverses a sequence. The input is a list of the form:

```
 sequence1 + ['end'] + sequence2
 ```
 
`+` refers to concatenation. `sequence1` is a list of strings, the `'end'` string indicates termination, and `sequence2` is arbitrary. The machine reverses `sequence1`: for each entry in the `sequence1`, the machine outputs `None`. For the `'end'` input and each entry in the second sequence, an item from the reversed `sequence1` is output, or `None` if no characters remain.

In [87]:
class Reverser(SM):
    start_state = (False, [])

    def transition_fn(self, s, x):
        # Your code here
        reverse, strings = s
        if x == 'end':
            return (True, strings)
        elif reverse:
            return (True, strings[1:])
        else:
            return (False, [x] + strings)

    def output_fn(self, s):
        # Your code here
        reverse, strings = s
        if reverse and len(strings) > 0:
            return strings[0]
        else:
            return None

In [88]:
def test_reverser_sm():
    res = Reverser().transduce(['foo', ' ', 'bar'] + ['end'] + list(range(5)))
    print('res', res)
    assert(res == [None, None, None, 'bar', ' ', 'foo', None, None, None])
    print("Test passed!")

# Unit test
test_reverser_sm()

res [None, None, None, 'bar', ' ', 'foo', None, None, None]
Test passed!


### 1.4 RNN
An RNN has a transition function and an output function, each of which is defined in terms of weight matrices, offset vectors and activation functions, analogously to standard neural networks.

* The inputs $x$ are $l\times1$ vectors
* The states $s$ are $m\times1$ vectors
* The outputs $y$ are $n\times1$ vectors

The behavior is defined as follows:
$$\begin{align*} s_{t} & = f_1(W^{ss} s_{{t-1}} + W^{sx} x_{t} + W^{ss}_0) \\ y_{t} & = f_2(W^o s_{t} + W^o_0) \end{align*}$$

where $f_1$ and $f_2$ are two activation functions, such as linear, softmax or tanh.


Note that each input `i` below has dimension `l x 1`. Implement the corresponding state machine, where the weights are given in `__init__`. Make sure to set an appropriate `start_state`.

In [176]:
class RNN(SM):
    def __init__(self, Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2):
        # Your code here
        self.Wsx = Wsx
        self.Wss = Wss
        self.Wo = Wo
        self.Wss_0 = Wss_0
        self.Wo_0 = Wo_0
        self.f1 = f1
        self.f2 = f2
        self.m, _ = Wss.shape
        self.start_state = np.zeros((self.m, 1))

    def transition_fn(self, s, i):
        # Your code here
        return self.f1(self.Wss @ s + self.Wsx @ i + self.Wss_0)

    def output_fn(self, s):
        # Your code here
        return self.f2(self.Wo @ s + self.Wo_0)

In [153]:
def softmax(z):
    v = np.exp(z)
    return v / np.sum(v, axis = 0)

def test_rnn():
    Wsx1 = np.array([[0.1],
                     [0.3],
                     [0.5]])
    Wss1 = np.array([[0.1,0.2,0.3],
                     [0.4,0.5,0.6],
                     [0.7,0.8,0.9]])
    Wo1 = np.array([[0.1,0.2,0.3],
                    [0.4,0.5,0.6]])
    Wss1_0 = np.array([[0.01],
                       [0.02],
                       [0.03]])
    Wo1_0 = np.array([[0.1],
                      [0.2]])
    in1 = [np.array([[0.1]]),
           np.array([[0.3]]),
           np.array([[0.5]])]
    
    rnn = RNN(Wsx1, Wss1, Wo1, Wss1_0, Wo1_0, np.tanh, softmax)
    expected = np.array([[[0.4638293846951024], [0.5361706153048975]],
                        [[0.4333239107898491], [0.566676089210151]],
                        [[0.3821688606165438], [0.6178311393834561]]])

    assert(np.allclose(expected, rnn.transduce(in1)))
    print("Test passed!")

# Unit test
test_rnn()

Test passed!


### 1.5 Accumulator Sign RNN
Enter the parameter matrices and vectors for an instance of the `RNN` class such that the output is `1` if the cumulative sum of the inputs is positive, `-1` if the cumulative sum is negative and `0` if otherwise. Make sure that you scale the outputs so that the output activation values are very close to `1`, `0` and `-1`. Note that both the inputs and outputs are `1 x 1`.

Hint: `np.tanh` may be useful. Remember to convert your Python lists to `np.array`.

In [154]:
Wsx = np.ones((1, 1))
Wss = np.ones((1, 1))
Wo = np.ones((1, 1))
Wss_0 = np.zeros((1, 1))
Wo_0 = np.zeros((1, 1))
f1 = lambda x: x
f2 = np.sign
acc_sign = RNN(Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2)

In [155]:
def test_acc_sign_rnn(acc_sign_rnn):
    res = acc_sign_rnn.transduce(list(map(lambda x: np.array([[x]]), [-1, -2, 2, 3, -3, 1])))
    print('res', res)
    expected = np.array([[[-1.0]], [[-1.0]], [[-1.0]], [[1.0]], [[-1.0]], [[0.0]]])
    assert(np.allclose(expected, res))
    print("Test passed!")

# Unit test
test_acc_sign_rnn(acc_sign)

res [array([[-1.]]), array([[-1.]]), array([[-1.]]), array([[1.]]), array([[-1.]]), array([[0.]])]
Test passed!


### 1.6 Autoregression RNN

Enter the parameter matrices and vectors for an instance of the `RNN` class such that it implements the following autoregressive model:
$$y_t=y_{t-1} - 2y_{t-2} + 3y_{t-3}$$
when $x_t = y_{t-1}$. Note that both the inputs and outputs are `1 x 1`.


In [177]:
Wsx = np.array([[1, 0, 0]]).reshape((3, 1))
Wss = np.array([[0, 0, 0], [1, 0, 0], [0, 1, 0]])
Wo = np.array([[1, -2, 3]])
Wss_0 = np.zeros((3, 1))
Wo_0 = np.zeros((1, 1))
f1 = lambda x: x
f2 = lambda x: x
auto = RNN(Wsx, Wss, Wo, Wss_0, Wo_0, f1, f2)

In [179]:
def test_auto_rnn(auto_rnn):
    res = auto_rnn.transduce([np.array([[x]]) for x in range(-2,5)])
    expected = np.array([[[-2.0]], [[3.0]], [[-4.0]], [[-2.0]], [[0.0]], [[2.0]], [[4.0]]])
    assert(np.allclose(expected, res))
    print("Test passed!")
    
# Unit test
test_auto_rnn(auto)

Test passed!


# 2) MDP
We consider the same tiny MDP as in the exercises, with states (0, 1, 2, 3) and actions ('b', 'c'). The reward function is:

$$
R(s,a) = \begin{cases}
        1, & \text{if } s = 1 \\
        2, & \text{if } s = 3 \\
        0, & \text{otherwise}
       \end{cases}
$$

You get the reward associated with a state on the step when you exit that state. The transition function for each action is below, where $T[i,x,j]$ is the $P(s_{t+1} = j | a = x, s_t = i)$ (note rows correspond to the input states, and columns correspond to the output states).

$$
\begin{equation}
T(s_t, \text{'b'}, s_{t+1}) =
    \begin{bmatrix}
        0.0 & 0.9 & 0.1 & 0.0 \\
        0.9 & 0.1 & 0.0 & 0.0 \\
        0.0 & 0.0 & 0.1 & 0.9 \\
        0.9 & 0.0 & 0.0 & 0.1 \\
    \end{bmatrix}
\end{equation}
$$

$$
\begin{equation}
T(s_t, \text{'c'}, s_{t+1}) = 
    \begin{bmatrix}
        0.0 & 0.1 & 0.9 & 0.0 \\
        0.9 & 0.1 & 0.0 & 0.0 \\
        0.0 & 0.0 & 0.1 & 0.9 \\
        0.9 & 0.0 & 0.0 & 0.1 \\
    \end{bmatrix}
\end{equation}
$$
 
Note that the only effect of the action is to change the transition probability from state 0.

# 2.1) Limited horizons
Consider two policies: one that always takes action 'b' in state 0 and one that always takes action 'c'.

## 2.1.A)
Which policy is best if you're starting from state 0 with horizon 2?

Horizon 2 values from taking action 'b':
```
[[0.9],
[1.1],
[1.8],
[2.2]]
```

Horizon 2 values from taking action 'c':
```
[[0.1],
[1.1],
[1.8],
[2.2]]
```
Since $0.9 \gt 0.1$, taking action 'b' is the better policy.

## 2.1.B)
Which policy is best if you're starting from state 0 with horizon 3?

Horizon 3 values from taking action 'b':
```
[[1.17],
[1.92],
[2.16],
[3.03]]
```

Horizon 3 values from taking action 'c':
```
[[1.73],
[1.2 ],
[2.16],
[2.31]]
```
Since $1.73 \gt 1.17$, taking action 'c' is the better policy.

## 2.1.C)
What if we start in state 0 with horizon 5, take action 'c', land in state 2 with horizon 4, land in state 3 with horizon 3 (and get reward = 2!), and then land in state 0 with horizon 2? What action should we take now?

Same as 2.1.A, action 'b'.

# 2.2) At a discount
Now, in the same tiny MDP, we are interested in the infinite-horizon discounted value, with discount factor 0.9. For a given policy, we can write down a set of linear equations characterizing these values, in the form:

$$
\begin{align}
    v_0 &= r_0 + c_{00}v_0 + c_{01}v_1 + c_{02}v_2 + c_{03}v_3 \\
    v_1 &= r_1 + c_{10}v_0 + c_{11}v_1 + c_{12}v_2 + c_{13}v_3 \\
    v_2 &= r_2 + c_{20}v_0 + c_{21}v_1 + c_{22}v_2 + c_{23}v_3 \\
    v_3 &= r_3 + c_{30}v_0 + c_{31}v_1 + c_{32}v_2 + c_{33}v_3 \\
\end{align}
$$

## 2.2.A)
Enter the vector of $r_i$ values, in the order shown above. Enter the matrix as a list of 4 numbers:

$[0, 1, 0, 2]$

## 2.2.B)
Enter the matrix of $c_{ij}$ values, in the order shown above, for the policy "always take action 'c'".

$$
\begin{bmatrix}
    0.0 & 0.09 & 0.81 & 0.0 \\
    0.81 & 0.09 & 0.0 & 0.0 \\
    0.0 & 0.0 & 0.09 & 0.81 \\
    0.81 & 0.0 & 0.0 & 0.09 \\
\end{bmatrix}
$$

## 2.2.C)
Solve for the value function under this policy by using numpy.linalg.solve to solve these equations. For example, if you have a set of linear equations of the form:

```
1 v0 + 2  v1 + 3  v2 = 4
5 v0 + 6  v1 + 7  v2 = 8
9 v0 + 10 v1 + 11 v2 = 12
```
Then you can solve as follows:
```
A = np.matrix([[1, 2, 3], [5, 6, 7], [9, 10, 11]])
b = np.matrix([[4], [8], [12]])
v = np.linalg.solve(A,b)  # A v = b
```

Since we know what the rewards are,$[0, 1, 0, 2]$, we will rearrange these equations and solve for $v$:

$$
\begin{equation}
\begin{aligned}
    v_0 &= r_0 + c_{00}v_0 + c_{01}v_1 + c_{02}v_2 + c_{03}v_3 \\
    v_1 &= r_1 + c_{10}v_0 + c_{11}v_1 + c_{12}v_2 + c_{13}v_3 \\
    v_2 &= r_2 + c_{20}v_0 + c_{21}v_1 + c_{22}v_2 + c_{23}v_3 \\
    v_3 &= r_3 + c_{30}v_0 + c_{31}v_1 + c_{32}v_2 + c_{33}v_3 \\
\end{aligned}
\qquad\Rightarrow\qquad
\begin{aligned}
    r_0 &= v_0 - c_{00}v_0 - c_{01}v_1 - c_{02}v_2 - c_{03}v_3 \\
    r_1 &= v_1 - c_{10}v_0 - c_{11}v_1 - c_{12}v_2 - c_{13}v_3 \\
    r_2 &= v_2 - c_{20}v_0 - c_{21}v_1 - c_{22}v_2 - c_{23}v_3 \\
    r_3 &= v_3 - c_{30}v_0 - c_{31}v_1 - c_{32}v_2 - c_{33}v_3 \\
\end{aligned}
\qquad\Rightarrow\qquad
\begin{aligned}
    r_0 &= v_0(1 - c_{00}) - c_{01}v_1 - c_{02}v_2 - c_{03}v_3 \\
    r_1 &= - c_{10}v_0 + v_1(1 - c_{11}) - c_{12}v_2 - c_{13}v_3 \\
    r_2 &= - c_{20}v_0 - c_{21}v_1 + v_2(1 - c_{22}) - c_{23}v_3 \\
    r_3 &= - c_{30}v_0 - c_{31}v_1 - c_{32}v_2 + v_3(1 - c_{33}) \\
\end{aligned}
\end{equation}
$$

In [309]:
b_transition_matrix = np.array([[0.0, 0.9, 0.1, 0.0], [0.9, 0.1, 0.0, 0.0], [0.0, 0.0, 0.1, 0.9], [0.9, 0.0, 0.0, 0.1]])
c_transition_matrix = np.array([[0.0, 0.1, 0.9, 0.0], [0.9, 0.1, 0.0, 0.0], [0.0, 0.0, 0.1, 0.9], [0.9, 0.0, 0.0, 0.1]])
# print('b_transition_matrix', b_transition_matrix)
# print('c_transition_matrix', c_transition_matrix)
reward = np.array([[0, 1, 0, 2]]).reshape((4, 1))
# horizon -> horizon values vector
memo = {0 : np.zeros((4, 1))}

def compute_horizon_values(horizon, transition_matrix, reward, memo):
    for h in range(1, horizon+1):
        v_h = reward + transition_matrix @ memo[h-1]
        memo[h] = v_h
    return memo

#2.1.A)
# print(compute_horizon_values(2, b_transition_matrix, reward, memo.copy()))
# print(compute_horizon_values(2, c_transition_matrix, reward, memo.copy()))

#2.1.B)
# print(compute_horizon_values(3, b_transition_matrix, reward, memo.copy()))
# print(compute_horizon_values(3, c_transition_matrix, reward, memo.copy()))

#2.1.C)
# print(compute_horizon_values(5, b_transition_matrix, reward, memo.copy()))
# print(compute_horizon_values(5, c_transition_matrix, reward, memo.copy()))

#2.2.C
discount = 0.9
A = np.eye(c_transition_matrix.shape[0]) - discount*c_transition_matrix
# print(A)
b = reward
v = np.linalg.solve(A, b)
np.round(v, 3).T

array([[6.053, 6.487, 6.752, 7.586]])

# 3) Tiny Q-value iteration
__In the same example as above__, with an infinite horizon and a discount factor of 0.9, compute three iterations of value iteration. Don't assume a particular policy. Assume that:

- All the value estimates start at 0 (meaning, at iteration 0, $Q(s,a)=0$ for all $s,a$ pairs), and
- You operate synchronously (that is, on iteration $t$ of value iteration, you only use values that were computed on iteration $t−1$).
We recommend you compute the Q-value iteration by hand to get a better understanding of the algorithm.
For each iteration enter 8 numbers corresponding to:

$$[Q(0,b),Q(0,c),Q(1,b),Q(1,c),Q(2,b),Q(2,c),Q(3,b),Q(3,c)]$$

at that iteration, accurate to at least 3 decimal places.

## 3.A) Iteration 1

$[0, 0, 1, 1, 0, 0, 2, 2]$

## 3.B) Iteration 2

I got different answers than the online grader. I strongly believe the solutions of the grader are incorrect. Online grader solution at time of writing: $[0.81, 0.09, 1.09, 1.09, 1.62, 1.62, 2.18, 2.18]$. My solution $[0.81, 0.81 ,1.09, 1.09, 1.62, 1.62, 2.18, 2.18]$

## 3.C) Iteration 3

I got different answers than the online grader. I strongly believe the solutions of the grader are incorrect. Online grader solution at time of writing: $[1.0287, 1.4103, 1.7542, 1.7542, 1.9116, 1.9116, 2.8523, 2.8523]$. My solution $[1.4103, 1.4103, 1.7542, 1.7542, 1.9116, 1.9116, 2.8523, 2.8523]$

## 3.D)
After the third iteration, what action would you select in state 0?

'c'

In [310]:
b_transition_matrix = np.array([[0.0, 0.9, 0.1, 0.0], [0.9, 0.1, 0.0, 0.0], [0.0, 0.0, 0.1, 0.9], [0.9, 0.0, 0.0, 0.1]])
c_transition_matrix = np.array([[0.0, 0.1, 0.9, 0.0], [0.9, 0.1, 0.0, 0.0], [0.0, 0.0, 0.1, 0.9], [0.9, 0.0, 0.0, 0.1]])
reward = np.array([[0, 1, 0, 2]]).reshape((4, 1))
discount = 0.9

# print(discount*b_transition_matrix)
# print(discount*c_transition_matrix)

def value_iteration(horizon, b_transition_matrix, c_transition_matrix, reward, memo):
    for h in range(1, horizon+1):
        v_h_b = reward + (b_transition_matrix @ memo[h-1]['b'])
        v_h_c = reward + (c_transition_matrix @ memo[h-1]['c'])
        all_actions_v = np.concatenate((v_h_b, v_h_c), axis=1)
        best_action_v = np.max(all_actions_v, axis=1, keepdims=True)
        memo[h] = {}
        memo[h]['b'] = best_action_v
        memo[h]['c'] = best_action_v
    return memo
# 3.A-3.D
print(compute_horizon_values(3, discount*b_transition_matrix, reward, memo.copy()))
print(compute_horizon_values(3, discount*c_transition_matrix, reward, memo.copy()))
# horizon -> action -> horizon value vectors
memo = {0 : {'b' : np.zeros((4, 1)), 'c' : np.zeros((4, 1))}}
print(value_iteration(3, discount*b_transition_matrix, discount*c_transition_matrix, reward, memo.copy()))

{0: array([[0.],
       [0.],
       [0.],
       [0.]]), 1: array([[0.],
       [1.],
       [0.],
       [2.]]), 2: array([[0.81],
       [1.09],
       [1.62],
       [2.18]]), 3: array([[1.0287],
       [1.7542],
       [1.9116],
       [2.8523]])}
{0: array([[0.],
       [0.],
       [0.],
       [0.]]), 1: array([[0.],
       [1.],
       [0.],
       [2.]]), 2: array([[0.09],
       [1.09],
       [1.62],
       [2.18]]), 3: array([[1.4103],
       [1.171 ],
       [1.9116],
       [2.2691]])}
{0: {'b': array([[0.],
       [0.],
       [0.],
       [0.]]), 'c': array([[0.],
       [0.],
       [0.],
       [0.]])}, 1: {'b': array([[0.],
       [1.],
       [0.],
       [2.]]), 'c': array([[0.],
       [1.],
       [0.],
       [2.]])}, 2: {'b': array([[0.81],
       [1.09],
       [1.62],
       [2.18]]), 'c': array([[0.81],
       [1.09],
       [1.62],
       [2.18]])}, 3: {'b': array([[1.4103],
       [1.7542],
       [1.9116],
       [2.8523]]), 'c': array([[1.4103],
       

## 5) MDP Implementations

We'll be using a couple of simple classes to represent MDPs and probability distributions.

### 5.1 Working with MDPs

Recall that given a $Q_\pi$ for any policy $\pi$, then $V_\pi(s)$ = $\max_a Q_\pi(s, a)$.

1. Write the `value` method, which takes a $Q$ function (an instance of `TabularQ`) and a state and returns the value `V` of an action that maximizes $Q$ function stored in `q`.



In [318]:
def value(q, s):
    """ Return Q*(s,a) based on current Q

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q_star = value(q,0)
    >>> q_star
    10
    """
    # Your code here
    return max(q.get(s, a) for a in q.actions)

In [319]:
def test_value():
    q = TabularQ([0,1,2,3], ['b','c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    assert(value(q, 0) == 10)
    print("Test passed!")
    
test_value()

Test passed!


2. Write the `greedy` method, which takes a $Q$ function (an instance of `TabularQ`) and a state and returns the action `a` determined by the policy that acts greedily with respect to the current value of `q`.

In [328]:
def greedy(q, s):
    """ Return pi*(s) based on a greedy strategy.

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q.set(1, 'b', 2)
    >>> greedy(q, 0)
    'c'
    >>> greedy(q, 1)
    'b'
    """
    # Your code here
    return argmax(q.actions, lambda a: q.get(s, a))

In [329]:
def test_greedy():
    q = TabularQ([0, 1, 2, 3],['b', 'c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    q.set(1, 'b', 2)
    assert(greedy(q, 0) == 'c')
    assert(greedy(q, 1) == 'b')
    print("Test passed!")

test_greedy()

Test passed!


3. Write the `epsilon_greedy` method, which takes a state `s` and a parameter `epsilon`, and returns an action. With probability `1 - epsilon` it should select the greedy action and with probability `epsilon` it should select an action uniformly from the set of possible actions.

    - You should use `random.random()` to generate a random number to test againts eps.
    - You should use the `draw` method of `uniform_dist` to generate a random action.
    - You can use the `greedy` function defined earlier.

In [335]:
def epsilon_greedy(q, s, eps = 0.5):
    """ Returns an action.

    >>> q = TabularQ([0,1,2,3],['b','c'])
    >>> q.set(0, 'b', 5)
    >>> q.set(0, 'c', 10)
    >>> q.set(1, 'b', 2)
    >>> eps = 0.
    >>> epsilon_greedy(q, 0, eps) #greedy
    'c'
    >>> epsilon_greedy(q, 1, eps) #greedy
    'b'
    """
    if random.random() < eps:  # True with prob eps, random action
        # Your code here
        return uniform_dist(q.actions).draw()
    else:
        # Your code here
        return greedy(q, s)

In [336]:
def test_epsilon_greedy():
    q = TabularQ([0, 1, 2, 3],['b', 'c'])
    q.set(0, 'b', 5)
    q.set(0, 'c', 10)
    q.set(1, 'b', 2)
    eps = 0.0
    assert(epsilon_greedy(q, 0, eps) == 'c')
    assert(epsilon_greedy(q, 1, eps) == 'b')
    print("Test passed!")
    
test_epsilon_greedy()

Test passed!


### 5.2 Implement Q-Value Iteration
Provide the definition of the `value_iteration` function. It takes an MDP instance and a `TabularQ` instance. It should terminate when

$$\max_{(s, a)}\left|Q_t(s, a) - Q_{t-1}(s, a)\right| < \epsilon$$

that is, the biggest difference between the value functions on successive iterations is less than input parameter `eps`. This function should return the final `TabularQ` instance. It should do no more that `max_iters` iterations.

* Make sure to copy the Q function between iterations, e.g. `new_q = q.copy()`.
* The `q` parameter contains the initialization of the Q function.
* The `value` function is already defined.

In [338]:
def value_iteration(mdp, q, eps=0.01, max_iters=1000):
    # Your code here
    Q_old = q.copy()
    for i in range(max_iters):
        Q_new = Q_old.copy()
        diff = 0.0
        for state in mdp.states:
            for action in mdp.actions:
                transition_dist = mdp.transition_model(state, action)
                expected_val = transition_dist.expectation(lambda s: value(Q_old, s))
                Q_val = mdp.reward_fn(state, action) + mdp.discount_factor*expected_val
                Q_new.set(state, action, Q_val)
                diff = max(diff, abs(Q_old.get(state, action) - Q_new.get(state, action)))
        if diff < eps:
            return Q_new
        Q_old = Q_new.copy()

Below is the implementation of the "tiny" MDP detailed in Problem 2 and Problem 5.3. We will be using it to test `value_iteration`.

In [339]:
def tiny_reward(s, a):
    # Reward function
    if s == 1: return 1
    elif s == 3: return 2
    else: return 0

def tiny_transition(s, a):
    # Transition function
    if s == 0:
        if a == 'b':
            return DDist({1 : 0.9, 2 : 0.1})
        else:
            return DDist({1 : 0.1, 2 : 0.9})
    elif s == 1:
        return DDist({1 : 0.1, 0 : 0.9})
    elif s == 2:
        return DDist({2 : 0.1, 3 : 0.9})
    elif s == 3:
        return DDist({3 : 0.1, 0 : 0.9})
    
def test_value_iteration():
    tiny = MDP([0, 1, 2, 3], ['b', 'c'], tiny_transition, tiny_reward, 0.9)
    q = TabularQ(tiny.states, tiny.actions)
    qvi = value_iteration(tiny, q, eps=0.1, max_iters=100)
    expected = dict([((2, 'b'), 5.962924188028282),
                     ((1, 'c'), 5.6957634856549095),
                     ((1, 'b'), 5.6957634856549095),
                     ((0, 'b'), 5.072814297918393),
                     ((0, 'c'), 5.262109602844769),
                     ((3, 'b'), 6.794664584556008),
                     ((3, 'c'), 6.794664584556008),
                     ((2, 'c'), 5.962924188028282)])
    for k in qvi.q:
        print("k=%s, expected=%s, got=%s" % (k, expected[k], qvi.q[k]))      
        assert(abs(qvi.q[k] - expected[k]) < 1.0e-5)
    print("Test passed!")

test_value_iteration()

k=(0, 'b'), expected=5.072814297918393, got=5.072814297918393
k=(0, 'c'), expected=5.262109602844769, got=5.262109602844769
k=(1, 'b'), expected=5.6957634856549095, got=5.6957634856549095
k=(1, 'c'), expected=5.6957634856549095, got=5.6957634856549095
k=(2, 'b'), expected=5.962924188028282, got=5.962924188028282
k=(2, 'c'), expected=5.962924188028282, got=5.962924188028282
k=(3, 'b'), expected=6.794664584556008, got=6.794664584556008
k=(3, 'c'), expected=6.794664584556008, got=6.794664584556008
Test passed!


### 5.3 Receding-horizon control and online search
Value iteration depends on the dynamic programming principle to efficiently solve for a value function over all the states in the domain, which allows very fast determination of the optimal action for each state. In some cases, the state space is huge, but the set of states that are reachable in a reasonable number of steps from the current state is relatively small. In that case, it's actually more efficient to select actions by performing an "expectimax" tree search from the current state to find the best action, taking the action, seeing what state results, and performing another tree search.

We will need to fix the horizon for the tree search; it is possible to choose a finite horizon that will guarantee bounded "regret" (optimal value minus value actually obtained) based on the discount factor, but for this problem we will just assume we are given horizon h.

Consider a function $q_{em}(s,a,h)$ that computes the horizon-$h$ $Q$ value for state $s$ and action $a$ by using the definition of the finite-horizon Q function in the notes (__but including a discount factor__). This is sometimes called "expectimax" search, because you can think of it as building a search tree, in which some nodes are evaluated by taking an expectation (we take an expectation over possible outcomes of an action in a state) and some nodes are evaluated by taking a maximum (we take a maximum over the Q values for the possible actions in a state).

Consider again our "tiny" MDP as in Section 2 with states (0, 1, 2, 3) and actions ('b', 'c'). Assume no discounting (i.e., discount factor = 1.0). The reward function is:
$$
R(s,a) = \begin{cases}
        1, & \text{if } s = 1 \\
        2, & \text{if } s = 3 \\
        0, & \text{otherwise}
       \end{cases}
$$

You get the reward associated with a state on the step when you exit that state. The transition function for each action is below, where $T[i,x,j]$ is the $P(s_{t+1} = j | a = x, s_t = i)$ (note rows correspond to the input states, and columns correspond to the output states).

$$
\begin{equation}
T(s_t, \text{'b'}, s_{t+1}) =
    \begin{bmatrix}
        0.0 & 0.9 & 0.1 & 0.0 \\
        0.9 & 0.1 & 0.0 & 0.0 \\
        0.0 & 0.0 & 0.1 & 0.9 \\
        0.9 & 0.0 & 0.0 & 0.1 \\
    \end{bmatrix}
\end{equation}
$$

$$
\begin{equation}
T(s_t, \text{'c'}, s_{t+1}) = 
    \begin{bmatrix}
        0.0 & 0.1 & 0.9 & 0.0 \\
        0.9 & 0.1 & 0.0 & 0.0 \\
        0.0 & 0.0 & 0.1 & 0.9 \\
        0.9 & 0.0 & 0.0 & 0.1 \\
    \end{bmatrix}
\end{equation}
$$
 
Note that the only effect of the action is to change the transition probability from state 0.

Write a procedure `q_em(mdp, s, a, h)` that computes the horizon-h Q value for state `s` and action `a` by using the definition of the finite-horizon Q function in the notes (but including a discount factor). 

This can be written as a relatively simple recursive procedure with a base case (what is the Q value when horizon is 0?) and a recursive case that computes the horizon `h` values assuming we can (recursively) get horizon `h-1` values.

## 5.3.A) 
What is $q_{em}(0, \text{'b'}, 0)$?

0.0

## 5.3.B) 
What is $q_{em}(0, \text{'b'}, 1)$?

0.0

## 5.3.C) 
What is $q_{em}(0, \text{'b'}, 2)$?

0.9

In [346]:
#5.3.D
def q_em(mdp, s, a, h):
    # Your code here
    Q_old = TabularQ(mdp.states, mdp.actions)
    Q_new = Q_old.copy()
    for horizon in range(1, h+1):
        Q_new = Q_old.copy()
        for state in mdp.states:
            for action in mdp.actions:
                transition_dist = mdp.transition_model(state, action)
                expected_val = transition_dist.expectation(lambda s: value(Q_old, s))
                Q_val = mdp.reward_fn(state, action) + mdp.discount_factor*expected_val
                Q_new.set(state, action, Q_val)
        Q_old = Q_new.copy()
    return Q_new.get(s, a)

tiny = MDP([0, 1, 2, 3], ['b', 'c'], tiny_transition, tiny_reward, 1.0)

#5.3.A
print(q_em(tiny, 0, 'b', 0))
#5.3.B
print(q_em(tiny, 0, 'b', 1))
#5.3.C
print(q_em(tiny, 0, 'b', 2))

0.0
0.0
0.9


We will be using the "tiny" MDP again to test `q_em`.

In [345]:
def test_q_em():
    tiny = MDP([0, 1, 2, 3], ['b', 'c'], tiny_transition, tiny_reward, 0.9)
    assert(np.allclose([q_em(tiny, 0, 'b', 1)], [0.0]))
    assert(np.allclose([q_em(tiny, 0, 'b', 2)], [0.81]))
    assert(np.allclose([q_em(tiny, 0, 'b', 3)], [1.0287000000000002]))
    assert(np.allclose([q_em(tiny, 0, 'c', 3)], [1.4103]))
    assert(np.allclose([q_em(tiny, 2, 'b', 3)], [1.9116000000000002]))
    print("Tests passed!")

test_q_em()

Tests passed!
