# Environement set up details

```
pipx install uv
uv venv
uv pip install mazelib ipykernal ipython matplotlib pandas openpyxl
```


In [31]:
import time
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
from typing import Tuple, List, Dict, Optional
from dataclasses import dataclass
from mazelib import Maze
from mazelib.generate.Sidewinder import Sidewinder
from mazelib.solve.BacktrackingSolver import BacktrackingSolver
from pprint import pprint
import openpyxl


# Configuration

In [32]:
@dataclass
class Config:
    """Configuration parameters for maze navigation system"""
    # Maze parameters
    maze_size: int = 5
    maze_id: Optional[int] = None  # None for random generation, int for specific seed
    
    # Algorithm parameters 
    learning_rate: float = 0.1  # step size α ∈ (0, 1]
    discount_factor: float = 0.9  # γ
    epsilon: float = 0.1  # small ε > 0
    num_episodes: int = 100

    # Reward structure
    goal_reward: float = 100
    wall_penalty: float = -10
    step_penalty: float = -1

# Agent, Environment Control classes

Go to the execution section to configure, train, and test.

In [33]:
class MazeEnvironment:
    """Handles maze generation, state management and visualization"""
    
    def __init__(self, config: Config):
        self.config = config
        self.grid = None
        self.start = None
        self.end = None
        self._maze = None
        self.seed = None
        self.optimal_path = None
        self.optimal_path_length = None
        self.generate()

        
    def generate(self) -> None:
        """Creates new maze using Sidewinder algorithm"""
        # Use config maze_id if provided, otherwise generate random seed
        self.seed = self.config.maze_id if self.config.maze_id is not None else np.random.randint(1, 1000)
   
        self._maze = Maze(self.seed)
        self._maze.generator = Sidewinder(self.config.maze_size, self.config.maze_size)
        self._maze.generate()
        self._maze.generate_entrances()
        
        self.grid = self._maze.grid

        # Set the start to the first valid cell inside grid
        self.start = (1, 1)  
        # Set the end to the last valid cell inside grid
        self.end = (self.grid.shape[0]-2, self.grid.shape[1]-2)

        # After maze generation, calculate optimal path
        self._calculate_optimal_path()

    def _calculate_optimal_path(self) -> None:
        """Calculate optimal path using maze's solver"""
        # Set up solver
        self._maze.solver = BacktrackingSolver()
        self._maze.start = self.start
        self._maze.end = self.end
        
        # Solve
        self._maze.solve()
        
        if self._maze.solutions:
            self.optimal_path = self._maze.solutions[0]  # Store first solution
            self.optimal_path_length = len(self.optimal_path) + 1
        else:
            # Handle case where no solution is found
            self.optimal_path = None
            self.optimal_path_length = None
    
    def get_minimum_steps(self) -> Optional[int]:
        """Returns the length of optimal path if it exists"""
        return self.optimal_path_length
        
    def get_state(self, position: Tuple[int, int]) -> Tuple[int, int]:
        """Returns current state representation"""
        # There is no uncertainity in this environment
        # uncertainty could be added here to mimic a dirty sensor
        # or to mimic external factors like wind
        # this would be for a different problem (partial observability) 
        return position
        
    def get_reward(self, state: Tuple[int, int], next_state: Tuple[int, int]) -> float:
        """Calculates reward for a state transition"""
        if not self.is_valid_move(next_state):
            return self.config.wall_penalty
        elif next_state == self.end:
            return self.config.goal_reward
        return self.config.step_penalty
        
    def is_valid_move(self, state: Tuple[int, int]) -> bool:
        """Checks if move is legal"""
        row, col = state
        # check if the move goes outside of the grid or into a wall (1)
        if (row < 0 or row >= self.grid.shape[0] or 
            col < 0 or col >= self.grid.shape[1] or 
            self.grid[state] == 1):
            return False
        # move is valid
        return True
        
    def visualize(self, path: Optional[List[Tuple[int, int]]] = None, show_optimal: bool = False) -> None:
        """Displays maze with optional path and optimal path"""
        plt.figure(figsize=(5, 5))
        # Add title showing Maze ID and minimum steps
        title = f"Maze #{self.seed} - Min Steps: {self.get_minimum_steps()}"
        plt.title(title)
        plt.imshow(self.grid, cmap='binary')
        
        # Plots the start (S) and goal (G)
        plt.text(self.start[1], self.start[0], 'S', 
                ha='center', va='center', color='red', fontsize=20)
        plt.text(self.end[1], self.end[0], 'G', 
                ha='center', va='center', color='green', fontsize=20)
        
        # Plot optimal path if requested
        if show_optimal and self.optimal_path:
            for pos in self.optimal_path:
                plt.text(pos[1], pos[0], "O", 
                        ha='center', va='center', color='green', fontsize=15)
        
        # Plot current path
        if path:
            for pos in path:
                plt.text(pos[1], pos[0], "#", 
                        ha='center', va='center', color='blue', fontsize=20)
        
        plt.xticks([])
        plt.yticks([])
        plt.show()

