In [1]:
import sys 
sys.path.append('../')

import itertools
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Mapping, Dict, Tuple
from rl.policy import FiniteDeterministicPolicy
from rl.markov_decision_process import FiniteMarkovDecisionProcess
from rl.distribution import Categorical, FiniteDistribution, Distribution
from rl.dynamic_programming import value_iteration, value_iteration_result

np.set_printoptions(formatter={'float': lambda x: f"{x:.3f}"})
plt.rcParams['figure.figsize'] = (15, 7)

In [2]:
SPACE = 'SPACE'
GOAL = 'GOAL'
BLOCK = 'BLOCK'

DOWN = 'D'
UP = 'U'
LEFT = 'L'
RIGHT = 'R'


@dataclass(frozen=True)
class Position:
    x : int
    y : int

ActionMapping = Mapping[Position, Mapping[str, Categorical[Tuple[Position, float]]]]

class GridMaze1(FiniteMarkovDecisionProcess[Position, str]):
    '''
    here, the rewards formulation is that every step gets a reward of -1 
    the terminal state has a reward of 0
    '''
    def __init__(self, grid : Dict[Tuple[int, int], str]) -> None:
        self.grid = grid
        super().__init__(self.GetActionMapping())

    def GetActionMapping(self) -> ActionMapping:
        d : Dict[Position, Dict[str, Categorical[Tuple[Position, float]]]] = {}

        for pos, kind in self.grid.items():
            sub_d : Dict[str, Categorical[Tuple[Position, float]]] = {}
            if kind == SPACE:
                state = Position(x = pos[0], y = pos[1])

                down = self.grid.get((pos[0] + 1, pos[1]), BLOCK)
                up = self.grid.get((pos[0] - 1, pos[1]), BLOCK)
                right = self.grid.get((pos[0], pos[1] + 1), BLOCK)
                left = self.grid.get((pos[0], pos[1] - 1), BLOCK)

                num_options = (left == SPACE) + (right == SPACE) + (up == SPACE) + (down == SPACE) + (left == GOAL) + (right == GOAL) + (up == GOAL) + (down == GOAL)
                p = float(1 / num_options)

                if down == SPACE:
                    sub_d[DOWN] = Categorical({(Position(x=pos[0] + 1, y=pos[1]), -1.0) : p})

                elif down == GOAL:
                    sub_d[DOWN] = Categorical({(Position(x=pos[0] + 1, y=pos[1]), 0.0) : p})
                
                if up == SPACE:
                    sub_d[UP] = Categorical({(Position(x=pos[0] - 1, y=pos[1]), -1.0): p})

                elif up == GOAL:
                    sub_d[UP] = Categorical({(Position(x=pos[0] - 1, y=pos[1]), 0.0): p})
                
                if right == SPACE:
                    sub_d[RIGHT] = Categorical({(Position(x=pos[0], y=pos[1] + 1), -1.0): p})

                elif right == GOAL:
                    sub_d[RIGHT] = Categorical({(Position(x=pos[0], y=pos[1] + 1), 0.0): p})
                
                if left == SPACE:
                    sub_d[LEFT] = Categorical({(Position(x=pos[0], y=pos[1] - 1), -1.0): p})
                elif left == GOAL:
                    sub_d[LEFT] = Categorical({(Position(x=pos[0], y=pos[1] - 1), 0.0): p})


                d[state] = sub_d

        return d

def done(v1 : Dict[Position, float], v2 : Dict[Position, float], tol : float):
	'''
	this function takes in two dictionaries, converts them to numpy
	arrays and then returns True if the maximum absolute value across
	one array and the other is smaller than the specified tolerance

	parameters:
	v1: a dictionary with states as the keys and the elements
		in the value function as the values
	v2: a dictionary with states as the keys and the elements
		in the value function as the values
	tol: the specified tolerance

	returns:
	--------
	True if the maximum difference is less than TOLERANCE
	False otherwise
	'''
	array1 = np.array([i for i in v1.values()])
	array2 = np.array([i for i in v2.values()])

	return np.linalg.norm(array1 - array2, ord = np.inf) < tol

