# COMP 579 - Assignment 3 - Winter 2025

In [None]:
# Install required packages (if needed)
!pip install gymnasium matplotlib torch numpy pandas seaborn

In [None]:
import numpy as np
import random
import matplotlib.pyplot as plt
import seaborn as sns
import gymnasium as gym
import time
from collections import deque, namedtuple
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from IPython.display import clear_output

# Set seeds for reproducibility
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

## PART 1

In [None]:
def create_environment(env_name="CartPole-v1"):
    """Create and return the specified environment"""
    env = gym.make(env_name)
    print(f"Environment: {env_name}")
    print(f"Observation Space: {env.observation_space}")
    print(f"Action Space: {env.action_space}")
    return env

# Create the environment
env = create_environment()

In [None]:
# Define a transition tuple structure
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class ReplayMemory:
    """Experience replay memory for storing and sampling transitions"""
    
    def __init__(self, capacity):
        """Initialize a replay memory with given capacity"""
        self.memory = deque(maxlen=capacity)
        
    def push(self, *args):
        """Save a transition to the replay memory"""
        self.memory.append(Transition(*args))
        
    def sample(self, batch_size):
        """Randomly sample a batch of transitions from the replay memory"""
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        """Return the current size of the replay memory"""
        return len(self.memory)

In [None]:
class QNetwork(nn.Module):
    """Neural network for approximating Q-values"""
    
    def __init__(self, input_dim, output_dim, hidden_dim=128):
        """Initialize the Q-Network
        
        Args:
            input_dim: Dimension of the input (state space)
            output_dim: Dimension of the output (action space)
            hidden_dim: Size of the hidden layer
        """
        super(QNetwork, self).__init__()
        
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        """Forward pass through the network"""
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [None]:
class DQNAgent:
    """Deep Q-Network Agent"""
    
    def __init__(self, state_size, action_size, hidden_size=128, 
                 learning_rate=1e-3, gamma=0.99, epsilon_start=1.0, 
                 epsilon_end=0.01, epsilon_decay=0.995, memory_size=10000, 
                 batch_size=64, target_update=10, device='cpu'):
        """Initialize the DQN Agent
        
        Args:
            state_size: Size of the state space
            action_size: Size of the action space
            hidden_size: Size of the hidden layers in the Q-Network
            learning_rate: Learning rate for the optimizer
            gamma: Discount factor
            epsilon_start: Starting value of epsilon for exploration
            epsilon_end: Minimum value of epsilon
            epsilon_decay: Rate at which epsilon decays
            memory_size: Capacity of the replay memory
            batch_size: Size of the batches sampled from the replay memory
            target_update: How often to update the target network
            device: Device to run the networks on ('cpu' or 'cuda')
        """
        self.state_size = state_size
        self.action_size = action_size
        self.hidden_size = hidden_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.memory_size = memory_size
        self.batch_size = batch_size
        self.target_update = target_update
        self.device = device
        
        # Initialize Q-Networks (policy and target)
        self.policy_net = QNetwork(state_size, action_size, hidden_size).to(device)
        self.target_net = QNetwork(state_size, action_size, hidden_size).to(device)
        
        # Initialize target network with the same weights as the policy network
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()  # Set target network to evaluation mode
        
        # Initialize optimizer
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
        
        # Initialize replay memory
        self.memory = ReplayMemory(memory_size)
        
        # Initialize step counter (for target network updates)
        self.steps_done = 0
    
    def select_action(self, state, evaluation=False):
        """Select an action using epsilon-greedy policy
        
        Args:
            state: The current state
            evaluation: Whether to use exploration or not
        
        Returns:
            The selected action
        """
        # Convert state to tensor
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        
        # During evaluation, use greedy policy
        if evaluation or random.random() > self.epsilon:
            with torch.no_grad():
                # t.max(1) returns the maximum value and index for each row
                # [1] gets the indices, which are the actions
                return self.policy_net(state).max(1)[1].item()
        else:
            # Random action
            return random.randrange(self.action_size)
    
    def store_transition(self, state, action, next_state, reward, done):
        """Store a transition in the replay memory
        
        Args:
            state: The current state
            action: The action taken
            next_state: The next state
            reward: The reward received
            done: Whether the episode is done
        """
        self.memory.push(state, action, next_state, reward, done)
    
    def learn(self):
        """Update the Q-Network using a batch of experiences"""
        # If we don't have enough experiences yet, return
        if len(self.memory) < self.batch_size:
            return
        
        # Sample a batch of transitions from the replay memory
        transitions = self.memory.sample(self.batch_size)
        
        # Convert batch of transitions to transition of batches
        batch = Transition(*zip(*transitions))
        
        # Convert to tensors
        state_batch = torch.FloatTensor(batch.state).to(self.device)
        action_batch = torch.LongTensor(batch.action).unsqueeze(1).to(self.device)
        reward_batch = torch.FloatTensor(batch.reward).unsqueeze(1).to(self.device)
        
        # Convert next_states to tensor, handling terminal states
        non_final_mask = torch.BoolTensor([not done for done in batch.done]).to(self.device)
        non_final_next_states = torch.FloatTensor([s for s, done in zip(batch.next_state, batch.done) 
                                                 if not done]).to(self.device)
        
        # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
        # columns of actions taken
        state_action_values = self.policy_net(state_batch).gather(1, action_batch)
        
        # Compute V(s_{t+1}) for all next states
        next_state_values = torch.zeros(self.batch_size, 1).to(self.device)
        
        if len(non_final_next_states) > 0:  # Make sure we have non-terminal states
            with torch.no_grad():
                next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1)[0].unsqueeze(1)
        
        # Compute the expected Q values
        expected_state_action_values = reward_batch + (self.gamma * next_state_values)
        
        # Compute loss (Huber loss)
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)
        
        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        # Clip gradients to help with training stability
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()
        
        # Update the target network
        self.steps_done += 1
        if self.steps_done % self.target_update == 0:
            self.target_net.load_state_dict(self.policy_net.state_dict())
        
        # Decay epsilon
        self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
        
        return loss.item()

