# Importing Libraries

In [None]:
from matplotlib import pyplot as plt 
from matplotlib import colors 
import numpy as np
import random
import heapq
import sys
np.set_printoptions(edgeitems = 5)

# Maze Generation

In [None]:
rows    = 101
cols    = 101
total_cells =  rows * cols
num_mazes = 50

def deadEnd(x, y, visited):

    '''
    x: integer, horizontal coordinate/ num_row
    y: integer, vertical coordinate/ num_col
    visited: set of visited cells in maze

    '''

    for i in range(-1, 1):
        for j in range(-1, 1):
            # i and j are not in the same cell
            if (i!= 0 and j != 0): 
                # x and y are within the maze
                if (x + i > 0 and x + i < rows): 
                    if (y + j > 0 and y + j < cols):
                        # neighbors are unvisited
                        if (x + i, y + j not in visited):

                            # return the unvisited neigbors
                            return False, x + i, y + j 

    return True, -1, -1

def validRow(x):
    if (x >= 0 and x < rows): return True
    
    return False 

def validCol(y):
    if (y >= 0 and y < cols): return True
    return False 

def generate_mazes():
    
    # initiate all cells as unvisited
    maze = np.zeros((num_mazes, rows, cols))

    for idx in range(0, num_mazes):
        #print(f"generating maze {idx}")

        # empty visited (closed list) and stack (open list)
        visited = set()
        stack = []

        # begin from random cell; must be integers
        row = random.randint(0, rows - 1)
        col = random.randint(0, cols - 1)
        #print(f"start location: ({row}, {col})") 

        # mark start cell as visited
        visited.add((row, col))
        # cell is unblocked
        maze[idx, row, col] = 1

        # explore neighbors randomly using DFS
        while(len(visited) < total_cells):

            curr_row = row + random.randint(-1, 1)
            curr_col = col + random.randint(-1, 1)

            i = 0
            dead_end = False

            # check neighboring cell validity
            while((validRow(curr_row) == False) or (validCol(curr_col) == False) or ((curr_row, curr_col) in visited)): 
                curr_row = row + random.randint(-1, 1)
                curr_col = col + random.randint(-1, 1)

                i = i + 1

                if (i == 8):
                    dead_end = True
                    break
                
            if(dead_end == False): visited.add((curr_row, curr_col))

            prob = random.uniform(0, 1)

            if(prob < 0.3 and dead_end == False): 
                # cell is blocked with 30% probability
                maze[idx, curr_row, curr_col] = 0

                # start with neighbors of blocked cell in next iteration
                row = curr_row
                col = curr_col
            else:
                if(dead_end == False):
                    # cell is unblocked with 70% probability
                    maze[idx, curr_row, curr_col] = 1

                    # put in stack (open list)
                    stack.append((curr_row, curr_col))

                    dead_end, row_to_visit, col_to_visit = deadEnd(row, col, visited) 

                if(dead_end == True):
                    # no neighbors left --> find next unvisited neighbor from stack
                    while(len(stack) > 0):
                        row_stack, col_stack = stack.pop()
                        dead_end, row_to_visit, col_to_visit = deadEnd(row_stack, col_stack, visited)

                        if(dead_end == False): break

                    if(len(stack) > 0): 
                        # stack is not empty OR we've reached a cell that's not a dead end --> update row and col
                        visited.add((row_to_visit, col_to_visit))
                        row = row_to_visit
                        col = col_to_visit
                    
                    else:
                        # repeat visiting neighbors
                        row = random.randint(0, rows - 1)
                        col = random.randint(0, cols - 1)

                        if(len(visited) < total_cells):
                            while((validRow(row) == False) or (validCol(col) == False) or ((row, col) in visited)):
                                row = random.randint(0, rows - 1)
                                col = random.randint(0, cols - 1)

                            visited.add((row, col))

                        else:
                            visited.add((row_to_visit, col_to_visit))
                            row = row_to_visit
                            col = col_to_visit
    return maze

Storing and Loading Mazes

In [None]:
mazes = generate_mazes()

# saving mazes as .txt files
for idx in range(0, num_mazes):
    np.savetxt("generated_maze_" + str(idx) + ".txt", mazes[idx].astype(int), fmt = '%i', delimiter = ',') 