In [34]:

class QLearningAgent:
    """Implements Q-learning algorithm"""
    
    def __init__(self, env: MazeEnvironment, config: Config):
        self.env = env
        self.config = config
        # Actions the agent can take: Up, Down, Left, Right. Each action is represented as a tuple of two values: (row_change, column_change)
        self.actions = [
            (-1, 0), # Up: Moving one step up, reducing the row index by 1
            (1, 0),  # Down: Moving on step down, increasing the row index by 1
            (0, -1), # Left: Moving one step to the left, reducing the column index by 1
            (0, 1)   # Right: Moving one step to the right, increasing the column index by 1 
        ]
        maze_height, maze_width = env.grid.shape
        self.q_table = np.zeros((maze_height, maze_width, 4))
        self.exploration_rate = config.epsilon
        
    def get_action(self, state: Tuple[int, int], training: bool = True) -> int:
        """Selects action using ε-greedy policy"""
        # When training, Choose A from S using policy derived from Q (e.g., ε-greedy)
        if training and np.random.rand() < (1- self.exploration_rate):
            # explore
            return np.random.randint(4)
        # exploit
        return np.argmax(self.q_table[state])
        
    def update(self, state: Tuple[int, int], action: int, 
              reward: float, next_state: Tuple[int, int]) -> None:
        """Updates Q-value for state-action pair"""
        # max_a Q(S', a)
        best_next_action = np.argmax(self.q_table[next_state])
        # Q(S, A)
        current_q = self.q_table[state][action]
        # Q(S', a)
        next_q = self.q_table[next_state][best_next_action]
        # Q(S, A) ← Q(S, A) + α [R + γ max_a Q(S', a) - Q(S, A)]
        new_q = current_q + self.config.learning_rate * (
            reward + self.config.discount_factor * next_q - current_q)
        # update the q_table
        self.q_table[state][action] = new_q
        


