In [1]:
'''Approximate dynamic programming algorithms are variations on
dynamic programming algorithms that can work with function
approximations rather than exact representations of the process's
state space.

'''

from typing import Iterator, Mapping, Tuple, TypeVar, Sequence, List
from operator import itemgetter
import numpy as np

from rl.distribution import Distribution, Constant, Choose
from rl.function_approx import FunctionApprox
from rl.iterate import iterate
from rl.markov_process import (FiniteMarkovRewardProcess, MarkovRewardProcess,
                               RewardTransition)
from rl.markov_decision_process import (FiniteMarkovDecisionProcess, Policy, FinitePolicy,
                                        MarkovDecisionProcess,
                                        StateActionMapping)

S = TypeVar('S')
A = TypeVar('A')

# A representation of a value function for a finite MDP with states of
# type S
V = Mapping[S, float]


def evaluate_finite_mrp(
        mrp: FiniteMarkovRewardProcess[S],
        γ: float,
        approx_0: FunctionApprox[S]
) -> Iterator[FunctionApprox[S]]:

    '''Iteratively calculate the value function for the give finite Markov
    Reward Process, using the given FunctionApprox to approximate the
    value function at each step.

    '''
    def update(v: FunctionApprox[S]) -> FunctionApprox[S]:
        vs: np.ndarray = v.evaluate(mrp.non_terminal_states)
        updated: np.ndarray = mrp.reward_function_vec + γ * \
            mrp.get_transition_matrix().dot(vs)
        return v.update(zip(mrp.states(), updated))

    return iterate(update, approx_0)


def policy_iteration_finite(
    mdp: FiniteMarkovDecisionProcess[S, A],
     γ: float,
    approx_0: FunctionApprox[S]
) -> Iterator[FunctionApprox[S]]:
    '''Perform approximate policy iteration using the given FunctionApprox to approximate the
    Optimal Value function at each step

    '''
    tol:float = 1.e-2
    
    
    def update(v: FunctionApprox[S]) -> FunctionApprox[S]:
        policy: FinitePolicy[S,A] = FinitePolicy({s:Choose({k for k in a.keys()}) for s,a in mdp.mapping if (not a is None)})
    
        # perform policy evaluation
        eval_it = evaluate_finite_mrp(mdp.apply_finite_policy(policy), γ, approx_0)
        old_vf = eval_it.__next__()
        while(True):
            new_vf = eval_it.__next__()
            if (all([np.abs(old_vf[i] - new_vf[i]) < tol for i in range(len(old_vf))])):
                break
            old_vf = new_vf
        
        # greedy policy improvement
        poli
        
        