# for loading mazes as numpy arrays
# loaded_maze = np.loadtxt(f"generated_maze_{idx}.txt", delimiter=",", dtype=int) 

Visualizing Mazes

In [None]:
# singular maze
def display_maze(maze):
    plt.figure(figsize=(10,10)) 
    plt.imshow(maze, cmap=colors.ListedColormap(['blue', 'yellow']))  
    plt.show(); 

# display_maze(loaded_maze)

In [None]:
# maze grid 
fig, ax = plt.subplots(10, 5, figsize=(20, 23))
fig.suptitle('Maze Grid')
colormap = colors.ListedColormap(['blue', 'yellow']) 

k = 0
for i in range(10):
    for j in range(5):
        ax[i][j].imshow(np.array(mazes[k]), cmap=colormap) 
        ax[i][j].set_title(f"maze {k+1}")
        k += 1

plt.show(); 

# Helper Functions

Visualization 

In [None]:
def display_maze_path(maze, path, boolean):
    for (i, j) in path:
        maze[i][j] = 2

    if boolean == False:
        plt.figure(figsize=(5, 5)) 
        plt.imshow(maze, cmap=colors.ListedColormap(['blue', 'yellow', 'red']))
        plt.show(); 
    else:
        plt.figure(figsize=(5, 5)) 
        plt.imshow(maze, cmap=colors.ListedColormap(['yellow', 'red']))
        plt.show(); 

Manhattan Distance

In [None]:
# manhattan distance between goal cell and current cell
def manhattan_dist(a, b):
    return abs(a[0] - b[0]) + abs(a[1] - b[1])  

Node class

In [None]:
class Node:
  def __init__(self, position=None, parent=None):
      self.position = position
      self.parent = parent

      self.g = 0
      self.h = 0
      self.f = 0 

  def __eq__(self, other):
      return self.position == other.position
    
  def __repr__(self):
    return f"{self.position} - g: {self.g} h: {self.h} f: {self.f}"

  # defining less than for purposes of heap queue
  def __lt__(self, other):
    # tie breaking
    if self.f == other.f:
        if self.g == other.g:
            coin = random.choice([0, 1])
            return coin == 1   
        return self.g > other.g
    return self.f < other.f 
    
  # defining greater than for purposes of heap queue
  def __gt__(self, other):
    return self.f > other.f

Traverse Path

In [None]:
def complete_path(node):
    path = []
    current = node

    while current is not None:
        path.append(current.position)
        current = current.parent

    path.reverse()
    
    return path

# Repeated A*

Basic A* Algorithm

In [None]:
def a_star(maze, start_cell, goal_cell):
    
    start = Node(start_cell, None)
    if maze[start.position[0]][start.position[1]] == 0:
        # print("You cannot start on a barrier.")
        return None, 0
    start.g = 0
    start.h = manhattan_dist(start_cell, goal_cell)
    start.f = start.g + start.h
        
    goal = Node(goal_cell, None)
    if maze[goal.position[0]][goal.position[1]] == 0:
        # print("Your goal cannot be a barrier.")
        return None, 0

    closed_list = {}
    open_list = []

    heapq.heappush(open_list, start) 

    d = [(0, -1), (0, 1), (-1, 0), (1, 0)]
    
    expanded = 0

    while len(open_list) > 0:
        current_node = heapq.heappop(open_list)

        if current_node == goal:

            return complete_path(current_node), expanded

        expanded = expanded + 1
        
        neighbors = []
        for coord in d:
            new_loc = (current_node.position[0] + coord[0], current_node.position[1] + coord[1]) 
                
            if ( 0 <= new_loc[0] < maze.shape[0]):
                if (0 <= new_loc[1] < maze.shape[1]):
                    new_node = Node(new_loc, current_node) 
                    neighbors.append(new_node)
                else:
                    continue
            else:
                continue
                
        for neighbor in neighbors:

            neighbor.g = current_node.g + 1
            neighbor.h = manhattan_dist(neighbor.position, goal.position)  
            neighbor.f = neighbor.g + neighbor.h
            
            if maze[neighbor.position[0]][neighbor.position[1]] == 1:
                if neighbor not in open_list:
                    if neighbor.position not in closed_list.keys():
                        heapq.heappush(open_list, neighbor) 
                    else:
                        temp = closed_list[neighbor.position]
                        if temp.g > neighbor.g:
                            heapq.heappush(open_list, neighbor)
                else:
                    for open_node in open_list:
                        if neighbor == open_node and neighbor.g < open_node.g:
                            if neighbor.position not in closed_list.keys():
                                heapq.heappush(open_list, neighbor) 
                            else:
                                temp = closed_list[neighbor.position]
                                if temp.g > neighbor.g:
                                    heapq.heappush(open_list, neighbor)
                        

        closed_list[current_node.position] = current_node   
            
    return None, expanded

