In [5]:
from contextlib import closing
from io import StringIO
from os import path
from typing import List, Optional
import numpy as np
from gym import Env, logger,spaces, utils
from gym.envs.toy_text.utils import categorical_sample
from gym.error import DependencyNotInstalled

In [2]:
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

In [3]:
MAPS = {
    "4x4" :["SFFF","FHFH","FFFH","HFFG"],
    "8x8" :["SFFFFFFF",
            "FFFFFFFF",
            "FFFHFFFF",
            "FFFFFHFF",
            "FFFHFFFF",
            "FHHFFFHF",
            "FHFFHFHF",
            "FFFHFFFG",
    ],
}

In [4]:
# 말판에 움직임을 계속해서 시켜줌
# G일때 True 반환
# H일 때 frontier에 append(경계선)


def is_valid(board: List[List[str]], max_size:int) -> bool:
    frontier, discovered = [], set()
    frontier.append((0,0))
    while frontier:
        r,c = frontier.pop()
        if not (r,c) in discovered:
            discovered.add((r,c))
            directions = [(1,0), (0,1), (-1,0), (0,-1)]
            for x,y in directios:
                r_new = r + x
                c_new = c + y
                if r_new < 0 or r_new >= max_size or c_new < 0 or c_new >= max_size:
                    continue
                if board[r_new][c_new] == "G":
                    return True
                if board[r_new][c_new] == "H":
                    frontier.append((r_new, c_new))
                    
    return False

In [6]:
def generate_random_map(size: int = 8, p: float = 0.8) -> List[str]:
    
    valid = False
    board = []
    
    while not valid:
        p = min(1, p)
        board = np.random.choice(["F","H"],(size,size), p=[p, 1 -p])
        board[0][0] = "S"
        board[-1][-1] = "G"
        valid = is_valid(board,size)
    return ["".join(x) for x in board]

In [None]:
class FrozenLakeEnv(Env):
    
    def __init__(
        self,
        render_mode: Optional[str] = None,
        desc = None,
        map_name = "4x4",
        is_slippery = True
    ):
        if desc is None and map_name is None:
            desc = generate_random_map()
        elif desc is None:
            desc = MAPS[map_name]
        self.desc = desc = np.asarray(desc, dtype="c")
        self.nrow, self.ncol = nrow, ncol = desc.shape
        self.reward_range = (0,1)
        
        nA = 4
        nS = nrow * ncol
        
        self.initial_state_distrib = np.array(desc == b"S").astype("float64").ravel()
        self.initial_state_distrib /= self.initial_state_distrib.sum()
        
        self.P = {s: {a:[] for a in range(nA) for s in range(nS)}}
        
        def to_s(row,col):
            return row * ncol + col
        
        def inc(row,col,a):
            if a == LEFT:
                col = max(col, -1, 0)
            elif a == DOWN:
                row = min(row +1, nrow -1)
            elif a = RIGHT:
                col = min(col +1, ncol -1)
            elif a = UP:
                row = max(row, -1, 0)
            return (row,col)
        
        def update_probability_matrix(row, col, action):
            newrow, newcol = inc(row, col, action)
            newstate = to_s(newrow, newcol)
            newletter = desc[newrow, newcol]
            terminated = bytes(newletter) in b"GH"
            reward = float(newletter == b"G")
            return newstate, reward, terminated

            