In [11]:
import gym
import sys
from contextlib import closing

import time
from IPython.display import clear_output

import numpy as np
from io import StringIO

from gym import utils
from gym.envs.toy_text import discrete


LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

MAPS = {
    "4x4": ["SFFF", "FHFH", "FFFH", "HFFG"],
    "8x8": [
        "NFFFFFFF",
        "FFFFFFFF",
        "FFFHFFFF",
        "FFFFFHFF",
        "FFFHFFFF",
        "FHHFFFHF",
        "FHFFHFHF",
        "FFFHFFFG",
    ],
}


def generate_random_map(size=8, p=0.8):
    """Generates a random valid map (one that has a path from start to goal)
    :param size: size of each side of the grid
    :param p: probability that a tile is frozen
    """
    valid = False

    # DFS to check that it's a valid path.
    def is_valid(res):
        frontier, discovered = [], set()
        frontier.append((0, 0))
        while frontier:
            r, c = frontier.pop()
            if not (r, c) in discovered:
                discovered.add((r, c))
                directions = [(1, 0), (0, 1), (-1, 0), (0, -1)]
                for x, y in directions:
                    r_new = r + x
                    c_new = c + y
                    if r_new < 0 or r_new >= size or c_new < 0 or c_new >= size:
                        continue
                    if res[r_new][c_new] == "G":
                        return True
                    if res[r_new][c_new] != "H":
                        frontier.append((r_new, c_new))
        return False

    while not valid:
        p = min(1, p)
        res = np.random.choice(["F", "H"], (size, size), p=[p, 1 - p])
        res[0][0] = "S"
        res[-1][-1] = "G"
        valid = is_valid(res)
    return ["".join(x) for x in res]
    
    

class customblemeshEnv(gym.Env):
     """
        SFFF
        FHFH
        FFFH
        HFFG
    S : starting point, safe
    F : frozen surface, safe
    H : hole, fall to your doom
    G : goal, where the frisbee is located
    The episode ends when you reach the goal or fall in a hole.
    You receive a reward of 1 if you reach the goal, and zero otherwise.
    """

metadata = {"render.modes": ["human", "ansi"]}

def __init__(self, desc=None, map_name="4x4", is_slippery=True):
    if desc is None and map_name is None:
        desc = generate_random_map()
    elif desc is None:
        desc = MAPS[map_name]
    self.desc = desc = np.asarray(desc, dtype="c")
    self.nrow, self.ncol = nrow, ncol = desc.shape
    self.reward_range = (0, 1)

    nA = 4
    nS = nrow * ncol

    isd = np.array(desc == b"S").astype("float64").ravel()
    isd /= isd.sum()

    P = {s: {a: [] for a in range(nA)} for s in range(nS)}

    def to_s(row, col):
        return row * ncol + col

    def inc(row, col, a):
        if a == LEFT:
            col = max(col - 1, 0)
        elif a == DOWN:
            row = min(row + 1, nrow - 1)
        elif a == RIGHT:
            col = min(col + 1, ncol - 1)
        elif a == UP:
            row = max(row - 1, 0)
        return (row, col)

    def update_probability_matrix(row, col, action):
        newrow, newcol = inc(row, col, action)
        newstate = to_s(newrow, newcol)
        newletter = desc[newrow, newcol]
        done = bytes(newletter) in b"GH"
        reward = float(newletter == b"G")
        return newstate, reward, done

    for row in range(nrow):
        for col in range(ncol):
            s = to_s(row, col)
            for a in range(4):
                li = P[s][a]
                letter = desc[row, col]
                if letter in b"GH":
                    li.append((1.0, s, 0, True))
                else:
                    if is_slippery:
                        for b in [(a - 1) % 4, a, (a + 1) % 4]:
                            li.append(
                                (1.0 / 3.0, *update_probability_matrix(row, col, b))
                            )
                    else:
                        li.append((1.0, *update_probability_matrix(row, col, a)))

    super(FrozenLakeEnv, self).__init__(nS, nA, P, isd)

def render(self, mode="human"):
    outfile = StringIO() if mode == "ansi" else sys.stdout

    row, col = self.s // self.ncol, self.s % self.ncol
    desc = self.desc.tolist()
    desc = [[c.decode("utf-8") for c in line] for line in desc]
    desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
    if self.lastaction is not None:
        outfile.write(
            "  ({})\n".format(["Left", "Down", "Right", "Up"][self.lastaction])
        )
    else:
        outfile.write("\n")
    outfile.write("\n".join("".join(line) for line in desc) + "\n")

    if mode != "human":
        with closing(outfile):
            return outfile.getvalue()
                
                


In [10]:
# create this list to hold all of the rewards we'll get from each episode.
rewards_all_episodes = []

# Q-learning algorithm
for episode in range(num_episodes):
    # initialize new episode params
    state = env.reset()
    done = False
    rewards_current_episode = 0
    
    for step in range(max_steps_per_episode): 
        # Exploration-exploitation trade-off
        exploration_rate_threshold = random.uniform(0, 1)
        if exploration_rate_threshold > exploration_rate:
            action = np.argmax(q_table[state,:]) 
        else:
            action = env.action_space.sample()        

        new_state, reward, done, info = env.step(action)
    
        # Update Q-table for Q(s,a)
        q_table[state, action] = q_table[state, action] * (1 - learning_rate) + \
        learning_rate * (reward + discount_rate * np.max(q_table[new_state, :]))

        state = new_state
        rewards_current_episode += reward 
        
        if done == True:
            break
            
    # Exploration rate decay
    exploration_rate = min_exploration_rate + \
    (max_exploration_rate - min_exploration_rate) * np.exp(-exploration_decay_rate*episode)

    rewards_all_episodes.append(rewards_current_episode)
    
# Calculate and print the average reward per thousand episodes
rewards_per_thousand_episodes = np.split(np.array(rewards_all_episodes),num_episodes/1000)
count = 1000

print("********Average reward per thousand episodes********\n")
for r in rewards_per_thousand_episodes:
    print(count, ": ", str(sum(r/1000)))
    count += 1000

NameError: name 'num_episodes' is not defined

In [None]:
# Print updated Q-table
print("\n\n********Q-table********\n")
print(q_table)

In [9]:
# Watch our agent play Frozen Lake by playing the best action 
# from each state according to the Q-table

for episode in range(3):
    state = env.reset()
    done = False
    print("*****EPISODE ", episode+1, "*****\n\n\n\n")
    time.sleep(1)

    for step in range(max_steps_per_episode):        
        clear_output(wait=True)
        env.render()
        time.sleep(0.3)
        
        action = np.argmax(q_table[state,:])        
        new_state, reward, done, info = env.step(action)

        if done:
            clear_output(wait=True)
            env.render()
            if reward == 1:
                print("****You reached the goal!****")
                time.sleep(3)
            else:
                print("****You fell through a hole!****")
                time.sleep(3)
                clear_output(wait=True)

        # Set new state
        state = new_state
env.close()

NameError: name 'env' is not defined