In [71]:
from typing import Sequence, Tuple, Mapping
import numpy as np
from random import sample
from collections import defaultdict

S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]


def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]


def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    V = defaultdict(lambda: 0)
    count = defaultdict(lambda: 0)
    for s, ret in state_return_samples:
        count[s] += 1
        V[s] += ret
        
    V = {key: val/count[key] for key, val in V.items()}
    return V.items()


def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    num_transition = defaultdict(lambda: 0)
    reward = defaultdict(lambda: 0)
    state_freq = defaultdict(lambda: 0)
    
    
    for s, r, snew in srs_samples:
        num_transition[(s, snew)] += 1
        reward[(s, snew)] += r
        state_freq[s] += 1
            
    prob_transition = {key: (num_transition[key] / state_freq[key[0]]) 
                       for key, val in num_transition.items()} 
    
    
    reward_func = defaultdict(lambda: 0)
    for key, val in prob_transition.items():
        reward_func[key[0]] += val*(reward[key]/num_transition[key])
        
    
    return prob_transition, dict(reward_func)
    


def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    #First need to create P matrix and R vector
    states = set(s for s in reward_func.keys())
    m = len(states)
    state_to_idx = dict()
    for idx, state in enumerate(states):
        state_to_idx[state] = idx

    P = np.zeros((m,m))
    for key, val in prob_func.items():
        if key[1] != "T":
            P[state_to_idx[key[0]]][state_to_idx[key[1]]] = val
        
    R = np.zeros(m)
    for key, val in reward_func.items():
        R[state_to_idx[key]] = val
    
    V = np.linalg.solve(np.eye(m) - P, R)
    return V


def get_td_value_function(
    srs_samples: Sequence[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """
    #Going to want to sample from srs_samples (how to sample from seq?)
    
    V = defaultdict(lambda: 0)
    count = defaultdict(lambda: 0)
    
    for updates in range(num_updates):
        s, r, snew = sample(srs_samples,1)[0]
        count[s] += 1
        alpha_n = learning_rate * (updates/learning_rate_decay + 1)** -0.5
        V[s] += alpha_n*(r + V[snew] - V[s])
        
    return V.items()


def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """
    #We'll be doing updates by one "batch"
    #First lets figure out the number of states, and create a dict that maps states to an "index" 
    states = set(s for s,r,snew in srs_samples)
    epsilon = 1e-4
    m = len(states)
    state_to_idx = dict()
    for idx, state in enumerate(states):
        state_to_idx[state] = idx
    
    A_inv = np.eye(m)/epsilon
    b = np.zeros(m)
    for s, r, snew in srs_samples:
        phi_i_s = np.zeros(m)
        phi_i_s[state_to_idx[s]] = 1
        if snew != "T":
            phi_i_snew = np.zeros(m)
            phi_i_snew[state_to_idx[snew]] = 1
            phi2 = phi_i_s - phi_i_snew
        else:
            phi2 = phi_i_s
            
        temp = A_inv.T @ (phi2)
        A_inv = A_inv  - np.outer(A_inv.dot(phi_i_s), temp) / (1+phi_i_s.dot(temp))
        b += phi_i_s*r

    V = A_inv @ b
    return V
    
    
    


if __name__ == '__main__':
    given_data: DataType = [
        [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
        [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
        [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
        [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
        [('B', 8.), ('B', 2.)]
    ]

    sr_samps = get_state_return_samples(given_data)

    print("------------- MONTE CARLO VALUE FUNCTION --------------")
    print(get_mc_value_function(sr_samps))

    srs_samps = get_state_reward_next_state_samples(given_data)

    pfunc, rfunc = get_probability_and_reward_functions(srs_samps)
    print("-------------- MRP VALUE FUNCTION ----------")
    print(get_mrp_value_function(pfunc, rfunc))

    print("------------- TD VALUE FUNCTION --------------")
    print(get_td_value_function(srs_samps))

    print("------------- LSTD VALUE FUNCTION --------------")
    print(get_lstd_value_function(srs_samps))

------------- MONTE CARLO VALUE FUNCTION --------------
dict_items([('A', 9.571428571428571), ('B', 5.642857142857143)])
-------------- MRP VALUE FUNCTION ----------
[12.93333333  9.6       ]
------------- TD VALUE FUNCTION --------------
dict_items([('A', 12.858823053430656), ('B', 8.788369448517361), ('T', 0)])
------------- LSTD VALUE FUNCTION --------------
[12.93279647  9.59967868]


As we can see, the Monte Carlo value function differs from the other prediction approximation functions, which is to be expected since the Monte Carlo Value Function is the only unbiased estimator, so with a small amount of available experiences, we expect this bias for the other algorithms to be quite large, and so the other 3 VF's have much different values than MC.