In [None]:
%pip install gymnasium pyvirtualdisplay > /dev/null 2>&1
%pip install pygame
%pip install -Uqq ipdb
%pip install gymnasium
%pip install opencv-python
import ipdb

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pygame
%pdb on

In [3]:
class FrozenLake(gym.Env):
  def __init__(self, grid_width, grid_height, grid_description):
      self.width = grid_width
      self.height = grid_height
      self.description = grid_description
      self.observation_space = spaces.Discrete(self.width * self.height)
      self.action_space = spaces.Discrete(4)  # Up, Down, Left, Right
      self.agent_location = [0, 0]  # Agent starts at top-left corner

      # creates the grid using the description
      def create_grid():
        grid = [[' ' for _ in range(self.width)] for _ in range(self.height)]
        block_types = {'I': 'ice', 'H': 'hole', 'G': 'goal'}

        # populates the grid based on the description
        for i, block_type in enumerate(self.description):
            row = i // self.width
            col = i % self.width
            if block_type in block_types:
                grid[row][col] = block_types[block_type]
        return grid

      self.grid = create_grid()
  
  def _get_obs(self):
    return self.agent_location
  
  def reset(self, seed=None, options=None):
    super().reset(seed=seed)
    self.agent_location = (0, 0)
    row, col = self._get_obs()
    return (row, col)
  
  def compute_reward(self, observation):
      row, col = self._get_obs()
      cell = self.grid[row][col]

      if cell == 'goal': # Reward for reaching the goal
          return 1
      if cell == 'hole' or row < 0 or row >= self.height or col < 0 or col >= self.width: # Penalty for falling into a hole or escaping the grid
          return -1

      return 1 # Neutral reward
    
  def step(self, action):
      observation = self._get_obs()
      reward = self.compute_reward(observation)

      def take_action(action):
        row, col = self.agent_location

        # moves the agent based on the action
        if action == 0: # up
            row -= 1
        elif action == 1: # down
            row += 1
        elif action == 2: # left
            col -= 1
        elif action == 3: # right
            col += 1

        # updates the agent's location if it's within the grid and the new location isn't a hole
        if 0 <= row < self.height and 0 <= col < self.width and self.grid[row][col] != 'hole':
            self.agent_location = (row, col)

      def is_episode_over():
        row, col = self.agent_location
        if row < 0 or row >= self.height or col < 0 or col >= self.width: # episode also ends if agent goes out of bounds
          return True

        cell = self.grid[row][col]
        # print('cell', cell)
        if cell == 'goal' or cell == 'hole': # episode ends if agent reaches the goal or falls into a hole
            return True
        return False

      take_action(action) # processes the action parameter on the board
      observation = self._get_obs() # observation after taking action
      terminated = is_episode_over() # true if episode completed and false otherwise

      return observation, reward, terminated, False, None
    
  def _render_frame(self, screen, window_width, window_height):
    pass

  def render(self, screen, window_width, window_height):
    pygame.init()

    # window dimensions
    window_width = self.width * 50
    window_height = self.height * 50

    # colors
    white = (255, 255, 255)
    black = (0, 0, 0)
    blue = (0, 0, 255)
    red = (255, 0, 0)

    # creates the window
    screen = pygame.display.set_mode((window_width, window_height))
    pygame.display.set_caption('Grid World Environment')

    # default screen
    screen.fill(white)

    # draws the grid based on the description
    for i, block_type in enumerate(self.description):
        row = i // self.width
        col = i % self.width
        x = col * 50
        y = row * 50

        if block_type == 'I': # Ice block
            pygame.draw.rect(screen, blue, (x, y, 50, 50))
        elif block_type == 'H': # Hole
            pygame.draw.rect(screen, black, (x, y, 50, 50))
        elif block_type == 'G': # Goal
            pygame.draw.circle(screen, red, (x + 25, y + 25), 20)

    # draws the agent
    agent_row, agent_col = self.agent_location
    agent_x = agent_col * 50 + 25
    agent_y = agent_row * 50 + 25
    pygame.draw.circle(screen, black, (agent_x, agent_y), 10)

    # updates the display
    pygame.display.update()

    # event loop
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            pygame.quit()

    return self._render_frame(screen, window_width, window_height)

