###Init

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import random
import numpy as np
import matplotlib.pyplot as plt

from itertools import chain
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import random
import numpy as np

# Might Remove this, who knows? not me!
import torch.optim as optim
from collections import deque

from dataclasses import dataclass, field
from torch.utils.data import Dataset, DataLoader

###Helpers

In [None]:
def load_maze_from_txt(path):
    with open(path, 'r') as f:
        lines = f.readlines()

    # First line contains dimensions (optional if you want to use it)
    rows, cols = map(int, lines[0].strip().split())

    # The rest of the lines contain the maze grid
    maze = []
    for line in lines[1:]:
        row = list(map(int, line.strip().split()))
        maze.append(row)

    return maze, rows, cols

In [None]:
def test_agent_dqn(agent, config):
    maze, rows, cols = load_maze_from_txt(f'maze_{config.maze_size}.txt')
    env = MazeEnv(maze_matrix=maze, max_steps=config.max_steps, move_prob=config.move_prob, rewards=config.rewards)
    state = env.reset()
    done = False
    total_reward = 0

    print("\nInitial maze state:")
    env.render()
    epsilon = 1.0
    epsilon_decay = 0.99995
    reached_goal = False
    while not done:
      action, epsilon = agent.get_action(state, epsilon, epsilon_decay)
      next_state, reward, done, reached_goal = env.step(action)
      state = next_state
      total_reward += reward
    env.render()
    print(f"Final maze state (Total reward: {total_reward:.2f})")
    print(f"Reached Goal: {reached_goal}")

In [None]:
def plot_rewards(reward_history, maze_size, agent_type="dqn"):
    reward_history = list(map(lambda x: x[0], reward_history))
    episodes = len(reward_history)
    window = max(5, episodes // 4)  # 5% of total episodes, but at least 5

    moving_avg = np.convolve(reward_history, np.ones(window)/window, mode='valid')

    plt.figure(figsize=(10, 5))
    plt.plot(reward_history, alpha=0.4, label="Episode Reward")
    plt.plot(range(window - 1, episodes), moving_avg, label=f"Moving Avg (window={window})", linewidth=2)

    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.title(f"Reward Progression on Maze {maze_size} ({agent_type.upper()})")
    plt.legend()
    plt.grid(True)

    plt.show()

In [None]:
def evaluate_agent_from_tuples(episode_rewards, maze_size):
    reward_values = list(map(lambda x: x[0], episode_rewards))
    success_flags = list(map(lambda x: x[1], episode_rewards))

    reward_values = np.array(reward_values)
    success_flags = np.array(success_flags)

    stats = {
        'episodes': len(reward_values),
        'mean_reward': np.mean(reward_values),
        'median_reward': np.median(reward_values),
        'min_reward': np.min(reward_values),
        'max_reward': np.max(reward_values),
        'std_reward': np.std(reward_values),
        'success_rate': np.mean(success_flags) * 100  # percent
    }

    print(f"Maze Size {maze_size} Agent Evaluation Stats:")
    for k, v in stats.items():
        print(f"{k}: {v:.2f}" if isinstance(v, float) else f"{k}: {v}")


###Maze Environment




In [None]:
class MazeEnv:
    def __init__(self, maze_matrix, max_steps=300, move_prob=0.99, rewards=[1,-75,-5,1000,1,-.5,-2]): # Config for maze 11
        self.original_maze = np.array(maze_matrix)
        self.maze = self.original_maze.copy()
        self.max_steps = max_steps
        self.move_prob = move_prob
        self.agent_pos = None
        self.goal_pos = tuple(map(int, np.argwhere(self.maze == 4)[0]))  # 4 = goal
        self.start_pos = tuple(map(int, np.argwhere(self.maze == 3)[0])) # 3 = start
        self.visit_counts = np.zeros_like(self.maze)
        self.reset()
        self.rewards = rewards
        self.prev_dist = self.manhattan_dist(self.agent_pos, self.goal_pos)

    def reset(self):
        self.maze = self.original_maze.copy()
        self.agent_pos = self.start_pos
        self.steps = 0
        self.prev_dist = self.manhattan_dist(self.agent_pos, self.goal_pos)
        self.visit_counts = np.zeros_like(self.maze)
        return self.get_state()

    def get_state(self):
        # Option: flattened maze + one-hot agent position
        flat_maze = self.maze.flatten()
        agent_pos_vec = np.zeros(flat_maze.shape)
        idx = self.agent_pos[0] * self.maze.shape[1] + self.agent_pos[1]
        agent_pos_vec[idx] = 1
        return np.concatenate([flat_maze, agent_pos_vec])

    def is_valid(self, pos):
        x, y = pos
        return (0 <= x < self.maze.shape[0]) and (0 <= y < self.maze.shape[1]) and (self.maze[x, y] != 1)

    def manhattan_dist(self, pos1, pos2):
        return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])

    def step(self, action):
        self.steps += 1

        move_map = {
            0: (-1, 0),   # up
            1: (1, 0),    # down
            2: (0, -1),   # left
            3: (0, 1),    # right
        }

        # Stochasticity
        if random.random() > self.move_prob:
            action = random.choice([0, 1, 2, 3])  # Random move

        dx, dy = move_map[action]
        new_pos = (self.agent_pos[0] + dx, self.agent_pos[1] + dy)

        hit_the_wall = True
        if self.is_valid(new_pos):
            hit_the_wall = False
            self.agent_pos = new_pos  # move

        reward = self.rewards[0]
        if hit_the_wall:
            reward = self.rewards[0]*self.rewards[2]  # default cost for hitting wall
        else:
            reward = self.rewards[0]  # default step cost
            # Directional shaping reward
            curr_dist = self.manhattan_dist(self.agent_pos, self.goal_pos)
            if curr_dist < self.prev_dist:
                reward += self.rewards[4]  # small bonus for progress
            elif curr_dist > self.prev_dist:
                reward += self.rewards[5]  # optional: small penalty for going wrong direction
            self.prev_dist = curr_dist

            # Visitation penalty
            self.visit_counts[self.agent_pos] += 1
            if self.visit_counts[self.agent_pos] > 3:
                reward += self.rewards[6]  # discourage revisiting same cell too often

        # reward = self.rewards[0]
        done = False
        reached_goal = False

        cell_value = self.maze[self.agent_pos]
        if cell_value == 2:  # trap
            reward = self.rewards[2]
            done = True
        elif self.agent_pos == self.goal_pos: # Reached goal
            reward = self.rewards[3]
            reached_goal = True
            done = True
        elif self.steps >= self.max_steps:
            reward = self.rewards[1]  # Penalty for timeout
            done = True

        return self.get_state(), reward, done, reached_goal

    def render(self):
        display = self.maze.copy().astype(str)
        display[display == '0'] = '.'  # Free
        display[display == '1'] = '#'  # Wall
        display[display == '2'] = 'X'  # Trap
        display[display == '3'] = 'S'  # Start
        display[display == '4'] = 'G'  # Goal
        display[display == '9'] = 'P'  # Padding
        x, y = self.agent_pos
        display[x, y] = 'A'
        print('\n'.join(' '.join(row) for row in display))
        print()



