# CME 241 Assignment 15

## Shaan Patel

### Question 1

In [5]:
from typing import Sequence, Tuple, Mapping
import numpy as np
from collections import defaultdict

S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]

In [6]:
def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]


def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    valfunc: ValueFunc = defaultdict(lambda: 0)
    counts = defaultdict(lambda: 0)
    for entry in state_return_samples:
        state = entry[0]
        reward = entry[1]
        counts[state] += 1
        valfunc[state] += (1/counts[state])*(reward - valfunc[state])
    return valfunc

In [7]:
def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    probf: ProbFunc = defaultdict(lambda: defaultdict(lambda: 0))
    rewdf: RewardFunc = defaultdict(lambda: 0)

    statecounts = defaultdict(lambda: [])
    rewardlist = defaultdict(lambda: [])

    for seq in srs_samples:
        state = seq[0]
        reward = seq[1]
        n_state = seq[2]
        statecounts[state].append(n_state)
        rewardlist[state].append(reward)
        counts = statecounts[state].count(n_state)
        probf[state][n_state] = counts/len(statecounts[state])
        rewdf[state] = sum(rewardlist[state])/len(rewardlist[state])

    return (probf, rewdf)

In [17]:
def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    states = set()
    for s in prob_func:
        states.add(s)
    
    statelist = list(states)  

    rlist = np.ndarray = np.zeros(len(statelist))
    for i in range(len(statelist)):
        rlist[i] = reward_func[statelist[i]]

    igp: np.ndarray = np.eye(len(statelist))
    for i in range(igp.shape[0]):
        for j in range(igp.shape[1]):
            statei = statelist[i]
            statej = statelist[j]
            igp[i,j] = igp[i,j] - prob_func[statei][statej]
    
    inv: np.ndarray = np.linalg.inv(igp)
    v: np.ndarray = inv.dot(rlist)

    valfunc: ValueFunc = defaultdict(lambda: 0)
    for i in range(len(statelist)):
        valfunc[statelist[i]] = v[i]
    
    return valfunc

In [18]:
def get_td_value_function(
    srs_samples: Sequence[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """
    f: ValueFunc = defaultdict(lambda: 0)

    t = 0
    for move in srs_samples:
        state = move[0]
        reward = move[1]
        n_state = move[2]
        t += 1
        alpha = learning_rate*(t/learning_rate_decay + 1) ** -0.5
        f[state] += alpha*(reward + f[n_state] - f[state])
    
    return f

In [25]:
def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """
    num_features = len(srs_samples)
    a_inv: np.ndarray = np.eye(num_features)
    b_vec: np.ndarray = np.zeros(num_features)

    statelist = []

    for entry in srs_samples:
        state = entry[0]
        statelist.append(state)

    for entry in srs_samples:
        state = entry[0]
        reward = entry[1]
        n_state = entry[2]
        phi1: np.ndarray = np.zeros(len(statelist))
        for i in range(len(statelist)):
            if statelist[i] is state:
                phi1[i] = 1
            else:
                phi1[i] = 0
        if n_state in statelist:
            phi2 = np.zeros(len(statelist))
            for i in range(len(statelist)):
                if statelist[i] is n_state:
                    phi2[i] = phi1[i] - 1
                else:
                    phi2[i] = phi1[i]
        else:
            phi2 = phi1
        temp: np.ndarray = a_inv.T.dot(phi2)
        a_inv = a_inv - np.outer(a_inv.dot(phi1), temp) / (1 + phi1.dot(temp))
        b_vec += phi1 * reward
    opt_wts: np.ndarray = a_inv.dot(b_vec)

    valfunc: ValueFunc = defaultdict(lambda: 0)
    for entry in srs_samples:
        state = entry[0]
        ind = statelist.index(state)
        valfunc[state] = opt_wts[ind]
    return valfunc


In [26]:
given_data: DataType = [
    [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
    [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
    [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
    [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
    [('B', 8.), ('B', 2.)]
]

sr_samps = get_state_return_samples(given_data)

print("------------- MONTE CARLO VALUE FUNCTION --------------")
print(get_mc_value_function(sr_samps))

srs_samps = get_state_reward_next_state_samples(given_data)

pfunc, rfunc = get_probability_and_reward_functions(srs_samps)
print("-------------- MRP VALUE FUNCTION ----------")
print(get_mrp_value_function(pfunc, rfunc))

print("------------- TD VALUE FUNCTION --------------")
print(get_td_value_function(srs_samps))

print("------------- LSTD VALUE FUNCTION --------------")
print(get_lstd_value_function(srs_samps))

------------- MONTE CARLO VALUE FUNCTION --------------
defaultdict(<function get_mc_value_function.<locals>.<lambda> at 0x000002353F74E1F0>, {'A': 9.571428571428571, 'B': 5.642857142857142})
-------------- MRP VALUE FUNCTION ----------
defaultdict(<function get_mrp_value_function.<locals>.<lambda> at 0x000002353F74E0D0>, {'A': -13.527472527472527, 'B': -3.3333333333333335})
------------- TD VALUE FUNCTION --------------
defaultdict(<function get_td_value_function.<locals>.<lambda> at 0x000002353F74E790>, {'A': 4.84698549089818, 'B': 5.663543957765118, 'T': 0})
------------- LSTD VALUE FUNCTION --------------
defaultdict(<function get_lstd_value_function.<locals>.<lambda> at 0x000002353F579790>, {'A': 1.7615508885298845, 'B': 0.6636510500807713})