In [35]:
class AgentControl:
    """Manages training, testing and metrics"""
    
    def __init__(self, env: MazeEnvironment, agent: QLearningAgent, config: Config):
        self.env = env
        self.agent = agent
        self.config = config
        self.metrics = {
            'rewards': [],
            'steps': [],
            'success_rate': [],
            'episode_status': [],  # Track if episode reached goal
            'training_start_time': None,
            'training_duration': None,
            'path_lengths': [],    # Track path length per episode
            'final_qtable': None,  # Store final Q-table state
            'policy_stability': [], # Track changes in policy
            'reached_goal_test': [], # Track if the episode test reached the goal
            'steps_test': [], # Track the episode test step count
            'path_optimality_test': [], # Track the episode test path optimality
        }
        
    def calculate_policy_stability(self) -> float:
        """Measures policy stability by comparing action choices across states"""
        current_policy = {state: np.argmax(self.agent.q_table[state]) 
                         for state in np.ndindex(self.env.grid.shape)}
        if not hasattr(self, '_last_policy'):
            self._last_policy = current_policy
            return 0.0
        
        matches = sum(1 for s in current_policy 
                     if current_policy[s] == self._last_policy[s])
        stability = matches / len(current_policy)
        self._last_policy = current_policy
        return stability
        
    def run_episode(self, training: bool = True) -> Tuple[float, int, List[Tuple[int, int]], bool]:
        """Runs single episode with enhanced metrics"""
        current_state = self.env.start
        episode_reward = 0
        steps = 0
        path = [current_state]
        max_steps = self.env.grid.shape[0] * self.env.grid.shape[1] * 10
        reached_goal = False
        
        while steps < max_steps:
            action = self.agent.get_action(current_state, training)
            next_state = (
                current_state[0] + self.agent.actions[action][0],
                current_state[1] + self.agent.actions[action][1]
            )
            
            if not self.env.is_valid_move(next_state):
                next_state = current_state

            reward = self.env.get_reward(current_state, next_state)
            
            if training:
                self.agent.update(current_state, action, reward, next_state)
                
            episode_reward += reward
            steps += 1
            path.append(next_state)
            
            if next_state == self.env.end:
                reached_goal = True
                break
                
            current_state = next_state
            
        return episode_reward, steps, path, reached_goal
        
    def train(self, save_path: Optional[str] = None) -> None:
        """Runs training loop with enhanced metrics"""
        plt.ion()
        fig, (ax_reward, ax_steps, ax_stability, ax_optimality) = plt.subplots(1, 4, figsize=(15, 5))
        
        window_size = 20
        moving_rewards = []
        moving_steps = []
        
        self.metrics['training_start_time'] = time.time()
        
        for episode in range(self.config.num_episodes):
            reward, steps, path, reached_goal = self.run_episode(training=True)
            
            # Update metrics
            self.metrics['rewards'].append(reward)
            self.metrics['steps'].append(steps)
            self.metrics['path_lengths'].append(len(path))
            self.metrics['episode_status'].append(reached_goal)
            self.metrics['policy_stability'].append(self.calculate_policy_stability())
            
            # Calculate moving averages
            if episode >= window_size:
                avg_reward = np.mean(self.metrics['rewards'][-window_size:])
                avg_steps = np.mean(self.metrics['steps'][-window_size:])
            else:
                avg_reward = np.mean(self.metrics['rewards'])
                avg_steps = np.mean(self.metrics['steps'])
                
            moving_rewards.append(avg_reward)
            moving_steps.append(avg_steps)

            # run a test 
            _ , steps_test, path_test, reached_goal_test= self.run_episode(training=False)
            
            self.metrics['reached_goal_test'].append(reached_goal_test)
            self.metrics['steps_test'].append(steps_test)
            
            if steps_test != 0 or self.env.optimal_path_length != 0:
                path_optimality_test = self.env.optimal_path_length / steps_test
            else:
                path_optimality_test = 0
            
            self.metrics['path_optimality_test'].append(path_optimality_test)

            # Update plots
            self._update_training_plots(fig, ax_reward, ax_steps, ax_stability, ax_optimality,
                                     moving_rewards, moving_steps)
            
        self.metrics['training_duration'] = time.time() - self.metrics['training_start_time']
        self.metrics['final_qtable'] = self.agent.q_table.copy()

        # One final plot update
        self._update_training_plots(fig, ax_reward, ax_steps, ax_stability, ax_optimality,
                                moving_rewards, moving_steps)

        if save_path:
            timestamp = int(time.time())
            filename = (f"{timestamp}_maze{self.config.maze_size}_"
                    f"lr{self.config.learning_rate}_"
                    f"df{self.config.discount_factor}_"
                    f"eps{self.config.epsilon}.png")
            fig.savefig(f"{save_path}/{filename}", bbox_inches='tight')

        plt.ioff()


    def test(self, display: bool = False) -> None:
        """Evaluates agent performance"""
        episode_reward, steps, path, reached_goal= self.run_episode(training=False)
        print(f"Test Results - Steps: {steps}, Reward: {episode_reward}, Successful: {reached_goal}")
        if display:
            self.env.visualize(path)

    def test_consistency(self, num_tests: int = 10) -> Dict[str, float]:
        """Tests agent consistency across multiple runs"""
        test_results = {
            'success_rate': 0,
            'avg_steps': 0,
            'std_steps': 0
        }
        
        steps_list = []
        for _ in range(num_tests):
            reward, steps, path, reached_goal = self.run_episode(training=False)
            if reached_goal:
                test_results['success_rate'] += 1
            steps_list.append(steps)
        
        if steps_list:
            test_results['success_rate'] /= num_tests
            test_results['avg_steps'] = np.mean(steps_list)
            test_results['std_steps'] = np.std(steps_list)
            
        return test_results

    def _update_training_plots(self, fig, ax_reward, ax_steps, ax_stability, ax_optimality,
                             moving_rewards, moving_steps) -> None:
        """Updates training visualization plots"""
        clear_output(wait=True)

        # Add configuration header
        elapsed_time = time.time() - self.metrics['training_start_time']
        header = (f"Maze Size: {self.config.maze_size}x{self.config.maze_size} | "
                f"Maze ID: {self.env.seed} | "
                f"Min Steps: {self.env.get_minimum_steps()} | "
                f"Episodes: {self.config.num_episodes} | "
                f"Learning Rate: {self.config.learning_rate} | "
                f"Discount: {self.config.discount_factor} | "
                f"Epsilon: {self.config.epsilon} | "
                f"Training Time: {elapsed_time:.1f}s")
        fig.suptitle(header, wrap=True, y=1.05)
        
        # Reward plot
        ax_reward.clear()
        ax_reward.plot(self.metrics['rewards'], 'r-', alpha=0.3, label='Rewards')
        ax_reward.plot(moving_rewards, 'r-', label='Moving Average')
        ax_reward.set_xlabel('Episode')
        ax_reward.set_ylabel('Reward')
        ax_reward.legend()
        ax_reward.grid(True)
        
        # Steps plot
        ax_steps.clear()
        ax_steps.plot(self.metrics['steps'], 'b-', alpha=0.3, label='Steps')
        ax_steps.plot(moving_steps, 'b-', label='Moving Average')
        ax_steps.set_xlabel('Episode')
        ax_steps.set_ylabel('Steps')
        ax_steps.legend()
        ax_steps.grid(True)
        
        # Policy stability plot
        ax_stability.clear()
        ax_stability.plot(self.metrics['policy_stability'], 'g-', label='Policy Stability')
        ax_stability.set_xlabel('Episode')
        ax_stability.set_ylabel('Stability')
        ax_stability.legend()
        ax_stability.grid(True)

        # Path optimality plot
        ax_optimality.clear()
        ax_optimality.plot(self.metrics['path_optimality_test'], 'g-', label='Path Optimality')
        ax_optimality.set_xlabel('Episode')
        ax_optimality.set_ylabel('Optimality')
        ax_optimality.legend()
        ax_optimality.grid(True)
        
        plt.tight_layout()
        display(fig)
        plt.pause(0.1)

