In [30]:
from pathlib import Path
from typing import List
from typing import Tuple
import numpy as np
from tqdm import trange
import pickle
import random as rand

# Q Learner Class 

#### Predict/Fit/Dump/Load

In [66]:
class QLearner:
    
    """
    Q Learner Object
    
    :param num_states: The number of states to consider.
    :param num_actions: The number of actions available..
    :param alpha: The learning rate used in the update rule. Should range between 0.0 and 1.0 with 0.2 as a typical value.
    :param gamma: The discount rate used in the update rule. Should range between 0.0 and 1.0 with 0.9 as a typical value.
    
    :param random_action_rate: Random action rate: the probability of selecting a random action at each step. 
     (Should range between 0.0 (no random actions) to 1.0 (always random action) with 0.5 as a typical value.)
     
    :param random_action_decay_rate: Random action decay rate, after each update, rar = rar * radr. 
     (Ranges between 0.0 (immediate decay to 0) and 1.0 (no decay). Typically 0.99.)
     
    :param verbose: If “verbose” is True, your code can print out information for debugging.    
    
    """
    SERIALIZED_LEARNER_STATE_FILE = "Q.pickle"
    
    def __init__(        
        self,
        *,
        num_states: int = 100,
        num_actions: int = 4,
        alpha: float = 0.000001,
        gamma: float = 0.9,
        random_action_rate: float = 0.99,
        random_action_decay_rate: float = 0.99,
        verbose: bool = True,
    ) -> None:
        """
        Constructor method
        """
        self.verbose = verbose
        self.num_states = num_states
        self.num_actions = num_actions
        self.alpha = alpha
        self.gamma = gamma
        self.random_action_rate = random_action_rate
        self.random_action_decay_rate = random_action_decay_rate
        self.current_state = 0
        self.current_action = 0
        self.q_values = np.zeros((self.num_states, self.num_actions))
        print("Q Learner Class Initiated")

    def predict(self, new_state: int) -> int:
        
        """
        Updates state without any changes to Q table 
        
        :param new_state: The new state 
        :return: Selected action along with the random flag
        
        """
        
        self.current_state = new_state
        
        if rand.random() < self.random_action_rate:
            random_flag = True
            action = int(rand.randint(0, self.num_actions - 1))
        else:
            random_flag = False
            action = int(np.argmax(self.q_values[self.current_state]))
                       
        self.current_action = action
        
        if self.verbose:
            print(f"state = {new_state}, action = {action}")
        return action, random_flag
    
    
    

    def fit(self, s_prime: int, immediate_reward: float) -> int:
        
        """
        Updates the Q table and returns an action

        :param s_prime: The new state
        :param immediate_reward: The immediate reward
        :return: The selected action along with random flag
        
        """
        
        impr_est = immediate_reward + (
            self.gamma
            * self.q_values[s_prime, np.argmax(self.q_values[s_prime])]
        )
        self.q_values[self.current_state, self.current_action] = (
            1 - self.alpha
        ) * self.q_values[self.current_state, self.current_action] + (
            self.alpha * impr_est
        )

        if rand.random() < self.random_action_rate:
            random_flag = True
            action = int(rand.randint(0, self.num_actions - 1))
        else:
            random_flag = False
            action = int(np.argmax(self.q_values[s_prime]))

        self.random_action_rate = (
            self.random_action_rate * self.random_action_decay_rate
        )
        self.current_state = s_prime
        self.current_action = action

        if self.verbose:
            print(f"s = {s_prime}, a = {action}, r={immediate_reward}")
        return action, random_flag

    def dump(self):
        # Saving the current state of the learner to be later used for testing
        with open(
            self.SERIALIZED_LEARNER_STATE_FILE, "wb"
        ) as serialized_learner_state_write:
            pickle.dump(
                self,
                serialized_learner_state_write,
                protocol=pickle.DEFAULT_PROTOCOL,
            )

    def load(filepath) -> "QLearner":
        # Rerieving the state of the learner persisted during training
        with open(filepath, "rb") as serialized_learner_state_read:
            return pickle.load(serialized_learner_state_read)
        
   # def __str__(self):
    #    return "<"+str(self.<<x>>)+", "+str(self.<<y>>)+">"
    

 #   def __eq__(self, obj):
 #       if isinstance(obj, QLearner):
 #           return (
 #               obj.num_states == self.num_states
 #               and obj.num_actions == self.num_actions
 #               and obj.random_action_rate == self.random_action_rate
 #               and obj.random_action_decay_rate == self.random_action_decay_rate
 #               and np.array_equal(obj.q_values, self.q_values)
 #           )
 #       return False

# Maze game functions

#### render/ goal/ cliff, out of bounds, step/discretize, create maze

In [67]:
def printmap(data: np.ndarray) -> None:
    """
    Prints out the map

    :param data: 2D array that stores the map
    
    """
    print("-----------")
    map_tile = {
        0: " ",  # Empty space
        9: ".",  # Trail
        1: "*",  # Current location of Robot
        2: "X",  # Goal
        3: "Y",  # Cliff
    }
    for row in range(0, data.shape[0]):
        for col in range(0, data.shape[1]):
            print(map_tile[data[row, col]], end=" ")
        print()
    print("-----------")


