In [1]:
!pip3 install pygame



In [None]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import pygame
from collections import deque
import matplotlib.pyplot as plt
import time


class MazeEnv(gym.Env):
    metadata = {'render_modes': ['human', 'rgb_array']}
    # 1. MDP Environment for Maze Runner
    def __init__(self, maze_size=10, render_mode=None):
        super().__init__()
        self.size = maze_size
        self.render_mode = render_mode
        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Discrete(self.size * self.size)
        self.actions = {0: (-1, 0), 1: (1, 0), 2: (0, -1), 3: (0, 1)}  # UP, DOWN, LEFT, RIGHT
        self.maze = self._create_maze()
        self.start_pos = (0, 0)
        self.goal_pos = (self.size - 1, self.size - 1)
        self.agent_pos = self.start_pos
        self.cell_size = 60

    def _create_maze(self):
        maze = np.zeros((self.size, self.size), dtype=int)
        if self.size >= 10:
            maze[2:7, 3] = 1
            maze[4:9, 6] = 1
            maze[3, 5:8] = 1
            maze[6, 1:4] = 1
            maze[4, 4] = 2  # traps
            maze[7, 7] = 2
            maze[8,1]=2 
        maze[0, 0] = 0
        maze[self.size - 1, self.size - 1] = 0
        return maze

    def _pos_to_state(self, pos):
        return pos[0] * self.size + pos[1]

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.agent_pos = self.start_pos
        return self._pos_to_state(self.agent_pos), {}

    # 3. TRANSITION FUNCTION P(s'|s,a)
    def step(self, action):
        move = self.actions[action]
        new_pos = (self.agent_pos[0] + move[0], self.agent_pos[1] + move[1])
        terminated = False

        if (new_pos[0] < 0 or new_pos[0] >= self.size or
                new_pos[1] < 0 or new_pos[1] >= self.size):
            reward = -10
            new_pos = self.agent_pos

        elif self.maze[new_pos] == 1:
            reward = -10
            new_pos = self.agent_pos

        elif self.maze[new_pos] == 2:
            reward = -50
            self.agent_pos = new_pos
            terminated = True  # Fell into trap (pothole)

        elif new_pos == self.goal_pos:
            reward = 100
            self.agent_pos = new_pos
            terminated = True  # Goal reached

        else:
            reward = -0.1
            self.agent_pos = new_pos

        # Keep trap termination AND goal termination
        terminated = terminated or (self.agent_pos == self.goal_pos)

        # Return info about whether it fell into trap
        return self._pos_to_state(self.agent_pos), reward, terminated, False, {"fell_in_trap": self.maze[new_pos] == 2}


# ---------------------------------------------------------------
# Q-Learning Agent
# ---------------------------------------------------------------
class QLearningAgent:
    def __init__(self, state_size, action_size, learning_rate=0.1,
                 discount_factor=0.99, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.state_size = state_size
        self.action_size = action_size
        self.alpha = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.q_table = np.zeros((state_size, action_size))
        self.training_info = {
            'episodes': [], 'rewards': [], 'steps': [],
            'epsilons': [], 'avg_q_values': []
        }

    def get_action(self, state, training=True):
        if training and np.random.rand() < self.epsilon:
            return np.random.randint(self.action_size)
        return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state, done):
        current_q = self.q_table[state, action]
        target_q = reward if done else reward + self.gamma * np.max(self.q_table[next_state])
        self.q_table[state, action] += self.alpha * (target_q - current_q)

    def decay_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def get_policy(self):
        return np.argmax(self.q_table, axis=1)


