In [107]:
from typing import Sequence, Tuple, Mapping, Iterable, Iterator,Callable
from collections import defaultdict
import numpy as np
import itertools
import rl.iterate as iterate

S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]


def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]


def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    valuesmap_mc: Mapping[S, float] = {}
    countsmap_mc: Mapping[S, int] = {}

    for tuple in state_return_samples:
        state = tuple[0]
        state_return = tuple[1]

        countsmap_mc.setdefault(state, 0)
        countsmap_mc[state] += 1

        weight: float = 1 / countsmap_mc[state]
        valuesmap_mc.setdefault(state, 0.)
        valuesmap_mc[state] += weight * (state_return - valuesmap_mc[state])

    return valuesmap_mc

def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    counts_transitions: Mapping[S, Sequence[Mapping[Tuple[float, S], float], int]] = {}
    for srs in srs_samples:
        state = srs[0]
        reward = srs[1]
        next_state = srs[2]

        counts_transitions.setdefault(state, [{}, 0])
        counts_transitions[state][0].setdefault((reward, next_state), 0.)
        counts_transitions[state][0][(reward, next_state)] += 1
        counts_transitions[state][1] += 1

    prob_transitions: Mapping[S, Mapping[S, float]] = {}
    reward_function: Mapping[S, float] = {}

    for state in counts_transitions.keys():
        prob_transitions.setdefault(state, {})
        reward_function.setdefault(state, 0.)
        for response in counts_transitions[state][0].keys():
            counts_transitions[state][0][response] /= counts_transitions[state][1] #becomes transition probability

            next_state = response[1]
            prob_transitions[state].setdefault(next_state, 0.)
            prob_transitions[state][next_state] += counts_transitions[state][0][response]

            reward = response[0]
            reward_function[state] += counts_transitions[state][0][response] * reward

    return (prob_transitions, reward_function)

def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    value_func: Mapping[S, float] = {}
    m: int = len(prob_func.keys())
    P: np.ndarray = np.vstack(
        [
            prob_func[state][next_state] for next_state in prob_func.keys() #Note here is not prob_func[state].keys()!
        ] for state in prob_func.keys()
    )
    R: np.ndarray = np.array([
        reward_func[state] for state in prob_func.keys()
    ])
    value_vec: ndarray = np.linalg.inv(np.eye(m) - P).dot(R)
    value_func = {state: value_vec[i] for (i, state) in enumerate(prob_func.keys())}

    return value_func

def get_td_value_function(
    srs_samples: Iterable[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """
    
    def td_pred(
        tr_seq: Iterable[Tuple[S, float, S]],
        updates: int = 0,
        count_to_weight_func: Callable[[int], float] = lambda n: learning_rate * (n / learning_rate_decay + 1) ** -0.5
    ) -> Mapping[S, float]:
        values_map: Mapping[S, float] = {}

        for transition in tr_seq:
            weight: float = count_to_weight_func(updates)
            values_map.setdefault(transition[0], 0.)
            values_map[transition[0]] += weight * (transition[1] + values_map.get(transition[2], 0.) - values_map[transition[0]])
            updates += 1

        yield values_map

    Iter_srs_samples: Iterable[S] = itertools.chain.from_iterable(
        list(itertools.repeat(srs_samples, (num_updates // len(srs_samples) + 1)))
    )
    values_map_td = td_pred(Iter_srs_samples)
    values_map_final: Mapping[S, float] = iterate.last(itertools.islice(values_map_td, num_updates))

    return values_map_final


def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """
    feature_func: Mapping[S, Callable[S, float]] = {}
    #state_rec: Mapping[S, int] = {}
    #for sample in srs_samples:
    #    state_rec[sample[0]] = 1

    #for key in state_rec.keys():
    #    feature_func[key] = lambda s: 1. if (s == key) else 0.

    #print([feature_func[s]('A') for s in feature_func.keys()])

    feature_func['A'] = lambda s: 1. if (s == 'A') else 0.
    feature_func['B'] = lambda s: 1. if (s == 'B') else 0.

    feature_functions: Sequence[Callable[S, float]] = [feature_func[key] for key in feature_func.keys()]

    epsilon: float = 1e-4

    num_features: int = len(feature_functions)
    a_inv: np.ndarray = np.eye(num_features) / epsilon
    b_vec: np.ndarray = np.zeros(num_features)
    for tr in srs_samples:
        phi1: np.ndarray = np.array([f(tr[0]) for f in feature_functions])
        if tr[2] in feature_func.keys():
            phi2 = phi1 - np.array([f(tr[2])
                                        for f in feature_functions])
        else:
            phi2 = phi1
        temp: np.ndarray = a_inv.T.dot(phi2)
        a_inv = a_inv - np.outer(a_inv.dot(phi1), temp) / (1 + phi1.dot(temp))
        b_vec += phi1 * tr[1]

    opt_wts: np.ndarray = a_inv.dot(b_vec)
    return { s: np.dot(np.array(
            [f(s) for f in feature_functions]
            ), opt_wts) for s in feature_func.keys()}

In [108]:
if __name__ == '__main__':
    given_data: DataType = [
        [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
        [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
        [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
        [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
        [('B', 8.), ('B', 2.)]
    ]

    sr_samps = get_state_return_samples(given_data)

    print("------------- MONTE CARLO VALUE FUNCTION --------------")
    print(get_mc_value_function(sr_samps))

    srs_samps = get_state_reward_next_state_samples(given_data)

    pfunc, rfunc = get_probability_and_reward_functions(srs_samps)
    print("-------------- MRP VALUE FUNCTION ----------")
    print(get_mrp_value_function(pfunc, rfunc))

    print("------------- TD VALUE FUNCTION --------------")
    print(get_td_value_function(srs_samps))

    print("------------- LSTD VALUE FUNCTION --------------")
    print(get_lstd_value_function(srs_samps))

------------- MONTE CARLO VALUE FUNCTION --------------
{'A': 9.571428571428571, 'B': 5.642857142857142}
-------------- MRP VALUE FUNCTION ----------
{'A': 12.933333333333325, 'B': 9.599999999999994}
------------- TD VALUE FUNCTION --------------
{'A': 12.915792570300198, 'B': 9.59605795480753}
------------- LSTD VALUE FUNCTION --------------
{'A': 12.93279646518894, 'B': 9.599678678463201}


  P: np.ndarray = np.vstack(
