# **Double DQN agent Implementation**

Here we gonna implement the DDQN method to be able to compare if the agent is able to learn faster.

# **Imports all needed libs**

In [1]:
import torch
from torch import nn
from torchvision import transforms as T

import numpy as np
import matplotlib.pyplot as plt
import datetime
import pandas as pd

# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace

# Super Mario environment for OpenAI Gym
import gym_super_mario_bros
from pathlib import Path
from torchrl.data import LazyMemmapStorage, TensorDictReplayBuffer
from tensordict import TensorDict


# **Init and test of the env**

In [2]:
# Initialize Super Mario environment (in v0.26 change render mode to 'human' to see results on the screen)
print(gym.__version__)
if gym.__version__ < '0.26':
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", new_step_api=True)
else:
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", render_mode='rgb', apply_api_compatibility=True)

# Limit the action-space to
#   0. walk right
#   1. jump right
env = JoypadSpace(env, [["right"], ["right", "A"]])

env.reset()
next_state, reward, done, trunc, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")

0.26.2


  logger.warn(
  logger.warn(


(240, 256, 3),
 0.0,
 False,
 {'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'y_pos': 79}


  if not isinstance(terminated, (bool, np.bool8)):


# **Preprocess of the env**

In [3]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info


class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        # permute [H, W, C] array to [C, H, W] tensor
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape, antialias=True), T.Normalize(0, 255)]
        )
        observation = transforms(observation).squeeze(0)
        return observation


# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
if gym.__version__ < '0.26':
    env = FrameStack(env, num_stack=4, new_step_api=True)
else:
    env = FrameStack(env, num_stack=4)

# **Definition of the DDQN neural network**

In [4]:
class MarioNet(nn.Module):
    """mini CNN structure
  input -> (conv2d + relu) x 3 -> flatten -> (dense + relu) x 2 -> output
  """

    def __init__(self, input_dim, output_dim):
        super().__init__()
        c, h, w = input_dim

        if h != 84:
            raise ValueError(f"Expecting input height: 84, got: {h}")
        if w != 84:
            raise ValueError(f"Expecting input width: 84, got: {w}")

        self.online = self.__build_cnn(c, output_dim)

        self.target = self.__build_cnn(c, output_dim)
        self.target.load_state_dict(self.online.state_dict())

        # Q_target parameters are frozen.
        for p in self.target.parameters():
            p.requires_grad = False

    def forward(self, input, model):
        if model == "online":
            return self.online(input)
        elif model == "target":
            return self.target(input)

    def __build_cnn(self, c, output_dim):
        return nn.Sequential(
            nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim),
        )

# **Definition of the mario agent**

In [5]:
class Mario:
    def __init__(self, state_dim, action_dim, save_dir):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.save_dir = save_dir
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.net = MarioNet(self.state_dim, self.action_dim).float().to(device=self.device)
        
        self.exploration_rate = 1
        self.exploration_rate_decay = 0.99999975
        self.exploration_rate_min = 0.1
        self.curr_step = 0
        self.save_every = 5e5
        
        self.memory = TensorDictReplayBuffer(storage=LazyMemmapStorage(100000, device=torch.device("cpu")))
        self.batch_size = 32
        self.gamma = 0.9
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
        self.loss_fn = torch.nn.SmoothL1Loss()
        
        self.burnin = 1e4
        self.learn_every = 3
        self.sync_every = 1e4

    def act(self, state):
        if np.random.rand() < self.exploration_rate:
            action_idx = np.random.randint(self.action_dim)
        else:
            state = state[0].__array__() if isinstance(state, tuple) else state.__array__()
            state = torch.tensor(state, device=self.device).unsqueeze(0)
            action_values = self.net(state, model="online")
            action_idx = torch.argmax(action_values, axis=1).item()
        self.exploration_rate *= self.exploration_rate_decay
        self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
        self.curr_step += 1
        return action_idx

    def cache(self, state, next_state, action, reward, done):
        def first_if_tuple(x):
            return x[0] if isinstance(x, tuple) else x
        
        state = first_if_tuple(state).__array__()
        next_state = first_if_tuple(next_state).__array__()
        state = torch.tensor(state)
        next_state = torch.tensor(next_state)
        action = torch.tensor([action])
        reward = torch.tensor([reward])
        done = torch.tensor([done])
        
        self.memory.add(TensorDict({"state": state, "next_state": next_state, "action": action, "reward": reward, "done": done}, batch_size=[]))

    def recall(self):
        batch = self.memory.sample(self.batch_size).to(self.device)
        state, next_state, action, reward, done = (batch.get(key) for key in ("state", "next_state", "action", "reward", "done"))
        return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()

    def td_estimate(self, state, action):
        current_Q = self.net(state, model="online")[np.arange(0, self.batch_size), action]
        return current_Q

    @torch.no_grad()
    def td_target(self, reward, next_state, done):
        next_state_Q = self.net(next_state, model="online")
        best_action = torch.argmax(next_state_Q, axis=1)
        next_Q = self.net(next_state, model="target")[np.arange(0, self.batch_size), best_action]
        return (reward + (1 - done.float()) * self.gamma * next_Q).float()

    def update_Q_online(self, td_estimate, td_target):
        loss = self.loss_fn(td_estimate, td_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def sync_Q_target(self):
        self.net.target.load_state_dict(self.net.online.state_dict())

    def save(self, best_agent_path):
        save_path = best_agent_path
        torch.save(dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate), save_path)

    def load(self, load_path):
        load_path = Path(load_path)
        if not load_path.exists():
            raise ValueError(f"{load_path} does not exist")
        
        ckp = torch.load(load_path, map_location=('cuda' if self.device == 'cuda' else 'cpu'))
        exploration_rate = ckp.get('exploration_rate')
        state_dict = ckp.get('model')
        
        print(f"Loading model at {load_path} with exploration rate {exploration_rate}")
        self.net.load_state_dict(state_dict)
        self.exploration_rate = exploration_rate

    def learn(self):
        if self.curr_step % self.sync_every == 0:
            self.sync_Q_target()
        if self.curr_step % self.save_every == 0:
            self.save()
        if self.curr_step < self.burnin:
            return None, None
        if self.curr_step % self.learn_every != 0:
            return None, None
        
        state, next_state, action, reward, done = self.recall()
        td_est = self.td_estimate(state, action)
        td_tgt = self.td_target(reward, next_state, done)
        loss = self.update_Q_online(td_est, td_tgt)
        return (td_est.mean().item(), loss)