## 5. Training Loop

Now, let's implement the training loop to train our DQN agent.

In [None]:
def train_dqn(env, agent, num_episodes=1000, max_steps=1000, 
              print_every=10, render=False, early_stop_reward=None):
    """Train the DQN agent
    
    Args:
        env: The environment
        agent: The DQN agent
        num_episodes: Number of episodes to train
        max_steps: Maximum steps per episode
        print_every: How often to print/plot results
        render: Whether to render the environment
        early_stop_reward: Reward threshold for early stopping
        
    Returns:
        List of rewards per episode
    """
    # Lists to store results
    rewards = []
    avg_rewards = []
    losses = []
    epsilons = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        episode_reward = 0
        episode_loss = 0
        step_count = 0
        
        for step in range(max_steps):
            # Select action
            action = agent.select_action(state)
            
            # Take action and observe next state and reward
            next_state, reward, done, truncated, _ = env.step(action)
            
            # Store transition in replay memory
            agent.store_transition(state, action, next_state, reward, done)
            
            # Learn from experience
            loss = agent.learn()
            if loss is not None:
                episode_loss += loss
            
            # Update state and reward
            state = next_state
            episode_reward += reward
            step_count += 1
            
            # Render if specified
            if render and episode % print_every == 0:
                env.render()
                time.sleep(0.01)
            
            # Break if episode is done
            if done or truncated:
                break
        
        # Append results
        rewards.append(episode_reward)
        losses.append(episode_loss / step_count if step_count > 0 else 0)
        epsilons.append(agent.epsilon)
        
        # Calculate moving average of rewards
        window = min(len(rewards), 100)
        avg_reward = sum(rewards[-window:]) / window
        avg_rewards.append(avg_reward)
        
        # Print and plot results
        if (episode + 1) % print_every == 0 or episode == 0:
            print(f"Episode {episode + 1}/{num_episodes}, "
                  f"Reward: {episode_reward:.2f}, "
                  f"Avg Reward: {avg_reward:.2f}, "
                  f"Epsilon: {agent.epsilon:.4f}, "
                  f"Steps: {step_count}")
            
            # Plot training progress
            clear_output(wait=True)
            plt.figure(figsize=(15, 10))
            
            # Plot rewards
            plt.subplot(2, 2, 1)
            plt.plot(rewards, label='Episode Reward')
            plt.plot(avg_rewards, label='Avg Reward (100 episodes)')
            if early_stop_reward is not None:
                plt.axhline(y=early_stop_reward, color='r', linestyle='--', label='Target Reward')
            plt.xlabel('Episode')
            plt.ylabel('Reward')
            plt.title('Rewards')
            plt.legend()
            
            # Plot losses
            plt.subplot(2, 2, 2)
            plt.plot(losses)
            plt.xlabel('Episode')
            plt.ylabel('Loss')
            plt.title('Training Loss')
            
            # Plot epsilon
            plt.subplot(2, 2, 3)
            plt.plot(epsilons)
            plt.xlabel('Episode')
            plt.ylabel('Epsilon')
            plt.title('Exploration Rate (Epsilon)')
            
            # Plot histogram of rewards
            plt.subplot(2, 2, 4)
            plt.hist(rewards, bins=20)
            plt.xlabel('Reward')
            plt.ylabel('Frequency')
            plt.title('Reward Distribution')
            
            plt.tight_layout()
            plt.show()
        
        # Check for early stopping
        if early_stop_reward is not None and avg_reward >= early_stop_reward:
            print(f"\nEnvironment solved in {episode + 1} episodes!")
            print(f"Average reward over the last 100 episodes: {avg_reward:.2f}")
            break
    
    return rewards, avg_rewards, losses, epsilons

