# **DUELING DQN AGENT VISUALISATION**

Notebook containing the code to generate videos of the agent playing the game once trained.

## Imports

In [1]:
import gymnasium as gym
import wandb
import datetime
import torch
import torch.nn as nn        
import torch.optim as optim 
from torchsummary import summary
import collections
import numpy as np
from matplotlib import pyplot as plt
import torch.nn.functional as F
import random
from PIL import Image
from IPython.display import HTML
from tqdm import tqdm
import ale_py
from gymnasium.wrappers import MaxAndSkipObservation, ResizeObservation, GrayscaleObservation, FrameStackObservation, ReshapeObservation
from collections import namedtuple, deque
import copy
import os

## Environment Set Up

In [2]:
gym.register_envs(ale_py)
ENV_NAME = "ALE/BeamRider-v5"

env = gym.make(ENV_NAME, render_mode = "rgb_array").unwrapped

A.L.E: Arcade Learning Environment (version 0.10.1+6a7e0ae)
[Powered by Stella]


### Wrappers and Preprocessing

In [3]:
# Implementation based on Pol Vierge's solution of the M3-3_Activity_1 and the Example_1 (REINFORCE baseline on CartPole)

class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        old_shape = self.observation_space.shape
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]), dtype=np.float32)

    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)

class ScaledFloatFrame(gym.ObservationWrapper):
    def observation(self, obs):
        return np.array(obs).astype(np.float32) / 255.0


def make_env(env_name, render_mode=None):
    env = gym.make(env_name, render_mode=render_mode)
    print("Standard Env.        : {}".format(env.observation_space.shape))
    env = MaxAndSkipObservation(env, skip = 2)
    print("MaxAndSkipObservation: {}".format(env.observation_space.shape))
    env = ResizeObservation(env, (84, 84))
    print("ResizeObservation    : {}".format(env.observation_space.shape))
    env = GrayscaleObservation(env, keep_dim=True)
    print("GrayscaleObservation : {}".format(env.observation_space.shape))
    env = ImageToPyTorch(env)
    print("ImageToPyTorch       : {}".format(env.observation_space.shape))
    env = ReshapeObservation(env, (84, 84))
    print("ReshapeObservation   : {}".format(env.observation_space.shape))
    env = FrameStackObservation(env, stack_size=4)
    print("FrameStackObservation: {}".format(env.observation_space.shape))
    env = ScaledFloatFrame(env)
    print("ScaledFloatFrame     : {}".format(env.observation_space.shape))

    return env

env = make_env(ENV_NAME, render_mode = "rgb_array")

Standard Env.        : (210, 160, 3)
MaxAndSkipObservation: (210, 160, 3)
ResizeObservation    : (84, 84, 3)
GrayscaleObservation : (84, 84, 1)
ImageToPyTorch       : (1, 84, 84)
ReshapeObservation   : (84, 84)
FrameStackObservation: (4, 84, 84)
ScaledFloatFrame     : (4, 84, 84)


## Replay Buffer and DuelingDQN Model

### REPLAY BUFFER

In [4]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

    def add(self, state, action, reward, next_state, done):
        self.buffer.append(self.experience(state, action, reward, next_state, done))

    def sample(self, batch_size):
        samples = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*samples)
        return np.array(states), actions, rewards, np.array(next_states), dones

    def __len__(self):
        return len(self.buffer)

### Dueling DQN Architecture

In [5]:
class DuelingDQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DuelingDQN, self).__init__()

        # Convolutional layers
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )

        conv_out_size = self._get_conv_out(input_shape)

        # Fully connected layers for value and advantage streams
        self.value_fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, 1)  # Single value output for state value
        )
        self.advantage_fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)  # Advantage for each action
        )

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0], -1)
        value = self.value_fc(conv_out)
        advantage = self.advantage_fc(conv_out)
        
        # Combining value and advantage into Q-values
        q_values = value + (advantage - advantage.mean())
        return q_values

## DQN Agent


Same implementation as in the training notebook, but this time with the added functionality to generate videos of the agent playing the game (wathc_agent).

