# Write up and code for Assignment 2 - Week 1

### Definitions

#### Markov Process
A Markov Process (MP) is a stochastic process where each state follow the Markov property, i.e. $\mathbb P[S_{t+1}=s'~|~S_1, S_2, ..., S_t] = \mathbb P[S_{t+1} = s' ~|~ S_t]$. This means that history beyond the current state offers no more information about the transition to the next state. The transition probabilities are governed by a transition probability matrix $\mathcal P$. The states in an MP consists of a finite set $\mathcal S$. An MP is then defined by its parameters $\mathcal S$ and $\mathcal P$.

#### Markov Reward Process
In addition to a finite set of states $\mathcal S$, and a transition probability matrix $\mathcal P$, a Markov Reward Process (MRP) also has a reward function $\mathcal R$, ($\mathcal R_s = \mathbb E[R_{t+1}~|~S_t = s]$), and a discount factor $\gamma \in [0,1]$. An MRP is then defined by the parameters $\mathcal S$, $\mathcal P$, $\mathcal R$ and $\gamma$. An MRP is thus an extension of an MP.

#### Value function
First we define the return $G_t$ as the total discounted rewards recieved starting at time $t$. $$G_t = R_{t} + \gamma R_{t+1} + \gamma^2 R_{t+2} + ...$$

The state value function $v(s)$ is the value of an MDP when starting in state $s$. It is given by:
$$v(s) = \mathbb E[G_t~|~S_t = s]$$
Using the definition of the return we can write the value function as $$v(s) = \mathbb E[R_t + \gamma R_{t+1} + ... ~|~S_t = s] = \mathbb E[R_t ~|~ S_t = s] + \gamma \mathbb E[G_{t+1}~|~S_t = s]$$


### Data Structures
Below is the code for how to represent the above processes in code as structs.

In [2]:
from typing import TypeVar

S = TypeVar('S')
A = TypeVar('A')

In [5]:
from typing import NamedTuple, Any, Dict, Tuple, Set
import numpy as np

class MP(NamedTuple):
    States: Set[S]
    P: Dict[S, Dict[S, float]]
        
        
class MRP(NamedTuple):
    # assumes that the reward is just a function of the current state
    mp: MP
    R: Dict[S, float]
    gamma: float
        
        
class MRP(NamedTuple):
    # assumes that the reward is a function of the transition
    mp: MP
    R: Dict[S, Dict[S, float]]
    gamma: float