## 6. Evaluation Function

Let's implement a function to evaluate the trained agent's performance.

In [None]:
def evaluate_agent(env, agent, num_episodes=10, render=True):
    """Evaluate the trained agent
    
    Args:
        env: The environment
        agent: The DQN agent
        num_episodes: Number of episodes to evaluate
        render: Whether to render the environment
        
    Returns:
        Average reward over the evaluation episodes
    """
    total_rewards = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        episode_reward = 0
        done = False
        truncated = False
        
        # Create a render environment if rendering is enabled
        if render:
            eval_env = gym.make(env.spec.id, render_mode='human')
            eval_state, _ = eval_env.reset()
        else:
            eval_env = env
            eval_state = state
        
        while not (done or truncated):
            # Select action (greedy policy for evaluation)
            action = agent.select_action(eval_state, evaluation=True)
            
            # Take action
            eval_state, reward, done, truncated, _ = eval_env.step(action)
            episode_reward += reward
        
        if render:
            eval_env.close()
        
        total_rewards.append(episode_reward)
        print(f"Evaluation Episode {episode + 1}/{num_episodes}, Reward: {episode_reward}")
    
    avg_reward = sum(total_rewards) / num_episodes
    print(f"\nAverage Reward over {num_episodes} episodes: {avg_reward:.2f}")
    
    return avg_reward

## 7. Training the Agent

Now, let's train the DQN agent on the CartPole environment.

In [None]:
# Create the environment
env = create_environment("CartPole-v1")

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Create the DQN agent
state_size = env.observation_space.shape[0]  # Assuming continuous state space
action_size = env.action_space.n  # Assuming discrete action space

agent = DQNAgent(
    state_size=state_size,
    action_size=action_size,
    hidden_size=128,
    learning_rate=1e-3,
    gamma=0.99,
    epsilon_start=1.0,
    epsilon_end=0.01,
    epsilon_decay=0.995,
    memory_size=10000,
    batch_size=64,
    target_update=10,
    device=device
)

# Train the agent
rewards, avg_rewards, losses, epsilons = train_dqn(
    env=env,
    agent=agent,
    num_episodes=500,
    print_every=10,
    early_stop_reward=195.0  # CartPole-v1 is considered solved when avg reward is 195+ over 100 episodes
)

## 8. Evaluating the Trained Agent

Let's evaluate the performance of our trained agent.

In [None]:
# Evaluate the trained agent
avg_reward = evaluate_agent(env, agent, num_episodes=5, render=True)

## 9. Extensions and Experiments

Here are some suggested extensions and experiments to try:

1. Implement Double DQN
2. Implement Dueling DQN
3. Experiment with different network architectures
4. Try different environments (e.g., LunarLander-v2)
5. Implement prioritized experience replay
6. Compare DQN with other algorithms (e.g., REINFORCE, A2C)

In [None]:
# TODO: Implement Double DQN
class DoubleDQNAgent(DQNAgent):
    """Double DQN Agent that uses two networks to reduce overestimation"""
    
    def learn(self):
        """Update the Q-Network using Double DQN"""
        # TODO: Implement Double DQN learning
        pass

In [None]:
# TODO: Implement Dueling DQN
class DuelingQNetwork(nn.Module):
    """Dueling Q-Network architecture"""
    
    def __init__(self, input_dim, output_dim, hidden_dim=128):
        """Initialize the Dueling Q-Network"""
        super(DuelingQNetwork, self).__init__()
        
        # TODO: Implement Dueling DQN architecture
        pass
    
    def forward(self, x):
        """Forward pass through the network"""
        # TODO: Implement forward pass
        pass

## 10. Conclusion

In this notebook, we implemented and trained a Deep Q-Network (DQN) agent to solve the CartPole environment. We explored the key components of DQN, including the Q-Network architecture, experience replay, and target networks.

What we learned:
- How to implement a DQN agent using PyTorch
- How to use experience replay to improve sample efficiency
- How to stabilize training using target networks
- How to evaluate the performance of a RL agent

Future directions:
- Implement more advanced algorithms (Double DQN, Dueling DQN, etc.)
- Try more complex environments
- Experiment with different hyperparameters and network architectures