In [25]:
import time
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from collections import namedtuple, deque
from itertools import count
from tqdm.notebook import tqdm

import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing, FrameStackObservation
import ale_py

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchsummary import summary

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [26]:
gym.register_envs(ale_py)
env = gym.make("ALE/VideoPinball-v5")

In [27]:
env = AtariPreprocessing(
    env,
    frame_skip=1,               # Disable frame-skipping as it is set to 4 by default when making the env
    screen_size=84,
    grayscale_obs=True,
    scale_obs=True
    )
env = FrameStackObservation(env, stack_size=4)

In [28]:
obs, _ = env.reset()
print(f"Observation space: {env.observation_space}")
print(f"Action space: {env.action_space}")
print(f"Sample observation: {obs.shape}")

Observation space: Box(0.0, 1.0, (4, 84, 84), float32)
Action space: Discrete(9)
Sample observation: (4, 84, 84)


In [29]:
class DQNNetwork(nn.Module):
    def __init__(self, n_actions):
        super().__init__()

        self.net = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4),  # -> (32, 20, 20)
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),  # -> (64, 9, 9)
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),  # -> (64, 7, 7)
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

    def forward(self, x):
        return self.net(x)

In [30]:
n_actions = env.action_space.n  # Number of possible actions in the environment
model = DQNNetwork(n_actions)
model.to(device)
summary(model, input_size=obs.shape)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 20, 20]           8,224
              ReLU-2           [-1, 32, 20, 20]               0
            Conv2d-3             [-1, 64, 9, 9]          32,832
              ReLU-4             [-1, 64, 9, 9]               0
            Conv2d-5             [-1, 64, 7, 7]          36,928
              ReLU-6             [-1, 64, 7, 7]               0
           Flatten-7                 [-1, 3136]               0
            Linear-8                  [-1, 512]       1,606,144
              ReLU-9                  [-1, 512]               0
           Linear-10                    [-1, 9]           4,617
Total params: 1,688,745
Trainable params: 1,688,745
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.11
Forward/backward pass size (MB): 0.35
Params size (MB): 6.44
Estimat

In [31]:
optimizer = optim.AdamW(model.parameters(), lr=0.00025, amsgrad=True)

def q_learning_update(model, state, action, reward, next_state, gamma=0.99):
    """
    Perform a single step of Q-learning update using the Bellman equation.
    The optimizer is used to minimize the loss.
    """
    # Convert states to tensors
    state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    
    # Get the Q-values from the model
    q_values = model(state_tensor)

    # Get the current Q-value for the chosen action
    current_q_value = q_values[0][action]

    # Compute the target Q-value using the Bellman equation
    with torch.no_grad():
        next_state_tensor = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0)
        next_q_values = model(next_state_tensor)
        target_q_value = reward + gamma * next_q_values.max().item()
    
    
    # Compute the loss (Mean Squared Error between current and target Q-values)
    target_q_value_tensor = torch.tensor(target_q_value, dtype=torch.float32, device=device)
    loss = F.mse_loss(current_q_value, target_q_value_tensor)
    
    # Backpropagate the loss and update the weights
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return loss

In [None]:
def q_learning_update(model, states, actions, rewards, next_states, gamma=0.99, device="cuda"):
    """
    Perform a Q-learning update using the Bellman equation for a batch of experiences.
    
    Parameters:
        - model: The neural network (DQN) model
        - states: Batch of states
        - actions: Batch of actions
        - rewards: Batch of rewards
        - next_states: Batch of next states
        - gamma: Discount factor (default 0.99)
        - device: The device ("cuda" or "cpu")
    
    Returns:
        - loss: The loss value
    """
    # Convert data to tensors
    states_tensor = torch.tensor(states, dtype=torch.float32, device=device)
    next_states_tensor = torch.tensor(next_states, dtype=torch.float32, device=device)
    actions_tensor = torch.tensor(actions, dtype=torch.int64, device=device)
    rewards_tensor = torch.tensor(rewards, dtype=torch.float32, device=device)

    # Get the Q-values for the current states
    q_values = model(states_tensor)
    
    # Select the Q-values corresponding to the actions taken
    current_q_values = q_values.gather(1, actions_tensor.view(-1, 1))  # Batch of actions

    # Compute the target Q-values using the Bellman equation
    with torch.no_grad():
        next_q_values = model(next_states_tensor)
        target_q_values = rewards_tensor + gamma * next_q_values.max(1)[0]  # max Q-value for next state

    # Compute the loss (Mean Squared Error between current Q-values and target Q-values)
    loss = F.mse_loss(current_q_values, target_q_values.view(-1, 1))  # Reshape target to match output shape

    # Backpropagation and optimization
    return loss

In [32]:
def train_dqn(env, model, optimizer, episodes=600, gamma=0.99, epsilon=0.1, device=device):

    print(f"Training on device: {device}")

    episode_rewards = []
    episode_losses = []

    # Ensure the model is on the correct device
    model.to(device)
    
    progress_bar = tqdm(range(episodes), desc="Training Progress", unit="episode", dynamic_ncols=True)
    for episode in progress_bar:
        state, _ = env.reset()
        total_reward = 0
        done = False

        start_time = time.time()
        
        while not done:
            # Epsilon-greedy action selection
            if np.random.rand() < epsilon:
                action = np.random.choice(env.action_space.n)
            else:
                state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
                action = model(state_tensor).argmax().item()
            
            next_state, reward, terminated, truncated, info = env.step(action)

            # Check if the episode is over due to termination or truncation
            done = terminated or truncated
            
            # Q-learning update
            loss = q_learning_update(model, state, action, reward, next_state, gamma)
            
            total_reward += reward
            state = next_state
        
        episode_time = time.time() - start_time
        episode_rewards.append(total_reward)
        episode_losses.append(loss.item())

    return episode_rewards, episode_losses

In [33]:
# Example to run training and visualize results
rewards, losses = train_dqn(env, model, optimizer)

Training on device: cuda


Training Progress:   0%|          | 0/600 [00:00<?, ?episode/s]

KeyboardInterrupt: 

In [None]:
# Plot training progress
plt.subplot(2, 1, 1)
plt.plot(rewards)
plt.title("Episode Rewards")
plt.subplot(2, 1, 2)
plt.plot(losses)
plt.title("Episode Losses")
plt.show()