# Grid World render

In [10]:
import gym
import pygame
import sys

colors = {
    "black": (0,0,0),
    "white": (255,255,255),
    "light_white": (200,200,200),
    "blue": (0,0,255),
    "green": (51, 204, 51),
}
class GridWorldRenderer():
    def __init__(self, rows, columns, cell_size = 50):
        self.rows = rows 
        self.columns = columns
        self.cell_size = cell_size

        pygame.init()
        self.test = False 
        self.test_image = False 

        self._window_width = self.columns * self.cell_size
        self._window_height = self.rows * self.cell_size

        self.screen = pygame.display.set_mode((self._window_width, self._window_height))
        pygame.display.set_caption(f"Grid World {self.rows}x{self.columns}" )
        self.clock = pygame.time.Clock()

        self.border_color = colors['black']

        self.update()

    def _drawgrid(self):
        for i, x in enumerate(range(0, self._window_width, self.cell_size)):
            for j, y in enumerate(range(0, self._window_height, self.cell_size)):
                color = colors['white']

                rect = pygame.Rect(x,y, self.cell_size, self.cell_size)

                pygame.draw.rect(self.screen, color, rect)
                if self.test_image and i == 0 and j == 0 :
                    self.draw_robot(self.screen, x,y)
                if i == 1 and j == 0 :
                    self.draw_battery(self.screen, x,y)
                if i == 0 and j == 1 :
                    self.draw_crap(self.screen, x,y)

                border = pygame.Rect(x,y, self.cell_size, self.cell_size)
                pygame.draw.rect(self.screen, self.border_color, border, 1)
                
    def toggle(self):
        self.test = not self.test
    
    def _draw_object(self, screen, img, x,y):
        img = pygame.image.load(img)
        img = pygame.transform.scale(img, (self.cell_size, self.cell_size))
        screen.blit(img, (x,y))
        border = pygame.Rect(x,y, self.cell_size, self.cell_size)
        pygame.draw.rect(self.screen, self.border_color, border, 1)

    def draw_robot(self, screen, x, y):
        robot_img = './assets/robot.jpg'
        self._draw_object(screen, robot_img, x, y)

    def draw_crap(self, screen, x, y):
        img = './assets/crap.png'
        self._draw_object(screen, img, x, y)

    def draw_battery(self, screen, x, y):
        img = './assets/battery.png'
        self._draw_object(screen, img, x, y)
    
    def update(self):
        # while True:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                print('exit')
                pygame.quit()
                sys.exit()
            
            if event.type == pygame.KEYDOWN:
                self.test_image = not self.test_image

            self._drawgrid()
            pygame.display.update()
    
    def run(self):
        pass

gridworld = GridWorldRenderer(8,8)


In [11]:
while True:
    gridworld.update()

exit


SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [2]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

# Gym env

In [14]:
a = [-1, 0]

a = np.clip(a, 0, 2)
print(a)

print(np.all((a >=0) & (a <= 2)))


[0 0]
True


In [109]:
def default_map():
    return 'SFFH\nH'

def GET_MAP(map):
    if map == "4x4": 
        return [
            "SFFF",
            "FHFH",
            "FFFH",
            "HFFG"
        ]

    if map == "8x8": 
        return [
            "SFFFFFFF",
            "FFFFFFFF",
            "FFFHFFFF",
            "FFFFFHFF",
            "FFFHFFFF",
            "FHHFFFHF",
            "FHFFHFHF",
            "FFFHFFFG",
        ],