In [4]:
import cv2
from IPython.display import clear_output
import time

lake = FrozenLake(3, 3, "IIHHIIIIG")
window_width = 400
window_height = 400
pygame.init()
screen = pygame.display.set_mode((window_width, window_height))

for i in range(0, 1):
  lake.reset()
  terminated = False
  while not terminated:
    action = lake.action_space.sample()
    # print('action', action)
    observation, reward, terminated, truncated, info = lake.step(action)
    lake.render(screen, window_width, window_height)

    view = pygame.surfarray.array3d(screen)
    view = view.transpose([1, 0, 2])
    img_bgr = cv2.cvtColor(view, cv2.COLOR_RGB2BGR)
    cv2.imshow('image', img_bgr)
    time.sleep(.5)

In [5]:
import random

class QLearning():
    def __init__(self, env, exploration, gamma, alpha, obs_space_n, action_space_n):
        self.env = env
        self.exploration = exploration
        self.gamma = gamma
        self.alpha = alpha

        self.q_table = {}
        for row in range(obs_space_n):
            for col in range(action_space_n):
                for action in range(4): # 4 possible actions (Up, Down, Left, Right)
                    self.q_table[(row, col, action)] = 0.0 # initializes Q-value to 0.0
                
    def sample_action(self, observation):
        return lake.action_space.sample()
    
    def update_table(self, observation, action, reward, new_observation, terminated):
        row, col = observation
        new_row, new_col = new_observation

        curr_q_value = self.q_table.get((row, col, action), 0.0)

        target_q_value = 0
        if not terminated:
            max_future_q_value = max([self.q_table.get((new_row, new_col, a), 0.0) for a in range(4)])
            target_q_value = reward + self.gamma * max_future_q_value
        else: # if terminated, no future awards
            target_q_value = reward
            new_q_value = (1 - self.alpha) * curr_q_value + self.alpha * target_q_value
            self.q_table[(row, col, action)] = new_q_value

In [6]:
lake = FrozenLake(5, 5, "IIIHIHIHHHIIIIGIIIIHIIIIH")
q_learning = QLearning(lake, .3, .99, .9, lake.width * lake.height, lake.action_space.n)

# Trains the policy using QLearning
num_episodes = 1000
for episode in range(num_episodes):
  observation = lake.reset() # _ for info
  terminated = False

  while not terminated:
    action = q_learning.sample_action(observation) # chooses random action
    new_observation, reward, terminated, _, _ = lake.step(action) # updates action in environment
    # ipdb.set_trace() # for debugging
    q_learning.update_table(observation, action, reward, new_observation, terminated) # updates q table
    observation = new_observation # updates observation and positions for next step

In [7]:
q_learning.exploration = 0.0
for i in range(0, 1):
  observation = lake.reset()
  terminated = False
  lake.render(screen, window_width, window_height)
  view = pygame.surfarray.array3d(screen)
  view = view.transpose([1, 0, 2])
  img_bgr = cv2.cvtColor(view, cv2.COLOR_RGB2BGR)
  cv2.imshow('image', img_bgr)
  time.sleep(.5)
  clear_output()
  while not terminated:
    action = q_learning.sample_action(observation[0] * lake.width + observation[1])
    # print('q learning action', action)
    observation, reward, terminated, truncated, info = lake.step(action)
    lake.render(screen, window_width, window_height)

    view = pygame.surfarray.array3d(screen)
    view = view.transpose([1, 0, 2])
    img_bgr = cv2.cvtColor(view, cv2.COLOR_RGB2BGR)
    cv2.imshow('image', img_bgr)
    time.sleep(.5)