In [19]:
from enum import Enum
from abc import ABCMeta, abstractmethod
import random
from typing import Tuple

import numpy as np

In [20]:
class Action(Enum):
    MOVE_RIGHT = 0
    MOVE_LEFT = 1
    MOVE_UP = 2
    MOVE_DOWN = 3

In [81]:
class Maze:

    class GridType(Enum):
        GOAL = 0
        LOAD = 1
        HOLE = 2
        WALL = 3

    def __init__(self, size: int):
        self._size = size
        self._HOLE_RATE = 0.4
        self._HOLE_NUM = int(self._HOLE_RATE*(size-2)**2)
        self.init()

    def init(self):
        self.init_maze(self._size, self._HOLE_NUM)
        self.init_player()
        return self._player_pos

    def init_maze(self, size: int, hole_num: int):
        self._maze = np.ones([size, size]) * Maze.GridType.LOAD.value
        for i in range(self._size):
            self._maze[0, i] = Maze.GridType.WALL.value
            self._maze[i, 0] = Maze.GridType.WALL.value
            self._maze[size-1, i] = Maze.GridType.WALL.value
            self._maze[i, size-1] = Maze.GridType.WALL.value
        random_hole_posses = np.random.randint(1, size-1, (hole_num, 2))
        for rx, ry in random_hole_posses:
            self._maze[ry, rx] = Maze.GridType.HOLE.value
        self._maze[size-2, size-2] = Maze.GridType.GOAL.value

    def init_player(self):
        self._player_pos = [1, 1]  # (y, x)
        self._move_cnt = 0
        return self._player_pos

    def dump_maze(self):
        maze = ''
        for y in range(self._size):
            for x in range(self._size):
                if self._player_pos[0] == y and self._player_pos[1] == x:
                    maze += 'Ｐ'
                    continue
                grid = self._maze[y, x]
                if grid == Maze.GridType.GOAL.value:
                    maze += 'Ｇ'
                elif grid == Maze.GridType.LOAD.value:
                    maze += '　'
                elif grid == Maze.GridType.HOLE.value:
                    maze += '○'
                elif grid == Maze.GridType.WALL.value:
                    maze += '■'
                else:
                    raise ValueError()
            maze += '\n'
        print(maze)

    def move_right(self):
        if self._maze[self._player_pos[0], self._player_pos[1]+1] == Maze.GridType.LOAD.value or \
            self._maze[self._player_pos[0], self._player_pos[1]+1] == Maze.GridType.GOAL.value:
            self._player_pos[1] += 1
        is_done = self._maze[self._player_pos[0], self._player_pos[1]] == Maze.GridType.GOAL.value
        self._move_cnt += 1
        return self._player_pos, is_done

    def move_left(self):
        if self._maze[self._player_pos[0], self._player_pos[1]-1] == Maze.GridType.LOAD.value or \
            self._maze[self._player_pos[0], self._player_pos[1]-1] == Maze.GridType.GOAL.value:
            self._player_pos[1] -= 1
        is_done = self._maze[self._player_pos[0], self._player_pos[1]] == Maze.GridType.GOAL.value
        self._move_cnt += 1
        return self._player_pos, is_done

    def move_down(self):
        if self._maze[self._player_pos[0]+1, self._player_pos[1]] == Maze.GridType.LOAD.value or \
            self._maze[self._player_pos[0]+1, self._player_pos[1]] == Maze.GridType.GOAL.value:
            self._player_pos[0] += 1
        is_done = self._maze[self._player_pos[0], self._player_pos[1]] == Maze.GridType.GOAL.value
        self._move_cnt += 1
        return self._player_pos, is_done

    def move_up(self):
        if self._maze[self._player_pos[0]-1, self._player_pos[1]] == Maze.GridType.LOAD.value or \
            self._maze[self._player_pos[0]-1, self._player_pos[1]] == Maze.GridType.GOAL.value:
            self._player_pos[0] -= 1
        is_done = self._maze[self._player_pos[0], self._player_pos[1]] == Maze.GridType.GOAL.value
        self._move_cnt += 1
        return self._player_pos, is_done

    @property
    def move_count(self):
        return self._move_cnt

