In [90]:
from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
# import graphviz
import numpy as np
from pprint import pprint
from typing import (Callable, Dict, Iterable, Generic, Sequence, Tuple,
                    Mapping, TypeVar, Set)

from rl.distribution import (Categorical, Distribution, FiniteDistribution,
                             SampledDistribution)

S = TypeVar('S')
X = TypeVar('X')

class State(ABC, Generic[S]):
    state: S

    def on_non_terminal(
        self,
        f: Callable[[NonTerminal[S]], X],
        default: X
    ) -> X:
        if isinstance(self, NonTerminal):
            return f(self)
        else:
            return default

    
@dataclass(frozen=True)
class Terminal(State[S]):
    state: S


@dataclass(frozen=True)
class NonTerminal(State[S]):
    state: S


In [170]:
class MarkovProcess(ABC, Generic[S]):
    '''A Markov process with states of type S.
    '''
    @abstractmethod
    def transition(self, state: NonTerminal[S]) -> Distribution[State[S]]:
        '''Given a state of the process, returns a distribution of
        the next states.  Returning None means we are in a terminal state.
        '''

    def simulate(
        self,
        start_state_distribution: Distribution[NonTerminal[S]]
    ) -> Iterable[State[S]]:
        '''Run a simulation trace of this Markov process, generating the
        states visited during the trace.

        This yields the start state first, then continues yielding
        subsequent states forever or until we hit a terminal state.
        '''
        state: State[S] = start_state_distribution.sample()
        # print(start_state_distribution.sample())
        yield state
        # print(state)
        while isinstance(state, NonTerminal):
            
            # print(self.transition(state))
            state = self.transition(state).sample()
            # print(state)
            yield state

    def traces(
            self,
            start_state_distribution: Distribution[NonTerminal[S]]
    ) -> Iterable[Iterable[State[S]]]:
        '''Yield simulation traces (the output of `simulate'), sampling a
        start state from the given distribution each time.

        '''
        while True:
            yield self.simulate(start_state_distribution)


Transition = Mapping[NonTerminal[S], FiniteDistribution[State[S]]]


class FiniteMarkovProcess(MarkovProcess[S]):
    '''A Markov Process with a finite state space.

    Having a finite state space lets us use tabular methods to work
    with the process (ie dynamic programming).

    '''

    non_terminal_states: Sequence[NonTerminal[S]]
    transition_map: Transition[S]


    def __init__(self, transition_map: Mapping[S, FiniteDistribution[S]]):
        non_terminals: Set[S] = set(transition_map.keys())
        self.transition_map = {
            NonTerminal(s): Categorical(
                {(s1 if s1 in non_terminals else s1): p
                 for s1, p in v.table().items()}
            ) for s, v in transition_map.items()
        }
        self.non_terminal_states = list(self.transition_map.keys())

    ####ORIGINAL ONE IS BELOW #####
    # def __init__(self, transition_map: Mapping[S, FiniteDistribution[S]]):
    #     non_terminals: Set[S] = set(transition_map.keys())
    #     self.transition_map = {
    #         NonTerminal(s): Categorical(
    #             {(NonTerminal(s1) if s1 in non_terminals else Terminal(s1)): p
    #              for s1, p in v.table().items()}
    #         ) for s, v in transition_map.items()
    #     }
    #     self.non_terminal_states = list(self.transition_map.keys())
        
    def __repr__(self) -> str:
        display = ""

        for s, d in self.transition_map.items():
            display += f"From State {s.state}:\n"
            for s1, p in d:
                opt = "Terminal " if isinstance(s1, Terminal) else ""
                display += f"  To {opt}State {s1.state} with Probability {p:.3f}\n"

        return display

    def get_transition_matrix(self) -> np.ndarray:
        sz = len(self.non_terminal_states)
        mat = np.zeros((sz, sz))

        for i, s1 in enumerate(self.non_terminal_states):
            for j, s2 in enumerate(self.non_terminal_states):
                mat[i, j] = self.transition(s1).probability(s2)
        return mat


    def transition(self, state: NonTerminal[S])\
            -> FiniteDistribution[State[S]]:
        return self.transition_map[state]

    def get_stationary_distribution(self) -> FiniteDistribution[S]:
        eig_vals, eig_vecs = np.linalg.eig(self.get_transition_matrix().T)
        index_of_first_unit_eig_val = np.where(
            np.abs(eig_vals - 1) < 1e-8)[0][0]
        eig_vec_of_unit_eig_val = np.real(
            eig_vecs[:, index_of_first_unit_eig_val])
        return Categorical({
            self.non_terminal_states[i].state: ev
            for i, ev in enumerate(eig_vec_of_unit_eig_val /
                                   sum(eig_vec_of_unit_eig_val))
        })

    def display_stationary_distribution(self):
        pprint({
            s: round(p, 3)
            for s, p in self.get_stationary_distribution()
        })

    def generate_image(self) -> graphviz.Digraph:
        d = graphviz.Digraph()

        for s in self.transition_map.keys():
            d.node(str(s))

        for s, v in self.transition_map.items():
            for s1, p in v:
                d.edge(str(s), str(s1), label=str(p))

        return d


