# Assignment 15

In [14]:
# import packages needed
import itertools
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from typing import (Iterable, Callable, Mapping, TypeVar, 
                    List, Tuple, Optional,Sequence, Iterator)
from rl.markov_decision_process import Policy,MarkovDecisionProcess
from rl.markov_decision_process import FiniteMarkovDecisionProcess,policy_from_q
from rl.markov_process import TransitionStep
from rl.returns import returns
from rl.function_approx import Tabular, FunctionApprox
from rl.function_approx import DNNSpec, AdamGradient, DNNApprox
from rl.dynamic_programming import policy_iteration_result
from rl.iterate import last
from collections import defaultdict
from copy import deepcopy
from dataclasses import replace

from rl.chapter2.simple_inventory_mrp import SimpleInventoryMRPFinite, InventoryState
from rl.chapter7.asset_alloc_discrete import AssetAllocDiscrete 
from rl.monte_carlo import mc_prediction, mc_control
from rl.td import td_prediction

from rl.distribution import Constant,Choose,Bernoulli,Distribution,Gaussian

## Problem 1

In [56]:
S = str
DataType = Sequence[Sequence[Tuple[S, float]]]
ProbFunc = Mapping[S, Mapping[S, float]]
RewardFunc = Mapping[S, float]
ValueFunc = Mapping[S, float]


def get_state_return_samples(
    data: DataType
) -> Sequence[Tuple[S, float]]:
    """
    prepare sequence of (state, return) pairs.
    Note: (state, return) pairs is not same as (state, reward) pairs.
    """
    return [(s, sum(r for (_, r) in l[i:]))
            for l in data for i, (s, _) in enumerate(l)]


def get_mc_value_function(
    state_return_samples: Sequence[Tuple[S, float]]
) -> ValueFunc:
    """
    Implement tabular MC Value Function compatible with the interface defined above.
    """
    
    v: ValueFunc = defaultdict(lambda:0)
    occurence: Mapping[S,int] = defaultdict(lambda:0)
        
    for st in state_return_samples:
        occurence[st[0]] += 1
        v[st[0]] += 1./(occurence[st[0]])*(st[1] - v[st[0]])
        
    return dict(v)


def get_state_reward_next_state_samples(
    data: DataType
) -> Sequence[Tuple[S, float, S]]:
    """
    prepare sequence of (state, reward, next_state) triples.
    """
    return [(s, r, l[i+1][0] if i < len(l) - 1 else 'T')
            for l in data for i, (s, r) in enumerate(l)]