In [82]:
maze = Maze(size=30)

In [83]:
maze.dump_maze()

■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
■Ｐ　　　　　　○　　　　　○　○　　○○　　○○　　　　■
■○　○　　　　　　　　○　　　　　　　　○　○　　　　　■
■　○○○　　　○　　○　　○　○○　　　○　　　　　　　■
■　　　○○　　　○　　　　○　　　○　　　　○　　○　　■
■　　　　　　　○　　　　　　○　○○　　　　　　　　　　■
■　　　○　　○　　○　○　○○○　○　　　　○○　　○○■
■　　○　○　　○○　　　　○　　○　　　　○○　　○○　■
■　　　　○○　　　　　○　○　○　　　　○○　　○○○　■
■　○○○　　○　○　　○　　○○　○　○　　　○　　　○■
■　○　○　○　　　○　　○○○○　○○　　○　　○　　　■
■　○○○○○　　○○　　○　　　○　　　　　　　　○○　■
■○　○○　○○　○　　　○　　　　　　　○　　○　○　○■
■　　　○　　　　　　　　　　　　○　　○　　　　　　　○■
■　○　　○　　○　○　　○　　○　○　　　　　　　　　○■
■　　　　　　○　　　○　○○　○　　○　　○　　○○○　■
■○○　　　　　　　　　　　○　　　　　○　○　○　○○　■
■○　　○○　○　○○　　○　　　　○　　　　　　○　　　■
■　　　　　　　　○　○　　　　　　　○　　　　　　　　　■
■　○○○　○　　　　　○　　　○○○　　○　○　　　○　■
■○○　　　　　○○　○　　　○○　○　○　　　　　　○　■
■　○○　○　○　　　　　　　○　○　○　　○　　　　○　■
■　○　○　　　○　　○○　○　　　○　　　○○○○　　○■
■　　　　　　　○　　　○　　　　○○　　○○　○○　　○■
■　　　　○　○　　○　　　　　　○○　　　○　○○○　　■
■　　　　　　○　　○○○　○　○○　　　　○　　　　　　■
■○　○　　○　　　　　○　　　　　○○　　○○○　　○　■
■○　○　　　　○　○　　　○○　　　　　　○○○○　○　■
■○　○　　　　　　○　　○○　　　　　○　○　　　　○Ｇ■
■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■



In [84]:
maze.move_right()

([1, 2], False)

In [85]:
maze.dump_maze()

■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
■　Ｐ　　　　　○　　　　　○　○　　○○　　○○　　　　■
■○　○　　　　　　　　○　　　　　　　　○　○　　　　　■
■　○○○　　　○　　○　　○　○○　　　○　　　　　　　■
■　　　○○　　　○　　　　○　　　○　　　　○　　○　　■
■　　　　　　　○　　　　　　○　○○　　　　　　　　　　■
■　　　○　　○　　○　○　○○○　○　　　　○○　　○○■
■　　○　○　　○○　　　　○　　○　　　　○○　　○○　■
■　　　　○○　　　　　○　○　○　　　　○○　　○○○　■
■　○○○　　○　○　　○　　○○　○　○　　　○　　　○■
■　○　○　○　　　○　　○○○○　○○　　○　　○　　　■
■　○○○○○　　○○　　○　　　○　　　　　　　　○○　■
■○　○○　○○　○　　　○　　　　　　　○　　○　○　○■
■　　　○　　　　　　　　　　　　○　　○　　　　　　　○■
■　○　　○　　○　○　　○　　○　○　　　　　　　　　○■
■　　　　　　○　　　○　○○　○　　○　　○　　○○○　■
■○○　　　　　　　　　　　○　　　　　○　○　○　○○　■
■○　　○○　○　○○　　○　　　　○　　　　　　○　　　■
■　　　　　　　　○　○　　　　　　　○　　　　　　　　　■
■　○○○　○　　　　　○　　　○○○　　○　○　　　○　■
■○○　　　　　○○　○　　　○○　○　○　　　　　　○　■
■　○○　○　○　　　　　　　○　○　○　　○　　　　○　■
■　○　○　　　○　　○○　○　　　○　　　○○○○　　○■
■　　　　　　　○　　　○　　　　○○　　○○　○○　　○■
■　　　　○　○　　○　　　　　　○○　　　○　○○○　　■
■　　　　　　○　　○○○　○　○○　　　　○　　　　　　■
■○　○　　○　　　　　○　　　　　○○　　○○○　　○　■
■○　○　　　　○　○　　　○○　　　　　　○○○○　○　■
■○　○　　　　　　○　　○○　　　　　○　○　　　　○Ｇ■
■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■



