GRID WALK ENVIRONMENT : A REINFORCEMENT LEARNING PROJECT

In [1]:
# download numpy and ipython (is used to clear terminal to show prints as animation)
!pip install numpy
!pip install ipython

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [2]:
# import necessary libraries (everything will be from scratch)
import time
import random
import numpy as np
from IPython.display import clear_output

class BombAvoider:
    def __init__(self):
        
        self.SIZE = 12 # size of the grid (in this case it is 12x12) 
        self.NUMS_OF_BOMBS = 30 # the number of the bombs in the grid
        
        # coordinates of the player that it should start at first
        self.row = 1 
        self.col = 1
        
        # generate the coordinates where bombs should located
        self.bomb_coordinates = [(random.randint(1, self.SIZE), random.randint(1, self.SIZE)) for _ in range(self.NUMS_OF_BOMBS)]
        self.bomb_coordinates = [coordinate for coordinate in self.bomb_coordinates if coordinate not in [(1, 1), (self.SIZE, self.SIZE)]]
        
        # dictionary to keep Q-values
        self.q_values = {}
        
        # This code generates random Q-values in each state for each action. Then these Q-Values will be updated.
        # In each state, there is 4 actions: up, down, left and right. Each action has its own Q-Value.
        # Whichever move has the highest Q-Value is the next best move for that coordinate.
        for row in range(1, self.SIZE + 1):
            for col in range(1, self.SIZE + 1):
                state = (row, col)
                self.q_values[state] = {(-1, 0): np.random.rand(), # up
                                        (1, 0): np.random.rand(), # down 
                                        (0, -1): np.random.rand(), # left 
                                        (0, 1): np.random.rand()} # right 
        
        # epilson number which shows how much percent of our agent is tend to explore new things
        self.epsilon = 0.1
        
    # draw our grid world with its agent and bombs    
    def draw_grid(self):
        for i in range(self.SIZE):
            for j in range(self.SIZE):
                if (i + 1, j + 1) == (self.row, self.col):
                    print("\u25A0", end=" ")
                elif (i + 1, j + 1) in self.bomb_coordinates:
                    print("#", end=" ")
                else:
                    print(" ", end=" ")
                if j < self.SIZE - 1:
                    print("|", end=" ")
            print()
            
    # returns the current coordinates of the agent
    def get_current_coordinates(self):
        return self.row, self.col
    
    # used for find next state using our current state and taken action
    # in other words, it takes current state, action as inputs and returns the agents next state (coordinates)
    def state_after_action(self, current_state, action):
        if current_state is not None and action is not None:
            return tuple(x + y for x, y in zip(current_state, action))
        else:
            return None
    
    # this part of code select agent's action.
    # in this case, this action is 10% random action (because epilson is 0.1)
    # and 90% is the best action best for the state based on Q-Values
    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.choice(list(self.q_values[state].keys()))
        else:
            return max(self.q_values[state].items(), key = lambda x: x[1])[0]
        
    # check if our action is valid, if yes set current location to new coordinates 
    def take_action(self, action):
        if action is not None:
            next_state = self.state_after_action(self.get_current_coordinates(), action)
            row, col = next_state
            
            # check if the next state is between the boundaries, if yes set current location to new coordinates 
            if 1 <= row <= self.SIZE and 1 <= col <= self.SIZE:
                self.row = row
                self.col = col
                return True
        return False
        
    # reward system 
    def get_reward(self, current_state, next_state):
        # if agent hits the bomb, it gets negative reward (called penalty). 
        # it makes agent to avoid bombs 
        if next_state in self.bomb_coordinates:
            return -1
        # if agent gets the finish, it gets a positive reward
        elif next_state == (self.SIZE, self.SIZE):
            return 1  
        # in other cases, agent does not get anything 
        else:
            return 0
        
    # update q_value
    def update_q_values(self, current_state, action, reward, next_state):
        q_sa = self.q_values[current_state][action]
        self.q_values[current_state][action] = q_sa + 0.1 * (reward + 0.9 * max(self.q_values[next_state].values()) - q_sa)
        return True

# create an object from the Bomb Avoider class
environment = BombAvoider()
# the number of episodes 
episodes = 750

# 1 episode is until when agent hit the bomb or get the finish 
for episode in range(episodes):
    # send agent to start point (1, 1) after an episode ends 
    environment.row, environment.col = 1, 1
        
    # "while true" loop to take an action until hit a bomb or get the finish
    while True:
        # this function clear output before print new thing, so you see the agent's moving as animation
        clear_output(wait=True)
        # get current state 
        current_state = environment.get_current_coordinates()
        # select an action 
        action = environment.select_action(current_state)
        # if action is valid take that action
        environment.take_action(action)
        # draw grid after perform an action
        environment.draw_grid()
        # get next state 
        next_state = environment.get_current_coordinates()
        # get reward for selected action
        reward = environment.get_reward(current_state, next_state)
        # update Q-Values
        environment.update_q_values(current_state, action, reward, next_state)
        
        # make last 10 episodes slower to see agent's progress better
        if episode >= episodes - 10:
            time.sleep(0.2)
        
        # break loop if agent hit a bomb
        if environment.get_current_coordinates() in environment.bomb_coordinates:
            print("You hit the bomb! Game over!")
            break
        
        # break loop if agent get the finish
        if environment.get_current_coordinates() == (environment.SIZE, environment.SIZE):
            print("Congratulations! You reached the goal!")
            break
    # print which episode it is
    print(f"Episode {episode + 1} finished")

  | # | # |   |   |   |   |   | # |   |   |   
  |   |   |   |   |   |   |   | # |   |   | # 
# |   |   |   |   |   |   |   |   |   |   |   
  |   |   | # |   |   |   |   |   |   |   | # 
  | # |   |   |   |   |   |   |   |   |   |   
  |   |   |   |   |   |   |   |   | # | # | # 
  |   | # |   |   | # |   |   | # |   |   |   
  |   |   |   |   |   |   | # |   | # |   |   
  |   |   | # |   |   |   |   |   |   |   |   
  |   |   |   |   | # |   |   |   |   |   |   
# | # |   |   | # | # |   |   |   |   |   |   
  |   | # |   |   |   | # |   | # |   |   | ■ 
Congratulations! You reached the goal!
Episode 750 finished