class GridWorldEnv(gym.Env):
    def __init__(self, map, reward_dict, **kwargs):

        #Check if map has the same dimension as rows and cols
        rows, cols, map = self._process_map(map)

        self.map = map
        nS = rows * cols
        nA = 4
        self.rows, self.cols = rows, cols
        self.agent_pos = [0,0]
        self.goal_pos = [rows - 1, cols - 1]

        #save reward dict
        self.reward_dict = reward_dict

        #default state
        self.state = np.zeros((self.rows, self.cols))
        self.set_state(self.agent_pos, self.goal_pos)

        self.observation_space = spaces.Discrete(nS)
        self.action_space = spaces.Discrete(nA)

        #time step used to track how long this agent performing
        #if it's to long, terminate early
        self.timestep = 0
        self.max_timestep = int(kwargs['max_timestep']) if kwargs['max_timestep'] != None else 100

        #termination status
        self.terminated = False
    def reset(self):
        pass

    def _process_map(self, map_data):

        rows = map_data
        rows_n = len(rows)
        cols_n = len(rows[0])
        map = [['' for _ in range(cols_n)] for _ in range(rows_n)]
        for i, row in enumerate(rows):
            for j, val in enumerate(row):
                map[i][j] = val 

        return rows_n, cols_n, map

    def set_state(self, agent_pos, goal_pos):
        self.state = np.zeros((self.rows, self.cols))
        self.state[tuple(agent_pos)] = 1
        self.state[tuple(goal_pos)] = 0.5
        observation = self.state.flatten()
        info = {}
        return observation, info

    #Step function: agent take step in env
    def step(self, action):
        #actions:
        #0: down
        #1:up 
        #2:right
        #3:left
        if self.terminated:
            return

        if action == 0: 
            self.agent_pos[0] += 1
        elif action == 1: 
            self.agent_pos[0] -= 1
        elif action == 2: 
            self.agent_pos[1] += 1
        elif action == 3: 
            self.agent_pos[1] -= 1

        #clip the agent position to avoid out of bound
        self.agent_pos = np.clip(self.agent_pos, 0, self.rows - 1)
        self.set_state(self.agent_pos, self.goal_pos)
        observation = self.state.flatten()

        #Check if the agent takes too long to go to goal
        self.timestep += 1
        self.terminated = True if self.timestep > self.max_timestep else False

        #Define your reward function
        reward = 0

        if np.array_equal(self.agent_pos, self.goal_pos):
            self.terminated = True
            reward = 1
        
        info = {}
        return observation, reward, self.terminated, info
    
    def render(self):
        #Put a renderer here
        pass
    
    def __str__(self):
        print("=" * 20)
        print('MAP')
        print("=" * 20)
        for row in self.map:
            print(row)
        print("=" * 20)
        print('STATE')
        print("=" * 20)
        print(self.state)
        print("=" * 20)
        print("DESCRIPTION")
        print("=" * 20)
        print(f'reward dict = {self.reward_dict}')
        print(f'timestep = {self.timestep}, max_timestep = {self.max_timestep},')
        print(f'obs space = {self.observation_space.n}')
        print(f'actions = {self.action_space.n}')
        print(f'agent position = {self.agent_pos}')
        print(f'goal position = {self.goal_pos}')
        print(f'terminated = {self.terminated}')
        return ''


In [108]:
from tqdm import tqdm 
import random

In [131]:
class QLearningAlgorithm():
    def __init__(self, env, **kwargs: dict) -> None:
        self.n_training_eps = int(self._get(kwargs, "n_training_eps", 10000))
        self.n_eval_eps = int(self._get(kwargs, "n_eval_eps", 100))
        self.max_steps = int(self._get(kwargs, "max_steps", 99))
        self.learning_rate = float(self._get(kwargs, "learning_rate", 0.001))
        self.max_epsilon = float(self._get(kwargs, "max_epsilon", 1.0))
        self.min_epsilon = float(self._get(kwargs, "min_epsilon", 0.005))
        self.decay_rate = float(self._get(kwargs, "decay_rate", 0.005))
        self.gamma = float(self._get(kwargs, "gamma", 0.95))
        self.qtable = self._init_qtable(env.observation_space.n, env.action_space.n)
        self.env = env

    def _get(self, dict, key, default):
        return dict[key] if key in dict else default

    def _init_qtable(self, state_space,action_space):
        qtable = np.zeros((state_space, action_space))
        return qtable
    
    def _epsilon_greedy_policy(self, state, epsilon):
        random_init = random.uniform(-1,1)
        if random_init > epsilon:
            action = np.argmax(self.qtable[state])
        else:
            action = self.env.action_space.sample()
        return action
    
    def train(self):
        loop = tqdm(list(range(self.n_training_eps)))
        print('qtable shape =', self.qtable.shape)

        for ep in loop:
            epsilon = self.min_epsilon + (self.max_epsilon - self.min_epsilon) * np.exp(-self.decay_rate * ep)
            state = self.env.reset()
            done = False

            for step in range(self.max_steps):
                action = self._epsilon_greedy_policy(state, epsilon)
                new_state, reward, done, info = self.env.step(action)

                self.qtable[state][action] = self.qtable[state][action] + \
                    self.learning_rate * (reward  + self.gamma * np.max(self.qtable[new_state]) - self.qtable[state][action])
                
                if done:
                    break
                
                state = new_state
            
            
            loop.set_description(f"ep = {ep}, eposilon = {epsilon:.2f}")


    def __str__(self):
        print(self.qtable)
        return ''