In [86]:
maze.move_down()

([2, 2], False)

In [87]:
maze.dump_maze()

■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
■　　　　　　　○　　　　　○　○　　○○　　○○　　　　■
■○Ｐ○　　　　　　　　○　　　　　　　　○　○　　　　　■
■　○○○　　　○　　○　　○　○○　　　○　　　　　　　■
■　　　○○　　　○　　　　○　　　○　　　　○　　○　　■
■　　　　　　　○　　　　　　○　○○　　　　　　　　　　■
■　　　○　　○　　○　○　○○○　○　　　　○○　　○○■
■　　○　○　　○○　　　　○　　○　　　　○○　　○○　■
■　　　　○○　　　　　○　○　○　　　　○○　　○○○　■
■　○○○　　○　○　　○　　○○　○　○　　　○　　　○■
■　○　○　○　　　○　　○○○○　○○　　○　　○　　　■
■　○○○○○　　○○　　○　　　○　　　　　　　　○○　■
■○　○○　○○　○　　　○　　　　　　　○　　○　○　○■
■　　　○　　　　　　　　　　　　○　　○　　　　　　　○■
■　○　　○　　○　○　　○　　○　○　　　　　　　　　○■
■　　　　　　○　　　○　○○　○　　○　　○　　○○○　■
■○○　　　　　　　　　　　○　　　　　○　○　○　○○　■
■○　　○○　○　○○　　○　　　　○　　　　　　○　　　■
■　　　　　　　　○　○　　　　　　　○　　　　　　　　　■
■　○○○　○　　　　　○　　　○○○　　○　○　　　○　■
■○○　　　　　○○　○　　　○○　○　○　　　　　　○　■
■　○○　○　○　　　　　　　○　○　○　　○　　　　○　■
■　○　○　　　○　　○○　○　　　○　　　○○○○　　○■
■　　　　　　　○　　　○　　　　○○　　○○　○○　　○■
■　　　　○　○　　○　　　　　　○○　　　○　○○○　　■
■　　　　　　○　　○○○　○　○○　　　　○　　　　　　■
■○　○　　○　　　　　○　　　　　○○　　○○○　　○　■
■○　○　　　　○　○　　　○○　　　　　　○○○○　○　■
■○　○　　　　　　○　　○○　　　　　○　○　　　　○Ｇ■
■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■



In [88]:
class Environment(metaclass=ABCMeta):

    @abstractmethod
    def reset(self):
        raise NotImplementedError

    @abstractmethod
    def step(self, action: Action):
        raise NotImplementedError

