# **Deep Reinforcement Learning**

# M3-2 Deep Q-Networks

## Example of DQN implementation on Pong environment (Part 2, testing)

Below we will see a simple example that will allow us to understand the concepts introduced in this module.

### Pong environment

The [Pong](https://gymnasium.farama.org/environments/atari/pong/) environment is part of the [Atari environments](https://gymnasium.farama.org/environments/atari/). Please read that page first for general information.

You control the right paddle, you compete against the left paddle controlled by the computer. You each try to keep deflecting the ball away from your goal and into your opponent’s goal.

<center><img src="https://ale.farama.org/_images/pong.gif"/></center>

For a more detailed documentation, see the [AtariAge page](https://atariage.com/manual_html_page.php?SoftwareLabelID=587).

First of all, we will load the environment. It is important to note that in this case, we will load the "preliminary" version of the environment, which belongs to the [Gym](https://github.com/openai/gym) framework (instead of [Gymnasium](https://gymnasium.farama.org/index.html)).

To install this environment, we need to execute the following command:
> pip install gym==0.25.0

And all related packages.

In [1]:
!pip install gym[atari]==0.25.0
!pip install autorom[accept-rom-license]

Collecting gym==0.25.0 (from gym[atari]==0.25.0)
  Downloading gym-0.25.0.tar.gz (720 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m720.4/720.4 kB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Collecting ale-py~=0.7.5 (from gym[atari]==0.25.0)
  Downloading ale_py-0.7.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.1 kB)
Downloading ale_py-0.7.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m35.7 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hBuilding wheels for collected packages: gym
  Building wheel for gym (pyproject.toml) ... [?25ldone
[?25h  Created wheel for gym: filename=gym-0.25.0-py3-none-any.whl size=824405 sha256=4fe89f8b6b958aaba632ecd0c049eb53

Once the dependencies are installed, we load them and initialize the `PongNoFrameskip-v4` environment.

There are several Pong environments, with minor differences among them. See [Pong](https://gymnasium.farama.org/environments/atari/pong/) page for further details.

In [2]:
import gym
import warnings
warnings.filterwarnings('ignore')

# version
print("Using Gym version {}".format(gym.__version__))

ENV_NAME = "PongNoFrameskip-v4"
test_env = gym.make(ENV_NAME)

Using Gym version 0.25.0


A.L.E: Arcade Learning Environment (version 0.7.5+db37282)
[Powered by Stella]


### Data preprocessing (Wrappers)

We need to apply the **same set of wrappers** used during the model's training phase to ensure that the inputs to the model are consistent in shape, format, and meaning.

In [3]:
# OpenAI Gym Wrappers
# Taken from
# https://github.com/PacktPublishing/Deep-Reinforcement-Learning-Hands-On/blob/master/Chapter06/lib/wrappers.py
import cv2
import numpy as np
import collections
import gym.spaces


class FireResetEnv(gym.Wrapper):
    def __init__(self, env=None):
        super(FireResetEnv, self).__init__(env)
        assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
        assert len(env.unwrapped.get_action_meanings()) >= 3

    def step(self, action):
        return self.env.step(action)

    def reset(self):
        self.env.reset()
        obs, _, done, _ = self.env.step(1)
        if done:
            self.env.reset()
        obs, _, done, _ = self.env.step(2)
        if done:
            self.env.reset()
        return obs

    
class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env=None, skip=4):
        super(MaxAndSkipEnv, self).__init__(env)
        self._obs_buffer = collections.deque(maxlen=2)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        done = None
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            if done:
                break
        max_frame = np.max(np.stack(self._obs_buffer), axis=0)
        return max_frame, total_reward, done, info

    def reset(self):
        self._obs_buffer.clear()
        obs = self.env.reset()
        self._obs_buffer.append(obs)
        return obs


class ProcessFrame84(gym.ObservationWrapper):
    def __init__(self, env=None):
        super(ProcessFrame84, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)

    def observation(self, obs):
        return ProcessFrame84.process(obs)

    @staticmethod
    def process(frame):
        if frame.size == 210 * 160 * 3:
            img = np.reshape(frame, [210, 160, 3]).astype(np.float32)
        elif frame.size == 250 * 160 * 3:
            img = np.reshape(frame, [250, 160, 3]).astype(np.float32)
        else:
            assert False, "Unknown resolution."
        img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
        resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_AREA)
        x_t = resized_screen[18:102, :]
        x_t = np.reshape(x_t, [84, 84, 1])
        return x_t.astype(np.uint8)


class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps, dtype=np.float32):
        super(BufferWrapper, self).__init__(env)
        self.dtype = dtype
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0),
                                                old_space.high.repeat(n_steps, axis=0), dtype=dtype)

    def reset(self):
        self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
        return self.observation(self.env.reset())

    def observation(self, observation):
        self.buffer[:-1] = self.buffer[1:]
        self.buffer[-1] = observation
        return self.buffer


class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super(ImageToPyTorch, self).__init__(env)
        old_shape = self.observation_space.shape
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1],
                                old_shape[0], old_shape[1]), dtype=np.float32)

    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)