def get_goal_location(data: np.ndarray) -> Tuple:
    """
    finds the goal location in the maze
    """
    goal_x, goal_y = np.where(data == 3)
    return goal_x, goal_y


def get_cliff_locations(data: np.ndarray) -> List:
    
    """ Returns the locations of the cliffs """
    
    shp = data.shape
    location = []
    for i in range(0, shp[0]):
        for j in range(shp[1]):
            if data[i, j] == 2:
                location.append([i, j])
    return location


def check_pos_valid(data: np.ndarray, new_x: int, new_y: int) -> bool:
    
    """ Checks if the new position is valid """
    
    shp = data.shape
    
    if (new_x < 0) | (new_x > shp[0] - 1):
        return False
    if (new_y < 0) | (new_y > shp[1] - 1):
        return False
    return True


def impl_action(
    data: np.ndarray, curr_x: int, curr_y: int, action: int, cliffs: List
):
    new_x = curr_x
    new_y = curr_y
    if action == 0:  # left
        new_y = curr_y - 1
    elif action == 1:  # right
        new_y = curr_y + 1
    elif action == 2:  # up
        new_x = curr_x - 1
    else:  # down
        new_x = curr_x + 1

    rewards = -1  # default reward
    if check_pos_valid(data, new_x, new_y):
        if [new_x, new_y] in cliffs:
            rewards = -100  # penalty for hitting a cliff
        data[curr_x, curr_y] = 9
        data[new_x, new_y] = 1
        return data, rewards, new_x, new_y
    rewards = -10  # penalty for going off the map
    return data, rewards, curr_x, curr_y


def discretize(curr_x: int, curr_y: int):
    return curr_x * 10 + curr_y



def create_maze():
    """
    loading the maze from csv file and testing the algorithm.
    """

    file_path = "maze.csv"
    with open(file_path, encoding="utf8") as maze_file:
        maze = np.array(
            [
                list(map(float, s.strip().split(",")))
                for s in maze_file.readlines()
            ]
        )

    return maze


# Training loop
#### Added params and starting states to train each state on multiple epochs

In [73]:
def train(start_positions, maze):
    """
    loading the maze from csv file and training the algorithm
    """
    
    # Qlearning training process
    num_states = discretize(maze.shape[0] - 1, maze.shape[1])
    
    learner = QLearner(
        num_states=num_states,
        num_actions=4,
        alpha=0.2,
        gamma=0.9,
        random_action_rate=0.98,
        random_action_decay_rate=0.99,
        verbose=False,
    )
    
    cliffs = get_cliff_locations(maze)
    print(" ")
    print(" ")
    goal_x, goal_y = get_goal_location(maze)
    print("Goal Position:", goal_x, "x",goal_y)
    print(" ")
    #start_pos1, start_pos2 = np.where(maze == 1)
    #print("Starting position is:", start_pos1, start_pos2)
    
        
    print(" ")
    print("Training a ",maze.shape[0],"x",maze.shape[1],"maze grid;")
    #print("**********************************")
    
    #print(maze)
    #printmap(maze)

    for start_x, start_y in start_positions:
        x,y = np.where(maze==1)
        maze[x,y] = 0
        maze[start_x,start_y]=1
        printmap(maze)
        
        for _ in trange(1, 1000, unit="episode"):  
            
            maze1 = maze.copy()
            done = False
            curr_x, curr_y = np.where(maze1 == 1)
            action, random_flag = learner.predict(discretize(curr_x, curr_y))

            while not done:

                # 
                curr_x, curr_y = np.where(maze1 == 1)
                maze1, rewards, curr_x, curr_y = impl_action(maze1, curr_x, curr_y, action, cliffs)
                s_prime = discretize(curr_x, curr_y)
                action, random_flag = learner.fit(s_prime, rewards)

                if [curr_x, curr_y] in cliffs or (curr_x == goal_x) & (curr_y == goal_y):
                    done = True
    learner.dump()                
    print("Training Complete")
    return goal_x, goal_y, learner
    print("-----------------------------------------------------------------------------------------------")

    

# Testing loop

