In [None]:
@dataclass(frozen=True)
class TransitionStop(Generic[S]):
    state: NonTerminal[S]
    next_state: State[S]
    reward: float


class MarkovRewardProcess(MarkovProcess[S]):

    @abstractmethod
    def transition_reward(self, state: NonTerminal[S]) -> Distribution[Tuple[State[S]], float]:
        pass

    def simulate_reward(self,
                        start_state_distribution: Distribution[NonTerminal[S]]) -> Iterable[TransitionStep[S]]:
        state: State[S] = start_state_distribution.sample()
        reward: float = 0

        while isinstance(state, NonTerminal):
            next_distribution = self.trainsition_reward(state)
            next_state, reward = next_distribution.sample()

            yield TransitionStep(state, next_state, reward)
            state = next_state


# create a concrete class that implement the interface of the abstract class markovRewardProcess
# the abstractmethod transition of MarkovProcess also needs to be implmented to make the whole thing concrete

def get_value_function_vec(self, gamma: float) -> np.ndarray:
    return np.linalg.solve(
        np.eye(len(self.nen_terminal_states)) -
        gamma * self.get_transition_matrix(),
        self.reward_function_vec
    )



In [None]:
#policy
A = TypeVar('A')
S = TypeVar('S')

class Policy(ABC, Generic[S, A]):

    @abstractmethod
    def act(self, state: NonTerminal[S]) -> Distribution[A]:
        pass

@dataclass(frozen=True)
class DeterministicPolicy(Policy[S, A]):
    action_for: Callable[[S],A]

    def act(self, state: NonTerminal[S]) -> Constant[A]:
        return Constant(self.action_for(state.state))

from rl.distribution import Choose

@dataclass(frozen=True)
class UniformPolicy(Policy[S, A]):
    valid_actions: Callable[[S], Iterable[A]]

    def act(self, state: NonTerminal[S]) -> Choose[A]:
        return Choose(self.valid_actions(state.state))



In [None]:
import numpy as np
from rl.distribution import SampledDistribution

class SimpleInvertoryStochasticPolicy(Policy[InventoryState, int]):

    def __init__(self, reorder_point_poisson_mean: float):
        self.reorder_point_poisson_mean: float = reorder_point_poisson_mean

    def act(self, state:NonTerminal[InventoryState]) -> SampledDistribution[int]:
        def action_func(state=state) -> int:
            reoder_point_sample: int = np.random.poisson(self.reorder_point_poisson_mean)
            return max(reorder_point_sample -state.state.inventory_posiiton(), 0)
        return SampledDistribution(action_func)


In [None]:
from rl.distribution import Distribution

@dataclass(frozen=True)
class TransitionStep(Generic[S, A]):
    state: NonTerminal[S]
    action: A
    next_state: State[A]
    reward: float

class MarkovDecisionProcess(ABC, Generic[S, A]):
    @abstractmethod
    def actions(self, state: NonTerminal[S]) -> Iterable[A]:
        pass

    @abstractmethod
    def step(self,
             state:NonTerminal[S],
             action:A ) -> Distribution[Tuple[State[S], float]]:
        pass

    def apply_policy(self, policy: Policy[S, A]) -> MarkovDecisionProcess[S]:
        mdp=self

        class RewardProcess(MarkovRewardProcess[S]):
            def transition_reward(self,
                                  state: Nonternimal[S]) -> Distribution[Tuple[State[S], float]]:
                actions: Distribution[A] = policy.act(state)
                return actions.apply(lambda a: mdp.step(state, a))
            return RewardProcess()

    def simulate_actions(self,
                         start_states: Distribution[NonTernimal[S]],
                         policy: Policy[S, A]) -> Iterable[TransitionStep[S,A]]:
        state: State[S] = start_states.sample()

        while isinstance(state.NonTerminal):
            action_distribution = policy.act(state)

            action = action_distribution.sample()
            next_distribution = self.step(state, action)

            next_state, reward = next_distribution.sample()
            yield TransitionStep(state, action, next_state, reward)

            state = next_state







In [None]:
# dynamic programming
X = TypeVar('X')

def iterate(step: Callable[[X], X], start: X) -> Iterator[X]:
    state = start

    while True:
        yield state
        state = step(state)

In [None]:
def converge(values: Iterator[X], done:Callable[[x,x], bool]) -> Iterator[X]:
    a = next(values, None)
    if a is None:
        return
    yield a

    for b in values:
        yield b
        if done(a, b):
            return
        a = b

In [None]:
# Policy evaluation
DEFAULT_TOLERANCE = 1e-5

V= Mapping[NonTerminal[S], float]

def evaluate_mrp(
        mrp: FiniteMarkovRewardProcess[S],
        gamma: float
) -> Iterator[np.ndarray]:
    def update(v: np.ndarray) -> np.ndarray:
        return mrp.reward_function_vec + gamma * mrp.get_transition_matrix().dot(v)

    v_0: np.nd_array = np.zeros(len(mrp.non_terminal_states))
    return iterate(update, v_0)

def almost_equal_np_arrays(
        v1: np.ndarray
)