Repeated A* Algorithm

In [None]:
def repeated_a_star(maze, start_cell, goal_cell):
    
    start = Node(start_cell, None)
    if maze[start.position[0]][start.position[1]] == 0:
        # print("You cannot start on a barrier")
        return None, 0
        
    goal = Node(goal_cell, None)
    if maze[goal.position[0]][goal.position[1]] == 0:
        # print("Your goal cannot be a barrier")
        return None, 0
        
    plan = np.ones((maze.shape[0], maze.shape[1]))
    
    d = [(0, -1), (0, 1), (-1, 0), (1, 0)]
    
    pawn = start.position
    all_paths = []
    all_maps = []
    expanded = 0
    
    test_path, new_expanded = a_star(plan, pawn, goal.position)
    all_paths.append(tuple(test_path))
    all_maps.append(np.copy(plan))
    
    expanded = expanded + new_expanded
    
    if test_path is None:
        return path, expanded
        
    path = []
    path.append(pawn)
    
    i = 1
    
    while not (pawn[0] == goal.position[0] and pawn[1] == goal.position[1]):
        
        neighbors = []
        for coord in d:
            new_loc = (pawn[0] + coord[0], pawn[1] + coord[1])
            
            if ( 0 <= new_loc[0] < maze.shape[0]):
                if (0 <= new_loc[1] < maze.shape[1]):
                    new_node = Node(new_loc, None) 
                    neighbors.append(new_node)

                else:
                    continue
            else:
                continue
                
        for neighbor in neighbors:
            
            if maze[neighbor.position[0]][neighbor.position[1]] == 0:
                plan[neighbor.position[0]][neighbor.position[1]] = 0
        
        on_deck = test_path[i]
        
        if plan[on_deck[0]][on_deck[1]] == 1:
            pawn = on_deck
            i = i + 1
            path.append(pawn)
        else:
            test_path, new_expanded = a_star(plan, pawn, goal.position)
            all_paths.append(tuple(test_path))
            all_maps.append(np.copy(plan))
            expanded = expanded + new_expanded
            
            if test_path is None:
                return None, expanded
            pawn = test_path[1]
            i = 1
            i = i + 1
            path.append(pawn)

    # for visualizing the path returned by each call to A* 
    for i in range(0, len(all_paths)):
        p = list(all_paths[i])
        maze = all_maps[i]
        if i == 0:
            display_maze_path(maze, p, True)
        else:
            display_maze_path(maze, p, False)

    return path, expanded

In [None]:
# for visualization
name = "maze_" + str(0) + ".txt"
test = np.loadtxt(name, delimiter=",", dtype=int) 
pathr, __ = repeated_a_star(test, (0, 0), (20, 20))

display_maze_path(test, pathr, False)
# display_maze(test)

# Adaptive A* Algorithm

Helper A* Algorithm

In [None]:
def update_visited(new_expanded, visited, g_goal):
    for cell in new_expanded.keys():
        if cell not in visited.keys():
            temp = new_expanded[cell]
            temp.h = g_goal - temp.g
            visited[temp.position] = temp
        else:
            node = visited[cell]
            node.h = g_goal - new_expanded[cell].g
            visited[cell] = node
    return visited

