In [None]:
import sys
import os
sys.path.append(os.path.dirname(os.getcwd()))

from MazeGenerationAlgorithms.HuntAndKill import HuntAndKillMaze
from MazeGenerationAlgorithms.DepthFirstPrim import DepthFirstPrimMaze
from MazeGenerationAlgorithms.RandomizedKruskal import RandomizedKruskalMaze
from MazeGenerationAlgorithms.Sidewinder import SidewinderMaze
from MazeGenerationAlgorithms.StochasticPrim import StochasticPrimMaze
from MazeGenerationAlgorithms.Wilson import WilsonMaze
from MazeGenerationAlgorithms.RandomizedPrim import RandomizedPrimMaze
from MazeGenerationAlgorithms.DFS import DFSMaze
from MazeGenerationAlgorithms.RecursiveBacktracker import RecursiveBacktrackerMaze
from MazeGenerationAlgorithms.AldousBroder import AldousBroderMaze

import numpy as np
import scipy.special as sp
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import copy
from collections import namedtuple, deque
import random
import pandas as pd
from IPython.display import clear_output

In [None]:
Transition = namedtuple('Memory',
                        field_names=['state', 'action', 'next_state', 'reward', 'is_game_on'])

In [None]:
class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = deque(maxlen=capacity)

    def __len__(self):
        return len(self.memory)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size, device='mps'):
        transitions = random.sample(self.memory, batch_size)
        states, actions, next_states, rewards, isgameon = zip(*transitions)
        
        states = np.array(states)
        actions = np.array(actions)
        next_states = np.array(next_states)
        rewards = np.array(rewards)
        isgameon = np.array(isgameon)
        
        return (torch.tensor(states, dtype=torch.float, device=device),
                torch.tensor(actions, dtype=torch.long, device=device),
                torch.tensor(next_states, dtype=torch.float, device=device),
                torch.tensor(rewards, dtype=torch.float, device=device),
                torch.tensor(isgameon, dtype=torch.bool, device=device))


In [None]:
class DeepQNetwork(nn.Module):
    def __init__(self, Ni, Nh1, Nh2, No=4):  
        super().__init__()
        self.fc1 = nn.Linear(Ni, Nh1)
        self.fc2 = nn.Linear(Nh1, Nh2)
        self.fc3 = nn.Linear(Nh2, No)
        self.act = nn.SiLU()
        self.weights = 0  

    def forward(self, x):
        x = self.act(self.fc1(x))
        x = self.act(self.fc2(x))
        x = self.fc3(x)
        return x


In [None]:
def Qloss(batch, net, gamma):
    states, actions, next_states, rewards, _ = batch
    lbatch = len(states)
    state_action_values = net(states.view(lbatch, -1))
    state_action_values = state_action_values.gather(1, actions.unsqueeze(-1)).squeeze(-1)
    next_state_values = net(next_states.view(lbatch, -1)).max(1)[0].detach()
    target = rewards + gamma * next_state_values 
    return F.smooth_l1_loss(state_action_values, target)