# Test MazeEnvironment

In [None]:
# First, let's create a small maze and check its properties
config = Config(maze_size=5, maze_id=42)  # Fixed seed for reproducibility
env = MazeEnvironment(config)

# Let's print basic information
print(f"Maze size: {env.grid.shape}")
print(f"Start position: {env.start}")
print(f"End position: {env.end}")
print(f"Optimal path length: {env.optimal_path_length}")
print(f"Optimal path: {env.optimal_path}")

# Visualize the maze with optimal path
env.visualize(show_optimal=True)

# Test reproducibility by creating another maze with same seed
env2 = MazeEnvironment(config)
print("\nChecking reproducibility:")
print(f"Same optimal path length? {env.optimal_path_length == env2.optimal_path_length}")
print(f"Same optimal path? {env.optimal_path == env2.optimal_path}")

# Test with random seed
config_random = Config(maze_size=5)  # No maze_id means random seed
env_random = MazeEnvironment(config_random)
print(f"\nRandom maze seed: {env_random.seed}")
print(f"Random maze optimal path length: {env_random.optimal_path_length}")

# Visualize both current path and optimal path
# Let's create a non-optimal path for demonstration
test_path = [(1,1), (1,2), (1,3), (2,3), (3,3), (3,4), (3,5)]
env_random.visualize(path=test_path, show_optimal=True)

