# Maze (Level 1 Assignment)

We are faced with a problem of finding an '*escape route*' for an agent to take to get to the terminal state - basically another reinforcement learning problem. For such a problem, an agents' state is **its' position**, or so called **cell**, and based by on what cell is agent currently at, we are going to try to compute an **optimal action** for it to take, and finally *finish the game* or as we're going to call it - end up stepping on a **terminal cell**.

A problem we're going to tackle for Level 1 assignment is constructed in such a way that its' core is **deterministic environment** - *for every action, agent has a probability of 1 of ending up in a particular cell, and probability of 0 to end up on some other cell, based on what action it took.* Of course, taking an action is **deterministic** as well, meaning *it does not depend on agents' state* (on what cell is agent standing on). 

## For the start, needed utilities

Firstly, we define an interface for cells.

In [1]:
from abc import ABC, abstractmethod

class Position(ABC):
    """
    Interface class for kinds of positions (1D, 2D etc.)
    """
    @property
    @abstractmethod
    def value(self):
        pass

class Cell(ABC):
    """
    Interface class for all maze cells.
    """

    @property
    @abstractmethod
    def position(self) -> Position:
        pass

    @position.setter
    @abstractmethod
    def position(self, position: Position):
        pass

    @property
    @abstractmethod
    def reward(self) -> float:
        pass

    @property
    @abstractmethod
    def color(self) -> tuple[int, int, int]:
        pass

    @property
    def is_steppable(self) -> bool:
        return True

    @property
    def is_terminal(self) -> bool:
        return False

    @property
    def has_value(self) -> bool:
        return True

Every cell has a few properties - its' position (read below about it), is it steppable and if it is, reward it returns to an agent for stepping on it, as well as if its' terminal.

As you might've noticed, we've defined a ***Position*** class, which will represent how positions are interpreted based on what type of maze are we using (it could be board, graph etc., and all of those types require different interpretation of position.)

Now's the time for descendant classes of ***Cell*** class. Firstly, we're defining a ***RegularCell*** class, which represents a steppable, non-terminal cell - just an ordinary cell.

In [2]:
class RegularCell(Cell):
    """
    A regular cell class.

    A non-terminal, steppable cell.
    """

    @property
    def position(self) -> Position:
        return self.__position

    @position.setter
    def position(self, position: Position):
        self.__position = position

    @property
    def reward(self) -> float:
        return self.__reward

    @property
    def color(self) -> tuple[int, int, int]:
        return (255, 255, 255) if self.reward == -1 else (255, 0, 0)

    def __init__(self, reward: float):
        self.__position: Position = None
        self.__reward: float = reward

***TerminalCell*** class - *where it all ends*. Agent finishes the *maze* by stepping onto this type of cell.

In [3]:
class TerminalCell(Cell):
    """
    A terminal cell class.

    When an agent steps onto it, a game finishes.
    """

    @property
    def position(self) -> Position:
        return self.__position

    @position.setter
    def position(self, position: Position):
        self.__position = position

    @property
    def reward(self) -> float:
        return self.__reward

    @property
    def color(self) -> tuple[int, int, int]:
        return 0, 0, 255

    @property
    def is_terminal(self) -> bool:
        return True

    def __init__(self, reward: float):
        self.__position: Position = None
        self.__reward: float = reward

***TeleportCell*** class - special type of cell used for teleporting an agent onto some other steppable cell, ***but not on other teleport cell***. This class is implemented in such a way that inherits all properties of cell it points to.

In [4]:
class TeleportCell(Cell):
    """
    A teleport cell class.

    When stepped onto it, agent can teleport only on
    regular or terminal cells, but not on other
    teleports nor wall cells.
    """

    @property
    def position(self) -> Position:
        return self.__position

    @position.setter
    def position(self, position: Position):
        self.__position = position

    @property
    def reward(self) -> float:
        return self.__to_teleport_to.reward

    @property
    def color(self) -> tuple[int, int, int]:
        return 0, 255, 0

    @property
    def to_teleport_to(self) -> Cell:
        return self.__to_teleport_to

    @to_teleport_to.setter
    def to_teleport_to(self, to_teleport_to: Cell):
        self.__to_teleport_to = to_teleport_to

    @property
    def is_steppable(self) -> bool:
        return self.to_teleport_to.is_steppable

    @property
    def is_terminal(self) -> bool:
        return self.to_teleport_to.is_terminal

    @property
    def has_value(self) -> bool:
        return self.to_teleport_to.has_value

    def __init__(self):
        self.__position: Position = None
        self.__to_teleport_to: Cell = None