def get_probability_and_reward_functions(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> Tuple[ProbFunc, RewardFunc]:
    """
    Implement code that produces the probability transitions and the
    reward function compatible with the interface defined above.
    """
    
    p:ProbFunc = {}
    r:RewardFunc = {}
        
    occurence: Mapping[S,float] = defaultdict(lambda:0)
        
    for st in srs_samples:
        if st[0] not in p or st[0] not in occurence or st[0] not in r:
            occurence[st[0]] = 1.
            p[st[0]] = {}
            p[st[0]][st[2]] = 1.
            r[st[0]] = st[1]
        else:
            occurence[st[0]] += 1.
            if st[2] in p[st[0]]:
                p[st[0]][st[2]] += 1.
            else:
                p[st[0]][st[2]] = 1.
            r[st[0]] += st[1]
        
    for k in p:
        r[k] /= occurence[k]
        for next_k in p[k]:
            p[k][next_k] = p[k][next_k]/occurence[k]
            
    return p,r     


def get_mrp_value_function(
    prob_func: ProbFunc,
    reward_func: RewardFunc
) -> ValueFunc:
    """
    Implement code that calculates the MRP Value Function from the probability
    transitions and reward function, compatible with the interface defined above.
    Hint: Use the MRP Bellman Equation and simple linear algebra
    """
    sz = len(prob_func.keys())
    mat = np.zeros((sz, sz))

    for i, s1 in enumerate(prob_func.keys()):
        for j, s2 in enumerate(prob_func.keys()):
            mat[i, j] = prob_func[s1][s2]
            
    r = np.zeros((sz, 1))
    
    for i, s1 in enumerate(reward_func.keys()):
        r[i][0] = reward_func[s1]
    
    
    res = np.linalg.inv(np.eye(len(prob_func.keys())) - mat).dot(r)
    
    for i in range(res.shape[0]):
        res[i][0]


def get_td_value_function(
    srs_samples: Sequence[Tuple[S, float, S]],
    num_updates: int = 300000,
    learning_rate: float = 0.3,
    learning_rate_decay: int = 30
) -> ValueFunc:
    """
    Implement tabular TD(0) (with experience replay) Value Function compatible
    with the interface defined above. Let the step size (alpha) be:
    learning_rate * (updates / learning_rate_decay + 1) ** -0.5
    so that Robbins-Monro condition is satisfied for the sequence of step sizes.
    """
    # for each iteration, check i%len, and shuffle
    
    v: ValueFunc = defaultdict(lambda:0)
    srs:List[Tuple[S, float, S]] = list(srs_samples)
        
    for i in range(1,num_updates+1):
        alpha = learning_rate * (i / learning_rate_decay + 1.) ** -0.5
        pick = np.random.randint(0,len(srs)-1)
        st = srs[pick]
        v[st[0]] += alpha*(st[1] + v[st[2]]- v[st[0]])
    
    return dict(v)

# TODO: fix this equation
def get_lstd_value_function(
    srs_samples: Sequence[Tuple[S, float, S]]
) -> ValueFunc:
    """
    Implement LSTD Value Function compatible with the interface defined above.
    Hint: Tabular is a special case of linear function approx where each feature
    is an indicator variables for a corresponding state and each parameter is
    the value function for the corresponding state.
    """
    feature_functions
    m:int = len(feature_functions)
    A:np.ndarray = np.zeros((m,m))
    b:np.ndarray = np.zeros((m,1))
    v:Mapping[S,float] = {}
    
    for s in trans:
        if s.state not in v:
            v[s.state] = 0.0
        feature_val_s:np.ndarray = np.reshape(np.array([f(s.state) for f in feature_functions]),(m,1))
        feature_val_ns:np.ndarray = np.reshape(np.array([f(s.next_state) for f in feature_functions]),(m,1))
        A = A+np.dot(feature_val_s,(feature_val_s-gamma*feature_val_ns).T)
        b = b+feature_val_s*s.reward
        
    w_star:np.ndarray = np.dot(np.linalg.inv(A),b)
    for k in v:
        feature_val_s:np.ndarray = np.reshape(np.array([f(k) for f in feature_functions]),(m,1))
        v[s] = np.dot(feature_val_s.T,w_star)


In [57]:
given_data: DataType = [
    [('A', 2.), ('A', 6.), ('B', 1.), ('B', 2.)],
    [('A', 3.), ('B', 2.), ('A', 4.), ('B', 2.), ('B', 0.)],
    [('B', 3.), ('B', 6.), ('A', 1.), ('B', 1.)],
    [('A', 0.), ('B', 2.), ('A', 4.), ('B', 4.), ('B', 2.), ('B', 3.)],
    [('B', 8.), ('B', 2.)]
]

sr_samps = get_state_return_samples(given_data)

print("------------- MONTE CARLO VALUE FUNCTION --------------")
print(get_mc_value_function(sr_samps))

srs_samps = get_state_reward_next_state_samples(given_data)

pfunc, rfunc = get_probability_and_reward_functions(srs_samps)
print("-------------- MRP VALUE FUNCTION ----------")
print(get_mrp_value_function(pfunc, rfunc))

print("------------- TD VALUE FUNCTION --------------")
print(get_td_value_function(srs_samps))

#print("------------- LSTD VALUE FUNCTION --------------")
#print(get_lstd_value_function(srs_samps))

------------- MONTE CARLO VALUE FUNCTION --------------
{'A': 9.571428571428571, 'B': 5.642857142857142}
-------------- MRP VALUE FUNCTION ----------
None
------------- TD VALUE FUNCTION --------------
{'B': 11.707922483874848, 'T': 0, 'A': 14.811825638337334}


Question:
1. How is it possible to build a LSTD without a set of feature function?
2. How to interpret the results above?

In [53]:
# consulted implementation in RL-book - only for linear cases
# TODO: Implementation need to change!!!
def tdc_prediction(transitions: Iterable[TransitionStep[S]],
                   feature_functions: Sequence[Callable[[S], float]],
                   gamma: float,
                   alpha: float,
                   beta: float) -> Iterator[FunctionApprox[S]]:
    
    class custom_LFA(Generic[S]):
        
        self.weights: ndarray
        self.feature_functions: Sequence[Callable[[S], float]]
        
        def __init__(self, feature_functions: Sequence[Callable[[S], float]],
                     beta: float, alpha: float, gamma: float,
                     weights: ndarray = None):
            self.feature_functions = approx.feature_functions
            self.alpha = alpha
            self.beta = beta
            self.gamma = gamma
            if weights is not None:
                self.weights = weights
            else:
                self.weights = np.zeros((len(feature_functions),1))
            self.theta: ndarray = np.zeros((len(feature_functions),1))
            self.values_map = Mapping[S,float] = defaultdict(lambda:0)
                
        def get_feature_values(self,x:S):
            return np.reshape(
                np.array([f(x) for f in self.feature_functions]),
                (len(feature_functions),1))
        
        # Assume only one (x,y) pair will be passed in
        def update(self,xy_vals: Tuple[S,S,float]) -> custom_LFA[S]:
            st,next_st,y = xy_vals
            feature_st: np.ndarray = self.get_feature_values(st)
            feature_next_st: np.ndarray = self.get_feature_values(next_st)
            delta: float = y - np.dot(feature_st.T, self.weights)
            thetaTfeature:float = np.dot(feature_st.T,theta)   
                
            new_theta: np.ndarray = self.theta+self.beta*(delta-thetaTfeature)*feature_st
            new_weights: np.ndarray = self.weights+\
                self.alpha*(delta*feature_st-self.gamma*thetaTfeature*feature_next_st)
                
            self.values_map[st] = np.dot(feature_st.T,new_weights)
                
            return replace(
                self,
                weights = new_weights,
                theta = new_theta
            )
    
    def step(v, transition):
        return v.update((transition.state,
                          transition.next_state,
                          transition.reward + gamma * v(transition.next_state)))

    return iterate.accumulate(transitions, step, initial=custom_LFA(feature_functions,
                                                                    beta,
                                                                    alpha,
                                                                    gamma))

In [17]:
# test for correctness
user_capacity = 2
user_poisson_lambda = 1.0
user_holding_cost = 1.0
user_stockout_cost = 10.0

user_gamma = 0.9

si_mrp = SimpleInventoryMRPFinite(
    capacity=user_capacity,
    poisson_lambda=user_poisson_lambda,
    holding_cost=user_holding_cost,
    stockout_cost=user_stockout_cost
)

start = InventoryState(on_hand = 0, on_order = 0)
sample = list(itertools.islice(si_mrp.simulate_reward(Constant(start)),100000))