##DQN

###DQN Agent

In [None]:
# Q-Value Network
class QNetwork(nn.Module):
  def __init__(self, state_size, action_size, hidden_size):
    super().__init__()
    self.net = nn.Sequential(nn.Linear(state_size, hidden_size),
                             nn.ReLU(),
                             nn.Linear(hidden_size, hidden_size),
                             nn.ReLU(),
                             nn.Linear(hidden_size, hidden_size),
                             nn.ReLU(),
                             nn.Linear(hidden_size, action_size))
    self.action_size = action_size

  def forward(self, x):
    return self.net(x)

In [None]:
class DQNAgent:
    def __init__(self, state_size, action_size=5, hidden_size=128,
                 buffer_size=10000, batch_size=64, gamma=0.99, lr=1e-3,
                 target_update = 100,  learn_frequency = 2,
                 start_training = 1000):

        self.gamma = gamma
        self.batch_size = batch_size
        self.buffer_size = buffer_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Q-Networks
        self.qnetwork_local = QNetwork(state_size, action_size, hidden_size).cuda()
        self.qnetwork_target = QNetwork(state_size, action_size, hidden_size).cuda()
        self.qnetwork_target.load_state_dict(self.qnetwork_local.state_dict())

        # Initialize optimizer with gradient clipping
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=lr)

        # Replay buffer - use numpy arrays for efficiency
        self.memory = deque(maxlen=buffer_size)
        self.global_step = 0
        self.target_update = target_update
        self.start_training = start_training
        self.learn_frequency = learn_frequency


    def get_action(self, state, epsilon, epsilon_decay):
        """Returns action based on epsilon-greedy policy"""
        if random.random() < epsilon:
          # perform random action
          action = random.randint(0,self.qnetwork_local.action_size-1)
        else:
          # perform greedy action
          with torch.no_grad():
            state = torch.from_numpy(state).float().to(self.device)
            action = self.qnetwork_local(state).argmax().item()

        return action, epsilon * epsilon_decay

    def step(self, state, action, reward, next_state, done):
        # Save experience to replay memory
        self.memory.append((state, action, reward, next_state, done))
        self.global_step += 1

        # Learn if enough samples are available
        if self.global_step > self.start_training and self.global_step % self.learn_frequency == 0:
            experiences = random.sample(self.memory, self.batch_size)
            self.learn(experiences)

    def learn(self, experiences):
        """Update value parameters using batch of experience tuples"""
        states, actions, rewards, next_states, dones = zip(*experiences)

        states = torch.from_numpy(np.concatenate(states, axis=0)).float().cuda().view(self.batch_size, -1)
        actions = torch.tensor(actions, dtype=torch.long, device=self.device)
        next_states = torch.from_numpy(np.concatenate(next_states, axis=0)).float().cuda().view(self.batch_size, -1)
        rewards = torch.tensor(rewards, dtype=torch.float, device=self.device)
        dones = torch.tensor(dones, dtype=torch.float, device=self.device)

        # Get network's q value predictions for the chosen states given the selected actions
        Q_expected = self.qnetwork_local(states)
        Q_sa = Q_expected[torch.arange(states.size(0)), actions]

        # Get next state's highest Q values (using target network)
        with torch.no_grad():
          Q_sa_next = self.qnetwork_target(next_states).max(1)[0]

        # Compute loss
        loss = F.mse_loss(Q_sa, rewards + (self.gamma*Q_sa_next*(1-dones)))

        # Minimize the loss
        loss.backward()
        self.optimizer.step()
        self.optimizer.zero_grad()

        if self.global_step % self.target_update == 0:
          self.qnetwork_target.load_state_dict(self.qnetwork_local.state_dict())