### Reward Functions
The reward function for a particular movement can be defined in mulitple ways, it can either be a function of the state the Agent leaves and the new state the Agent enters, i.e. $R(s,s')$. Or the reward function can be a function of simply the current state, i.e. $R(s)$. The latter reward function is then simply an expected value of the reward for the next transition: $$R(s) = \mathbb E[R(s,s')~|~ S = s] = \sum_{s'\in \mathcal S}P(S_{t+1} = s'~|~S_t=s)\times R(s, s')$$

In [6]:
def convert_reward(R: Dict[S, Dict[S, float]], P: Dict[S, Dict[S, float]]) -> Dict[S, float]:
    # this function converts reward as a function of the transition to reward as a function of the current state
    new_R: {}
    for s, d in P.items():
        # assume that the first element in the key tuple is the current state
        if s not in new_R:
            new_R[s] = 0
        for sp, p in d.items():
            new_R[s] += p * R[s][sp]
        
    return new_R

### Stationary Distribution
Below is code to find the stationary distribution of a Markov Process. The stationary distirbution can be found analytically using eigenvalues and eigenvectors of the transition matrix. In this problem I've chosen to find the stationary distribution of a Markov Process using simulation. By simulating the process a large number of times, and to a sufficient depth and then count how many times we end up in every state we can estimate the stationary distribution. For a large set of states we need a large number of simulations to get a reasonable estimate.

In [7]:
import random

def get_stationary_dist(mp: MP, n_iter: int, depth: int) -> Dict[S, float]:
    # this function finds a stationary distribution for a Markov Process through simulation
    stat_dist = {}
    for i in range(n_iter):
        state = random.sample(mp.States, 1).pop()
        for j in range(depth):
            new_state = get_random_state(mp.P[state])
            state = new_state
        
        if state not in stat_dist:
            stat_dist[state] = 0
        stat_dist[state] += 1/n_iter
    return stat_dist


def get_random_state(d: Dict[S, float]) -> S:
    # helper function to return a random state based on a mapping from a state to a float
    p = np.random.rand()
    agg_prob = 0
    
    for sp, prob in d.items():
        agg_prob += prob
        if p <= agg_prob:
            return sp
    # degenerate distribution if it does not return a state before reaching this point
    return None

Below I've used the eigenvalues approach to finding the stationary distribution.

In [17]:
from scipy.linalg import eig

def get_stat_dist_eig(mp: MP) -> Dict[S, float]:
    # create a matrix to store the transition probabilities
    mat = np.zeros((len(mp.States), len(mp.States)))
    for i, s in enumerate(mp.States):
        for j, sp in enumerate(mp.States):
            if sp in mp.P[s]:
                # if we have a mapping from s -> {sp -> a probability} then the transition has non-zero probability
                mat[i,j] = mp.P[s][sp]
                
    v = get_valid_eig(mat)
    dist = {}
    for i, s in enumerate(mp.States):
        dist[s] = v[i]
        
    return dist


def get_valid_eig(mat: np.array) -> np.array:
    # helper function to return a normalized valid eigenvector (i.e. no negative numbers)
    eig_vals, eig_vecs = eig(mat.T)
    v = np.zeros((eig_vals.shape))
    # we use the eigenvector associated with an eigenvalue = 1
    for i, val in enumerate(eig_vals):
        if np.abs(val - 1.) < 10 ** (-5):
            v = eig_vecs[:, i]
            break
            
    return v / sum(v)

### Examples
Below is a simple example of a gridworld environment, we have a 4$\times$4 grid where in each cell you have probability of 0.25 of going in any direction (north, south, east, west), unless if you are at the border. If you are at the rightmost edge you will have 0.25 chance of going north, south, west and a probability of 0.25 of staying in the same cell.

In [18]:
def is_in_grid(state: Tuple[int, int], size: int) -> bool:
    # helper function to check whether a state is in the grid
    return  state[0] >= 0 and state[0] < size and state[1] >= 0 and state[1] < size


def get_neighbor_states(state: Tuple[int, int], size: int) -> Set[Tuple[int, int]]:
    # function to return a set of neighboring states in the grid
    nbr_states = set()
    
    up_state = s[0]-1, s[1]
    if is_in_grid(up_state, size):
        nbr_states.add(up_state)
        
    down_state = s[0]+1, s[1]
    if is_in_grid(down_state, size):
        nbr_states.add(down_state)
        
    left_state = s[0], s[1]-1
    if is_in_grid(left_state, size):
        nbr_states.add(left_state)
        
    right_state = s[0], s[1]+1
    if is_in_grid(right_state, size):
        nbr_states.add(right_state)
    
    return nbr_states

In [19]:
# define the gridworld parameters
States = set()
P = {}
for i in range(4):
    for j in range(4):
        state = (i, j)
        States.add(state)

for s in States:
    P[s] = {}
    nbrs = get_neighbor_states(s, 4)
    for sp in nbrs:
        P[s][sp] = 0.25
    if len(nbrs) < 4:
        P[s][s] = 0.25*(4 - len(nbrs))
        
mp = MP(States, P)

Below we can evaluate how the simulation approach of finding the stationary distribution compares to the analytical approach of using eigenvalues. We observe that they are fairly similar. We can expect the simulaiton method to approach the analytical solution as the number of simulaitons increases, in this case I chose 10,000 simulations to a depth of 100, i.e. every simulation was done for 100 time steps.

In [20]:
get_stationary_dist(mp, 10000, 100)

AttributeError: 'MP' object has no attribute 'S'

In [88]:
get_stat_dist_eig(mp)

{(0, 0): 0.06250000000000018,
 (0, 1): 0.06250000000000008,
 (0, 2): 0.06250000000000018,
 (0, 3): 0.06250000000000015,
 (1, 0): 0.06250000000000003,
 (1, 1): 0.06250000000000014,
 (1, 2): 0.06250000000000014,
 (1, 3): 0.06250000000000008,
 (2, 0): 0.06250000000000001,
 (2, 1): 0.06249999999999996,
 (2, 2): 0.06249999999999997,
 (2, 3): 0.06249999999999991,
 (3, 0): 0.06249999999999997,
 (3, 1): 0.06249999999999983,
 (3, 2): 0.062499999999999806,
 (3, 3): 0.06249999999999974}