In [179]:
N = 10000
States = {}

for i in range(1,100):
    States[i] = NonTerminal(i)
for i in range(100,106):
    States[i] = Terminal(i)
States[80] = Terminal(80)
# States[100] = Terminal(100)

Dists: Mapping[int, float] = {}
for i in range(0,100):
    temp_map: Mapping[int, float] = {}
    for j in range(1,7):
            temp_map[States[i+j]] = 1/6
        # if i+j<=100:
        #     temp_map[States[i+j]] = 1/6
    Dists[i] = Categorical(temp_map)


Dists[1] = Dists[38]
Dists[4] = Dists[14]
Dists[9] = Dists[31]
Dists[16] = Dists[6]
Dists[21] = Dists[42]
Dists[28] = Dists[84]
Dists[36] = Dists[44]
Dists[47] = Dists[26]
Dists[49] = Dists[11]
Dists[51] = Dists[67]
Dists[56] = Dists[53]
Dists[62] = Dists[19]
Dists[64] = Dists[60]
Dists[71] = Dists[91]
Dists[87] = Dists[24]
Dists[93] = Dists[73]
Dists[95] = Dists[75]
Dists[98] = Dists[78]


process = FiniteMarkovProcess(Dists)
for state in process.simulate(Dists[0]): 
    print(state)


NonTerminal(state=3)
NonTerminal(state=7)
NonTerminal(state=10)
NonTerminal(state=12)
NonTerminal(state=14)
NonTerminal(state=16)
NonTerminal(state=10)
NonTerminal(state=14)
NonTerminal(state=18)
NonTerminal(state=20)
NonTerminal(state=24)
NonTerminal(state=26)
NonTerminal(state=32)
NonTerminal(state=36)
NonTerminal(state=49)
NonTerminal(state=17)
NonTerminal(state=19)
NonTerminal(state=21)
NonTerminal(state=48)
NonTerminal(state=51)
NonTerminal(state=72)
NonTerminal(state=73)
NonTerminal(state=78)
Terminal(state=80)


In [212]:
######FROG GAME###########
N=1000
States = {}
for i in range(1,N):
    States[i] = NonTerminal(i)

States[0] = Terminal(0)

Dists: Mapping[int, float] = {}
for i in range(1,N):
    temp_map: Mapping[int, float] = {}
    for j in range(0,i):
            temp_map[States[j]] = 1/(i)
    Dists[i] = Categorical(temp_map)
process = FiniteMarkovProcess(Dists)
avg = 0
for _ in range(15000):
    inc = 0
    for state in process.simulate(Dists[5]): 
        inc += 1
    avg += inc/15000
print(avg*60)

137.42799999998743