In [None]:
def adopt_a_star(maze, start_cell, goal_cell, visited): 

    start = Node(start_cell, None)
    if maze[start.position[0]][start.position[1]] == 0:
        # print("You cannot start on a barrier.")
        return None, 0, {}, 0
    
    start.g = 0
    start.h = manhattan_dist(start_cell, goal_cell)
    start.f = start.g + start.h
            
    goal = Node(goal_cell, None)
    if maze[goal.position[0]][goal.position[1]] == 0:
        # print("Your goal cannot be a barrier.")
        return None, 0, {}, 0

    closed_list = {}
    open_list = []
    new_expanded = {}

    heapq.heappush(open_list, start) 

    d = [(0, -1), (0, 1), (-1, 0), (1, 0)]
        
    expanded = 0

    while len(open_list) > 0:
        current_node = heapq.heappop(open_list)

        if current_node == goal:
            return complete_path(current_node), expanded, new_expanded, current_node.g

        new_expanded[current_node.position] = current_node
        expanded = expanded + 1 
            
        neighbors = []
        for coord in d:
            new_loc = (current_node.position[0] + coord[0], current_node.position[1] + coord[1]) 
                    
            if ( 0 <= new_loc[0] < maze.shape[0]):
                if (0 <= new_loc[1] < maze.shape[1]):
                    new_node = Node(new_loc, current_node) 
                    neighbors.append(new_node)

                else:
                    continue
            else:
                continue
                    
        for neighbor in neighbors:
            neighbor.g = current_node.g + 1 
            
            if neighbor.position in visited.keys():  
                neighbor.f = visited[neighbor.position].h + neighbor.g
                    
            else:
                neighbor.f = manhattan_dist(neighbor.position, goal.position) + neighbor.g
            
            if maze[neighbor.position[0]][neighbor.position[1]] == 1:
                if neighbor not in open_list:
                    if neighbor.position not in closed_list.keys():
                            heapq.heappush(open_list, neighbor) 
                    else:
                        temp = closed_list[neighbor.position]
                        if temp.g > neighbor.g:
                            heapq.heappush(open_list, neighbor)
                else:
                    for open_node in open_list:
                        if neighbor == open_node and neighbor.g < open_node.g:
                            if neighbor.position not in closed_list.keys():
                                heapq.heappush(open_list, neighbor) 
                            else:
                                temp = closed_list[neighbor.position]
                                if temp.g > neighbor.g: 
                                    heapq.heappush(open_list, neighbor)
                            
        closed_list[current_node.position] = current_node

    return None, expanded, new_expanded, current_node.g

Adaptive A*

In [None]:
def adaptive_a_star(maze, start_cell, goal_cell): 
    
    start = Node(start_cell, None)
    if maze[start.position[0]][start.position[1]] == 0:
        # print("You cannot start on a barrier")
        return None, 0
                
    goal = Node(goal_cell, None)
    if maze[goal.position[0]][goal.position[1]] == 0:
        # print("Your goal cannot be a barrier")
        return None, 0
                
    plan = np.ones((maze.shape[0], maze.shape[1]))
            
    d = [(0, -1), (0, 1), (-1, 0), (1, 0)]
            
    pawn = start.position
            
    expanded = 0
    visited = {} 
    all_paths = []
    all_maps = []
            
    test_path, new_expanded, new_visited, g_goal = adopt_a_star(plan, pawn, goal.position, visited)    
    visited = update_visited(new_visited, visited, g_goal)
    all_paths.append(tuple(test_path))
    all_maps.append(np.copy(plan))
    expanded = expanded + new_expanded
            
    if test_path is None:
        return None, expanded
            
    path = []
    path.append(pawn)
            
    i = 1
            
    while not (pawn[0] == goal.position[0] and pawn[1] == goal.position[1]):
                
        neighbors = []
        for coord in d:
            new_loc = (pawn[0] + coord[0], pawn[1] + coord[1])
                    
            if ( 0 <= new_loc[0] < maze.shape[0]):
                if (0 <= new_loc[1] < maze.shape[1]):
                    new_node = Node(new_loc, None) 
                    neighbors.append(new_node)

                else:
                    continue
            else:
                continue
                        
        for neighbor in neighbors:
                    
            if maze[neighbor.position[0]][neighbor.position[1]] == 0:
                plan[neighbor.position[0]][neighbor.position[1]] = 0
                
        on_deck = test_path[i]
                
        if plan[on_deck[0]][on_deck[1]] == 1:
            pawn = on_deck
            i = i + 1
            path.append(pawn)
        else:
            test_path, new_expanded, new_visited, g_goal = adopt_a_star(plan, pawn, goal.position, visited)
            all_paths.append(tuple(test_path))
            all_maps.append(np.copy(plan))
            visited = update_visited(new_visited, visited, g_goal)  
            expanded = expanded + new_expanded
            
            if test_path is None:
                return None, expanded
            
            pawn = test_path[1]
            i = 1
            i = i + 1
            path.append(pawn)
            
    # for visualizing the path returned by each call to A* 
    for i in range(0, len(all_paths)):
        p = list(all_paths[i])
        maze = all_maps[i]
        if i == 0:
            display_maze_path(maze, p, True)
        else:
            display_maze_path(maze, p, False)
    
    return path, expanded