# Execution

In [None]:
# set up for training
config = Config(maze_size=3, num_episodes=50)
env = MazeEnvironment(config)
agent = QLearningAgent(env, config)
control = AgentControl(env, agent, config)
# Train
control.train("training_output")
# show number of steps, reward, and success of trained agent
control.test()
# show the best path
control.test(True)
# Test consistency
test_results = control.test_consistency(num_tests=100)
print(f"Success rate: {test_results['success_rate']:.2%}")
print(f"Average steps: {test_results['avg_steps']:.1f} ± {test_results['std_steps']:.1f}")


In [44]:
import pandas as pd
from dataclasses import asdict
import json
from typing import List, Dict, Any
import time
from pathlib import Path

class ExperimentRunner:
    """Manages automated testing and reporting"""
    
    def __init__(self, base_config: Config):
        self.base_config = base_config
        self.results = []
        current_time = time.localtime()
        date_prefix = time.strftime('%Y%m%d', current_time)
        self.experiment_id = f"{date_prefix}_{int(time.time())}"
        self.base_path = Path(f"experiments/{self.experiment_id}")
        self.base_path.mkdir(parents=True, exist_ok=True)
        
    def run_baseline_comparison(self, num_trials: int = 10) -> Dict:
        """Compare trained vs untrained agent performance"""
        # create the training rsults folder
        save_path = self.base_path / "baseline"
        save_path.mkdir(exist_ok=True)

        # Untrained agent
        untrained_results = self._run_untrained_tests(num_trials)
        
        # Trained agent
        env = MazeEnvironment(self.base_config)
        agent = QLearningAgent(env, self.base_config)
        control = AgentControl(env, agent, self.base_config)
        control.train(save_path=str(save_path))
        trained_results = control.test_consistency(num_trials)

        print(f"untrained_results: {untrained_results}") 
        
        return {
            'untrained': untrained_results,
            'trained': trained_results
        }
        
    def run_hyperparameter_sweep(self) -> List[Dict]:
        """Test different hyperparameter combinations"""
        # create the training results folder
        save_path = self.base_path / "hyperparameter_sweep"
        save_path.mkdir(exist_ok=True)

        param_grid = {
            'learning_rate': [0.1, 0.11],
            'discount_factor': [1],
            'epsilon': [0.31, 0.29, 0.3], 
            'maze_id': 42
        }
        
        results = []
        for lr in param_grid['learning_rate']:
            for df in param_grid['discount_factor']:
                for eps in param_grid['epsilon']:
                    config = Config(
                        maze_size=self.base_config.maze_size,
                        maze_id=param_grid['maze_id'],
                        learning_rate=lr,
                        discount_factor=df,
                        epsilon=eps
                    )
                    env = MazeEnvironment(config)
                    agent = QLearningAgent(env, config)
                    control = AgentControl(env, agent, config)
                    
                    # Train and test
                    control.train(save_path=str(save_path))
                    test_results = control.test_consistency(num_tests=10)
                    
                    results.append({
                        'params': asdict(config),
                        'metrics': {
                            **test_results,
                            'training_duration': control.metrics['training_duration'],
                            'final_policy_stability': control.metrics['policy_stability'][-1]
                        }
                    })
        return results
    
    def run_scalability_test(self, sizes: List[int] = [5, 25, 200]) -> List[Dict]:
        """Test performance across different maze sizes"""
        # create the training results folder
        save_path = self.base_path / "scalability"
        save_path.mkdir(exist_ok=True)

        results = []
        for size in sizes:
            # adust num_episodes for larger mazes
            num_episodes = ( size+ 10 ) * size
            config = Config(
                maze_size=size, 
                num_episodes=num_episodes,
                learning_rate=0.1,
                discount_factor=0.99,
                epsilon=0.3
            )
            env = MazeEnvironment(config)
            agent = QLearningAgent(env, config)
            control = AgentControl(env, agent, config)
            
            start_time = time.time()
            control.train(save_path=str(save_path))
            training_time = time.time() - start_time
            
            test_results = control.test_consistency(num_tests=10)
            results.append({
                'maze_size': size,
                'training_time': training_time,
                **test_results
            })
        return results
    
    def _run_untrained_tests(self, num_trials: int) -> Dict:
        """Run tests with untrained agent"""
        env = MazeEnvironment(self.base_config)
        agent = QLearningAgent(env, self.base_config)
        control = AgentControl(env, agent, self.base_config)
        return control.test_consistency(num_trials)
    
    def save_report(self, experiment_results: Dict[str, Any]) -> None:
        """Save experiment results to Excel report"""
        output_path = self.base_path / "experiment_report.xlsx"
        
        with pd.ExcelWriter(output_path) as writer:
            # Baseline comparison
            if 'baseline' in experiment_results:
                df_baseline = pd.DataFrame({
                    'Metric': ['Success Rate', 'Average Steps', 'Std Steps'],
                    'Untrained': [
                        experiment_results['baseline']['untrained']['success_rate'],
                        experiment_results['baseline']['untrained']['avg_steps'],
                        experiment_results['baseline']['untrained']['std_steps']
                    ],
                    'Trained': [
                        experiment_results['baseline']['trained']['success_rate'],
                        experiment_results['baseline']['trained']['avg_steps'],
                        experiment_results['baseline']['trained']['std_steps']
                    ]
                })
                df_baseline.to_excel(writer, sheet_name='Baseline Comparison', index=False)
            
            # Hyperparameter results
            if 'hyperparameters' in experiment_results:
                df_hyper = pd.DataFrame(experiment_results['hyperparameters'])
                df_hyper.to_excel(writer, sheet_name='Hyperparameter Study', index=False)
            
            # Scalability results
            if 'scalability' in experiment_results:
                df_scale = pd.DataFrame(experiment_results['scalability'])
                df_scale.to_excel(writer, sheet_name='Scalability Study', index=False)

def run_full_experiment_suite():
    """Run complete set of experiments and generate report"""
    base_config = Config(maze_size=10, num_episodes=100)
    runner = ExperimentRunner(base_config)
    
    results = {
        'baseline': runner.run_baseline_comparison(),
        'hyperparameters': runner.run_hyperparameter_sweep(),
        'scalability': runner.run_scalability_test()
    }
    
    runner.save_report(results, 'maze_experiments.xlsx')
    return results

In [45]:
# run specific experiments
runner = ExperimentRunner(
    Config(
        maze_size=5,
        learning_rate=0.1,
        discount_factor=0.99,
        epsilon=0.3
    )
)

In [None]:
# Run all experiments and generate report
# results = run_full_experiment_suite()

# Baseline comparison
baseline = runner.run_baseline_comparison(num_trials=5)

In [None]:
# Hyperparameter testing
hyperparam_results = runner.run_hyperparameter_sweep()


In [None]:

# Scalability testing
scalability_results = runner.run_scalability_test([10,15,20])

In [43]:
# Save all results
runner.save_report({
    'baseline': baseline,
    'hyperparameters': hyperparam_results,
    'scalability': scalability_results
})