In [949]:
import pandas as pd
import numpy as np
import random

In [950]:
MAZE=[[0,0,1,1,1],
      [0,1,1,0,1],
      [1,1,1,0,1],
      [1,0,1,0,1],
      [1,0,1,1,1]]
REWARDS_TABLE = [[0,0,1,1,1],
                 [0,1,1,0,1],
                 [1,1,1,0,1],
                 [1,0,1,0,1],
                 [100,0,1,1,1]]

In [951]:
def init_state_table():
    qtable = []
    for row in range(len(MAZE)):
        qtable_row = []
        for column in range(len(MAZE[row])):
            qtable_row.append(QEntry((row, column)))
        qtable.append(qtable_row)
    return qtable
            

class QEntry:
    def __init__(self, state):
        self.state = state
        self.up = 0
        self.down = 0
        self.left = 0
        self.right = 0
        # self.q_score = 0
    
    def get_state(self):
        return self.state
    def get_positions(self):
        return {"up": self.up, "down": self.down, "left": self.left, "right": self.right}
    # def get_qscore(self):
    #     return self.q_score

    def add_to_positions(self, increment, position):
        match position:
            case "up":
                self.up += increment
            case "down":
                self.down += increment
            case "right":
                self.right += increment
            case "left":
                self.left += increment
            case _:
                raise Exception("No possible moves at ${current_position}")
    # def add_to_qscore(self, increment):
    #     self.q_score += increment
            

# Init Q-Table
q_table = init_state_table()

In [952]:
def get_possible_actions(row_index, column_index):    
    possible = {"up": False, "down": False, "right": False, "left": False}
    #possible = {"up": row_index != 0, "down": row_index != len(MAZE)-1, "right": column_index != 0, "left": column_index != len(MAZE[0])-1}    
    dir_functions = {"up": (lambda: True if MAZE[row_index-1][column_index] == 1 and not row_index == 0 else False),
                     "down": (lambda: True if MAZE[row_index+1][column_index] == 1 and not row_index == len(MAZE)-1 else False),
                     "right": (lambda: True if MAZE[row_index][column_index+1] == 1 and not column_index == len(MAZE[row_index])-1 else False),
                     "left": (lambda: True if MAZE[row_index][column_index-1] == 1 and not column_index == 0 else False)}
    for dir,funct in dir_functions.items():
        try:
            possible[dir] = funct()
        except: pass

    return possible

In [953]:
def get_next_move(dictionary):
    true_keys = [key for key, value in dictionary.items() if value]
    if not true_keys:
        return None
    return random.choice(true_keys)

In [954]:
def iterate_to_next_move(current_position, next_move):
    '''
    current_position: tuple (row_index, column_index)
    '''
    new_pos = [current_position[0], current_position[1]]
    match next_move:
        case "up":
            new_pos[0] -= 1
        case "down":
            new_pos[0] += 1
        case "right":
            new_pos[1] += 1
        case "left":
            new_pos[1] -= 1
        case _:
            raise Exception("No possible moves at ${current_position}")
    
    return tuple(new_pos)

def get_next_move_qtable(current_state, dir):
    '''
    current_state: A tuple containing the coordinates to the old state, (row_index, column_index)
    '''

    next_state = list(current_state)
    match dir:
        case "up":
            next_state[0] = current_state[0] - 1
            next_state[1] = current_state[1]
        case "down":
            next_state[0] = current_state[0] + 1
            next_state[1] = current_state[1]
        case "left":
            next_state[0] = current_state[0]
            next_state[1] = current_state[1] - 1
        case "right":
            next_state[0] = current_state[0]
            next_state[1] = current_state[1] + 1
        case _:
            raise Exception("Direction is invalid")
    next_state = tuple(next_state)
        
    # Find the entry in the q_table that contains the new state
    return q_table[next_state[0]][next_state[1]]
        

In [955]:
# Constants
ALPHA = 0.01
GAMMA = 0.01
EPISODES = 300

In [956]:
# Bellman Equation
bellman_equation = lambda state, old, next : (1 - ALPHA) * old + ALPHA * (REWARDS_TABLE[state[0]][state[1]] + GAMMA * next)

In [957]:
# Moving through the maze

start_row_index, start_column_index = 0,4
end_row_index, end_column_index = 4,0

current_row_index, current_column_index = start_row_index, start_column_index

for loop in range(0, EPISODES):
    while True:
        # Get current state
        entry = q_table[current_row_index][current_column_index]
        state = entry.get_state()
        state_q_positions = entry.get_positions()
        possible_movements = get_possible_actions(state[0], state[1])

        # Get next move
        next_move = get_next_move(possible_movements)
        # display(str(state) + str(state_q_positions) + str(possible_movements) + " " + str(next_move))

        # display(state_q_positions)
        # Using Bellman equation to find the Q-values
        old = state_q_positions[next_move]
        next_q_obj = get_next_move_qtable(state, next_move)
        next_q = max(list(next_q_obj.get_positions().values()))
        # print(str(next_q_obj.get_state()) + " old: {} next_q: {}".format(old, next_q))
        # display(bellman_equation(state, old, next_q))
        # Update the Q-value in the Q-table
        q_table[state[0]][state[1]].add_to_positions(bellman_equation(state, old, next_q), next_move)

        # Update for the next iteration
        next_state = next_q_obj.get_state()
        current_row_index, current_column_index = next_state[0], next_state[1]

        if current_row_index == end_row_index and current_column_index == end_column_index:
            break;

In [977]:
# Visualize Q-Table
q_table_optimal = []
for row in q_table:
    q_table_optimal_row = []
    for column in row:
        pos = column.get_positions()
        q_table_optimal_row.append(" " if max(pos.values()) == 0 else max(pos, key=pos.get))
    q_table_optimal.append(q_table_optimal_row)

q_table_optimal

[[' ', ' ', 'down', 'left', 'left'],
 [' ', 'down', 'left', ' ', 'up'],
 ['down', 'left', 'left', ' ', 'up'],
 ['down', ' ', 'up', ' ', 'down'],
 ['up', ' ', 'up', 'left', 'left']]

In [959]:
q_table[0][2].get_positions()

{'up': 0,
 'down': 1.6535395287292701e+78,
 'left': 0,
 'right': 1.3291252405194005e+71}

In [971]:
# Putting the data into a state table
state_table = pd.DataFrame()

for row in q_table:
    for obj in row:
        pos = obj.get_positions()
        df_entry = pd.DataFrame({"Position": [obj.get_state()], "Up": [pos["up"]], "Down": [pos["down"]], "Left": [pos["left"]], "Right": [pos["right"]]})
        state_table = pd.concat([state_table, df_entry])

display(state_table)

Unnamed: 0,Position,Up,Down,Left,Right
0,"(0, 0)",0.0,0.0,0.0,0.0
0,"(0, 1)",0.0,0.0,0.0,0.0
0,"(0, 2)",0.0,1.6535399999999999e+78,0.0,1.329125e+71
0,"(0, 3)",0.0,0.0,8.226233e+74,7.048301e+67
0,"(0, 4)",0.0,4.023787e+64,4.636627e+71,0.0
0,"(1, 0)",0.0,0.0,0.0,0.0
0,"(1, 1)",0.0,1.550585e+83,0.0,3.324049e+76
0,"(1, 2)",6.145408e+74,9.062697999999999e+78,1.822858e+80,0.0
0,"(1, 3)",0.0,0.0,0.0,0.0
0,"(1, 4)",2.282003e+68,2.467645e+58,0.0,0.0