In [218]:
# Reward processes
@dataclass(frozen=True)
class TransitionStep(Generic[S]):
    state: NonTerminal[S]
    next_state: State[S]
    reward: float

    def add_return(self, γ: float, return_: float) -> ReturnStep[S]:
        '''Given a γ and the return from 'next_state', this annotates the
        transition with a return for 'state'.

        '''
        return ReturnStep(
            self.state,
            self.next_state,
            self.reward,
            return_=self.reward + γ * return_
        )


@dataclass(frozen=True)
class ReturnStep(TransitionStep[S]):
    return_: float


class MarkovRewardProcess(MarkovProcess[S]):
    def transition(self, state: NonTerminal[S]) -> Distribution[State[S]]:
        '''Transitions the Markov Reward Process, ignoring the generated
        reward (which makes this just a normal Markov Process).

        '''
        distribution = self.transition_reward(state)

        def next_state(distribution=distribution):
            next_s, _ = distribution.sample()
            return next_s

        return SampledDistribution(next_state)

    @abstractmethod
    def transition_reward(self, state: NonTerminal[S])\
            -> Distribution[Tuple[State[S], float]]:
        '''Given a state, returns a distribution of the next state
        and reward from transitioning between the states.

        '''

    def simulate_reward(
        self,
        start_state_distribution: Distribution[NonTerminal[S]]
    ) -> Iterable[TransitionStep[S]]:
        '''Simulate the MRP, yielding an Iterable of
        (state, next state, reward) for each sampled transition.
        '''

        state: State[S] = start_state_distribution.sample()
        reward: float = 0.

        while isinstance(state, NonTerminal):
            next_distribution = self.transition_reward(state)

            next_state, reward = next_distribution.sample()
            yield TransitionStep(state, next_state, reward)

            state = next_state

    def reward_traces(
            self,
            start_state_distribution: Distribution[NonTerminal[S]]
    ) -> Iterable[Iterable[TransitionStep[S]]]:
        '''Yield simulation traces (the output of `simulate_reward'), sampling
        a start state from the given distribution each time.

        '''
        while True:
            yield self.simulate_reward(start_state_distribution)


StateReward = FiniteDistribution[Tuple[State[S], float]]
RewardTransition = Mapping[NonTerminal[S], StateReward[S]]


class FiniteMarkovRewardProcess(FiniteMarkovProcess[S],
                                MarkovRewardProcess[S]):

    transition_reward_map: RewardTransition[S]
    reward_function_vec: np.ndarray

    def __init__(
        self,
        transition_reward_map: Mapping[S, FiniteDistribution[Tuple[S, float]]]
    ):
        transition_map: Dict[S, FiniteDistribution[S]] = {}

        for state, trans in transition_reward_map.items():
            probabilities: Dict[S, float] = defaultdict(float)
            for (next_state, _), probability in trans:
                probabilities[next_state] += probability

            transition_map[state] = Categorical(probabilities)

        super().__init__(transition_map)

        nt: Set[S] = set(transition_reward_map.keys())
        self.transition_reward_map = {
            NonTerminal(s): Categorical(
                {(s1 if s1 in nt else s1, r): p
                 for (s1, r), p in v.table().items()}
            ) for s, v in transition_reward_map.items()
        }
        #####ORIGINAL ONE#########
        # nt: Set[S] = set(transition_reward_map.keys())
        # self.transition_reward_map = {
        #     NonTerminal(s): Categorical(
        #         {(NonTerminal(s1) if s1 in nt else Terminal(s1), r): p
        #          for (s1, r), p in v.table().items()}
        #     ) for s, v in transition_reward_map.items()
        # }

        self.reward_function_vec = np.array([
            sum(probability * reward for (_, reward), probability in
                self.transition_reward_map[state])
            for state in self.non_terminal_states
        ])

    def __repr__(self) -> str:
        display = ""
        for s, d in self.transition_reward_map.items():
            display += f"From State {s.state}:\n"
            for (s1, r), p in d:
                opt = "Terminal " if isinstance(s1, Terminal) else ""
                display +=\
                    f"  To [{opt}State {s1.state} and Reward {r:.3f}]"\
                    + f" with Probability {p:.3f}\n"
        return display

    def transition_reward(self, state: NonTerminal[S]) -> StateReward[S]:
        return self.transition_reward_map[state]

    def get_value_function_vec(self, gamma: float) -> np.ndarray:
        return np.linalg.solve(
            np.eye(len(self.non_terminal_states)) -
            gamma * self.get_transition_matrix(),
            self.reward_function_vec
        )

    def display_reward_function(self):
        pprint({
            self.non_terminal_states[i]: round(r, 3)
            for i, r in enumerate(self.reward_function_vec)
        })

    def display_value_function(self, gamma: float):
        pprint({
            self.non_terminal_states[i]: round(v, 3)
            for i, v in enumerate(self.get_value_function_vec(gamma))
        })