###Train

In [None]:
def train_dqn(config):
    print(f"\nTraining DQN on Maze {config.maze_size}")
    agent = DQNAgent(
        state_size = config.state_size,
        action_size=config.action_size,
        hidden_size=config.hidden_size,
        buffer_size=config.buffer_size,
        batch_size=config.batch_size,
        gamma=config.gamma,
        lr=config.lr,
        target_update = config.target_update,
        learn_frequency = config.learn_frequency,
        start_training = config.start_training)

    env = config.env
    epsilon = config.epsilon
    epsilon_decay = config.epsilon_decay
    rewards = []

    loop = tqdm(total=config.episodes, position=0, leave=False)
    for episode in range(config.episodes):
        state = env.reset()
        done, total_reward, reached_goal = False, 0, False

        while not done and total_reward < 53:
            action, epsilon = agent.get_action(state, epsilon, epsilon_decay)
            next_state, reward, done, reached_goal = env.step(action)
            agent.step(state, action, reward, next_state, done)

            total_reward += reward
            state = next_state

        epsilon *= config.epsilon_decay
        rewards.append((total_reward, reached_goal))

        loop.set_description(f"Ep: {episode} R: {total_reward:.2f} Îµ: {epsilon:.3f}")
        loop.update(1)
    loop.close()
    return agent, rewards


###Run and Test

In [None]:
class DQNConfig:
    def __init__(self, maze_size, env):
        self.maze_size = maze_size
        self.state_size = 2 * (maze_size ** 2)  # Maze matrix + agent position
        self.action_size = 5
        self.hidden_size = 128
        self.env = env

        # Training parameters
        self.episodes = 1000
        self.gamma = 0.99
        self.lr = 1e-3
        self.batch_size = 64
        self.buffer_size = 10000
        self.target_update = 75
        self.learn_frequency = 2
        self.start_training = 200
        self.epsilon = 1.0
        self.epsilon_decay = 0.999995
        self.max_steps = maze_size ** 2
        self.move_prob = 0.99

mazes_data = [
    {"size":5,  "max_steps": 75,  "rewards":  [1,-10,-5,300,.1,-.1,-5]},  # 0: move, 1: timeout, 2: hit wall, 3: goal, 4: bonus for progress 5: going wrong direction, 6: repeating cell
    # {"size":7,  "max_steps": 100, "rewards":  [1,-20,-5,500,.1,-.1,-5]},
    # {"size":9,  "max_steps": 200, "rewards":  [1,-35,-5,750,.1,-.1,-5]},
]

In [None]:
env = get_env_mazes(mazes_data)[0][1]
config_DQN = DQNConfig(mazes_data[0]["size"], env)
agent, reward_history = train_dqn(config_DQN)
plot_rewards(reward_history, mazes_data[0]["size"])
env.play_agent(agent)


Training DQN on 7x7 maze...


                                                                               


Initial maze state:
# # # # # # #
# A . . # # #
# # # . . # #
# . # # . . #
# . . # . # #
# # . . . G #
# # # # # # #

# # # # # # #
# S . . # # #
# # # . . # #
# . # # . A #
# . . # . # #
# # . . . G #
# # # # # # #

Final maze state (Total reward: -53):


