In [1]:
import random, numpy, gym, time, statistics
from gym.envs.toy_text.frozen_lake import generate_random_map

# constants
LEFT, DOWN, RIGHT, UP = 0, 1, 2, 3
FLOOR, EXPLORED, PATH, HOLE, DEADEND = 0, 1, 2, 3, 4
dirs = {0: "LEFT", 1: "DOWN", 2: "RIGHT", 3: "UP"} # convenient direction dict for debug output

In [2]:
# this converts the 1D grid coordinate to a 2D coordinate
def get_2d_pos(pos: int, map_size: int) -> tuple:
    return int(pos % (map_size - 0)), int(pos / (map_size - 0))

# manhattan distance heuristic formula
def manhattan(a, b) -> int:
    return sum(abs(val1-val2) for val1, val2 in zip(a, b))

# this function updates the 2D grid coordinate based on the given action.
def update_pos_for_action(action: int, pos: tuple) -> tuple:
    if action == LEFT:
        return pos[0] - 1, pos[1]
    elif action == DOWN:
        return pos[0], pos[1] + 1
    elif action == RIGHT:
        return pos[0] + 1, pos[1]
    elif action == UP:
        return pos[0], pos[1] - 1
    
# returns the action for a given 2d position offset. used for replaying actions to support slippery=True
def get_action_for_pos_offset(pos: tuple) -> int:
    if pos[0] == -1 and pos[1] == 0:
        return LEFT
    elif pos[0] == 0 and pos[1] == 1:
        return DOWN
    elif pos[0] == 1 and pos[1] == 0:
        return RIGHT
    elif pos[0] == 0 and pos[1] == -1:
        return UP

# this function returns True if the given action for the passed position results in a valid board target position.
# if not, we are outside the game board.
def is_action_valid(action: int, pos: tuple, map_size: int) -> bool:
    if action == LEFT:
        return pos[0] >= 0
    elif action == DOWN:
        return pos[1] <= map_size - 1
    elif action == RIGHT:
        return pos[0] <= map_size - 1
    elif action == UP:
        return pos[1] >= 0

In [3]:
# here we determine the suitable action for a given position and goal.
# this function tries its best to cull non-optimal actions and selects a suitable one based on the lowest
# manhattan distance from the current position and the goal position.
def determine_action(env, pos: int, goal_pos: int, exploration: bool, map_size: int, grid: numpy.array, moves: list) -> int:
    pos_2d = get_2d_pos(pos, map_size) # convert 1D state index to 2D grid coordinates
    goal_pos_2d = get_2d_pos(goal_pos, map_size) # convert goal pos to 2D grid coordinates
    lowest_dist = 9999 # keep track of the lowest distance for an action.
    best_action = -1 # keep track of the best action found during iteration
    for action in range(env.action_space.n): # loop over all actions within the action space
        target_pos_2d = update_pos_for_action(action, pos_2d) # compute the resulting target 2D grid coord for this action

        # if an action results in a target position outside the grid, we can skip this as a valid action
        if not is_action_valid(action, target_pos_2d, map_size):
            # print(f"skipped: {dirs[action]}")
            continue

        # Culling Adjacent Holes and Deadends
        # if the target position has been identified as a deadend or hole during a previous episode,
        # remove it from the moves dict for this current pos_2d.
        grid_val = grid[target_pos_2d[1], target_pos_2d[0]]
        if grid_val == DEADEND or grid_val == HOLE:
            # only avoid moving to deadends or holes if we are currently standing on one
            if grid[pos_2d[1], pos_2d[0]] == FLOOR or grid[pos_2d[1], pos_2d[0]] == EXPLORED:
                # print(f"skipped {dirs[action]} was hole: {target_pos_2d}")
                if pos_2d in moves and target_pos_2d in moves[pos_2d]:
                    moves[pos_2d].remove(target_pos_2d) # remove valid move as it has been discovered as a hole
                continue

        # Deadend detection
        # if this action results in a position, where the only next valid from is our *current* position *and* it
        # is the only valid move for that target position then it has to be skipped and never visited in future to
        # avoid cyclic moves and optimise the simulation
        if target_pos_2d in moves and pos_2d in moves[target_pos_2d] and len(moves[target_pos_2d]) <= 1 and grid_val == EXPLORED and target_pos_2d != (0, 0):
            # print(f"skipped cyclic move {dirs[action]}: {target_pos_2d}")
            # treat this dead-end as a hole for an early escape
            grid[target_pos_2d[1], target_pos_2d[0]] = DEADEND
            continue

        # make sure the dict keys for this pos and target pos exist
        if not pos_2d in moves:
            moves[pos_2d] = set()

        # If a position has been explored (more than two valid moves) then upgrade this cell to allow culling
        if grid[pos_2d[1], pos_2d[0]] == FLOOR and len(moves[pos_2d]) >= 1:
            grid[pos_2d[1], pos_2d[0]] = EXPLORED

        # if the adjacent position for pos_2d has not ever been visited, visit it to discover its state
        # this is an optional extra that requires more episodes to reach the goal, but it also discovers
        # more of the game board doing so. This could potentially be used to "train" an agent
        # before navigating the game like usual. Otherwise pick based on best man dist.
        if target_pos_2d not in moves[pos_2d] and exploration:
            best_action = action
            moves[pos_2d].add(target_pos_2d)
            break # stop iterating as we want to explore this action

        # keep note of all valid moves for this position, even if its not the most optimal in terms of man dist
        moves[pos_2d].add(target_pos_2d)

        # calculate the manhattan distance, suitable for distances within 2D grids
        man_dist = manhattan(target_pos_2d, goal_pos_2d)

        # test if the dist is equal the best best distance thus far, and choose between them at random.
        # this tries to avoid the situation where the agent will always pick DOWN (since its first in order)
        # which could be a hole, instead of going RIGHT (which has the same dist)
        if man_dist == lowest_dist:
            best_action = random.choice([best_action, action])
        # record new lowest dist and the action for that distance
        elif man_dist < lowest_dist:
            lowest_dist = man_dist
            best_action = action

    # sanity check: if we reach this condition, the algorithm is flawed as FrozenLake is generated
    # so there is at least one valid path to the goal.
    if best_action == -1:
        print(f"Error: no valid actions!!!")

    # print(f"moves for {pos_2d}: {moves[pos_2d]}")
    return best_action