***WallCell*** class - represents a wall. It is, of course, not steppable.

In [5]:
class WallCell(Cell):
    """
    A wall cell class.

    A non steppable, wall cell.
    """

    @property
    def position(self) -> Position:
        return self.__position

    @position.setter
    def position(self, position: Position):
        self.__position = position

    @property
    def reward(self) -> float:
        return self.__reward

    @property
    def color(self) -> tuple[int, int, int]:
        return 128, 128, 128

    @property
    def is_steppable(self) -> bool:
        return False

    @property
    def has_value(self) -> bool:
        return False

    def __init__(self):
        self.__position: Position = None
        self.__reward: float = 0

Lastly, we're defining a CellGenerator, which will be used to generate cells.

In [6]:
from typing import Callable

CellGenerator = Callable[[], Cell]

Now that we've defined all types of cells (from now on, we're going to call them **states**), it is time to define **actions**. But, for the purpose of *Level 2* and *Level 3 Assignments*, we're firstly going to define something called **directions**, which basically represent a direction for an agent to follow. By defining directions, we're distincting actions from agent movements' directions. For example, if the ***action*** is defined as '*Go RIGHT*', ***direction*** is '*RIGHT*'.

Practical usage of directions will be shown in *Level 2 Assignment*.

In [7]:
from enum import Enum, auto

class Direction(Enum):
    RIGHT = auto()
    LEFT = auto()
    UP = auto()
    DOWN = auto()

    @staticmethod
    def get_all_directions():
        return [Direction.RIGHT, Direction.LEFT, Direction.UP, Direction.DOWN]


class Action(Enum):
    ACTION_A1 = auto()  # On Level 1 assignment, this would collapse to direction RIGHT
    ACTION_A2 = auto()  # On Level 1 assignment, this would collapse to direction LEFT
    ACTION_A3 = auto()  # On Level 1 assignment, this would collapse to direction UP
    ACTION_A4 = auto()  # On Level 1 assignment, this would collapse to direction DOWN

    @staticmethod
    def get_all_actions():
        return [Action.ACTION_A1, Action.ACTION_A2, Action.ACTION_A4, Action.ACTION_A3]

## Environment's 'Base'

It is time for defining so called '**Base**'. Basically, a ***base*** can be interpreted as a representational structure of a particular environment, be it a **board**, a **graph** or some other structure. It is tightly coupled with *actions* and especially *directions*, because it gives a meaningful sense to the latter.

Like we've said, bases can be of several types - perfect opportunity for making an interface and grouping types of bases together.

***Important disclaimer*** - we're going to interchangeably use the terms ***state***, ***node***, ***position*** and ***cell***, because, even though they're used in several different contexts, basically mean the same thing. Don't get confused.

In [8]:
from typing import Dict

class MazeBase(ABC):
    """
    Base class for maze.
    """

    @property
    @abstractmethod
    def nodes(self) -> Dict[Position, Cell]:
        pass

    @property
    @abstractmethod
    def connections(self) -> Dict[Position, Dict[Direction, Position]]:
        pass

    @abstractmethod
    def __getitem__(self, **kwargs) -> Cell:
        pass

    @abstractmethod
    def get_directions(self, s: Position) -> list[Direction]:
        pass

Here we can observe several properties:
- '***Nodes***' *property* describes a group of ***nodes***, which represent a single position, a single cell, a single ***state*** in base structure.
- '***Connections***' *property* describes what nodes are connected and how they're connected.

Since for the purpose of *Level 1 Assignment*, we're considering only a 'board' type of base, so we're going to define a representation of position on a board (virtually, it is a $n \times m$ matrix, so each position can be represented by corresponding indicies $(i, j)$ of corresponding matrix), as well as the board itself. 

In [9]:
class BoardPosition(Position):
    """
    Inherited from Position class - models a board square.
    """

    @property
    def value(self) -> tuple[int, int]:
        return self.__value

    def __init__(self, value: tuple[int, int]):
        self.__value = value

    def __eq__(self, other):
        if isinstance(other, tuple):
            return self.__value == other
        elif isinstance(other, BoardPosition):
            return self.__value == other.__value

        raise Exception(
            f"Cannot use '==' in context of {other.__class__.__name__}"
        )

    def __hash__(self):
        return hash(self.__value)

    def __getitem__(self, key: int):
        if key in [0, 1]:
            return self.__value[key]

        raise Exception(
            f"No position {key} in this base!"
        )

In [10]:
from typing import Iterable