class ScaledFloatFrame(gym.ObservationWrapper):
    def observation(self, obs):
        return np.array(obs).astype(np.float32) / 255.0

    
def make_env(env_name):
    env = gym.make(env_name)
    print("Standard Env.        : {}".format(env.observation_space.shape))
    env = MaxAndSkipEnv(env)
    print("MaxAndSkipEnv        : {}".format(env.observation_space.shape))
    env = FireResetEnv(env)
    print("FireResetEnv         : {}".format(env.observation_space.shape))
    env = ProcessFrame84(env)
    print("ProcessFrame84       : {}".format(env.observation_space.shape))
    env = ImageToPyTorch(env)
    print("ImageToPyTorch       : {}".format(env.observation_space.shape))
    env = BufferWrapper(env, 4)
    print("BufferWrapper        : {}".format(env.observation_space.shape))
    env = ScaledFloatFrame(env)
    print("ScaledFloatFrame     : {}".format(env.observation_space.shape))
    
    return env


def print_env_info(name, env):
    obs = env.reset()
    print("*** {} Environment ***".format(name))
    print("Observation shape: {}, type: {} and range [{},{}]".format(obs.shape, obs.dtype, np.min(obs), np.max(obs)))
    print("Observation sample:\n{}".format(obs))

In [4]:
# wrapped Env
env = make_env(ENV_NAME)
print_env_info("Wrapped", env)

Standard Env.        : (210, 160, 3)
MaxAndSkipEnv        : (210, 160, 3)
FireResetEnv         : (210, 160, 3)
ProcessFrame84       : (84, 84, 1)
ImageToPyTorch       : (1, 84, 84)
BufferWrapper        : (4, 84, 84)
ScaledFloatFrame     : (4, 84, 84)
*** Wrapped Environment ***
Observation shape: (4, 84, 84), type: float32 and range [0.0,0.6352941393852234]
Observation sample:
[[[0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  ...
  [0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]]

 [[0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.         0.         ... 0.         0.         0.        ]
  [0.         0.    

### Neural network architecture

The following code will implement the NN:

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)
device = torch.device(device)

Using device: cpu


In [6]:
def make_DQN(input_shape, output_shape):
    net = nn.Sequential(
        nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
        nn.ReLU(),
        nn.Conv2d(32, 64, kernel_size=4, stride=2),
        nn.ReLU(),
        nn.Conv2d(64, 64, kernel_size=3, stride=1),
        nn.ReLU(),
        nn.Flatten(),
        nn.Linear(64*7*7, 512),
        nn.ReLU(),
        nn.Linear(512, output_shape)
    )
    return net

### Test

We play several episodes and save them (.gif or .mp4)

In [7]:
import gym
import time
import numpy as np
import torch
import collections
import matplotlib.pyplot as plt
from PIL import Image

ENV_NAME = "PongNoFrameskip-v4"
model = "/kaggle/input/pongnoframeskip-v4/pytorch/dqn/1/PongNoFrameskip-v4.dat"
visualize = True
images = []

env = make_env(ENV_NAME)
net = make_DQN(env.observation_space.shape, env.action_space.n)
net.load_state_dict(torch.load(model, map_location=torch.device(device)))

state = env.reset()
total_reward = 0.0

while True:
    start_ts = time.time()
    if visualize:
        img = env.render(mode='rgb_array')
        images.append(Image.fromarray(img))

    state_ = torch.tensor(np.array([state], copy=False))
    q_vals = net(state_).data.numpy()[0]
    action = np.argmax(q_vals)

    state, reward, done, _ = env.step(action)
    total_reward += reward
    if done:
        break

print("Total reward: %.2f" % total_reward)

# duration is the number of milliseconds between frames; this is 40 frames per second
images[0].save("video.gif", save_all=True, append_images=images[1:], duration=60, loop=0)

Standard Env.        : (210, 160, 3)
MaxAndSkipEnv        : (210, 160, 3)
FireResetEnv         : (210, 160, 3)
ProcessFrame84       : (84, 84, 1)
ImageToPyTorch       : (1, 84, 84)
BufferWrapper        : (4, 84, 84)
ScaledFloatFrame     : (4, 84, 84)
Total reward: 18.00