In [3]:
SPACE = 'SPACE'
BLOCK = 'BLOCK'
GOAL = 'GOAL'

maze_grid = {(0, 0): SPACE, (0, 1): BLOCK, (0, 2): SPACE, (0, 3): SPACE, (0, 4): SPACE, 
             (0, 5): SPACE, (0, 6): SPACE, (0, 7): SPACE, (1, 0): SPACE, (1, 1): BLOCK,
             (1, 2): BLOCK, (1, 3): SPACE, (1, 4): BLOCK, (1, 5): BLOCK, (1, 6): BLOCK, 
             (1, 7): BLOCK, (2, 0): SPACE, (2, 1): BLOCK, (2, 2): SPACE, (2, 3): SPACE, 
             (2, 4): SPACE, (2, 5): SPACE, (2, 6): BLOCK, (2, 7): SPACE, (3, 0): SPACE, 
             (3, 1): SPACE, (3, 2): SPACE, (3, 3): BLOCK, (3, 4): BLOCK, (3, 5): SPACE, 
             (3, 6): BLOCK, (3, 7): SPACE, (4, 0): SPACE, (4, 1): BLOCK, (4, 2): SPACE, 
             (4, 3): BLOCK, (4, 4): SPACE, (4, 5): SPACE, (4, 6): SPACE, (4, 7): SPACE, 
             (5, 0): BLOCK, (5, 1): BLOCK, (5, 2): SPACE, (5, 3): BLOCK, (5, 4): SPACE, 
             (5, 5): BLOCK, (5, 6): SPACE, (5, 7): BLOCK, (6, 0): SPACE, (6, 1): BLOCK, 
             (6, 2): BLOCK, (6, 3): BLOCK, (6, 4): SPACE, (6, 5): BLOCK, (6, 6): SPACE, 
             (6, 7): SPACE, (7, 0): SPACE, (7, 1): SPACE, (7, 2): SPACE, (7, 3): SPACE, 
             (7, 4): SPACE, (7, 5): BLOCK, (7, 6): BLOCK, (7, 7): GOAL}

grid = np.zeros((8, 8)).astype(str)

for (i, j), k in maze_grid.items():
    if k == SPACE:
        grid[i, j] = '0'
    elif k == BLOCK:
        grid[i, j] = '-'
    else:
        grid[i, j] = 'x'

print(grid)

[['0' '-' '0' '0' '0' '0' '0' '0']
 ['0' '-' '-' '0' '-' '-' '-' '-']
 ['0' '-' '0' '0' '0' '0' '-' '0']
 ['0' '0' '0' '-' '-' '0' '-' '0']
 ['0' '-' '0' '-' '0' '0' '0' '0']
 ['-' '-' '0' '-' '0' '-' '0' '-']
 ['0' '-' '-' '-' '0' '-' '0' '0']
 ['0' '0' '0' '0' '0' '-' '-' 'x']]


In [4]:
maze = GridMaze1(grid=maze_grid)

In [5]:
print("Transition Map:")
print("--" * 10)
print(maze)

Transition Map:
--------------------
From State Position(x=0, y=0):
  With Action D:
    To [State Position(x=1, y=0) and Reward -1.000] with Probability 1.000
From State Position(x=0, y=2):
  With Action R:
    To [State Position(x=0, y=3) and Reward -1.000] with Probability 1.000
From State Position(x=0, y=3):
  With Action D:
    To [State Position(x=1, y=3) and Reward -1.000] with Probability 1.000
  With Action R:
    To [State Position(x=0, y=4) and Reward -1.000] with Probability 1.000
  With Action L:
    To [State Position(x=0, y=2) and Reward -1.000] with Probability 1.000
From State Position(x=0, y=4):
  With Action R:
    To [State Position(x=0, y=5) and Reward -1.000] with Probability 1.000
  With Action L:
    To [State Position(x=0, y=3) and Reward -1.000] with Probability 1.000