class MazeBoard(MazeBase):
    """
    Inherited from MazeBase class - models a board.
    """

    @property
    def nodes(self) -> Dict[Position, Cell]:
        return self.__nodes

    @property
    def connections(self) -> Dict[Position, Dict[Direction, Position]]:
        return self.__connections

    @property
    def rows_no(self) -> int:
        return self.__rows_no

    @property
    def cols_no(self) -> int:
        return self.__cols_no

    def __init__(self, size: tuple[int, int], specs: list[tuple[float, CellGenerator]]):
        width, height = size
        weights = [w for w, _ in specs]
        generators = [g for _, g in specs]

        def random_cell():
            return choices(generators, weights, k=1)[0]()

        cells = [[random_cell() for _ in range(width)] for _ in range(height)]

        self.__rows_no, self.__cols_no, cells = MazeBoard.__validate_cells(cells)

        self.__nodes: Dict[Position, Cell] = \
            {
                BoardPosition((i, j)): cells[i][j]
                for i in range(self.__rows_no)
                for j in range(self.cols_no)
            }

        self.__connections: Dict[Position, Dict[Direction, Position]] = \
            {
                node: {}
                for node in self.__nodes
            }

        self.__set_maze()

    def __getitem__(self, key: tuple[int, int]) -> Cell:
        for node in self.__nodes:
            if node == key:
                return self.__nodes[node]

        raise Exception(
            f"No item {key} in this base!"
        )

    @staticmethod
    def __validate_cells(cells: Iterable[Iterable[Cell]]) -> tuple[int, int, list[list[Cell]]]:
        """
        Utility function used to validate the given double-iterable of cells.

        If checks are successful, it will return number of board rows and
        columns, as well as cells themselves.
        """
        cells = [list(row) for row in cells] if cells else []

        if not cells:
            raise Exception("Number of rows in a board must be at least 1.")
        if not cells[0]:
            raise Exception("There has to be at least one column.")

        rows_no = len(cells)
        cols_no = len(cells[0])

        for row in cells:
            if not row or len(row) != cols_no:
                raise Exception(
                    "Each row in a board must have the same number of columns."
                )

        return rows_no, cols_no, cells

    def __right(self, position: tuple[int, int]):
        row, col = position
        if col != self.__cols_no - 1:
            if self[row, col + 1].is_steppable:
                return row, col + 1
        return row, col

    def __left(self, position: tuple[int, int]):
        row, col = position
        if col != 0:
            if self[row, col - 1].is_steppable:
                return row, col - 1
        return row, col

    def __up(self, position: tuple[int, int]):
        row, col = position
        if row != 0:
            if self[row - 1, col].is_steppable:
                return row - 1, col
        return row, col

    def __down(self, position: tuple[int, int]):
        row, col = position
        if row != self.__rows_no - 1:
            if self[row + 1, col].is_steppable:
                return row + 1, col
        return row, col

    def __set_teleport(self):
        """
        Private method for configuring teleport cells - to what
        cells will agent teleport when stepped onto teleport cell.
        """

        for node in self.__nodes:
            cell = self.__nodes[node]
            if isinstance(cell, TeleportCell):
                while True:
                    tp = choice(list(self.__nodes.keys()))
                    tn = self.__nodes[tp]
                    if not isinstance(tn, WallCell) and not isinstance(tn, TeleportCell):
                        cell.position = tp
                        cell.to_teleport_to = tn
                        break

    def __set_maze(self):
        """
        Private method for creating board -
        making board squares (here named nodes).
        """

        self.__set_teleport()

        for node in self.__nodes:
            self.__nodes[node].position = node
            directions = Direction.get_all_directions()

            for direction in directions:
                if direction == Direction.RIGHT:
                    di, dj = self.__right(node.value)
                elif direction == Direction.LEFT:
                    di, dj = self.__left(node.value)
                elif direction == Direction.UP:
                    di, dj = self.__up(node.value)
                elif direction == Direction.DOWN:
                    di, dj = self.__down(node.value)
                else:
                    raise Exception(
                        f"Board doesn't support {direction.name}!"
                    )
                for dnode in self.__nodes:
                    if dnode == (di, dj):
                        self.__connections[node][direction] = dnode

    def compute_direction(self, node: Position, direction: Direction) -> Position:
        """
        Returns a node that is in direction from a given node. If that is a wall cell,
        it returns given node.
        """
        return self.__connections[node][direction]

    def compute_position(self, position: tuple[int, int]) -> Position:
        for node in self.__nodes:
            if node == position:
                return node

        raise Exception(
            f"No position {position} in this base!"
        )

    def get_directions(self, node: Position) -> list[Direction]:
        return list(self.__connections[node].keys())

