# Install Dependecies to Render OpenAI Gym Environment

In [None]:
%%capture
!apt-get update
!pip install pyglet==1.3.2
!pip install gym[atari] pyvirtualdisplay
!apt-get install -y xvfb python-opengl ffmpeg
!pip install box2d-py
!pip install gast==0.2.2
!pip install torch
import gym
from gym import logger as gymlogger
from gym.wrappers import Monitor, AtariPreprocessing, FrameStack
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
import glob
import io
import math
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

#### Check that there is a GPU avaiable

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


# Start Environment and Build Dueling DQN Agent

In [None]:
# Load gym environment and get action and state spaces
env = gym.make('PongNoFrameskip-v4')
env = AtariPreprocessing(env,
                         grayscale_obs=True,
                         scale_obs=False,
                         terminal_on_life_loss=True)
env = FrameStack(env, num_stack=4)

num_state_feats = env.observation_space.shape
num_actions = env.action_space.n
max_observation_values = env.observation_space.high

print('Number of state features: {}'.format(num_state_feats))
print('Number of possible actions: {}'.format(num_actions))

Number of state features: (4, 84, 84)
Number of possible actions: 6


In [None]:
class DuelingDQN(nn.Module):
    """Convolutional neural network for the Atari games."""
    
    def __init__(self, num_actions):
        super(DuelingDQN, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4)
        std = math.sqrt(2.0 / (4 * 84 * 84))
        nn.init.normal_(self.conv1.weight, mean=0.0, std=std)
        self.conv1.bias.data.fill_(0.0)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        std = math.sqrt(2.0 / (32 * 4 * 8 * 8))
        nn.init.normal_(self.conv2.weight, mean=0.0, std=std)
        self.conv2.bias.data.fill_(0.0)

        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        std = math.sqrt(2.0 / (64 * 32 * 4 * 4))
        nn.init.normal_(self.conv3.weight, mean=0.0, std=std)
        self.conv3.bias.data.fill_(0.0)

        self.fc1 = nn.Linear(64 * 7 * 7, 512)
        std = math.sqrt(2.0 / (64 * 64 * 3 * 3))
        nn.init.normal_(self.fc1.weight, mean=0.0, std=std)
        self.fc1.bias.data.fill_(0.0)
        self.V = nn.Linear(512, 1)
        self.A = nn.Linear(512, num_actions)


    def forward(self, states):
        """Forward pass of the neural network with some inputs."""
        x = F.relu(self.conv1(states))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.fc1(x.view(x.size(0), -1)))  # Flatten imathut.
        V = self.V(x)
        A = self.A(x)
        Q = V + (A - A.mean(dim=1, keepdim=True))
        return Q

    
# Create main and target neural networks.
main_nn = DuelingDQN(num_actions).to(device)
target_nn = DuelingDQN(num_actions).to(device)

# Loss function and optimizer.
optimizer = torch.optim.Adam(main_nn.parameters(), lr=1e-5)
loss_fn = nn.SmoothL1Loss()  # Huber loss

# Create Helper Functions

In [None]:
def select_epsilon_greedy_action(state, epsilon):
    """Take random action with probability epsilon, else take best action."""
    
    result = np.random.uniform()
    
    if result < epsilon:
        return env.action_space.sample() # Random action.
    else:
        qs = main_nn(state).cpu().data.numpy()
        return np.argmax(qs) # Greedy action for state.

In [None]:
class UniformBuffer(object):
    """Experience replay buffer that samples uniformly."""

    def __init__(self, size, device):
        self._size = size
        self.buffer = []
        self.device = device
        self._next_idx = 0

    def add(self, state, action, reward, next_state, done):
        if self._next_idx >= len(self.buffer):
            self.buffer.append((state, action, reward, next_state, done))
        else:
            self.buffer[self._next_idx] = (state, action, reward, next_state, done)
        self._next_idx = (self._next_idx + 1) % self._size

    def __len__(self):
        return len(self.buffer)

    def sample(self, num_samples):
        states, actions, rewards, next_states, dones = [], [], [], [], []
        idx = np.random.choice(len(self.buffer), num_samples)
        
        for i in idx:
            elem = self.buffer[i]
            state, action, reward, next_state, done = elem
            states.append(np.array(state, copy=False))
            actions.append(np.array(action, copy=False))
            rewards.append(reward)
            next_states.append(np.array(next_state, copy=False))
            dones.append(done)
        
        states = torch.as_tensor(np.array(states), device=self.device)
        actions = torch.as_tensor(np.array(actions), device=self.device)
        rewards = torch.as_tensor(np.array(rewards, dtype=np.float32),
                                  device=self.device)
        next_states = torch.as_tensor(np.array(next_states), device=self.device)
        dones = torch.as_tensor(np.array(dones, dtype=np.float32),
                                device=self.device)
        
        return states, actions, rewards, next_states, dones

# Set Up Function to Perform a Training Step