In [None]:
class MazeEnvironment:    
    def __init__(self, maze):
        self.graph_maze = maze.to_graph()
        self.maze = maze.to_grid()
        self.maze_size = self.maze.size
        x = len(self.maze)
        y = len(self.maze)
        self.boundary = np.asarray([x, y])
        self.init_position = np.array([1, 1])
        self.current_position = self.init_position.copy()
        self.goal = (x-2, y-2)
        self.visited = {tuple(self.current_position)}
        self.allowed_states = np.argwhere(self.maze == 0).tolist()
        distances = np.linalg.norm(np.array(self.allowed_states) - self.goal, axis=1)
        goal_idx = np.where(distances == 0)[0][0]
        self.allowed_states.pop(goal_idx)
        self.distances = np.delete(distances, goal_idx)
        self.action_map = {0: [0, 1], 1: [0, -1], 2: [1, 0], 3: [-1, 0]}
        self.directions = {0: '→', 1: '←', 2: '↓', 3: '↑'}
        self.dead_end_rewards = self.calculate_dead_end_rewards()

    def reset_policy(self, eps, reg=7):
        scaling_factor = reg * (1 - eps ** (2 / reg)) ** (reg / 2)
        return sp.softmax(-self.distances / scaling_factor).squeeze()

    def reset(self, epsilon, prand=0):
        if np.random.rand() < prand:
            idx = np.random.choice(len(self.allowed_states))
        else:
            idx = np.random.choice(len(self.allowed_states), p=self.reset_policy(epsilon))
        self.current_position = np.array(self.allowed_states[idx])
        self.visited = {tuple(self.current_position)}
        return self.state()
    
    def state_update(self, action):
        isgameon = True
        reward = -0.05
        next_position = self.current_position + self.action_map[action]
        if np.array_equal(self.current_position, self.goal):
            return self.state(), -sum(self.dead_end_rewards.values()) if self.dead_end_rewards else 1 , False 
        if tuple(self.current_position) in self.visited:
            reward = -0.2
        if tuple(self.current_position) in self.dead_end_rewards:
            reward += self.dead_end_rewards[tuple(self.current_position)]
        if self.is_valid_state(next_position):
            self.current_position = next_position
        else:
            reward = -1
        self.visited.add(tuple(self.current_position))
        return self.state(), reward, isgameon

    def state(self):
        state = copy.deepcopy(self.maze)
        state[tuple(self.current_position)] = 2
        return state
        
    def is_valid_state(self, position):
        return not (self.is_out_of_bounds(position) or self.is_wall(position))

    def is_out_of_bounds(self, position):
        return np.any(position < 0) or np.any(position >= self.boundary)

    def is_wall(self, position):
        return self.maze[tuple(position)] == 1
        
    def find_dead_ends(self):
        start_in_graph = tuple([(self.init_position[0] - 1) / 2, (self.init_position[1] - 1) / 2])
        goal_in_graph = tuple([(self.goal[0] - 1) / 2, (self.goal[1] - 1) / 2])
        dead_ends = {node for node, degree in self.graph_maze.degree() if degree == 1 and node not in {goal_in_graph, start_in_graph}}
        dead_end_positions = {(2 * y + 1, 2 * x + 1) for x, y in dead_ends}
        return dead_end_positions
    
    def dead_end_L1_distance(self):
        dead_ends = self.find_dead_ends()
        distances = {}
        for dead_end in dead_ends:            
            distance = abs(dead_end[0] - self.goal[0]) + abs(dead_end[1] - self.goal[1])
            distances[dead_end] = distance  
        return distances
    
    def calculate_dead_end_rewards(self):
        dead_end_distances = self.dead_end_L1_distance()
        if not dead_end_distances:
            return {}
        max_dist = max(dead_end_distances.values())
        min_dist = min(dead_end_distances.values())
        if max_dist == min_dist:
            return {dead_end: -1 for dead_end in dead_end_distances}
        dead_end_rewards = {}
        for dead_end, dist in dead_end_distances.items():
            normalized_reward = -0.4 - (0.6 * (dist - min_dist) / (max_dist - min_dist))
            dead_end_rewards[dead_end] = normalized_reward
        return dead_end_rewards  

In [None]:
class Agent:
    def __init__(self, env, maze, memory_buffer, alpha=1e-6, use_softmax=True):
        self.env = env
        self.maze = maze
        self.buffer = memory_buffer
        self.alpha = alpha
        self.num_act = 4
        self.use_softmax = use_softmax
        self.min_reward = -self.env.maze.size // 3
        self.total_reward = 0
        self.isgameon = True
        self.rwd = 0

    def make_a_move(self, net, epsilon, device='mps'):
        current_state = self.env.state()
        action = self.select_action(net, epsilon, device)
        next_state, reward, self.isgameon = self.env.state_update(action)
        self.total_reward += reward
        self.rwd = reward
        if self.total_reward < self.min_reward:
            self.isgameon = False
        if not self.isgameon:
            self.total_reward = 0
        self.buffer.append(Transition(current_state, action, next_state, reward, self.isgameon))

    def select_action(self, net, epsilon, device='mps'):
        states = np.array(self.env.state())  
        state_tensor = torch.tensor(states, dtype=torch.float32, device=device).view(1, -1)
        qvalues = net(state_tensor).cpu().detach().numpy().squeeze()
        if self.use_softmax:
            p = sp.softmax(qvalues / max(epsilon, 1e-8))
            action = np.random.choice(self.num_act, p=p)
        else:
            if random.random() < epsilon:
                action = random.randint(0, self.num_act - 1)
            else:
                action = np.argmax(qvalues)
        probs = np.full(self.num_act, self.alpha)
        probs[action] = 1 - self.alpha * (self.num_act - 1)
        return np.random.choice(self.num_act, p=probs)

In [None]:
def train(agent, net, target, optimizer, num_epochs, cutoff, batch_size, 
          buffer_start_size, gamma, exploration_rate=-1, device="mps"):
    epsilon = np.exp(-np.arange(num_epochs) / cutoff)
    cutoff_idx = 100 * int(num_epochs / cutoff)
    threshold = epsilon[cutoff_idx]
    epsilon = np.minimum(epsilon, threshold)

    converged_epoch = None
    last_losses = deque(maxlen=20)
    last_rewards = deque(maxlen=20)

    for epoch in range(num_epochs):
        r_sum, loss, counter = 0, 0, 0
        eps = epsilon[epoch] if exploration_rate == -1 else exploration_rate

        agent.isgameon = True
        _ = agent.env.reset(eps, prand=0.1)

        while agent.isgameon:
            agent.make_a_move(net, eps)
            counter += 1
            r_sum += agent.rwd

            if len(agent.buffer) < buffer_start_size:
                continue

            optimizer.zero_grad()
            batch = agent.buffer.sample(batch_size, device=device)
            loss_t = Qloss(batch, net, gamma=gamma)
            loss_t.backward()
            torch.nn.utils.clip_grad_norm_(net.parameters(), max_norm=1.0)
            optimizer.step()
            loss += loss_t.item()

        if epoch % 5 == 0:
            target.load_state_dict(net.state_dict())

        last_losses.append(loss)
        last_rewards.append(r_sum)

        avg_last_losses = np.mean(last_losses) if last_losses else 0

        if avg_last_losses <= 0.005 and epoch > buffer_start_size and all(r > 0 for r in last_rewards):
            converged_epoch = epoch
            break

    return converged_epoch