In [4]:
# the main simulation loop. this will iterate for x episodes and will infinitely loop until we either
#reach the goal or hit a hole. additionally it will generate new observations and update the grid
#array when the agent finds a hole.
def run_iterations(env, episodes: int, map_size: int, grid: numpy.array, moves: list, exploration=True) -> tuple:
    goal_pos = map_size * map_size - 1 # calculate the goal pos for a given map size
    for episode in range(episodes):
        positions = list()
        observation, _ = env.reset() # reset the environment for each new episode or run through the maze
        while True: # infinite loop to either find goal or hole
            prev_pos_2d = get_2d_pos(observation, map_size) # save a copy of the previous position so it can be used for debug output
            # generate a suitable action from our custom algorithm
            action = determine_action(env, observation, goal_pos, exploration, map_size, grid, moves)
            # if the action is -1, then our algorithm reached an illegal state and we can to return and bail from the loop
            if action == -1:
                return None, None
            # update the agent state based on the generated action
            observation, reward, term, trunc, info = env.step(action)
            # append action to action replay list
            positions.append(get_2d_pos(observation, map_size))
            # basic debug output to show the current episode, but rate limited as to not slow down simulations
            if episode % 50000 == 0:
                print(f"({episode}) prev: {prev_pos_2d}, action: {dirs[action]}, result: {get_2d_pos(observation, map_size)}")
            # the simulation has reached a termination state, and we have found the goal
            if term and reward == 1:
                # steps_list.append(step)
                print(f"Found: {observation} {get_2d_pos(observation, map_size)} in {episode} episodes")
                return episode, positions
            # the simulation has reached a termination state, and we fell into a hole
            elif term and reward == 0:
                # convert the new updated agent state position to a 2D grid coordinate
                pos_2d = get_2d_pos(observation, map_size)
                # update the grid array to record the presence of this hole for the next episodes
                grid[pos_2d[1], pos_2d[0]] = HOLE
                # print(f"Hole: {get_2d_pos(observation)}")
                break

# run a FrozenLake simulation with default parameters. slippery True as stated by the assessment
def simulate(game_map: list, map_size: int, grid: numpy.array, moves: list, render: any, slippery: bool, exploration: bool, episodes: int) -> tuple:
    env = gym.make('FrozenLake-v1', desc=game_map, is_slippery=slippery, render_mode=render)
    start = time.time()
    episode, positions = run_iterations(env, episodes, map_size, grid, moves, exploration=exploration)
    end = time.time()
    print(f"Elapsed: {end - start}s")  # print the elapsed seconds until the goal is found
    return episode, positions, end - start

