<a href="https://colab.research.google.com/github/murphybrendan/ml-courses/blob/main/huggingface/deep-rl/unit3/dqn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Setup

##Install necessary packages

In [1]:
!pip install wandb einops pygame stable_baselines3
!pip install gymnasium[classic_control,box2d,atari]
!pip install gymnasium[accept-rom-license]

Collecting wandb
  Downloading wandb-0.17.0-py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.7/6.7 MB[0m [31m53.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting einops
  Downloading einops-0.8.0-py3-none-any.whl (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.2/43.2 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
Collecting stable_baselines3
  Downloading stable_baselines3-2.3.2-py3-none-any.whl (182 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m182.3/182.3 kB[0m [31m24.2 MB/s[0m eta [36m0:00:00[0m
Collecting docker-pycreds>=0.4.0 (from wandb)
  Downloading docker_pycreds-0.4.0-py2.py3-none-any.whl (9.0 kB)
Collecting gitpython!=3.1.29,>=1.0.0 (from wandb)
  Downloading GitPython-3.1.43-py3-none-any.whl (207 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.3/207.3 kB[0m [31m21.8 MB/s[0m e

## Set up the virtual display

In [2]:
%%capture
!apt install python-opengl
!apt install xvfb
!pip3 install pyvirtualdisplay

In [3]:
# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

<pyvirtualdisplay.display.Display at 0x7dcbeb301450>

## Imports and other things

In [30]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  and should_run_async(code)


#DQN Implementation

Implement the Q-Network. It's a simple feed forward network with some number of hidden layers. The input dimension is the dimension of an observation, and the output dimension is the dimension of the action space.

In [73]:
import torch

class QNetwork(torch.nn.Module):
    def __init__(self, observation_dim, action_space_dim, hidden_layers=[128, 64]):
        super().__init__()
        layer_dim = [observation_dim] + hidden_layers
        layers = []
        for i in range(len(layer_dim)-1):
            layers.append(torch.nn.Linear(layer_dim[i], layer_dim[i+1]))
            layers.append(torch.nn.ReLU())
        layers.append(torch.nn.Linear(layer_dim[-1], action_space_dim))
        self.layers = torch.nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.layers(x)

In [None]:
q = QNetwork(10, 2)
q

QNetwork(
  (layers): Sequential(
    (0): Linear(in_features=10, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=64, bias=True)
    (3): ReLU()
    (4): Linear(in_features=64, out_features=2, bias=True)
  )
)

Define the epsilon-greedy policy

In [117]:
import random
from gymnasium.spaces import Space

def epsilon_greedy_policy(q_net: QNetwork, observation: np.ndarray, action_space: Space, eps: float):
    if random.random() < eps:
        return action_space.sample()
    obs_tensor = torch.from_numpy(observation).to(device).float()
    action = torch.argmax(q_net(obs_tensor)).cpu().numpy()
    return action

In [123]:
from stable_baselines3.common.buffers import ReplayBuffer
from gymnasium.vector import VectorEnv
from gymnasium import Env
import torch.nn.functional as F
from tqdm.notebook import trange, tqdm


class DQN:
    def __init__(self,
                 env: Env, buffer_size=1000000, batch_size=32, gamma=0.99, train_freq=4, exploration_initial_eps=1.0, exploration_final_eps=0.05, learning_starts=100, target_update_interval=10000, learning_rate=0.0001) -> None:
        self.env = env
        self.batch_size = batch_size
        self.gamma = gamma
        self.train_freq = train_freq
        self.target_update_interval = target_update_interval
        self.exploration_initial_eps = exploration_initial_eps
        self.exploration_final_eps = exploration_final_eps

        self.replay_buffer = ReplayBuffer(buffer_size, env.observation_space, env.action_space)
        # observation_space.shape is more correct, but probably needs to be flattened
        self.q_net = QNetwork(env.observation_space.shape[0], env.action_space.n).to(device)
        self.q_net_target = QNetwork(env.observation_space.shape[0], env.action_space.n).to(device)
        self.q_net_target.load_state_dict(self.q_net.state_dict())
        self.optimizer = torch.optim.SGD(self.q_net.parameters(), learning_rate)
        self.last_target_update = learning_starts
        self.last_training_step = learning_starts
        self.num_envs = 1 if not isinstance(env, VectorEnv) else env.num_envs
        self.timestep = 0


    def collect_rollouts(self):
        # Linear schedule for eps
        action = epsilon_greedy_policy(self.q_net, self.prev_observation, self.env.action_space, self.eps)
        next_observation, reward, terminated, truncated, info = self.env.step(action)
        self.replay_buffer.add(self.prev_observation, next_observation, action, reward, terminated, [info])
        self.prev_observation = next_observation
        self.timestep += self.num_envs

    def step(self):
        samples = self.replay_buffer.sample(self.batch_size)

        # Double DQN: use the Q network to choose the next action instead of taking the max over all actions
        next_actions = torch.argmax(self.q_net(samples.next_observations))

        # Discounted future return is 0 if this was a terminating state
        td_target = self.gamma * self.q_net_target(samples.next_observations)[next_actions] * (1.0 - samples.dones)

        y = samples.rewards + td_target

        self.optimizer.zero_grad()
        x = self.q_net(samples.observations)[samples.actions].squeeze(-1)
        loss = F.mse_loss(x, y)
        loss.backward()

        self.optimizer.step()


    def learn(self, total_timesteps):
        self.prev_observation, _ = self.env.reset()
        for _ in trange(total_timesteps // self.num_envs):

            # Linear decay of epsilon over the course of training
            self.eps = self.exploration_initial_eps * (1 - self.timestep / total_timesteps) + self.exploration_final_eps * (self.timestep / total_timesteps)

            self.collect_rollouts()
            if self.last_training_step < self.timestep - self.train_freq:
                self.step()
                self.last_training_step = self.timestep
                if self.last_target_update < self.timestep - self.target_update_interval:
                    self.q_net_target.load_state_dict(self.q_net.state_dict())
                    self.last_target_update = self.timestep

model = DQN(env, learning_rate=0.001, target_update_interval=100)
model.learn(total_timesteps=10000)

  0%|          | 0/10000 [00:00<?, ?it/s]

In [124]:
model.q_net(torch.zeros((1, 1), device=device))

tensor([[1.0000]], device='cuda:0', grad_fn=<AddmmBackward0>)

In [74]:
import random
from gymnasium.spaces import Space

def epsilon_greedy_policy(Q: QNetwork, observation: torch.Tensor, action_space: Space, epsilon: float) -> int:
    n = random.random()
    if n < epsilon:
        return action_space.sample()
    return Q(observation).argmax()


Implement DQN, using the ReplayBuffer from stable_baselines3

# Define Probe Environments

Since DQN is model-free, we can use test environments unrelated to our target environment to get a sense of if our algorithm is working. We want to use extremely simple environments that DQN should be able to learn very easily.

So what's the easiest thing our DQN agent can do?

*Nothing*

That's it. Just sit there and do nothing. We'll define an environment with a discrete action space with only one choice. Then there's only one timestep with will reward with +1. You can think of this as the agent learning V instead of Q.

In [52]:
import gymnasium as gym
import numpy as np
from gymnasium.spaces import Discrete, Box

ObsType = np.ndarray
ActType = int


class Probe1(gym.Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(np.array([0]), np.array([0]))
        self.action_space = Discrete(1)

    def step(self, action: int) -> tuple[np.ndarray, float, bool, bool, dict]:
        return (np.array([0]), 1.0, True, False, {})

    def reset(self, seed: int = None) -> tuple[np.ndarray, dict]:
        super().reset(seed=seed)
        return np.array([0.0]), {}


gym.envs.registration.register(id="Probe1-v0", entry_point=Probe1)
env = gym.make("Probe1-v0")
assert env.observation_space.shape == (1,)
assert env.action_space.shape == ()

  and should_run_async(code)
  logger.warn(f"Overriding environment {new_spec.id} already in registry.")
  logger.warn(


Test that the environment works correctly using an existing agent.

In [62]:
from stable_baselines3 import DQN as DQNSB3

model = DQNSB3("MlpPolicy", env, learning_rate=0.001)
model.learn(total_timesteps=1000)

<stable_baselines3.dqn.dqn.DQN at 0x7b81eef8a110>

See that the model learned that the value of taking action 0 (the only action) at state \[0\] is 1.

In [61]:
model.q_net(torch.zeros((1,1), device=device))

tensor([[1.0000]], device='cuda:0', grad_fn=<AddmmBackward0>)

In [69]:
def test_model(model, input, target):
    assert torch.allclose(model.q_net(input), target)

test_model(model, torch.zeros((1,1), device=device), torch.ones((1,1), device=device))

tensor([[1.0000]], device='cuda:0', grad_fn=<AddmmBackward0>)
tensor([[1.]], device='cuda:0')


  and should_run_async(code)


True

TypeError: Expected state_dict to be dict-like, got <class '__main__.QNetwork'>.