In [6]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

In [8]:
class MazeEnv(gym.Env):
    def __init__(self, maze_size=10):
        super(MazeEnv, self).__init__()
        self.maze_size = maze_size
        self.observation_space = spaces.Box(low=0, high=1, shape=(maze_size, maze_size, 3), dtype=np.float32)
        self.action_space = spaces.Discrete(4)  # Up, Down, Left, Right
        self.maze = self._generate_maze()
        self.agent_pos = self._get_empty_position()
        self.target_pos = self._get_empty_position()

    def _generate_maze(self):
        maze = np.zeros((self.maze_size, self.maze_size))
        maze[1::2, 1::2] = 1  # Add obstacles
        return maze

    def _get_empty_position(self):
        empty_positions = np.argwhere(self.maze == 0)
        return tuple(empty_positions[np.random.randint(len(empty_positions))])

    def reset(self):
        self.agent_pos = self._get_empty_position()
        self.target_pos = self._get_empty_position()
        return self._get_observation()

    def step(self, action):
        directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        new_pos = tuple(np.add(self.agent_pos, directions[action]))
        
        # Fixed boundary check (corrected variable name)
        if (0 <= new_pos[0] < self.maze_size and 
            0 <= new_pos[1] < self.maze_size and 
            self.maze[new_pos] == 0):
            self.agent_pos = new_pos

        done = (self.agent_pos == self.target_pos)
        reward = 10 if done else -0.1
        return self._get_observation(), reward, done, {}

    def _get_observation(self):
        obs = np.zeros((self.maze_size, self.maze_size, 3))
        obs[self.agent_pos] = [1, 0, 0]  # Red for agent
        obs[self.target_pos] = [0, 1, 0]  # Green for target
        return obs

# Fixed DQN Agent
class DQNAgent:
    def __init__(self, state_shape, action_size):
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.model = self._build_model(state_shape, action_size)
        self.target_model = self._build_model(state_shape, action_size)
        self.update_target_model()
        self.optimizer = optim.Adam(self.model.parameters())

    def _build_model(self, state_shape, action_size):
        input_size = np.prod(state_shape)
        return nn.Sequential(
            nn.Linear(input_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, action_size)
        )

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)
        state = torch.FloatTensor(state.flatten()).unsqueeze(0)
        with torch.no_grad():
            q_values = self.model(state)
        return torch.argmax(q_values).item()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self, batch_size=32):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        
        states = torch.FloatTensor(np.array([s.flatten() for s, _, _, _, _ in minibatch]))
        actions = torch.LongTensor(np.array([a for _, a, _, _, _ in minibatch]))
        rewards = torch.FloatTensor(np.array([r for _, _, r, _, _ in minibatch]))
        next_states = torch.FloatTensor(np.array([s.flatten() for _, _, _, s, _ in minibatch]))
        dones = torch.FloatTensor(np.array([d for _, _, _, _, d in minibatch]))

        target_q = rewards + (1 - dones) * self.gamma * self.target_model(next_states).max(1)[0]
        current_q = self.model(states).gather(1, actions.unsqueeze(1))

        loss = nn.MSELoss()(current_q.squeeze(), target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Fixed epsilon decay condition
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [10]:
env = MazeEnv()
agent = DQNAgent(env.observation_space.shape, env.action_space.n)
episodes = 1000
# Define max_steps before the training loop (adjust the value as needed)
max_steps = 1000

# Training loop over episodes
for episode in range(episodes):
    state = env.reset()       # Reset the environment for a new episode
    done = False              # Initialize done flag
    total_reward = 0          # Track total reward for the episode
    steps = 0                 # Initialize step counter

    # Modify the while loop to include the max_steps condition
    while not done and steps < max_steps:
        action = agent.act(state)                          # Agent chooses an action
        next_state, reward, done, _ = env.step(action)     # Environment steps
        agent.remember(state, action, reward, next_state, done)  # Store experience
        state = next_state                                 # Update state
        total_reward += reward                             # Accumulate reward
        steps += 1                                         # Increment step counter

    # Optional: Notify if episode ended due to max steps
    if steps >= max_steps:
        print(f"Episode {episode} terminated due to max steps")

    agent.replay()              # Train the agent with stored experiences
    if episode % 10 == 0:
        agent.update_target_model()  # Update target model periodically

    print(f"Episode: {episode}, Total Reward: {total_reward}, Epsilon: {agent.epsilon:.2f}")

Episode: 0, Total Reward: -17.900000000000126, Epsilon: 0.99
Episode: 1, Total Reward: -22.40000000000019, Epsilon: 0.99
Episode: 2, Total Reward: -61.800000000000196, Epsilon: 0.99
Episode: 3, Total Reward: 2.6000000000000103, Epsilon: 0.98
Episode: 4, Total Reward: -23.200000000000202, Epsilon: 0.98
Episode: 5, Total Reward: -13.100000000000058, Epsilon: 0.97
Episode: 6, Total Reward: -1.1999999999999762, Epsilon: 0.97
Episode 7 terminated due to max steps
Episode: 7, Total Reward: -99.9999999999986, Epsilon: 0.96
Episode: 8, Total Reward: -8.199999999999989, Epsilon: 0.96
Episode: 9, Total Reward: -4.699999999999964, Epsilon: 0.95
Episode: 10, Total Reward: -33.300000000000345, Epsilon: 0.95
Episode: 11, Total Reward: 9.3, Epsilon: 0.94
Episode: 12, Total Reward: 9.1, Epsilon: 0.94
Episode: 13, Total Reward: -3.8999999999999666, Epsilon: 0.93
Episode: 14, Total Reward: 4.000000000000005, Epsilon: 0.93
Episode: 15, Total Reward: -75.19999999999943, Epsilon: 0.92
Episode: 16, Total Re