A lot to consider, so we're going to break it all into pieces.
- A few added *properties*, such as `rows_no` (number of rows) and `cols_no` (number of columns) a board has.
- A board initializer method `__init__` is an initializer for all of the nodes and connections. As we can see, `nodes` is of type `Dict[Position, Cell]`, meaning that it represents a mapping from positions to cells (a *conversion* mechanism). `connections` *property* gives us the overview of internal connections between nodes, therefore it maps a certain position (node) to another position (node) using directions and is the type of `Dict[Position, Dict[Direction, Position]]`.
- A few private '*directional*' methods called `__right`, `__left`, `__up` and `__down`, used for computing the corresponding directions. They return the indicies of a cell that is to right, left, up or down of a given cell, and as such only used when constructing a maze.
- `__set_teleport` private method is used for randomly selecting one cell from the set of all steppable cell and assigning it to the teleport cell, for all of teleport cells.
- `__set_maze` private method 'sets' a maze base, meaning that it will populate `connections` *property* and determine what node is connected to another node in what direction.
- `compute_direction` will compute which node is reached after taking given direction from a given node.
- `compute_position` is used for converting from tuple of two integers to `Position` class.
- `get_directions` will return what directions are possible to follow from the given node. This is particularly importart when dealing with graph bases.

## The environment itself

The time has come to make an ***environment*** itself. We will model the environment as *Markov decision process*, meaning that for a given state and action, it returns the reward signal and the next agent's state. Of course, it is possible to calculate $Q$ values, as well as determining $V$ values from the former.

In [11]:
class MazeEnvironment:
    """
    Wrapper for a maze board that behaves like an MDP environment.

    This is a callable object that behaves like a stochastic MDP
    state transition function - given the current state and action,
    it returns the following state and reward.

    In addition, the environment object is capable of enumerating all
    possible states and all possible actions, as well as determining
    if the state is terminal.
    """

    @property
    def base(self) -> MazeBase:
        return self.__base

    @property
    def states(self) -> list[Position]:
        return self.__states

    @property
    def q_values(self) -> Dict[tuple[Position, Action], float]:
        return self.__q_values

    @property
    def v_values(self) -> Dict[Position, float]:
        return self.__v_values

    @property
    def probabilities(self) -> Dict[tuple[Position, Action], Dict[Direction, float]]:
        return self.__probabilities

    @property
    def gamma(self) -> float:
        return self.__gamma

    def __init__(self, base: MazeBase, gamma: float = 1):
        """
        Initializer for the environment by specifying the underlying
        maze base.
        :param base: A base for maze, i.e. *Graph* or *Board*.
        :param gamma: Discount factor.
        """
        self.__base = base
        self.__states: list[Position] = \
            [
                node
                for node in self.base.nodes
                if self.base[node].is_steppable and not isinstance(self.__base[node], TeleportCell)
            ]

        # Setting probabilities -
        self.__probabilities: Dict[tuple[Position, Action], Dict[Direction, float]] = {}
        self.__set_probabilities()

        self.__q_values: Dict[tuple[Position, Action], float] = \
            {
                (s, a): -10 * random() if not self.is_terminal(s) else 0
                for s in self.__states
                for a in self.get_actions()
            }

        self.__v_values: Dict[Position, float] = \
            {
                s: self.determine_v(s) for s in self.__states
            }

        self.__gamma = gamma

    def __call__(self, state, action: Action):
        """
        Makes possible for environment class to act as a Markov Decision process -
        for a given state and action, it will return new states and rewards.
        """
        snext = []

        if not isinstance(state, Position):
            state = self.base.compute_position(state)

        for direction in self.base.get_directions(state):
            new_state = self.compute_direction(state, direction)
            new_cell = self.__base[new_state]

            if isinstance(new_cell, TeleportCell):
                new_state = new_cell.to_teleport_to.position
                new_cell = new_cell.to_teleport_to

            snext.append(
                {
                    "Direction": direction,
                    "New state": new_state,
                    "Reward": new_cell.reward,
                    "Probability": self.probabilities[(state, action)][direction],
                    "Is terminal": new_cell.is_terminal,
                }
            )

        return snext

    # def validate_position(self, row: int, col: int):
    #     """
    #     A utility function that validates a position.
    #     """
    #     if row < 0 or row >= self.base.rows_no:
    #         raise Exception("Invalid row position")
    #     if col < 0 or col >= self.base.cols_no:
    #         raise Exception("Invalid column position")
    #     if not self.base[row, col].is_steppable:
    #         raise Exception("Invalid position: unsteppable cell")

    def __map_da(direction: Direction) -> Action:
        if direction == Direction.RIGHT:
            return Action.ACTION_A1
        elif direction == Direction.LEFT:
            return Action.ACTION_A2
        elif direction == Direction.UP:
            return Action.ACTION_A3
        elif direction == Direction.DOWN:
            return Action.ACTION_A4
        else:
            raise Exception(
                f"No action to map to {direction}!"
            )

    def __set_probabilities(self):
        """
        Private method for initializing random probabilities.
        Iterating through all states and all possible directions,
        then generating probabilities based on number of directions.
        """
        for s in self.__states:
            for a in self.get_actions():
                self.__probabilities[(s, a)] = {}
                for direction in enumerate(self.base.get_directions(s)):
                        self.__probabilities[(s, a)][direction] = 1 if a == map_da(direction) else 0

    def __update_values(self):
        """
        Private method for updating Q and V values.
        """
        for s in self.states:
            if not self.is_terminal(s):
                for a in self.get_actions():
                    news = self(s, a)
                    # q(s, a) = sum(p(s^+, r | s, a)(r + gamma * q(s^+, a^+)))
                    self.__q_values[(s, a)] = sum(
                        [
                            new["Probability"] * (new["Reward"] + self.gamma * self.determine_v(new["New state"]))
                            for new in news
                        ]
                    )
                self.__v_values[s] = self.determine_v(s)

    def compute_direction(self, state: Position, direction: Direction) -> Position:
        """
        Follow a specific direction in this environment.
        If possible, it will use base for computing direction.
        """

        if direction not in self.get_directions():
            raise Exception(
                f"Agent cannot move in direction {direction.name} in this environment!"
            )

        return self.__base.compute_direction(state, direction)

    def compute_values(self, eps: float = 0.01, max_iter: int = 1000):
        """
        Method for converging Q and V values using Bellman's equations.
        """
        for k in range(max_iter):
            ov = deepcopy(self.q_values)
            self.__update_values()
            err = max([abs(self.__q_values[(s, a)] - ov[(s, a)])
                       for s, a in self.__q_values])
            if err < eps:
                return k

        return max_iter

    def determine_v(self, s: Position):
        """
        Method for determining V values using Q values.
        """
        q = []
        for a in self.get_actions():
            q.append(
                sum(
                    [
                        self.__probabilities[(s, a)][direction] * self.__q_values[(s, a)]
                        for direction in self.base.get_directions(s)
                    ]
                )
            )
        # v = max_a(q)
        return max(q)

    def get_actions(self):
        """
        Returns actions that are possible to take in this
        environment.
        """
        return Action.get_all_actions()

    def get_directions(self):
        """
        Returns directions that are possible to follow in this
        environment.
        """
        return Direction.get_all_directions()

    def is_terminal(self, state: Position):
        """
        Returns if the state is terminal.
        """
        return self.__base[state].is_terminal