In [None]:
width, height = 15, 15
DEVICE = "mps"

NUM_EPOCHS = 10000
CUTOFF = NUM_EPOCHS // 3
BUFFER_CAPACITY = NUM_EPOCHS
BUFFER_START_SIZE = NUM_EPOCHS // 20
BATCH_SIZE = 24
LEARNING_RATE = 1e-4
GAMMA = 0.9
NUM_EXPERIMENTS = 10

maze_types = {
    'Sidewinder': SidewinderMaze,         
    'Randomized Prim\'s': RandomizedPrimMaze,  
    'Depth First Prim\'s': DepthFirstPrimMaze,  
    'Stochastic Prim\'s': StochasticPrimMaze, 
    'Randomized Kruskal\'s': RandomizedKruskalMaze,  
    'Wilson\'s': WilsonMaze,                
    'Aldous Broder': AldousBroderMaze,     
    'Hunt and Kill': HuntAndKillMaze,      
    'Randomized DFS': DFSMaze,             
    'Recursive Backtracker': RecursiveBacktrackerMaze,  
}

results = []

for maze_name, MazeClass in maze_types.items():
    print(f"\n=== Running experiments for {maze_name} ===")
    converged_epochs = []
    deadend_counts = []  
    maze_sizes = []  
    avg_manhattan_distances = []
    avg_deadend_rewards = []
    solution_rewards = []

    for i in range(NUM_EXPERIMENTS):
        print(f"\n[{maze_name}] Experiment {i + 1}/{NUM_EXPERIMENTS}")

        maze = MazeClass(width, height)
        env = MazeEnvironment(maze)
        input_size = env.maze_size

        net = DeepQNetwork(input_size, input_size, input_size, 4).to(DEVICE)
        target = DeepQNetwork(input_size, input_size, input_size, 4).to(DEVICE)
        target.load_state_dict(net.state_dict())

        memory = ReplayMemory(BUFFER_CAPACITY)
        agent = Agent(env, maze, memory)
        optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)

        converged_epoch = train(
            agent, net, target, optimizer,
            num_epochs=NUM_EPOCHS,
            cutoff=CUTOFF,
            batch_size=BATCH_SIZE,
            buffer_start_size=BUFFER_START_SIZE,
            gamma=GAMMA,
            device=DEVICE
        )

        deadends = [n for n, d in maze.maze.degree() if d == 1 and n not in [maze.start, maze.end]]
        deadend_counts.append(len(deadends))

        manhattan_distances = [
            abs(dead_end[0] - maze.end[0]) + abs(dead_end[1] - maze.end[1]) for dead_end in deadends
        ]
        avg_manhattan_distance = np.mean(manhattan_distances) if manhattan_distances else 0
        avg_manhattan_distances.append(avg_manhattan_distance)

        max_dist = max(manhattan_distances) if manhattan_distances else 0
        min_dist = min(manhattan_distances) if manhattan_distances else 0

        if max_dist == min_dist:
            deadend_rewards = {dead_end: -1 for dead_end in deadends}
        else:
            deadend_rewards = {
                dead_end: -0.4 - (0.6 * (dist - min_dist) / (max_dist - min_dist))
                for dead_end, dist in zip(deadends, manhattan_distances)
            }

        avg_deadend_reward = np.mean(list(deadend_rewards.values())) if deadend_rewards else 0
        avg_deadend_rewards.append(avg_deadend_reward)
        
        solution_reward = -np.sum(list(deadend_rewards.values())) if deadend_rewards else 1
        solution_rewards.append(solution_reward)

        maze_size = f"{maze.width}x{maze.height}"
        maze_sizes.append(maze_size)

        converged_epochs.append(converged_epoch)

        clear_output(wait=True)

    for i in range(NUM_EXPERIMENTS):
        results.append({
            'Maze Type': maze_name,
            'Size': maze_sizes[i],
            '# of Deadends': deadend_counts[i],
            'Avg Manhattan Distance from Deadends to Solution': f"{avg_manhattan_distances[i]:.3f}",
            'Avg Deadend Reward': f"{avg_deadend_rewards[i]:.3f}",
            'Solution Reward': f"{solution_rewards[i]:.3f}",
            'Converged Epoch': converged_epochs[i]
        })

df = pd.DataFrame(results)

df

In [None]:
df.to_csv("DQN_experiment_results.csv", index=False)