In [1]:
import numpy as np

In [10]:
from typing import Callable, Mapping, Tuple, TypeVar, Set

In [12]:
S = TypeVar('S')
A = TypeVar('A')
MDPTransitions = Mapping[S, Mapping[A, Mapping[S, float]]]
MDPActions = Mapping[S, Set[A]]
MDPRewards = Mapping[S, Mapping[A, float]]



In [35]:
def value_iteration(actions: MDPActions, transitions: MDPTransitions, rewards: MDPRewards,
                   discount: float) -> Mapping[S, float]:
    base_value_function = {s: 0 for s in actions.keys()}
    next_value_function = iterate_on_value_function(actions, transitions, rewards, base_value_function,
                                                   discount)
    while not check_value_fuction_equivalence(base_value_function, next_value_function):
        base_value_function = next_value_function
        next_value_function = iterate_on_value_function(actions, transitions, rewards, base_value_function, 
                                                        discount)
    return base_value_function


def iterate_on_value_function(actions: MDPActions, transitions: MDPTransitions, rewards: MDPRewards,
                              base_vf: Mapping[S, float], discount: float) -> Mapping[S, float]:
    new_vf = {}
    for s in actions.keys():
        action_values = [(action, extract_value_of_action(actions, transitions, rewards, 
                                                          action, s, base_vf, discount)) for action in actions[s]]
        best_action_reward = min([x[1] for x in action_values])
        new_vf[s] = best_action_reward
    return new_vf


def extract_value_of_action(actions: MDPActions, transitions: MDPTransitions, rewards: MDPRewards,
                            action: A, state: S, value_function, discount: float):
    return rewards[state][action] + discount * sum([p * value_function[s_prime]
                                                    for s_prime, p in
                                                    transitions[state][action].items()])


def check_value_fuction_equivalence(v1, v2, epsilon=1e-8) -> bool:
    assert v1.keys() == v2.keys(), "comparing policies with different state spaces"
    for state in v1:
        if not abs(v1[state] - v2[state]) <= epsilon:
            return False
    return True


def check_policy_equivalence(p1, p2) -> bool:
    assert p1.keys() == p2.keys(), "comparing policies with different state spaces"
    for state in p1:
        if p1[state] != p2[state]:
            return False
    return True


def get_greedy_policy(actions: MDPActions, transitions: MDPTransitions, rewards: MDPRewards,
                      value_function: Mapping[S, float], terminal_states: Set[S]) -> Mapping[S, A]:
    policy = {}
    non_terminal_states = set(actions.keys()) - terminal_states
    for s in non_terminal_states:
        actions_rewards = {}
        for action in actions[s]:
            actions_rewards[action] = extract_value_of_action(mdp, action, s, value_function)
        policy[s] = {(min(actions_rewards, key=actions_rewards.get), 1)}
    for s in terminal_states:
        policy[s] = {(actions[s][0], 1)}
    return policy


In [36]:
'''
Maze Runner Problem
'''

maze_runner_actions = {
    0: {'s', 'j'},
    1: {'s', 'j'},
    2: {'s', 'j'},
    3: {'s', 'j'},
    4: {'s',},
    5: {'stay'}
}

maze_runner_transitions = {
    0: {'s': {1: 1}, 'j': {2: 0.5, 3: 0.25, 4: 0.125, 5:0.125}},
    1: {'s': {2: 1}, 'j': {3: 0.5, 4: 0.25, 5:0.25}},
    2: {'s': {3: 1}, 'j': {4: 0.5, 5:0.5}},
    3: {'s': {4: 1}, 'j': {4: 0.5, 5:0.5}},
    4: {'s': {5: 1}},
    5: {'stay': {5: 1}}
}

maze_runner_rewards = {
    0: {'s': 0, 'j': 0},
    1: {'s': 0, 'j': 0},
    2: {'s': 0, 'j': 0},
    3: {'s': 0, 'j': 0},
    4: {'s': 1},
    5: {'stay': 0}
}


In [37]:
value_iteration(actions=maze_runner_actions, transitions=maze_runner_transitions, rewards=maze_runner_rewards,
               discount=0.9)

{0: 0.32805000000000006,
 1: 0.36450000000000005,
 2: 0.405,
 3: 0.45,
 4: 1.0,
 5: 0.0}