# Q-Learning With Frozen Lake

By Rumaisa Abdulhai

In [1]:
# Imports
import gym
import random
import numpy as np
import time
from gym.envs.registration import register
from IPython.display import clear_output

In [2]:
env_name = "FrozenLakeNoSlip-v0"
custom_map = ["SFFF", 
              "FHFH", 
              "FFFH", 
              "HFFG"]

try:
    register(
        id='FrozenLakeNoSlip-v0',
        entry_point='gym.envs.toy_text:FrozenLakeEnv',
        kwargs={'is_slippery':False},
        max_episode_steps=100,
        reward_threshold=0.78, # optimum = .8196
    )
except:
    pass

env = gym.make(env_name, desc = custom_map)
print("Observation space:", env.observation_space)
print("Action space:", env.action_space)
type(env.action_space)

Observation space: Discrete(16)
Action space: Discrete(4)


gym.spaces.discrete.Discrete

In [3]:
class Agent():
    def __init__(self, env):
        '''
        Constructor for the Agent object
        
        Parameters:
        -----------
        self: The Agent object
        env: The OpenAI environment
        '''
        self.is_discrete = \
            type(env.action_space) == gym.spaces.discrete.Discrete
        
        if self.is_discrete:
            self.action_size = env.action_space.n
            print("Action size:", self.action_size)
        else:
            self.action_low = env.action_space.low
            self.action_high = env.action_space.high
            self.action_shape = env.action_space.shape
            print("Action range:", self.action_low, self.action_high)
        
    def get_action(self, state):
        if self.is_discrete:
            action = random.choice(range(self.action_size))
        else:
            action = np.random.uniform(self.action_low,
                                       self.action_high,
                                       self.action_shape)
        return action

In [4]:
class QAgent(Agent):
    
    def __init__(self, env, discount_rate=0.97, learning_rate=0.01):
        '''
        Constructor for the QAgent object
        
        Parameters:
        -----------
        self: The QAgent object
        env: The OpenAI environment
        discount_rate: The discount rate
        learning_rate: The learning rate
        '''
        super().__init__(env)
        self.state_size = env.observation_space.n
        print("State size:", self.state_size)
        
        self.epsilon = 1.0
        self.discount_rate = discount_rate
        self.learning_rate = learning_rate
        self.q_table = 1e-4 * np.random.random([self.state_size, self.action_size])
        
    def get_action(self, state):
        '''
        Generates a random number between 0.0 
        and 1.0 inclusive. If the number is less 
        than epsilon, returns a random action, 
        else returns the action with the max q.
        
        Parameters:
        -----------
        self: The QAgent object
        state: The current state
        '''
        if random.random() < self.epsilon:
            return super().get_action(state)
        else:
            return np.argmax(self.q_table[state])
    
    def train(self, experience):
        '''
        Trains the agent
        
        Parameters:
        -----------
        self: The QAgent object
        experience: Info about a step
        '''
        state, action, next_state, reward, done = experience
        
        q_next = self.q_table[next_state]
        q_next = np.zeros([self.action_size]) if done else q_next
        q_target = reward + self.discount_rate * np.max(q_next)
        
        delta_q = q_target - self.q_table[state,action]
        self.q_table[state,action] += self.learning_rate * delta_q
        
        if done:
            self.epsilon *= 0.99

In [5]:
# Creates the agent
agent = QAgent(env)

Action size: 4
State size: 16


In [6]:
total_reward = 0
rounds = 5
num_eps = 100

def run(rounds, num_eps, total_reward):

    for i in range(1, rounds + 1):

        for ep in range(1, num_eps + 1):
            state = env.reset()
            done = False
            while not done:
                action = agent.get_action(state)
                next_state, reward, done, _ = env.step(action)
                agent.train((state,action,next_state,reward,done))
                state = next_state
                total_reward += reward

                print("Round", i)
                print("s:", state, "a:", action)
                print("Episode: {}, Total reward: {}, eps: {}".format(ep,total_reward,agent.eps))
                env.render()
                print(agent.q_table)
                time.sleep(0.05)
                clear_output(wait=True)

        total_reward = 0
        
# run(rounds, num_eps, total_reward)

In [7]:
import pygame, sys

SCREEN_WIDTH = 480
SCREEN_HEIGHT = 480

NUM_BLOCKS_WIDE = 4
NUM_BLOCKS_HIGH = 4
BLOCK_HEIGHT = round(SCREEN_HEIGHT/NUM_BLOCKS_HIGH)
BLOCK_WIDTH = round(SCREEN_WIDTH/NUM_BLOCKS_WIDE)

BLACK = (0, 0, 0)
START = (153, 184, 152)
GOAL = (232, 74, 95)
HOLE = (27, 38, 44)
FROZEN = (50, 130, 184)

TITLE = "Frozen Lake"