From State Position(x=0, y=5):
  With Action R:
    To [State Position(x=0, y=6) and Reward -1.000] with Probability 1.000
  With Action L:
    To [State Position(x=0, y=4) and Reward -1.000] wit

In [6]:
# frog_mdp : FiniteMarkovDecisionProcess[Position, str] = FrogEscape(length = length)
TOLERANCE = 1.e-6
iter = 0
old_vf : Dict[Position, float] = {s: 0.0 for s in maze.non_terminal_states}
vf_generator = value_iteration(mdp = maze, gamma = 1.0)
new_vf = next(vf_generator)
for new_vf in vf_generator:
    if done(old_vf, new_vf, TOLERANCE):
        break
    old_vf = new_vf
    iter += 1

print(f"Finished value iteration algorithm in {iter} iterations")

Finished value iteration algorithm in 15 iterations


In [7]:
print(old_vf)

{NonTerminal(state=Position(x=0, y=0)): -15.0, NonTerminal(state=Position(x=0, y=2)): -11.0, NonTerminal(state=Position(x=0, y=3)): -10.0, NonTerminal(state=Position(x=0, y=4)): -11.0, NonTerminal(state=Position(x=0, y=5)): -12.0, NonTerminal(state=Position(x=0, y=6)): -13.0, NonTerminal(state=Position(x=0, y=7)): -14.0, NonTerminal(state=Position(x=1, y=0)): -14.0, NonTerminal(state=Position(x=1, y=3)): -9.0, NonTerminal(state=Position(x=2, y=0)): -13.0, NonTerminal(state=Position(x=2, y=2)): -9.0, NonTerminal(state=Position(x=2, y=3)): -8.0, NonTerminal(state=Position(x=2, y=4)): -7.0, NonTerminal(state=Position(x=2, y=5)): -6.0, NonTerminal(state=Position(x=2, y=7)): -6.0, NonTerminal(state=Position(x=3, y=0)): -12.0, NonTerminal(state=Position(x=3, y=1)): -11.0, NonTerminal(state=Position(x=3, y=2)): -10.0, NonTerminal(state=Position(x=3, y=5)): -5.0, NonTerminal(state=Position(x=3, y=7)): -5.0, NonTerminal(state=Position(x=4, y=0)): -13.0, NonTerminal(state=Position(x=4, y=2)): -1

# Using the second Reward Function:

In [8]:
SPACE = 'SPACE'
GOAL = 'GOAL'
BLOCK = 'BLOCK'

DOWN = 'D'
UP = 'U'
LEFT = 'L'
RIGHT = 'R'


@dataclass(frozen=True)
class Position:
    x : int
    y : int

ActionMapping = Mapping[Position, Mapping[str, Categorical[Tuple[Position, float]]]]

class GridMaze2(FiniteMarkovDecisionProcess[Position, str]):
    '''
    here, the rewards formulation is that every step gets a reward of 0
    the terminal state has a reward of 1
    '''
    def __init__(self, grid : Dict[Tuple[int, int], str]) -> None:
        self.grid = grid
        super().__init__(self.GetActionMapping())

    def GetActionMapping(self) -> ActionMapping:
        d : Dict[Position, Dict[str, Categorical[Tuple[Position, float]]]] = {}

        for pos, kind in self.grid.items():
            sub_d : Dict[str, Categorical[Tuple[Position, float]]] = {}
            if kind == SPACE:
                state = Position(x = pos[0], y = pos[1])

                down = self.grid.get((pos[0] + 1, pos[1]), BLOCK)
                up = self.grid.get((pos[0] - 1, pos[1]), BLOCK)
                right = self.grid.get((pos[0], pos[1] + 1), BLOCK)
                left = self.grid.get((pos[0], pos[1] - 1), BLOCK)

                num_options = (left == SPACE) + (right == SPACE) + (up == SPACE) + (down == SPACE) + (left == GOAL) + (right == GOAL) + (up == GOAL) + (down == GOAL)
                p = float(1 / num_options)

                if down == SPACE:
                    sub_d[DOWN] = Categorical({(Position(x=pos[0] + 1, y=pos[1]), 0.0) : p})

                elif down == GOAL:
                    sub_d[DOWN] = Categorical({(Position(x=pos[0] + 1, y=pos[1]), 1.0) : p})
                
                if up == SPACE:
                    sub_d[UP] = Categorical({(Position(x=pos[0] - 1, y=pos[1]), 0.0): p})

                elif up == GOAL:
                    sub_d[UP] = Categorical({(Position(x=pos[0] - 1, y=pos[1]), 1.0): p})
                
                if right == SPACE:
                    sub_d[RIGHT] = Categorical({(Position(x=pos[0], y=pos[1] + 1), 0.0): p})

                elif right == GOAL:
                    sub_d[RIGHT] = Categorical({(Position(x=pos[0], y=pos[1] + 1), 1.0): p})
                
                if left == SPACE:
                    sub_d[LEFT] = Categorical({(Position(x=pos[0], y=pos[1] - 1), 0.0): p})
                elif left == GOAL:
                    sub_d[LEFT] = Categorical({(Position(x=pos[0], y=pos[1] - 1), 1.0): p})


                d[state] = sub_d

        return d

def done(v1 : Dict[Position, float], v2 : Dict[Position, float], tol : float):
	'''
	this function takes in two dictionaries, converts them to numpy
	arrays and then returns True if the maximum absolute value across
	one array and the other is smaller than the specified tolerance

	parameters:
	v1: a dictionary with states as the keys and the elements
		in the value function as the values
	v2: a dictionary with states as the keys and the elements
		in the value function as the values
	tol: the specified tolerance

	returns:
	--------
	True if the maximum difference is less than TOLERANCE
	False otherwise
	'''
	array1 = np.array([i for i in v1.values()])
	array2 = np.array([i for i in v2.values()])

	return np.linalg.norm(array1 - array2, ord = np.inf) < tol

In [11]:
maze = GridMaze2(grid=maze_grid)

TOLERANCE = 1.e-6
iter = 0

old_vf : Dict[Position, float] = {s: 0.0 for s in maze.non_terminal_states}
vf_generator = value_iteration(mdp = maze, gamma = 0.9)
new_vf = next(vf_generator)

for new_vf in vf_generator:
    if done(old_vf, new_vf, TOLERANCE):
        break
    old_vf = new_vf
    iter += 1

print(f"Finished value iteration algorithm in {iter} iterations")

Finished value iteration algorithm in 16 iterations


In [12]:
print(old_vf)

{NonTerminal(state=Position(x=0, y=0)): 0.2058911320946491, NonTerminal(state=Position(x=0, y=2)): 0.31381059609000017, NonTerminal(state=Position(x=0, y=3)): 0.34867844010000015, NonTerminal(state=Position(x=0, y=4)): 0.31381059609000017, NonTerminal(state=Position(x=0, y=5)): 0.28242953648100017, NonTerminal(state=Position(x=0, y=6)): 0.25418658283290013, NonTerminal(state=Position(x=0, y=7)): 0.22876792454961012, NonTerminal(state=Position(x=1, y=0)): 0.22876792454961012, NonTerminal(state=Position(x=1, y=3)): 0.38742048900000015, NonTerminal(state=Position(x=2, y=0)): 0.25418658283290013, NonTerminal(state=Position(x=2, y=2)): 0.38742048900000015, NonTerminal(state=Position(x=2, y=3)): 0.43046721000000016, NonTerminal(state=Position(x=2, y=4)): 0.47829690000000014, NonTerminal(state=Position(x=2, y=5)): 0.5314410000000002, NonTerminal(state=Position(x=2, y=7)): 0.5314410000000002, NonTerminal(state=Position(x=3, y=0)): 0.28242953648100017, NonTerminal(state=Position(x=3, y=1)): 0.3