In [89]:
class MazeEnvironment(Environment):

    def __init__(self, maze_size: int, slip_rate: float = 0.1):
        self._maze_size = maze_size
        self._slip_rate = slip_rate
        self._maze = Maze(maze_size)
        self._REWARD_GAMMA = 0.95

    def reset(self):
        # player_pos = self._maze.init()
        player_pos = self._maze.init_player()
        init_state = self._pos2state(player_pos)
        return init_state

    def step(self, action: Action):
        if action == Action.MOVE_DOWN:
            action = np.random.choice(
                [Action.MOVE_DOWN, Action.MOVE_RIGHT, Action.MOVE_LEFT],
                p=[1-2*self._slip_rate, self._slip_rate, self._slip_rate]
            )
        elif action == Action.MOVE_UP:
            action = np.random.choice(
                [Action.MOVE_UP, Action.MOVE_RIGHT, Action.MOVE_LEFT],
                p=[1-2*self._slip_rate, self._slip_rate, self._slip_rate]
            )
        elif action == Action.MOVE_RIGHT:
            action = np.random.choice(
                [Action.MOVE_RIGHT, Action.MOVE_UP, Action.MOVE_DOWN],
                 p=[1-2*self._slip_rate, self._slip_rate, self._slip_rate]
            )
        elif action == Action.MOVE_LEFT:
            action = np.random.choice(
                [Action.MOVE_LEFT, Action.MOVE_UP, Action.MOVE_DOWN],
                p=[1-2*self._slip_rate, self._slip_rate, self._slip_rate]
            )
        else:
            raise NotImplementedError()

        if action == Action.MOVE_DOWN:
            next_pos, is_done = self._maze.move_down()
        elif action == Action.MOVE_UP:
            next_pos, is_done = self._maze.move_up()
        elif action == Action.MOVE_RIGHT:
            next_pos, is_done = self._maze.move_right()
        elif action == Action.MOVE_LEFT:
            next_pos, is_done = self._maze.move_left()
        else:
            raise NotImplementedError()
        
        next_state = self._pos2state(next_pos)
        reward = self._reward(is_done)

        return next_state, reward, is_done

    def _pos2state(self, pos: Tuple[int, int]) -> int:
        return (pos[0]-1)*(self._maze_size-2) + (pos[1]-1)

    def _state2pos(self, state: int):
        pos_y = state//(self._maze_size-2)+1
        pos_x = state%(self._maze_size-2)+1
        return (pos_y, pos_x)

    def _reward(self, is_done: bool):
        # return 1 if is_done else 0
        if not is_done:
            return 0
        # return self._REWARD_GAMMA ** self._maze.move_count
        return 1.0 / self._maze.move_count

    def dump_maze(self):
        self._maze.dump_maze()

In [92]:
me = MazeEnvironment(maze_size=7)

In [93]:
me.dump_maze()

■■■■■■■
■Ｐ　　○　■
■○　　　○■
■　　　　　■
■　○○○　■
■　　○○Ｇ■
■■■■■■■



In [94]:
me.reset()

0

In [112]:
ACTIONS = [Action.MOVE_DOWN, Action.MOVE_LEFT, Action.MOVE_LEFT, Action.MOVE_UP]

In [113]:
random.choice(ACTIONS)

<Action.MOVE_DOWN: 3>

In [97]:
init_state = me.reset()

In [98]:
me.dump_maze()

■■■■■■■
■Ｐ　　○　■
■○　　　○■
■　　　　　■
■　○○○　■
■　　○○Ｇ■
■■■■■■■



In [99]:
from fastprogress import progress_bar as pb

In [114]:
is_done = False
states_list = []
actions_list = []
rewards = []
i = 0
n_episodes = 5

for epi in pb(range(n_episodes)):
    state = me.reset()
    is_done = False
    states = []
    actions = []
    i = 0
    while not is_done:
        if i % 100 == 0:
            act_probs = np.random.rand(4)
            act_probs /= act_probs.sum()
        rand_action = np.random.choice(ACTIONS, p=act_probs)
        # rand_action = np.random.choice(actions)
        next_state, reward, is_done = me.step(rand_action)
        states.append(state)
        actions.append(rand_action)
        # rewards.append(reward)
        i += 1
        # me.dump_maze()
        state = next_state
    states_list.append(states)
    actions_list.append(actions)
    rewards.append(reward)

In [115]:
states[-10:]

[11, 12, 7, 12, 12, 7, 12, 13, 14, 19]

In [116]:
states[:4]

[0, 0, 0, 0]

In [103]:
max(states)

24

In [104]:
me._state2pos(max(states))

(5, 5)

In [117]:
rewards

[9.125752874612155e-05,
 1.6933080465998375e-05,
 0.00044169611307420494,
 3.769459836405443e-05,
 1.3929710679909178e-05]

In [118]:
[len(s) for s in states_list]

[10958, 59056, 2264, 26529, 71789]

In [119]:
1/2264

0.00044169611307420494

In [80]:
1/15899

6.289703754953141e-05

In [4]:
a1, a2, a3 = zip([0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10 ,11])

In [5]:
a1

(0, 3, 6, 9)

In [6]:
for a, b, c, d in zip([0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10 ,11]):
    pass