# Metric class to print and save usefull insight

In [25]:
class MetricLogger:
    def __init__(self, save_dir):
        self.save_log = save_dir / "log"
        with open(self.save_log, "w") as f:
            f.write(f"{'Episode':>8}{'Reward':>15}\n")
        self.ep_rewards_plot = save_dir / "reward_plot.jpg"
        self.ep_rewards = []

    def log_episode(self, episode, reward):
        self.ep_rewards.append(reward)
        with open(self.save_log, "a") as f:
            f.write(f"{episode:8d}{reward:15.3f}\n")
    
    def create_df(self):
        with open(self.save_log, "r") as f:
            lines = f.readlines()[1:]
            rewards = [float(line.split()[-1]) for line in lines]
        
        df = pd.DataFrame({'Episode': range(1, len(rewards) + 1), 'Reward': rewards})
        return df

    def plot_rewards(self, rolling_window=10):
        df = self.create_df()
        rolling_mean = df['Reward'].rolling(window=rolling_window).mean()
        plt.figure(figsize=(12, 8))
        plt.plot(df['Episode'], df['Reward'], label='Reward', linestyle='--', color='orange', linewidth=1)
        plt.plot(df['Episode'], rolling_mean, label=f'Rolling Mean (window={rolling_window})', color='blue', linewidth=3)
        plt.xlabel("Episode")
        plt.ylabel("Reward")
        plt.title("Reward per Episode")
        plt.legend()
        plt.savefig(self.ep_rewards_plot)
        plt.close()

# **Definition of the training process**

Saving process:

1- Save the model based on the average reward over a certain number of episodes instead of relying on a single episode's reward. This helps smooth out the variability and provides a more reliable measure of the agent's performance.

In [26]:
def train(env, agent, logger, num_episodes, best_agent_dir, eval_episodes=100):
    best_avg_reward = float('-inf')
    best_agent_path = best_agent_dir / "best_agent.chkpt"
    episode_rewards = []

    for episode in range(1, num_episodes + 1):
        state = env.reset()
        episode_reward = 0
        done = False

        while not done:
            action = agent.act(state)
            next_state, reward, done, truncated, info = env.step(action)
            agent.cache(state, next_state, action, reward, done)
            state = next_state
            episode_reward += reward

            if done:
                break

        episode_rewards.append(episode_reward)
        logger.log_episode(episode, episode_reward)

        if len(episode_rewards) >= eval_episodes:
            avg_reward = np.mean(episode_rewards[-eval_episodes:])
            if avg_reward > best_avg_reward:
                best_avg_reward = avg_reward
                agent.save(best_agent_path)
            print(f"Episode {episode}/{num_episodes}, Best Avg. Reward: {best_avg_reward:.2f}")

        agent.learn()

    # Save the final model
    agent.save(best_agent_dir / "final_agent.chkpt")

In [27]:
def test_visualized(agent):
   if gym.__version__ < '0.26':
      env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", new_step_api=True)
   else:
      env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", render_mode='human', apply_api_compatibility=True)
   
   env = JoypadSpace(env, [["right"], ["right", "A"]])

   env = SkipFrame(env, skip=4)
   env = GrayScaleObservation(env)
   env = ResizeObservation(env, shape=84)
   if gym.__version__ < '0.26':
      env = FrameStack(env, num_stack=4, new_step_api=True)
   else:
      env = FrameStack(env, num_stack=4)
   
   state = env.reset()
   done = False

   while done == False:
      action = agent.act(state)
      state, reward, done, truncated, info = env.step(action)
      env.render()
      
   env.close()

In [28]:
# Usage example
save_dir = Path('../checkpoints') / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)
logger = MetricLogger(save_dir)

best_agent_dir = Path('../best_agent')
best_agent_dir.mkdir(exist_ok=True)

In [29]:
print(env.action_space.n)

2


In [30]:
# Initialize the agent
agent = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)

In [None]:
# Train the agent
train(env, agent, logger, num_episodes=500, best_agent_dir=best_agent_dir)

In [31]:
logger.plot_rewards(rolling_window=100)

   Episode  Reward
0        1   607.0
1        2  1208.0
2        3  1023.0
3        4   958.0
4        5   636.0


# **Test the best agent**

In [None]:
# Load the best agent
best_agent_path = best_agent_dir / "best_agent.chkpt"
agent.load(best_agent_path)

In [None]:
# Test the agent
test_visualized(agent)

## --------------