In [2]:
#Import Modules:
import sys
sys.path.append("../../RL-book")
from dataclasses import dataclass
from typing import Tuple, Dict, Sequence, Callable
from typing import Iterable, Iterator, Tuple, TypeVar
from rl.markov_decision_process import FiniteMarkovDecisionProcess, MarkovDecisionProcess
from rl.markov_decision_process import FinitePolicy, StateActionMapping, policy_from_q
from rl.markov_process import FiniteMarkovProcess, FiniteMarkovRewardProcess
from rl.distribution import Categorical, Constant, Choose
from scipy.stats import poisson
import numpy as np
from more_itertools import distinct_permutations
import matplotlib.pyplot as plt
from rl.dynamic_programming import value_iteration, policy_iteration, policy_iteration_result
import rl.markov_process as mp
from matplotlib.pyplot import cm
from collections import defaultdict
from rl.function_approx import Tabular
from rl import markov_decision_process
import importlib
importlib.reload(markov_decision_process)
import numpy.linalg as LA

from IPython.core.display import display, HTML
display(HTML("<style>.container{width:90%!important;}"))

In [74]:
from typing import Sequence, Tuple, Mapping

S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]


def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]

def X():
    return 0 

def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    ValueFunc = defaultdict(X)
    NumVisits = defaultdict(X)
    
    for step in state_return_samples:
        state = step[0]
        return_ = step[1]
        
        NumVisits[state] += 1
        
        ValueFunc[state] += 1/NumVisits[state]*(return_ - ValueFunc[state])
    return ValueFunc
    

def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    Probability_Function = {}
    
    state_to_state_counter = defaultdict(X)
    state_1_counter = defaultdict(X)
    state_2_counter = defaultdict(X)
    
    rewards_dict_total = defaultdict(X)
    rewards_dict = defaultdict(X)
    
    for step in srs_samples:
        state_1 = step[0]
        state_2 = step[2]
        reward = step[1]
        
        state_to_state_counter[(state_1,state_2)] += 1
        state_1_counter[state_1] += 1
        state_2_counter[state_2] += 1
                               
        rewards_dict_total[state_2] +=  reward                     
                               
    for state in rewards_dict_total.keys():
        rewards_dict[state] = rewards_dict_total[state]/state_2_counter[state]
    
    for key in state_1_counter.keys():
        
        temp_dict = {}
        for key2 in state_to_state_counter.keys():
            if key2[0] == key:        
                prob_transition = state_to_state_counter[key2]/state_1_counter[key]
                temp_dict[key2[1]] = prob_transition
        
        
        Probability_Function[key] = temp_dict
            
                               
    return Probability_Function,rewards_dict


def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    #VF = (1-gamma*P)^-1*R
    
    #Get a list of all of the unique states:
    States = []
    States_NT = []
    
    
    #Determine number of unique states
    n = len(reward_func)

    P = np.zeros((n,n))
    R = np.zeros((n,1))
    
    for key in reward_func.keys():
        States.append(key)
        
    for key in prob_func.keys():
        States_NT.append(key)
        
        
    for i,state_1 in enumerate(States_NT):
        
        for j,state_2 in enumerate(States):
            R[j] = reward_func[state_2]
            if state_2 in prob_func[state_1].keys():
                P[i,j] = prob_func[state_1][state_2]
                
    print (R)
            
    VF = np.matmul(np.linalg.inv(np.identity(n)-P),R)
    return VF


def get_td_value_function(
    srs_samples: Sequence[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """
    VF = defaultdict(X)
    np.random.seed(1)
    
    for i in range(num_updates):
        random_num = np.random.randint(0,len(srs_samples))
        transition = srs_samples[random_num]
        
        
        
        alpha = learning_rate * (i/learning_rate_decay + 1) ** (-.5)
        
        VF[transition[0]] += alpha*(transition[1] + VF[transition[2]] - VF[transition[0]])
        
    return VF


def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """
    def get_feature_vec(state):
        if state == "A":
            return np.array([1,0,0])
        if state == "B":
            return np.array([0,1,0])
        if state == "T":
            return np.array([0,0,1])
    
    n = 3
    
    #Step 1: Intial A and b Matricies:
    A = np.zeros((n,n))
    b = np.zeros((1,n))
    
    VF = defaultdict(X)
            
    #Step 3: Fill out what the x_values that go into the feature functions will be?
    
    #Step 2: Step through all of the single transitions and update A and b
    for i,transtion in enumerate(srs_samples):
        phi_1 = get_feature_vec(transtion[0])
        phi_2 = get_feature_vec(transtion[2])

        A += np.outer(phi_1,(phi_1 - phi_2))
        b += phi_1 * transtion[1]
        
    print (A)
    print (b)
    
    weights = np.dot(LA.inv(A),b.T)
    
    states = ["A","B","T"]
    for s in states:
        VF[s] = np.dot(get_feature_vec(s),weights)
    
    return VF

if __name__ == '__main__':
    given_data: DataType = [
        [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
        [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
        [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
        [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
        [('B', 8.), ('B', 2.)]
    ]

    sr_samps = get_state_return_samples(given_data)

    print("------------- MONTE CARLO VALUE FUNCTION --------------")
    print(get_mc_value_function(sr_samps))

    srs_samps = get_state_reward_next_state_samples(given_data)

    pfunc, rfunc = get_probability_and_reward_functions(srs_samps)
    print("-------------- MRP VALUE FUNCTION ----------")
    print(get_mrp_value_function(pfunc, rfunc))

    print("------------- TD VALUE FUNCTION --------------")
    print(get_td_value_function(srs_samps))

    print("------------- LSTD VALUE FUNCTION --------------")
    print(get_lstd_value_function(srs_samps))

------------- MONTE CARLO VALUE FUNCTION --------------
defaultdict(<function X at 0x00000132C36DCB88>, {'A': 9.571428571428571, 'B': 5.642857142857142})
-------------- MRP VALUE FUNCTION ----------
[[3.        ]
 [3.16666667]
 [1.6       ]]
[[16.06666667]
 [12.56666667]
 [ 1.6       ]]
------------- TD VALUE FUNCTION --------------
defaultdict(<function X at 0x00000132C36DCB88>, {'B': 9.54740868693202, 'A': 13.261688415027827, 'T': 0})
------------- LSTD VALUE FUNCTION --------------
[[ 6. -6.  0.]
 [-3.  8. -5.]
 [ 0.  0.  0.]]
[[20. 38.  0.]]


LinAlgError: Singular matrix