In [6]:
class DQNAgent:
    def __init__(self, env, replay_size=5000, batch_size=8, lr=1e-4, gamma=0.99, epsilon=1.0, epsilon_min=0.1, epsilon_decay=0.999, sync_target_frames=2000):
        self.env = env
        self.n_actions = env.action_space.n
        self.replay_buffer = ReplayBuffer(replay_size)
        self.batch_size = batch_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.sync_target_frames = sync_target_frames
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        self.policy_net = DuelingDQN(env.observation_space.shape, self.n_actions).to(self.device)
        self.target_net = DuelingDQN(env.observation_space.shape, self.n_actions).to(self.device)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.frame_idx = 0

    def act(self, state):
        """ 
        Selects an action using epsilon-greedy policy.

        Parameters:
        - state: Current state of the environment.

        Returns:
        - action: Action to take in the environment.
        """

        if np.random.rand() < self.epsilon:
            return self.env.action_space.sample()
        state = torch.tensor([state], dtype=torch.float32).to(self.device)
        q_values = self.policy_net(state)
        return q_values.max(1)[1].item()

    def learn(self):
        """
        Samples a batch from the replay buffer and performs a single step of optimization.

        Returns:
        - loss: Loss value from the optimization step.
        """
        
        if len(self.replay_buffer) < self.batch_size:
            return 0.0

        self.frame_idx += 1
        if self.frame_idx % self.sync_target_frames == 0:
            self.target_net.load_state_dict(self.policy_net.state_dict())

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

        states = torch.tensor(states, dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.long).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.float32).to(self.device)

        current_q_values = self.policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_net(next_states).max(1)[0]
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        loss = nn.MSELoss()(current_q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    # Visualization function for reward plot
    def visualise(self, score, run_folder, show_avg=True):
        """
        Parameters:
        - score: List of rewards per episode.
        - run_folder: Folder to save the plot.
        - show_avg: Whether to show the moving average of rewards.

        Returns:
        - None, it saves the plot in the run folder.
        """

        score = np.array(score)

        plt.figure(figsize=(15, 7))
        plt.ylabel("Trajectory duration", fontsize=12)
        plt.xlabel("Training iterations", fontsize=12)
        plt.plot(score, color='gray', linewidth=1, label='Score')

        if show_avg:
            N = 100
            avg_score = np.convolve(score, np.ones(N) / N, mode='valid')
            plt.plot(avg_score, color='blue', linewidth=3, label='Moving Average (window=100)')

        plt.scatter(np.arange(score.shape[0]), score, color='green', linewidth=0.3)
        plt.legend(fontsize=12)

        # Save the plot
        rewards_plot_path = os.path.join(run_folder, "test_reward_plot.png")
        plt.savefig(rewards_plot_path)
        plt.close()  # Close the plot to avoid display in Jupyter/other environments
        print(f"Rewards plot saved at: {rewards_plot_path}")

    # Loss plot function
    def plot_loss(self, losses, run_folder):
        plt.figure(figsize=(15, 7))
        plt.ylabel("Loss", fontsize=12)
        plt.xlabel("Training iterations", fontsize=12)
        plt.plot(losses, color='gray', linewidth=1, label='Loss')

        # Save the plot
        losses_plot_path = os.path.join(run_folder, "loss_plot.png")
        plt.savefig(losses_plot_path)
        plt.close()  # Close the plot to avoid display in Jupyter/other environments
        print(f"Loss plot saved at: {losses_plot_path}")

    # Watch agent function to evaluate the agent and save a GIF of the best episode.
    def watch_agent(self, model_path=None, T=500, episodes=10, device="cpu"):
        """
        Parameters:
        - model_path: Path to the trained model weights (optional).
        - T: Maximum steps per episode.
        - episodes: Number of episodes to evaluate.
        """
        if model_path:
            self.policy_net.load_state_dict(torch.load(model_path, map_location=device))
            self.policy_net.eval()

        scores = []
        episode_images = []

        for episode in tqdm(range(episodes), desc="Evaluating episodes"):
            state, _ = self.env.reset()
            state = np.array(state)  # Ensure state is in the right format
            images = []
            total_reward = 0
            done = False

            for t in range(T):
                # Capture the rendered image for the GIF
                img = self.env.render()
                images.append(Image.fromarray(img))

                # Select action using the policy network
                state_tensor = torch.tensor([state], dtype=torch.float32).to(self.device)
                with torch.no_grad():
                    q_values = self.policy_net(state_tensor)
                action = q_values.max(1)[1].item()

                # Take the selected action
                next_state, reward, done, _, _ = self.env.step(action)
                state = next_state
                total_reward += reward

                if done:
                    break

            scores.append(total_reward)
            episode_images.append(images)

        # Save a GIF of the best episode
        best_episode = np.argmax(scores)
        best_images = episode_images[best_episode]
        gif_path = "best_episode.gif"
        best_images[0].save(
            gif_path,
            save_all=True,
            append_images=best_images[1:],
            duration=60,
            loop=0
        )
        print(f"Best episode GIF saved at: {gif_path}")

        # Plot scores
        self.visualise(scores, run_folder=".", show_avg=False)

        # Close the environment
        self.env.close()


In [7]:
# Create the agent
agent = DQNAgent(env)

# Watch the agent perform and save a GIF of the best episode
agent.watch_agent(model_path="dqn_model.pth", T=5000, episodes=100)

  self.policy_net.load_state_dict(torch.load(model_path, map_location=device))
  state_tensor = torch.tensor([state], dtype=torch.float32).to(self.device)
Evaluating episodes: 100%|██████████| 100/100 [13:16<00:00,  7.97s/it]


Best episode GIF saved at: best_episode.gif
Rewards plot saved at: ./test_reward_plot.png
