In [3]:
import numpy as np
from typing import TypeVar,Optional,Mapping,Sequence,Iterable, Iterator, Tuple, TypeVar, Dict, Callable,List
from rl.markov_decision_process import Policy
import math
from rl.distribution import (Bernoulli, Constant, Categorical, Choose,
                             Distribution, FiniteDistribution)
import numpy as np

from rl.distribution import (Bernoulli, Constant, Categorical, Choose,
                             Distribution, FiniteDistribution)
from dataclasses import dataclass, replace
from rl.markov_decision_process import FinitePolicy, TransitionStep
from rl.function_approx import FunctionApprox

In [4]:
S = TypeVar('S')
A = TypeVar('A')

@dataclass(frozen=True)
class Linear_Approx_pi():
    feature_func: Callable[[S,A],Sequence[float]]
    weight: Sequence[float]

    def update(self, delta_weight:Sequence[float]):
        return replace(self,weight = self.weight + delta_weight)

    def evaluate(self,state:S, action:A)->float:
        return np.dot(self.weight,self.feature_func(state,action))

    def get_gradient(self,state:S,action:A)->Sequence[float]:
        return self.feature_func(state,action)

@dataclass(frozen=True)
class Linear_Approx_V():
    feature_func: Callable[[S],Sequence[float]]
    weight: Sequence[float]

    def update(self, delta_weight:Sequence[float]):
        return replace(self,weight = self.weight + delta_weight)

    def evaluate(self,state:S)->float:
        return np.dot(self.weight,self.feature_func(state))

    def get_gradient(self,state:S)->Sequence[float]:
        return self.feature_func(state)


def sample_from_policy(
        state:S,
        pi: Linear_Approx_pi,
        actions: Mapping[S,Iterable[A]]
) -> A:
    d = {}
    for action in actions[state]:
        d[action] = pi(state,action)
    d = Categorical(d)

    return d.sample()


def Actor_Critic_Eligibility_Trace(
         feature_func_pi: Callable[[S,A],Sequence[float]],     # feature functions
         feature_func_V:Callable[[S],Sequence[float]],
         simulator: Callable[[S,A],Tuple[S,float]],
         theta0: Sequence[float],
         v0: Sequence[float],
         actions: Mapping[S,Iterable[A]],

         gamma: float,

         learning_rate_v: Callable[[int],float],
         learning_rate_theta: Callable[[int],float],

         lambda_theta: float,
         lambda_v: float,

         d: int,

         state_distribution: Distribution[S],
         tolerance: float = 1e-6,
         nstop: int = None
         )->Iterator[Sequence[float]]:
    """
    LSTD algorithm.
    feature_func:S->R^d. feature_func(terminal) = 0
    simulator: Take input state and action, output next state and reward
    d: dimension of features
    theta0: The initial parameters for policy
    v0: The initial parameters for V

    actions: allowed actions for each state

    learning_rate_theta: learning rate for theta as a function of number of appearance of  a (state,action) pari
    learning_Rate_v: learning rate for v as a function of number of appearance of  a (state,action) pari
    lambda_theta: lambda for theta trace eligibitiy
    lambda_v: lambda for v trace eligibility

    return: Iterator of (pi, v)
    """

    # initializations
    theta = theta0
    v = v0

    pi = Linear_Approx_pi(feature_func = feature_func_pi, weight = theta)
    value = Linear_Approx_V(feature_func = feature_func_V, weight = v)
    max_steps = round(math.log(tolerance) / math.log(gamma)) if gamma < 1 else nstop

    trace_count = 0
    state_action_count = {}
    while True:
        state = state_distribution.sample()
        z_theta = np.zeros(d)
        z_v = np.zeros(d)
        P = 1
        trace_count += 1
        # for each step in a episode
        step_count = 0
        while step_count < max_steps:
            step_count += 1

            action = sample_from_policy(state,pi,actions)

            state_action_count[(state,action)]  = state_action_count.get((state,action),0.) +1

            next_state,reward = simulator(state,action)

            delta = reward + gamma*value.evaluate(next_state) - value.evaluate(state)

            z_v = gamma*lambda_v*z_v + value.get_gradient(state)
            z_theta = gamma*lambda_theta*z_theta + P*pi.get_gradient(state,action)/pi.evaluate(state,action)

            alpha_v = learning_rate_v(state_action_count[(state,action)])
            alpha_theta = learning_rate_theta(state_action_count[(state,action)])

            delta_v = alpha_v*delta*z_v
            delta_theta = alpha_theta*delta*z_theta

            pi.update(delta_theta)
            value.update(delta_v)

            P = gamma*P
            state = next_state


        yield pi,value