In [14]:
from typing import Sequence, Tuple, Mapping
import numpy as np

S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]


def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]


In [15]:
def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    v: ValueFunc = {}
    count: Mapping[S,int] = {}
        
    for (state, return_) in state_return_samples:
        count[state] = count.get(state, 0) + 1
        v[state] = (1 - 1./count[state]) *  v.get(state, 0) + 1./count[state] * return_
        
    return v

In [41]:
def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    count: Mapping[S,int] = {}
    pdict:ProbFunc = {}
    rdict:RewardFunc = {}
    for (state, reward, next_state) in srs_samples:
        count[state] = count.get(state, 0) + 1
        if state not in pdict:
            pdict[state] = {next_state: 1}
            rdict[state] = reward
        else:
            pdict[state][next_state] = pdict[state].get(next_state, 0) + 1.
            rdict[state] += reward
    for state in count:
        rdict[state] = rdict[state] / count[state]
        for next_state in pdict[state]:
            pdict[state][next_state] = pdict[state][next_state] / count[state]
    return (pdict, rdict)

In [42]:
def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    states = list(prob_func.keys())
    n = len(states)
    rvec = np.array([reward_func[s] for s in states])
    pmat = np.zeros((n, n))
    for i, s in enumerate(states):
        for j, t in enumerate(states):
            pmat[i][j] = prob_func[s][t]
    valuevec = np.linalg.inv(np.eye(n) - pmat).dot(rvec)
    vdict = {states[i]: valuevec[i] for i in range(n)}
    return vdict

In [59]:

def get_td_value_function(
    srs_samples: Sequence[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """
    v: ValueFunc = {}
    srs:List[Tuple[S, float, S]] = list(srs_samples)
        
    for i in range(1,num_updates+1):
        alpha = learning_rate * (i / learning_rate_decay + 1.) ** -0.5
        pick = np.random.randint(0,len(srs)-1)
        (state, reward, next_state) = srs[pick]
        v[state] = v.get(state, 0) + alpha*(reward + v.get(next_state, 0)- v.get(state, 0))
    
    return v

In [60]:


def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """
    states = list(set(s for (s, q, k) in srs_samples))
    n = len(states)
    dic = {}
    v = {}
    for i in range(n):
        dic[states[i]] = i
    phi = np.eye(n)
    A = np.zeros((n, n))
    b = np.zeros(n)
    for (state, reward, next_state) in srs_samples:
        features_this = phi[dic[state]]
        features_next = phi[dic[next_state]] if next_state != 'T' else np.zeros(n)
        A = A + np.outer(features_this, features_this - features_next)
        b = b + features_this * reward
    w = np.dot(np.linalg.inv(A),b)
    for state in states:
        features = phi[dic[state]]
        v[state] = np.dot(features, w)
    return v




In [61]:
if __name__ == '__main__':
    given_data: DataType = [
        [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
        [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
        [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
        [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
        [('B', 8.), ('B', 2.)]
    ]

    sr_samps = get_state_return_samples(given_data)

    print("------------- MONTE CARLO VALUE FUNCTION --------------")
    print(get_mc_value_function(sr_samps))

    srs_samps = get_state_reward_next_state_samples(given_data)

    pfunc, rfunc = get_probability_and_reward_functions(srs_samps)
    print("-------------- MRP VALUE FUNCTION ----------")
    print(get_mrp_value_function(pfunc, rfunc))

    print("------------- TD VALUE FUNCTION --------------")
    print(get_td_value_function(srs_samps))

    print("------------- LSTD VALUE FUNCTION --------------")
    print(get_lstd_value_function(srs_samps))


------------- MONTE CARLO VALUE FUNCTION --------------
{'A': 9.571428571428573, 'B': 5.642857142857143}
-------------- MRP VALUE FUNCTION ----------
{'A': 12.933333333333334, 'B': 9.6}
------------- TD VALUE FUNCTION --------------
{'A': 14.718344546058416, 'B': 11.170275811214108}
------------- LSTD VALUE FUNCTION --------------
{'A': 12.933333333333334, 'B': 9.600000000000001}