A brief overview of all of the stuff contained in *MazeEnvironment* class:
- We can assure that *properties* are self explanatory, besides `properties`, which is explained below.
- A first interesting method to look at is `__set_properties` private method, which sets *the probabilities of ending up in some state $s^{+}$ after taking the action $a$ from a certain state $s$*. For the purposes of the *Level 1 Assignment*, we're only considering a ***deterministic MDP***. What is meant by that is, for example, if we take the *Go LEFT* action (which we labeled as *ACTION_A2* above), we are $100\%$ sure that we'll end up in the left cell - there's no way we end up in the cell right, up or down from where we currently are. A more general case is considered in *Level 2 Assignment*. The mapping between actions and directions is done by `__map_da` private method.
- `__call__` method is used for computing *next_state*, *reward*, *direction of next state*, *probability of ending up in that state when taking given action* and *if the next state is terminal*. Basically, we've defined a *MDP* by defining the `__call__` method.
- `compute_values` will, as the name says, compute $Q$ values by using the $Q$-*iteration* algorithm. It uses `__update_values` private method to calculate and update $Q$ values iteration by iteration, as well as `determine_v` private method, which will determine new $V$ value after new $Q$ values for state are calculated, for every state. It is important to note that we're calculating ***optimal*** $Q$ and $V$ values, as we can see that, by using `determine_v` method, we're choosing maximum $Q$ out of all $Q$ values when determining $V$, and then using determined $V$ to calculate other needed $Q$ values.
- `get_directions` and `get_actions` return a list of directions and actions possible to take and follow in this particular environment, respectfully.
- `is_terminal` method will return us whether the given cell is terminal or not.

## Defining policies and the agent