def state_to_pos(state):
    '''
    Converts a state to a 
    matrix position
    
    Parameters:
    -----------
    state (int): The current state
    '''
    count = 0
    matrix = []
    
    for x in range(len(custom_map)):
        row = []
        for y in range(len(custom_map[0])):
            row.append(count)
            count+=1
        matrix.append(row)
    
    for i in range(len(custom_map)):
        for j in range(len(custom_map[0])):
            if matrix[i][j] == state:
                pos = (i, j)
    
    return pos

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


In [8]:
def get_tile_color(tile):
    '''
    Gets the tile color
    
    Parameters:
    -----------
    tile (str): The tile
    '''
    if tile == "S":
        color = START
    elif tile == "G":
        color = GOAL
    elif tile == "H":
        color = HOLE
    elif tile == "F":
        color = FROZEN
    else:
        color = BLACK
    return color

def draw_map(surface, map_tiles):
    '''
    Draws the map
    '''
    for j, tile in enumerate(map_tiles):
        for i, tile_contents in enumerate(tile):
            # print("{},{}: {}".format(i, j, tile_contents))
            myrect = pygame.Rect(i * BLOCK_WIDTH, j * BLOCK_HEIGHT, BLOCK_WIDTH, BLOCK_HEIGHT)
            pygame.draw.rect(surface, get_tile_color(tile_contents), myrect)

def draw_grid(surface):
    '''
    Draws the grid
    '''
    for i in range(NUM_BLOCKS_WIDE):
        new_height = round(i * BLOCK_HEIGHT)
        new_width = round(i * BLOCK_WIDTH)
        pygame.draw.line(surface, BLACK, (0, new_height), (SCREEN_WIDTH, new_height), 2)
        pygame.draw.line(surface, BLACK, (new_width, 0), (new_width, SCREEN_HEIGHT), 2)

def game_loop(surface):
    
    total_reward = 0
    rounds = 5
    num_eps = 100

    for i in range(1, rounds + 1):

        for ep in range(1, num_eps + 1):
                    
            state = env.reset()
            done = False
            while not done:
                
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        pygame.quit()
                        sys.exit()
                    if event.type == pygame.KEYDOWN:
                        if event.key == pygame.K_ESCAPE:
                            pygame.quit()
                            sys.exit()
                action = agent.get_action(state)
                next_state, reward, done, _ = env.step(action)
                agent.train((state,action,next_state,reward,done))
                state = next_state
                total_reward += reward

                print("Round", i)
                print("s:", state, "a:", action)
                print("Episode: {}, Total reward: {}, eps: {}".format(ep,total_reward,agent.epsilon))
                env.render()
                print(agent.q_table)
                time.sleep(0.05)
                clear_output(wait=True)
                
                draw_map(surface, read_map(state_to_pos(state)))
                draw_grid(surface)
                pygame.display.update()

        total_reward = 0

def initialize_game():
    pygame.init()
    surface = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
    pygame.display.set_caption(TITLE)
    surface.fill(BLACK)
    return surface

def read_map(state_pos):
    '''
    Reads the map
    '''
    
    world_map = custom_map      
    world_map = []
    for row in custom_map:
        world_map.append(list(row))
        
    world_map[state_pos[0]][state_pos[1]] = "X"
    
    new_map = []
    for row in world_map:
        string = ""
        for ele in row:
            string += ele
        new_map.append(string)
                
    return new_map

def main():
    world_map = custom_map
    surface = initialize_game()
    game_loop(surface)

if __name__=="__main__":
    main()

Round 5
s: 15 a: 2
Episode: 100, Total reward: 99.0, eps: 0.006570483042414605
  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m
[[1.15846032e-04 9.59576022e-02 6.81227131e-05 6.28162664e-04]
 [9.92957575e-05 3.29015141e-05 8.71721865e-05 4.12594637e-05]
 [3.49169731e-05 2.85710796e-05 5.38673025e-05 7.76537166e-05]
 [5.04956376e-05 1.45550249e-05 6.24590927e-05 6.98008799e-05]
 [1.10818558e-03 2.01919119e-01 3.40809929e-05 3.67649703e-04]
 [8.64388570e-05 6.99179817e-05 1.36110625e-05 2.38237955e-05]
 [3.84167022e-05 2.38772788e-02 6.18024506e-05 6.57518568e-05]
 [4.56255933e-05 8.81120976e-05 2.52414063e-05 7.41331014e-05]
 [4.52601413e-03 4.20071787e-07 3.72786576e-01 6.79928737e-04]
 [1.89665597e-03 6.87416184e-04 5.93698241e-01 6.25323383e-05]
 [2.76328855e-03 8.11272879e-01 3.94198644e-05 4.00933073e-04]
 [6.68979573e-05 4.42978369e-05 4.66740003e-05 5.97541715e-05]
 [7.42957272e-05 6.82973995e-05 3.57044220e-05 2.18380871e-05]
 [2.15030844e-05 3.66884790e-05 4.82714517e-02 9.75039482e-06]
