# CSCN8020 â€“ Assignment 3: DQN for Pong

> **Note:** This notebook is a **template scaffold**.  
> You must complete and adapt it yourself before submitting it as your own work.


In [2]:
import sys
print(sys.executable)


c:\Users\Dell\AppData\Local\Programs\Python\Python310\python.exe


In [None]:
# Install missing package(s) in the notebook environment
%pip install "gym[atari]" -q

import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
import matplotlib.pyplot as plt

from assignment3_utils import process_frame, transform_reward

ENV_NAME = "PongDeterministic-v4"

IMAGE_SHAPE = (84, 80)
STACK_SIZE = 4

GAMMA = 0.99
LR = 1e-4

REPLAY_BUFFER_SIZE = 100_000
MIN_REPLAY_SIZE = 10_000

BATCH_SIZE = 8
TARGET_UPDATE_EVERY = 10

EPS_START = 1.0
EPS_END = 0.1
EPS_DECAY_FRAMES = 1_000_000

NUM_EPISODES = 300

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

ModuleNotFoundError: No module named 'gym'

In [None]:
class ReplayBuffer:
    def __init__(self, capacity: int):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size: int):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.stack(states),
            np.array(actions),
            np.array(rewards, dtype=np.float32),
            np.stack(next_states),
            np.array(dones, dtype=np.uint8),
        )
    
    def __len__(self):
        return len(self.buffer)

In [None]:
class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super().__init__()
        c, h, w = input_shape
        
        self.features = nn.Sequential(
            nn.Conv2d(c, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
        )
        
        with torch.no_grad():
            dummy = torch.zeros(1, c, h, w)
            n_flatten = self.features(dummy).view(1, -1).size(1)
        
        self.fc = nn.Sequential(
            nn.Linear(n_flatten, 512),
            nn.ReLU(),
            nn.Linear(512, num_actions),
        )
    
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [None]:
def get_epsilon(frame_idx: int) -> float:
    if frame_idx >= EPS_DECAY_FRAMES:
        return EPS_END
    return EPS_START + (EPS_END - EPS_START) * (frame_idx / EPS_DECAY_FRAMES)


def select_action(q_net: nn.Module, state_tensor: torch.Tensor, epsilon: float, num_actions: int) -> int:
    if random.random() < epsilon:
        return random.randrange(num_actions)
    with torch.no_grad():
        q_values = q_net(state_tensor)
        return int(q_values.argmax(dim=1).item())

In [None]:
def init_state(env, image_shape, stack_size):
    obs = env.reset()
    
    frame = process_frame(obs, image_shape)
    frame = np.squeeze(frame, axis=0)
    frame = np.transpose(frame, (2, 0, 1))
    
    frames = deque([frame] * stack_size, maxlen=stack_size)
    state = np.concatenate(frames, axis=0)
    return state, frames


def update_state(frames, obs, image_shape):
    frame = process_frame(obs, image_shape)
    frame = np.squeeze(frame, axis=0)
    frame = np.transpose(frame, (2, 0, 1))
    frames.append(frame)
    state = np.concatenate(frames, axis=0)
    return state

In [None]:
env = gym.make(ENV_NAME)
num_actions = env.action_space.n
print("Number of actions:", num_actions)

q_net = DQN((STACK_SIZE, *IMAGE_SHAPE), num_actions).to(device)
target_net = DQN((STACK_SIZE, *IMAGE_SHAPE), num_actions).to(device)
target_net.load_state_dict(q_net.state_dict())
target_net.eval()

optimizer = optim.Adam(q_net.parameters(), lr=LR)
replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)

print(q_net)

In [None]:
state, frames = init_state(env, IMAGE_SHAPE, STACK_SIZE)
frame_idx = 0

print("Warming up replay buffer...")

while len(replay_buffer) < MIN_REPLAY_SIZE:
    action = env.action_space.sample()
    next_obs, reward, done, info = env.step(action)
    reward = transform_reward(reward)
    
    next_state = update_state(frames, next_obs, IMAGE_SHAPE)
    replay_buffer.push(state, action, reward, next_state, done)
    
    state = next_state
    frame_idx += 1
    
    if done:
        state, frames = init_state(env, IMAGE_SHAPE, STACK_SIZE)

print("Replay buffer size:", len(replay_buffer))

In [None]:
episode_rewards = []
moving_avg_rewards = []

for episode in range(NUM_EPISODES):
    state, frames = init_state(env, IMAGE_SHAPE, STACK_SIZE)
    done = False
    episode_reward = 0.0
    
    while not done:
        frame_idx += 1
        epsilon = get_epsilon(frame_idx)
        
        state_tensor = torch.from_numpy(state).unsqueeze(0).float().to(device)
        
        action = select_action(q_net, state_tensor, epsilon, num_actions)
        
        next_obs, reward, done, info = env.step(action)
        reward = transform_reward(reward)
        
        next_state = update_state(frames, next_obs, IMAGE_SHAPE)
        replay_buffer.push(state, action, reward, next_state, done)
        
        state = next_state
        episode_reward += reward
        
        states, actions, rewards, next_states, dones = replay_buffer.sample(BATCH_SIZE)
        
        states_t = torch.from_numpy(states).float().to(device)
        actions_t = torch.from_numpy(actions).long().unsqueeze(1).to(device)
        rewards_t = torch.from_numpy(rewards).to(device)
        next_states_t = torch.from_numpy(next_states).float().to(device)
        dones_t = torch.from_numpy(dones).float().to(device)
        
        q_values = q_net(states_t).gather(1, actions_t).squeeze(1)
        
        with torch.no_grad():
            next_q_values = target_net(next_states_t).max(1)[0]
            targets = rewards_t + GAMMA * next_q_values * (1 - dones_t)
        
        loss = nn.MSELoss()(q_values, targets)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    episode_rewards.append(episode_reward)
    if len(episode_rewards) >= 5:
        moving_avg = np.mean(episode_rewards[-5:])
    else:
        moving_avg = np.mean(episode_rewards)
    moving_avg_rewards.append(moving_avg)
    
    if (episode + 1) % TARGET_UPDATE_EVERY == 0:
        target_net.load_state_dict(q_net.state_dict())
    
    print(f"Episode {episode+1}/{NUM_EPISODES} | Reward: {episode_reward:.1f} | "
          f"Avg (last 5): {moving_avg:.2f} | Epsilon: {epsilon:.3f}")

In [None]:
episodes = np.arange(1, len(episode_rewards) + 1)

plt.figure(figsize=(10, 5))
plt.plot(episodes, episode_rewards, label="Reward per episode")
plt.plot(episodes, moving_avg_rewards, label="Moving avg (last 5)")
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title(f"Pong DQN (batch={BATCH_SIZE}, target_update={TARGET_UPDATE_EVERY})")
plt.grid(True)
plt.legend()
plt.show()