# ---------------------------------------------------------------
# Visualizer
# ---------------------------------------------------------------
class EpisodeVisualizer:
    def __init__(self, env, agent):
        pygame.init()
        self.env = env
        self.agent = agent
        self.maze_width = env.size * env.cell_size
        self.info_width = 450
        self.total_width = self.maze_width + self.info_width
        self.total_height = env.size * env.cell_size
        self.screen = pygame.display.set_mode((self.total_width, self.total_height))
        pygame.display.set_caption("Maze Runner Q-Learning")
        self.clock = pygame.time.Clock()
        self.font = pygame.font.Font(None, 26)
        self.small_font = pygame.font.Font(None, 20)
        self.reward_history = deque(maxlen=100)
        self.steps_history = deque(maxlen=100)

    def draw_episode(self, trajectory, episode, total_episodes, steps, total_reward, epsilon, success, fell_in_trap=False):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                return False

        WHITE, BLACK = (255, 255, 255), (40, 40, 40)
        WALL, TRAP = (60, 60, 60), (200, 50, 50)
        AGENT, GOAL = (50, 150, 250), (50, 200, 50)
        PATH, TRAJECTORY = (240, 240, 240), (100, 200, 255)

        self.screen.fill(WHITE)

        for i in range(self.env.size):
            for j in range(self.env.size):
                x, y = j * self.env.cell_size, i * self.env.cell_size
                if self.env.maze[i, j] == 1:
                    pygame.draw.rect(self.screen, WALL, (x, y, self.env.cell_size, self.env.cell_size))
                elif self.env.maze[i, j] == 2:
                    pygame.draw.rect(self.screen, TRAP, (x, y, self.env.cell_size, self.env.cell_size))
                else:
                    pygame.draw.rect(self.screen, PATH, (x, y, self.env.cell_size, self.env.cell_size))
                pygame.draw.rect(self.screen, BLACK, (x, y, self.env.cell_size, self.env.cell_size), 1)

        if len(trajectory) > 1:
            for i in range(len(trajectory) - 1):
                pos1, pos2 = trajectory[i], trajectory[i + 1]
                x1 = pos1[1] * self.env.cell_size + self.env.cell_size // 2
                y1 = pos1[0] * self.env.cell_size + self.env.cell_size // 2
                x2 = pos2[1] * self.env.cell_size + self.env.cell_size // 2
                y2 = pos2[0] * self.env.cell_size + self.env.cell_size // 2
                pygame.draw.line(self.screen, TRAJECTORY, (x1, y1), (x2, y2), 3)

        gx = self.env.goal_pos[1] * self.env.cell_size
        gy = self.env.goal_pos[0] * self.env.cell_size
        pygame.draw.rect(self.screen, GOAL, (gx + 5, gy + 5, self.env.cell_size - 10, self.env.cell_size - 10))

        sx = self.env.start_pos[1] * self.env.cell_size + self.env.cell_size // 2
        sy = self.env.start_pos[0] * self.env.cell_size + self.env.cell_size // 2
        pygame.draw.circle(self.screen, (255, 0, 0), (sx, sy), 8)

        if trajectory:
            final_pos = trajectory[-1]
            ax = final_pos[1] * self.env.cell_size + self.env.cell_size // 2
            ay = final_pos[0] * self.env.cell_size + self.env.cell_size // 2
            pygame.draw.circle(self.screen, AGENT, (ax, ay), self.env.cell_size // 3)

        self._draw_info_panel(episode, total_episodes, steps, total_reward, epsilon, success, fell_in_trap)
        pygame.display.flip()
        self.clock.tick(10)
        return True

    def _draw_info_panel(self, episode, total_episodes, steps, total_reward, epsilon, success, fell_in_trap=False):
        WHITE, BLACK = (255, 255, 255), (0, 0, 0)
        GRAY, BLUE = (200, 200, 200), (50, 150, 250)
        GREEN, RED = (50, 200, 50), (200, 50, 50)

        panel_x = self.maze_width
        pygame.draw.rect(self.screen, WHITE, (panel_x, 0, self.info_width, self.total_height))
        pygame.draw.line(self.screen, BLACK, (panel_x, 0), (panel_x, self.total_height), 2)

        y = 20
        title = self.font.render("Q-Learning Training", True, BLUE)
        self.screen.blit(title, (panel_x + 20, y))
        y += 50

        progress = episode / total_episodes
        bar_w, bar_h = self.info_width - 40, 25
        pygame.draw.rect(self.screen, GRAY, (panel_x + 20, y, bar_w, bar_h))
        pygame.draw.rect(self.screen, GREEN, (panel_x + 20, y, int(bar_w * progress), bar_h))
        prog_text = self.small_font.render(f"{episode}/{total_episodes} ({progress*100:.1f}%)", True, BLACK)
        self.screen.blit(prog_text, (panel_x + 20, y + 30))
        y += 70

        # Status message logic
        if success:
            status_text = "SUCCESS! ðŸŽ¯"
            status_color = GREEN
        elif fell_in_trap:
            status_text = "Fell into Pothole! ðŸ’€"
            status_color = (255, 80, 80)
        else:
            status_text = "Failed/Timeout"
            status_color = RED

        for text, color in [
            (f"Episode: {episode}", BLACK),
            (f"Status: {status_text}", status_color),
            (f"Steps: {steps}", BLACK),
            (f"Reward: {total_reward:.2f}", BLACK),
            (f"Epsilon: {epsilon:.3f}", BLACK),
            ("", BLACK),
            ("Recent Performance:", BLUE)
        ]:
            surf = self.small_font.render(text, True, color)
            self.screen.blit(surf, (panel_x + 20, y))
            y += 28

        if len(self.reward_history) > 0:
            avg_r = np.mean(self.reward_history)
            avg_s = np.mean(self.steps_history)
            succ_rate = sum(1 for r in self.reward_history if r > 50) / len(self.reward_history) * 100

            for text in [
                f"Avg Reward (100): {avg_r:.2f}",
                f"Avg Steps (100): {avg_s:.1f}",
                f"Success Rate: {succ_rate:.1f}%"
            ]:
                surf = self.small_font.render(text, True, BLACK)
                self.screen.blit(surf, (panel_x + 20, y))
                y += 25

    def add_episode_data(self, reward, steps):
        self.reward_history.append(reward)
        self.steps_history.append(steps)

    def close(self):
        pygame.quit()


# ---------------------------------------------------------------
# Training
# ---------------------------------------------------------------
def train_agent(env, agent, num_episodes=500, visualize=True, show_every=1):
    print("=" * 60)
    print("MAZE RUNNER Q-LEARNING TRAINING")
    print("=" * 60)
    print(f"Episodes: {num_episodes} | Visualization: Every {show_every} episode(s)")
    print("=" * 60)

    visualizer = EpisodeVisualizer(env, agent) if visualize else None

    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward, steps, done = 0, 0, False
        trajectory = [env.agent_pos]
        fell_in_trap = False

        while not done:
            action = agent.get_action(state, training=True)
            next_state, reward, done, _, info = env.step(action)
            fell_in_trap = info.get("fell_in_trap", False)
            agent.update(state, action, reward, next_state, done)
            trajectory.append(env.agent_pos)
            total_reward += reward
            state = next_state
            steps += 1
            if steps > 500:
                done = True

        agent.decay_epsilon()
        agent.training_info['episodes'].append(episode + 1)
        agent.training_info['rewards'].append(total_reward)
        agent.training_info['steps'].append(steps)
        agent.training_info['epsilons'].append(agent.epsilon)
        agent.training_info['avg_q_values'].append(np.mean(agent.q_table))

        success = (env.agent_pos == env.goal_pos)

        if visualize and visualizer and (episode + 1) % show_every == 0:
            visualizer.add_episode_data(total_reward, steps)
            if not visualizer.draw_episode(
                    trajectory, episode + 1, num_episodes,
                    steps, total_reward, agent.epsilon, success, fell_in_trap):
                print("Training interrupted")
                visualizer.close()
                return

    if visualizer:
        time.sleep(1)
        visualizer.close()
    print("=" * 60)
    print("TRAINING COMPLETED!")
    print("=" * 60)


# ---------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------
def main():
    print("\n" + "="*70)
    print("  MAZE RUNNER: Q-LEARNING PROJECT")
    print("="*70)
    print("\n Units: MDP | Bellman | Q-Learning | Tabular Methods")
    print("="*70 + "\n")

    MAZE_SIZE = 10
    NUM_EPISODES = 500

    print("Creating Environment...")
    env = MazeEnv(maze_size=MAZE_SIZE)

    print("Initializing Q-Learning Agent...")
    agent = QLearningAgent(
        state_size=env.observation_space.n,
        action_size=env.action_space.n,
        learning_rate=0.1,
        discount_factor=0.99,
        epsilon=1.0,
        epsilon_decay=0.995,
        epsilon_min=0.01
    )

    print("Starting Training...\n")
    input("Press ENTER to start...")

    try:
        train_agent(env, agent, num_episodes=NUM_EPISODES, visualize=True, show_every=1)
    except KeyboardInterrupt:
        print("\nInterrupted")

    print("\nPROJECT COMPLETED!\n")


if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"\n Error: {e}")
        import traceback
        traceback.print_exc()
    finally:
        pygame.quit()
        print("\n Done!")



  MAZE RUNNER: Q-LEARNING PROJECT

 Units: MDP | Bellman | Q-Learning | Tabular Methods

Creating Environment...
Initializing Q-Learning Agent...
Starting Training...

