# Libraries

In [None]:
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!pip install colabgymrender==1.0.2
!pip install "gym[atari, accept-rom-license]"



In [None]:
import gym
import cv2
import math
import random
import warnings
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
from colabgymrender.recorder import Recorder


import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

  from scipy.ndimage.filters import sobel



In [None]:
warnings.filterwarnings("ignore", category=DeprecationWarning)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

  and should_run_async(code)



# Normal Replay Memory

In [None]:
class NormalReplayMemory():
    def __init__(self, max_size, min_replay_size):
        self.max_size = max_size
        self.memory = deque(maxlen=self.max_size)
        self.min_replay_size = min_replay_size

    def add(self, experience):
        state, action, reward, done, next_state = experience
        state = preprocess(state) # resize and convert to gray scale
        next_state = preprocess(next_state) # resize and convert to gray scale
        experience = (state, action, reward, done, next_state)
        self.memory.append(experience)

    def get_batch(self, batch_size):
        experiences = random.sample(self.memory, batch_size)

        # Take batches from experiences
        states = np.array([experience[0] for experience in experiences])
        actions = np.array([experience[1] for experience in experiences])
        rewards = np.array([experience[2] for experience in experiences])
        dones = np.array([experience[3] for experience in experiences])
        next_states = np.array([experience[4] for experience in experiences])

        # Convert to tensor
        states = torch.tensor(states, dtype=torch.float32).to(device)
        actions = torch.tensor(actions, dtype=torch.int64).to(device).unsqueeze(-1) # (batch_size,) --> (batch_size, 1)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(device).unsqueeze(-1) # (batch_size,) --> (batch_size, 1)
        dones = torch.tensor(dones, dtype=torch.float32).to(device).unsqueeze(-1) # (batch_size,) --> (batch_size, 1)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(device)

        return states, actions, rewards, dones, next_states


# Preprocess

In [None]:
def preprocess(state):
    state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
    state_resize = cv2.resize(state, (84, 84), interpolation=cv2.INTER_CUBIC)
    state_resize = np.reshape(state_resize, (1, 84, 84))

    return state_resize

# Network

In [None]:
class NeuronNetwork(nn.Module):
    def __init__(self, env):
        super(NeuronNetwork, self).__init__()
        self.network = nn.Sequential(
            # Extraction
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # Prediction
            nn.Flatten(),
            nn.Linear(in_features=11552, out_features=1024),
            nn.ReLU(),
            nn.Linear(in_features=1024, out_features=env.action_space.n)
        )

    def forward(self, state):
        x = self.network(state)
        return x

    def choose_action(self, state):
        # Compute max q value
        state = preprocess(state)
        state = torch.tensor(state, dtype=torch.float32).to(device)
        q_values = self(state.unsqueeze(0)) # pytorch requires inputs in terms of batch
        best_action = torch.argmax(q_values, dim=1)[0]

        return best_action.detach().cpu().numpy()

# Agent

In [None]:
class Agent():
    def __init__(self, env, max_epsilon, min_epsilon, max_num_steps, \
                 epsilon_decay_intervals, gamma, alpha, \
                 memory_size, min_replay_size, batch_size, \
                 target_update_frequency):
        # Environment
        self.env = env
        self.memory = NormalReplayMemory(max_size=memory_size, min_replay_size=min_replay_size)

        # Hyperparameters
        self.max_epsilon = max_epsilon
        self.min_epsilon = min_epsilon
        self.max_num_steps = max_num_steps
        self.epsilon_decay_intervals = epsilon_decay_intervals
        self.gamma = gamma # discount value
        self.alpha = alpha # learning rate
        self.batch_size = batch_size # batch size taken from memory
        self.target_update_frequency = target_update_frequency # target network update frequency

        # Network
        self.q_net = NeuronNetwork(self.env).to(device)
        self.target_net = NeuronNetwork(self.env).to(device)
        self.target_net.load_state_dict(self.q_net.state_dict())

        self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=self.alpha)

    def choose_action(self, epsilon, state):
        random_number = np.random.uniform(0,1)
        if random_number <= epsilon:
            action = self.env.action_space.sample()
        else:
            action = self.q_net.choose_action(state)
        return action

    def fill_memory(self):
        state = self.env.reset()

        # Loop min_replay_size times and append experience to memory
        for _ in range(self.memory.min_replay_size):

            # Randomly taking action
            action = self.env.action_space.sample()

            next_state, reward, done, info = self.env.step(action)
            experience = (state, action, reward, done, next_state)

            # Add to memory
            self.memory.add(experience)

            state = next_state

            if done:
                self.env.reset()

    def training(self):
        # Fill memory
        self.fill_memory()
        reward_buffer = deque(maxlen=100) # Rewards of the previous 100 episodes

        reward_per_episode = 0.0
        state = self.env.reset()
        all_rewards = []

        for step in range(self.max_num_steps):
            # Computer epsilon
            epsilon = np.interp(step, [0, self.epsilon_decay_intervals], [self.max_epsilon, self.min_epsilon])
            # Choose action to take
            action = self.choose_action(epsilon, state)

            # Take action and add experience to memory
            next_state, reward, done, info = self.env.step(action)
            experience = (state, action, reward, done, next_state)
            # Add to memory
            self.memory.add(experience)

            reward_per_episode += reward

            state = next_state

            # If done, 1 episode is done
            if done:
                state = self.env.reset()
                reward_buffer.append(reward_per_episode)
                all_rewards.append((step, reward_per_episode))
                reward_per_episode = 0.0

            # Get batch from memory for training
            states, actions, rewards, dones, next_states = self.memory.get_batch(self.batch_size)

            # Predict Q value with Q network
            q_values = self.q_net(states)
            action_q_values = torch.gather(input=q_values, dim=1, index=actions)

            # Predict target with Target network
            # Compute targets using the formulation sample = r + gamma * max q(s',a')
            target_q_values = self.target_net(next_states)
            max_target_q_values = target_q_values.max(dim=1, keepdim=True)[0]
            targets = rewards + self.gamma * (1 - dones) * max_target_q_values

            loss = torch.nn.functional.mse_loss(action_q_values, targets)

            # Gradient descent for q-network
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            # Update target network
            if (step + 1) % self.target_update_frequency == 0:
                self.target_net.load_state_dict(self.q_net.state_dict())

            # Print training results
            if (step + 1) % 1000 == 0:
                average_reward = np.mean(reward_buffer)
                print(f'Episode: {len(all_rewards)} Step: {step+1} Average reward: {average_reward}')
        return all_rewards

In [None]:
max_epsilon = 1.0
min_epsilon = 0.01
max_num_steps = 500000
epsilon_decay_intervals = 150000
gamma = 0.99 # discount
alpha = 5e-4 # learning rate

memory_size = 50000
min_replay_size = 1000
batch_size = 32

target_update_frequency = 2000 # target network update frequency

In [None]:
env = gym.make("ALE/KungFuMaster-v5")
model = Agent(env, max_epsilon, min_epsilon, max_num_steps, \
                 epsilon_decay_intervals, gamma, alpha, \
                 memory_size, min_replay_size, batch_size, \
                 target_update_frequency)
all_rewards = model.training()

In [None]:
import numpy as np
np.save('DQN_all_rewards_500k', all_rewards)

In [None]:
from IPython.display import FileLink
FileLink(r'DQN_all_rewards_500k.npy')

In [None]:
torch.save(model.q_net.state_dict(), "./DQN_all_rewards_500k")