# Basic DQN for Space Invaders

Minimal implementation of Deep Q-Network for ALE/SpaceInvaders-v5

<a href="https://colab.research.google.com/github/tcharos/AIDL_B02-Advanced-Topics-in-Deep-Learning/blob/main/space_invaders_basic_dqn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install Dependencies

In [None]:
!pip install gymnasium[atari,accept-rom-license] ale-py torch scipy numpy

## Import Libraries

In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
from scipy.ndimage import zoom
import matplotlib.pyplot as plt

## Define DQN Network

In [None]:
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        
        conv_out_size = self._get_conv_out(input_shape)
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
    
    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))
    
    def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0], -1)
        return self.fc(conv_out)

## Define Replay Buffer

In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)
        return np.array(state), action, reward, np.array(next_state), done
    
    def __len__(self):
        return len(self.buffer)

## Frame Preprocessing

In [None]:
def preprocess_frame(frame):
    """Convert frame to grayscale, resize to 84x84, and normalize"""
    # Convert to grayscale
    gray = np.dot(frame[..., :3], [0.299, 0.587, 0.114])
    # Resize to 84x84
    resized = zoom(gray, (84/210, 84/160), order=1)
    # Normalize
    normalized = resized / 255.0
    return normalized.astype(np.float32)

## Hyperparameters

In [None]:
# Hyperparameters
LEARNING_RATE = 0.00025
GAMMA = 0.99
BUFFER_SIZE = 10000
BATCH_SIZE = 32
EPSILON_START = 1.0
EPSILON_END = 0.1
EPSILON_DECAY = 10000
TARGET_UPDATE = 1000
N_FRAMES = 4
N_EPISODES = 1000

## Initialize Environment and Networks

In [None]:
# Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

env = gym.make("ALE/SpaceInvaders-v5")
n_actions = 6  # All 6 actions for Space Invaders

# Networks
policy_net = DQN((N_FRAMES, 84, 84), n_actions).to(device)
target_net = DQN((N_FRAMES, 84, 84), n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
replay_buffer = ReplayBuffer(BUFFER_SIZE)

print(f"Action space: {n_actions}")
print(f"Policy Network initialized")

## Training Functions

In [None]:
def select_action(state, epsilon):
    """Epsilon-greedy action selection"""
    if random.random() < epsilon:
        return random.randrange(n_actions)
    else:
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
            q_values = policy_net(state_tensor)
            return q_values.max(1)[1].item()

def optimize_model():
    """Perform one step of optimization"""
    if len(replay_buffer) < BATCH_SIZE:
        return
    
    states, actions, rewards, next_states, dones = replay_buffer.sample(BATCH_SIZE)
    
    states = torch.FloatTensor(states).to(device)
    actions = torch.LongTensor(actions).to(device)
    rewards = torch.FloatTensor(rewards).to(device)
    next_states = torch.FloatTensor(next_states).to(device)
    dones = torch.FloatTensor(dones).to(device)
    
    # Current Q values
    current_q = policy_net(states).gather(1, actions.unsqueeze(1))
    
    # Next Q values from target network
    next_q = target_net(next_states).max(1)[0].detach()
    target_q = rewards + (1 - dones) * GAMMA * next_q
    
    # Loss
    loss = nn.MSELoss()(current_q.squeeze(), target_q)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

## Training Loop

In [None]:
episode_rewards = []
steps = 0

for episode in range(N_EPISODES):
    state, _ = env.reset()
    state = preprocess_frame(state)
    state_stack = deque([state] * N_FRAMES, maxlen=N_FRAMES)
    
    episode_reward = 0
    done = False
    
    while not done:
        # Epsilon decay
        epsilon = EPSILON_END + (EPSILON_START - EPSILON_END) * \
                  np.exp(-1. * steps / EPSILON_DECAY)
        
        # Select action
        state_array = np.array(state_stack)
        action = select_action(state_array, epsilon)
        
        # Take step
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        next_state = preprocess_frame(next_state)
        next_state_stack = state_stack.copy()
        next_state_stack.append(next_state)
        
        # Store transition
        replay_buffer.push(
            np.array(state_stack),
            action,
            reward,
            np.array(next_state_stack),
            float(done)
        )
        
        state_stack = next_state_stack
        episode_reward += reward
        steps += 1
        
        # Optimize
        optimize_model()
        
        # Update target network
        if steps % TARGET_UPDATE == 0:
            target_net.load_state_dict(policy_net.state_dict())
    
    episode_rewards.append(episode_reward)
    
    if episode % 10 == 0:
        avg_reward = np.mean(episode_rewards[-100:]) if len(episode_rewards) >= 100 else np.mean(episode_rewards)
        print(f"Episode {episode}, Reward: {episode_reward:.2f}, Avg Reward (100): {avg_reward:.2f}, Epsilon: {epsilon:.3f}")

env.close()
print("Training completed!")

## Plot Results

In [None]:
# Plot episode rewards
plt.figure(figsize=(12, 6))
plt.plot(episode_rewards, alpha=0.6, label='Episode Reward')

# Calculate moving average
window = 100
moving_avg = np.convolve(episode_rewards, np.ones(window)/window, mode='valid')
plt.plot(range(window-1, len(episode_rewards)), moving_avg, label=f'Moving Average ({window})', linewidth=2)

plt.axhline(y=500, color='r', linestyle='--', label='Target (500)')
plt.axhline(y=400, color='orange', linestyle='--', label='Minimum (400)')

plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('DQN Training Progress on Space Invaders')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Print final statistics
final_avg = np.mean(episode_rewards[-100:])
print(f"\nFinal average reward (last 100 episodes): {final_avg:.2f}")

## Save Model

In [None]:
# Save the trained model
torch.save({
    'policy_net_state_dict': policy_net.state_dict(),
    'target_net_state_dict': target_net.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'episode_rewards': episode_rewards,
}, 'dqn_space_invaders_basic.pth')

print("Model saved successfully!")