In [5]:
# this function runs the frozen lake simulation while also storing all the metrics needed while the simulation runs all of its iterations and episodes
def run_metrics(game_map: list, map_size: int, iterations=1, render=None, slippery=True, exploration=False, pre_explore=False, episodes=1_000_000) -> list:
    # small sanity check to make sure there isn't a custom map passed here that does not match the map_size, as that would break many things
    if len(game_map) != map_size:
        print("Map dimensions and map size do not match!")
        exit(1)

    all_episodes = []
    all_positions = []
    all_seconds = []
    for i in range(iterations):
        print(f"Running Iteration: {i + 1}")
        if game_map is None:
            # generate a random map with the given map size and 30% chance of slipping
            game_map = generate_random_map(size=map_size, p=0.3)
        grid = numpy.zeros(shape=(map_size, map_size))  # grid to record the map layout discoveries for future episodes
        moves = {}  # moves dict for recording valid moves for a given position

        # if we want to pre explore, we want to do it without slippery being enabled and also force exploration of unknown tiles
        if pre_explore:
            print("Running pre exploration")
            simulate(game_map, map_size, grid, moves, render, False, True, episodes)
            print(grid)

        # run the frozen lake simulation
        print("Running simulation")
        eps, positions, secs = simulate(game_map, map_size, grid, moves, render, slippery, exploration, episodes)
        all_episodes.append(eps)
        all_positions.append(positions)
        all_seconds.append(secs)
    avg_episodes = statistics.mean(all_episodes)
    avg_seconds = statistics.fmean(all_seconds)
    print(f"Found the goal in an average of {avg_episodes} episodes and {avg_seconds} seconds.")
    return all_positions

In [6]:
# this function replays the list of positions returned by run_metrics. this is used to show the set of moves for the winning episode
def replay_actions(game_map: list, all_positions: list, map_size: int) -> None:
    env = gym.make('FrozenLake-v1', desc=game_map, is_slippery=False, render_mode="human")
    for positions in all_positions:
        print(positions)
        observation, _ = env.reset()  # reset the environment for each new episode or run through the maze
        for pos in positions:
            diff = tuple(x-y for x,y in zip(pos, get_2d_pos(observation, map_size)))
            action = get_action_for_pos_offset(diff)
            if action is None:
                continue
            observation, reward, term, trunc, info = env.step(action)
            if (term or trunc) and reward == 1:
                print("GOAL")
                return

In [7]:
random_map = generate_random_map(size=10, p=0.3)
# standard_map = ['SFFHHHFFHH', 'FHFFHFHHHH', 'FHHFFFFFFF', 'HFHHFFHHFF', 'FHHFHHHHHF', 'HHHHHFHFFF', 'FHHHHHFFFF', 'HFFHHFFFHF', 'HHFHHHHFFF', 'HHFHHHHHFG']
all_sim_positions = run_metrics(random_map, 10, iterations=1, pre_explore=False, slippery=True)
time.sleep(5) # just to help with recording
replay_actions(random_map, all_sim_positions, 10)

Running Iteration: 1
Running simulation
(0) prev: (0, 0), action: RIGHT, result: (1, 0)
(0) prev: (1, 0), action: RIGHT, result: (2, 0)
(0) prev: (2, 0), action: DOWN, result: (1, 0)
(0) prev: (1, 0), action: RIGHT, result: (1, 0)
(0) prev: (1, 0), action: DOWN, result: (2, 0)
(0) prev: (2, 0), action: RIGHT, result: (2, 0)
(0) prev: (2, 0), action: DOWN, result: (1, 0)
(0) prev: (1, 0), action: DOWN, result: (1, 1)


  if not isinstance(terminated, (bool, np.bool8)):


(50000) prev: (0, 0), action: RIGHT, result: (0, 0)
(50000) prev: (0, 0), action: RIGHT, result: (1, 0)
(50000) prev: (1, 0), action: RIGHT, result: (1, 0)
(50000) prev: (1, 0), action: RIGHT, result: (1, 0)
(50000) prev: (1, 0), action: RIGHT, result: (1, 1)
(100000) prev: (0, 0), action: RIGHT, result: (0, 0)
(100000) prev: (0, 0), action: RIGHT, result: (1, 0)
(100000) prev: (1, 0), action: RIGHT, result: (1, 1)
Found: 99 (9, 9) in 114794 episodes
Elapsed: 26.1750009059906s
Found the goal in an average of 114794 episodes and 26.1750009059906 seconds.
[(1, 0), (2, 0), (2, 1), (3, 1), (2, 1), (3, 1), (2, 1), (3, 1), (3, 2), (4, 2), (5, 2), (6, 2), (7, 2), (8, 2), (9, 2), (9, 3), (9, 4), (9, 4), (9, 5), (9, 6), (9, 7), (9, 8), (9, 9)]


  if not isinstance(terminated, (bool, np.bool8)):


GOAL