In [None]:
def train_step(states, actions, rewards, next_states, dones):
    """Perform a training iteration on a batch of data."""
    
    next_qs_argmax = main_nn(next_states).argmax(dim=-1, keepdim=True)
    masked_next_qs = target_nn(next_states).gather(1, next_qs_argmax).squeeze()
    target = rewards + (1.0 - dones) * discount * masked_next_qs
    masked_qs = main_nn(states).gather(1, actions.unsqueeze(dim=-1)).squeeze()
    loss = loss_fn(masked_qs, target.detach())
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    return loss

# Start running the DQN algorithm and see how the algorithm learns.

In [None]:
# Hyperparameters.
num_episodes = 1200 # @param {type:"integer"}
epsilon = 1.0 # @param {type:"number"}
batch_size = 32 # @param {type:"integer"}
discount = 0.99 # @param {type:"number"}
buffer_size = 200000 # @param {type:"integer"}

In [None]:
buffer = UniformBuffer(size=buffer_size, device=device)

# Start training. Play game once and then train with a batch.
last_100_ep_rewards, cur_frame = [], 0
for episode in range(num_episodes+1):
    state = env.reset()
    ep_reward, done = 0, False
    
    while not done:
        state_np = np.array(state, dtype=np.float32)
        state_in = torch.as_tensor(np.expand_dims(state_np / 255., axis=0),
                                   device=device)
        action = select_epsilon_greedy_action(state_in, epsilon)
        next_state, reward, done, info = env.step(action)
        ep_reward += reward
        reward = np.sign(reward)

        # Save to experience replay.
        buffer.add(state, action, reward, next_state, done)
        state = next_state
        cur_frame += 1
        
        if epsilon > 0.01:
            epsilon -= 1.1e-6

        if len(buffer) >= batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            states = states.type(torch.FloatTensor).to(device) / 255.
            next_states = next_states.type(torch.FloatTensor).to(device) / 255.
            loss = train_step(states, actions, rewards, next_states, dones)

    # Copy main_nn weights to target_nn.
    if cur_frame % 10000 == 0:
        target_nn.load_state_dict(main_nn.state_dict())

    if len(last_100_ep_rewards) == 100:
        last_100_ep_rewards = last_100_ep_rewards[1:]
        
    last_100_ep_rewards.append(ep_reward)

    if episode % 25 == 0:
        print(f'Episode: {episode}/{num_episodes}, Epsilon: {epsilon:.3f}, '\
              f'Loss: {loss:.4f}, Return: {np.mean(last_100_ep_rewards):.2f}')

env.close()

Episode: 0/1200, Epsilon: 0.999, Loss: 0.0153, Return: -19.00
Episode: 25/1200, Epsilon: 0.974, Loss: 0.0003, Return: -20.46
Episode: 50/1200, Epsilon: 0.950, Loss: 0.0016, Return: -20.45
Episode: 75/1200, Epsilon: 0.925, Loss: 0.0001, Return: -20.41
Episode: 100/1200, Epsilon: 0.900, Loss: 0.0009, Return: -20.39
Episode: 125/1200, Epsilon: 0.874, Loss: 0.0014, Return: -20.28
Episode: 150/1200, Epsilon: 0.849, Loss: 0.0038, Return: -20.26
Episode: 175/1200, Epsilon: 0.822, Loss: 0.0022, Return: -20.21
Episode: 200/1200, Epsilon: 0.796, Loss: 0.0025, Return: -20.13
Episode: 225/1200, Epsilon: 0.770, Loss: 0.0018, Return: -20.13
Episode: 250/1200, Epsilon: 0.743, Loss: 0.0030, Return: -20.06
Episode: 275/1200, Epsilon: 0.717, Loss: 0.0029, Return: -19.99
Episode: 300/1200, Epsilon: 0.689, Loss: 0.0021, Return: -19.97
Episode: 325/1200, Epsilon: 0.662, Loss: 0.0076, Return: -20.01
Episode: 350/1200, Epsilon: 0.634, Loss: 0.0030, Return: -19.97
Episode: 375/1200, Epsilon: 0.604, Loss: 0.00

## Helper functions to visualize the performance of the agent

In [None]:
def show_video():
    """Enables video recording of gym environment and shows it."""
    
    mp4list = glob.glob('video/*.mp4')
    
    if len(mp4list) > 0:
        mp4 = mp4list[0]
    
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                                        loop controls style="height: 400px;">
                                        <source src="data:video/mp4;base64,{0}" type="video/mp4" />
                                        </video>'''.format(encoded.decode('ascii'))))
    else: 
        print("Video not found")

def wrap_env(env):
    env = Monitor(env, './video', force=True)
    return env

# Display Result of Trained DQN Agent on Pong Environment

In [None]:
env = gym.make('PongNoFrameskip-v4')
env = AtariPreprocessing(env,
                         grayscale_obs=True,
                         scale_obs=True,
                         terminal_on_life_loss=False)
env = wrap_env(FrameStack(env, num_stack=4))

state = env.reset()
ep_rew, done = 0, False
while not done:
    env.render()
    state = torch.as_tensor(np.array(state, dtype=np.float32), device=device)
    state_in = torch.unsqueeze(state, dim=0)
    action = select_epsilon_greedy_action(state_in, epsilon=0.01)
    state, reward, done, info = env.step(action)
    ep_rew += reward
print(f'Total Return: {ep_rew}')
env.close()
show_video()