In [None]:
# for visualization
name = "maze_" + str(0) + ".txt"
test = np.loadtxt(name, delimiter=",", dtype=int) 
patha, __ = adaptive_a_star(test, (0, 0), (20, 20))

display_maze_path(test, patha, False)
# display_maze(maze)

# Empirical Analysis

In [None]:
start = (0, 0)
goal = (20, 20)
counter = 0
k = 0

while counter < 50:
    j = 0
    candidates = generate_mazes()
    k = k + 1
    for idx in range(0, num_mazes):
        maze = candidates[idx]
        path, expanded = a_star(maze, start, goal)
        j = j + 1
        print(str(k) + ", " + str(j))
        if path is None:
              continue
        np.savetxt("maze_" + str(counter) + ".txt", candidates[idx].astype(int), fmt = '%i', delimiter = ',')
        counter = counter + 1

In [None]:
start = (0, 0)
goal = (20, 20)
i = 0
rfa_gg = np.zeros(50)
rba_gg = np.zeros(50)
aa_gg = np.zeros(50)
while i < 50:
    name = "maze_" + str(i) + ".txt"
    maze = np.loadtxt(name, delimiter=",", dtype=int)
    print("Step 1")
    pathf, expandedf = repeated_a_star(maze, start, goal)
    print("Step 2")
    pathb, expandedb = repeated_a_star(maze, goal, start)
    print("Step 3")
    patha, expandeda = adaptive_a_star(maze, start, goal)
    print("Step 4")
    rfa_gg[i] = expandedf
    rba_gg[i] = expandedb
    aa_gg[i] = expandeda
    print(i)
    i = i + 1

In [None]:
class Node:
  def __init__(self, position=None, parent=None):
      self.position = position
      self.parent = parent

      self.g = 0
      self.h = 0
      self.f = 0 

  def __eq__(self, other):
      return self.position == other.position
    
  def __repr__(self):
    return f"{self.position} - g: {self.g} h: {self.h} f: {self.f}"

  # defining less than for purposes of heap queue
  def __lt__(self, other):
    # tie breaking
    if self.f == other.f:
        if self.g == other.g:
            coin = random.choice([0,1])
            return coin == 1   
        return self.g < other.g
    return self.f < other.f 
    
  # defining greater than for purposes of heap queue
  def __gt__(self, other):
    return self.f > other.f

In [None]:
start = (0, 0)
goal = (20, 20)
i = 0
rfa_lg = np.zeros(50)
while i < 50:
    name = "maze_" + str(i) + ".txt"
    maze = np.loadtxt(name, delimiter=",", dtype=int)
    path, expanded = repeated_a_star(maze, start, goal)
    rfa_lg[i] = expanded
    i = i + 1

In [None]:
print(np.mean(rfa_gg))
print(np.mean(rfa_lg))
print(np.mean(rba_gg))
print(np.mean(aa_gg))

In [None]:
print(np.std(rfa_gg))
print(np.std(rfa_lg))
print(np.std(rba_gg))
print(np.std(aa_gg))

In [None]:
print(rfa_gg)

In [None]:
print(rfa_lg)

In [None]:
print(rba_gg)

In [None]:
print(aa_gg)

In [None]:
i = 0
counter1 = 0
counter2 = 0
counter3 = 0
while i < 50:
    if rfa_gg[i] < rfa_lg[i]:
        counter1 = counter1 + 1
    if rfa_gg[i] < rba_gg[i]:
        counter2 = counter2 + 1
    if rfa_gg[i] < aa_gg[i]:
        counter3 = counter3 + 1
    i = i + 1
print(counter1)
print(counter2)
print(counter3)