# class RLController():
#     def __init__(self, **kwargs) -> None:

#         self.n_training_eps = int(self._get(kwargs, "n_training_eps", 10000))
#         self.n_eval_eps = int(self._get(kwargs, "n_eval_eps", 100))
#         self.learning_rate = float(self._get(kwargs, "learning_rate", 0.001))
#         self.max_epsilon = float(self._get(kwargs, "max_epsilon", 1.0))
#         self.min_epsilon = float(self._get(kwargs, "min_epsilon", 0.005))
#         self.decay_rate = float(self._get(kwargs, "decay_rate", 0.005))
#         # self.env = GridWorldEnv()
#         reward_dict = {
#             'G': 1
#         }
#         self.env = GridWorldEnv(map = GET_MAP('4x4'), reward_dict = reward_dict, max_timestep = 10)

#         obs_space = self.env.observation_space
#         action_space = self.env.action_space
#         self.qlearning = QLearningLearning(obs_space, action_space)
    
#     def _get(self, dict, key, default):
#         return dict[key] if key in dict else default
            
#     def train(self):
#         loop = tqdm(list(range(self.n_training_eps)))
#         print('qtable shape =', qtable.shape)

#         for ep in loop:
#             epsilon = self.min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate * ep)
#             state = env.reset()
#             done = False

#             for step in range(max_steps):
#                 action = epsilon_greedy_policy(qtable, env, state,epsilon)
#                 new_state, reward, done, info = env.step(action)

#                 qtable[state][action] = qtable[state][action] + \
#                     learning_rate * (reward  + gamma * np.max(qtable[new_state]) - qtable[state][action])
                
#                 if done:
#                     break
                
#                 state = new_state
            
            
#             loop.set_description(f"ep = {ep}, eposilon = {epsilon:.2f}")



In [132]:
env = gym.make("FrozenLake-v1", map_name="4x4",is_slippery=False)

qlearning = QLearningAlgorithm(env)
print(qlearning.qtable.shape)

qlearning.train()

(16, 4)


  0%|          | 0/10000 [00:00<?, ?it/s]

qtable shape = (16, 4)





AttributeError: 'QLearningAlgorithm' object has no attribute 'gamma'

# Test GridWorldEnv

In [129]:
reward_dict = {
    'G': 1,
}
env = GridWorldEnv(map = GET_MAP('4x4'), reward_dict = reward_dict, max_timestep = 10)
print(env)

MAP
['S', 'F', 'F', 'F']
['F', 'H', 'F', 'H']
['F', 'F', 'F', 'H']
['H', 'F', 'F', 'G']
STATE
[[1.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0.5]]
DESCRIPTION
reward dict = {'G': 1}
timestep = 0, max_timestep = 10,
obs space = 16
actions = 4
agent position = [0, 0]
goal position = [3, 3]
terminated = False



In [104]:
env.step(0)
print(env)

MAP
['S', 'F', 'F', 'F']
['F', 'H', 'F', 'H']
['F', 'F', 'F', 'H']
['H', 'F', 'F', 'G']
STATE
[[0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0.5]]
DESCRIPTION
reward dict = {'G': 1}
timestep = 6, max_timestep = 10,
obs space = 16
actions = 4
agent position = [3 3]
goal position = [3, 3]
terminated = True