In [74]:
def test(start_positions, goal_x, goal_y, maze, learner):
    #start_positions = [(1,0),(1,4),(3,3),(5,2),(6,0),(6,4),(8,2),(9,0),(9,4)]
    for start_x, start_y in start_positions:
        x,y = np.where(maze==1)
        maze[x,y] = 0
        maze[start_x,start_y]=1
    
        #print(QLearner.SERIALIZED_LEARNER_STATE_FILE)
        print("-------------------------")
        #learner = QLearner.load(QLearner.SERIALIZED_LEARNER_STATE_FILE)
        print(" ")
        print("Testing Initiated: Maze Game")
        print(" ")
        
        
        done = False
        maze2 = maze.copy()   
        print("..............................")
        print("Testing a ",maze2.shape[0],"x",maze2.shape[1],"maze grid;")
        print("..............................")
        print(" ")
        print(maze2)
        print(" ")
        print("Maze Starting Position: ", start_x, start_y)
        printmap(maze2)
        #goal_x, goal_y = np.where(maze2 == 3)
    
    
        while not done:
            old_x, old_y = np.where(maze2 == 1)
            action, random_flag = learner.predict(discretize(old_x, old_y))
            curr_x, curr_y = old_x, old_y
            #print("Maze Trajectory")
            if action == 0:  # left
                curr_y = old_y - 1
            elif action == 1:  # right
                curr_y = old_y + 1
            elif action == 2:  # up
                curr_x = old_x - 1
            else:  # down
                curr_x = old_x + 1
            maze2[old_x, old_y] = 9
            maze2[curr_x, curr_y] = 1
            old_x, old_y = curr_x, curr_y
            print("Path Sequence: ")
            printmap(maze2)
            if (curr_x == goal_x) & (curr_y == goal_y):
                done = True
        print(" ")
        print("<<<<<<<< Final path chosen:<<<<<<<<<<<<<")
        printmap(maze2)
        assert done

# Render Maze

In [75]:
maze = create_maze()
start_positions = [
    #(1, 0),
    (1, 0),
    (1, 4),
    (3, 3),
    (5, 2),
    (6, 0),
    (6, 4),
    (8, 2),
    (9, 0),
    (9, 4),
]
print("Maze Chosen: ")
printmap(maze)

Maze Chosen: 
-----------
    Y     
      X   
  X     X 
    X     
X       X 
  X   X   
          
  X X X   
        X 
  X X   * 
-----------


# Train Model
#### Different starting states trained on multiple epochs

In [76]:
start_positions = [(5, 2), (6, 0), (6, 4), (8, 2), (9, 0), (9, 4)]
goal_x, goal_y, learner = train(start_positions, maze)

Q Learner Class Initiated
 
 
Goal Position: [0] x [2]
 
 
Training a  10 x 5 maze grid;
-----------
    Y     
      X   
  X     X 
    X     
X       X 
  X * X   
          
  X X X   
        X 
  X X     
-----------


100%|██████████| 999/999 [00:00<00:00, 1793.54episode/s]


-----------
    Y     
      X   
  X     X 
    X     
X       X 
  X   X   
*         
  X X X   
        X 
  X X     
-----------


100%|██████████| 999/999 [00:00<00:00, 1573.36episode/s]


-----------
    Y     
      X   
  X     X 
    X     
X       X 
  X   X   
        * 
  X X X   
        X 
  X X     
-----------


100%|██████████| 999/999 [00:00<00:00, 1571.58episode/s]


-----------
    Y     
      X   
  X     X 
    X     
X       X 
  X   X   
          
  X X X   
    *   X 
  X X     
-----------


100%|██████████| 999/999 [00:00<00:00, 1038.63episode/s]


-----------
    Y     
      X   
  X     X 
    X     
X       X 
  X   X   
          
  X X X   
        X 
* X X     
-----------


100%|██████████| 999/999 [00:00<00:00, 1006.36episode/s]


-----------
    Y     
      X   
  X     X 
    X     
X       X 
  X   X   
          
  X X X   
        X 
  X X   * 
-----------


100%|██████████| 999/999 [00:01<00:00, 693.89episode/s]

Training Complete





# ----------------------- ******---------------------- # 

# Test Model

In [42]:
# Test maze game
start_positions = [(3,3)]
test(start_positions, goal_x, goal_y, maze, learner)

-------------------------
 
Testing Initiated: Maze Game
 
..............................
Testing a  10 x 5 maze grid;
..............................
 
[[0. 0. 3. 0. 0.]
 [0. 0. 0. 2. 0.]
 [0. 2. 0. 0. 2.]
 [0. 0. 2. 1. 0.]
 [2. 0. 0. 0. 2.]
 [0. 2. 0. 2. 0.]
 [0. 0. 0. 0. 0.]
 [0. 2. 2. 2. 0.]
 [0. 0. 0. 0. 2.]
 [0. 2. 2. 0. 0.]]
 
Maze Starting Position:  3 3
-----------
    Y     
      X   
  X     X 
    X *   
X       X 
  X   X   
          
  X X X   
        X 
  X X     
-----------
Path Sequence: 
-----------
    Y     
      X   
  X   * X 
    X .   
X       X 
  X   X   
          
  X X X   
        X 
  X X     
-----------
Path Sequence: 
-----------
    Y     
      X   
  X * . X 
    X .   
X       X 
  X   X   
          
  X X X   
        X 
  X X     
-----------
Path Sequence: 
-----------
    Y     
    * X   
  X . . X 
    X .   
X       X 
  X   X   
          
  X X X   
        X 
  X X     
-----------
Path Sequence: 
-----------
    *     
    . X   
  