In [222]:
#######REWARD VERSION OF SNAKES AND LADDERS#####
States = {}

for i in range(1,100):
    States[i] = NonTerminal(i)
for i in range(100,106):
    States[i] = Terminal(i)
States[80] = Terminal(80)
# States[100] = Terminal(100)

Dists: Mapping[int, float] = {}
for i in range(0,100):
    temp_map: Mapping[int, float] = {}
    for j in range(1,7):
            temp_map[States[i+j]] = 1/6
    Dists[i] = Categorical(temp_map)

Reward_map: Mapping[int,Categorical]={}
for i in range(0,100):
    temp_map: Mapping[tuple, float] = {}
    for j in range(1,7):
        if j<6:
            temp_map[(States[i+j],1)] = 1/6
        else: 
            temp_map[(States[i+j],0)] = 1/6
    Reward_map[i] = Categorical(temp_map)


Reward_map[1] = Reward_map[38]
Reward_map[4] = Reward_map[14]
Reward_map[9] = Reward_map[31]
Reward_map[16] = Reward_map[6]
Reward_map[21] = Reward_map[42]
Reward_map[28] = Reward_map[84]
Reward_map[36] = Reward_map[44]
Reward_map[47] = Reward_map[26]
Reward_map[49] = Reward_map[11]
Reward_map[51] = Reward_map[67]
Reward_map[56] = Reward_map[53]
Reward_map[62] = Reward_map[19]
Reward_map[64] = Reward_map[60]
Reward_map[71] = Reward_map[91]
Reward_map[87] = Reward_map[24]
Reward_map[93] = Reward_map[73]
Reward_map[95] = Reward_map[75]
Reward_map[98] = Reward_map[78]


process = FiniteMarkovRewardProcess(Reward_map)
reward = 0
for transition_step in process.simulate_reward(Dists[0]): 
    print(transition_step)
    reward += transition_step.reward
print(reward)

TransitionStep(state=NonTerminal(state=3), next_state=NonTerminal(state=9), reward=0)
TransitionStep(state=NonTerminal(state=9), next_state=NonTerminal(state=35), reward=1)
TransitionStep(state=NonTerminal(state=35), next_state=NonTerminal(state=37), reward=1)
TransitionStep(state=NonTerminal(state=37), next_state=NonTerminal(state=39), reward=1)
TransitionStep(state=NonTerminal(state=39), next_state=NonTerminal(state=44), reward=1)
TransitionStep(state=NonTerminal(state=44), next_state=NonTerminal(state=45), reward=1)
TransitionStep(state=NonTerminal(state=45), next_state=NonTerminal(state=47), reward=1)
TransitionStep(state=NonTerminal(state=47), next_state=NonTerminal(state=29), reward=1)
TransitionStep(state=NonTerminal(state=29), next_state=NonTerminal(state=33), reward=1)
TransitionStep(state=NonTerminal(state=33), next_state=NonTerminal(state=36), reward=1)
TransitionStep(state=NonTerminal(state=36), next_state=NonTerminal(state=50), reward=0)
TransitionStep(state=NonTerminal(st