# 130. Deep Q-Network (DQN) 

## Deep Q-learning Target NN and DDQN - Deep Mind
#### Playing Atari with Deep Reinforcement Learning - 2015.2

<img src="https://miro.medium.com/max/1400/1*igVmhIWcLPrNnRPwx8PQSg.png" width= 500 />



## Deep Q-Network (DQN)

In [1]:
# 2D physics library
!pip3 install -q box2d-py

# # install dependencies needed for recording videos
!apt-get install -y xvfb x11-utils > tmp
!pip install -q pyvirtualdisplay==0.2.*

[?25l[K     |▊                               | 10 kB 28.2 MB/s eta 0:00:01[K     |█▌                              | 20 kB 35.8 MB/s eta 0:00:01[K     |██▏                             | 30 kB 24.9 MB/s eta 0:00:01[K     |███                             | 40 kB 14.2 MB/s eta 0:00:01[K     |███▋                            | 51 kB 13.1 MB/s eta 0:00:01[K     |████▍                           | 61 kB 15.2 MB/s eta 0:00:01[K     |█████▏                          | 71 kB 14.6 MB/s eta 0:00:01[K     |█████▉                          | 81 kB 12.5 MB/s eta 0:00:01[K     |██████▋                         | 92 kB 13.6 MB/s eta 0:00:01[K     |███████▎                        | 102 kB 14.4 MB/s eta 0:00:01[K     |████████                        | 112 kB 14.4 MB/s eta 0:00:01[K     |████████▊                       | 122 kB 14.4 MB/s eta 0:00:01[K     |█████████▌                      | 133 kB 14.4 MB/s eta 0:00:01[K     |██████████▎                     | 143 kB 14.4 MB/s eta 0:

In [2]:
from IPython import display
import matplotlib.pyplot as plt
import gym
from IPython.display import HTML
from pyvirtualdisplay import Display
from gym.wrappers.monitoring.video_recorder import VideoRecorder
from base64 import b64encode

import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np
import collections

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

환경을 초기화합니다.  

[MountainCar-v0](https://www.gymlibrary.ml/environments/classic_control/mountain_car/)  
[LunarLander-v2](https://www.gymlibrary.ml/environments/box2d/lunar_lander/)  
[MountainCar-v0'](https://www.gymlibrary.ml/environments/classic_control/cart_pole/)

In [3]:
ENV_NAME = 'MountainCar-v0'  # 'LunarLander-v2' #'CartPole-v1' #'MountainCar-v0'
env = gym.make(ENV_NAME)

env.seed(0)
print('State dimension: ', env.observation_space.shape[0])
print('Number of actions: ', env.action_space.n)

gamma = 0.99                   
batch_size = 64         
replay_size = 100_000       
   
max_t = 1000   # done 을 만나지 않아도 loop 종료

UPDATE_EVERY = 10   

eps_start = 1.0
eps_decay= .995
eps_end = 0.02

n_episodes = 1000 

# Define some hyperparameters
if ENV_NAME == 'CartPole-v0':
    learning_rate = 0.01   
    WIN_SCORE = 200

if ENV_NAME == 'LunarLander-v2':
    learning_rate = 0.005   
    WIN_SCORE = 100

if ENV_NAME == 'MountainCar-v0':
    learning_rate = 0.01   
    WIN_SCORE = -150

State dimension:  2
Number of actions:  3


### Neural Network Architecture 정의

In [4]:
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_shape[0], 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, n_actions)
        
    def forward(self, state):
        """Build a network that maps state -> action values."""
        x = self.fc1(state)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)

In [5]:
print(env.observation_space, env.action_space)
DQN(env.observation_space.shape, env.action_space.n).to(device)

Box(-1.2000000476837158, 0.6000000238418579, (2,), float32) Discrete(3)


DQN(
  (fc1): Linear(in_features=2, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=3, bias=True)
)

### Replay Memory 정의

In [6]:
Experience = collections.namedtuple('Experience', 
                                    field_names=['state', 'action', 'reward', 'done', 'new_state'])

class ExperienceReplay:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])
        return np.array(states, dtype=np.float32), np.array(actions, dtype=np.int64), \
                  np.array(rewards, dtype=np.float32), \
                  np.array(dones), np.array(next_states, dtype=np.float32)

### Replay Memory, Current / Target Network 초기화

In [7]:
#Initialize replay memory D to capacity N
D = ExperienceReplay(replay_size)

#Initialize action-value function Q with random weights theta
Q = DQN(env.observation_space.shape, env.action_space.n).to(device)
#Initialize target action-value function Q_hat with random weights theta_ = theta
target_Q = DQN(env.observation_space.shape, env.action_space.n).to(device)

### DQN main algorithm 구현

In [8]:
optimizer = optim.Adam(Q.parameters(), lr=learning_rate)

scores = []
eps = eps_start   

for episode in range(1, n_episodes+1):
    #Initialize sequence s1
    state = env.reset()
    score = 0
    for t in range(max_t):
        
        #With probability e select a random action a
        if np.random.random() < eps:
            action = np.random.randint(env.action_space.n)
        else:
            #otherwise select a_t = argmax Q(s)
            state_t = torch.FloatTensor(state).to(device)
            with torch.no_grad():
                action_values = Q(state_t)
            action = np.argmax(action_values.cpu().numpy())
            
        #Execute action a and observe reward
        next_state, reward, done, _ = env.step(action)
        # Store transition in D
        experience = Experience(state, action, reward, done, next_state)
        D.add(experience)

        if len(D) > batch_size:
            #Sample random minibatch of transitions from D
            states, actions, rewards, dones, next_states = D.sample(batch_size)
            states_v = torch.FloatTensor(states).to(device)
            actions_v = torch.LongTensor(actions).to(device)
            rewards_v = torch.FloatTensor(rewards).to(device)
            dones_v = torch.BoolTensor(dones).to(device)
            next_states_v = torch.FloatTensor(next_states).to(device)

            state_action_values = Q(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)
            next_state_action_values = target_Q(next_states_v).max(1)[0]

            # y = r if episode terminates
            next_state_action_values[dones_v] = 0.0

            # otherwise y = r + gamma*maxQ_target(a')
            y = rewards_v + gamma * next_state_action_values

            #Perform gradient descent
            loss = nn.MSELoss()(state_action_values, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Every C steps reset Q_target = Q
        if (t + 1) % UPDATE_EVERY == 0:
            target_Q.load_state_dict(Q.state_dict())

        state = next_state
        score += reward

        if done:
            break

    scores.append(score)
    eps = max(eps_end, eps_decay * eps)    # decrease epsilon

    print(f"\rscore of spisode {episode},  {np.mean(scores[-100:]):.2f}", end="")
    
    if episode % 100 == 0:
        print(f"\rscore of spisode {episode},  {np.mean(scores[-100:]):.2f}")
        torch.save(Q.state_dict(), 'checkpoint.pth')

    if np.mean(scores[-100:]) > WIN_SCORE:
        print(f"\nCompleted after {episode} with score  {np.mean(scores[-100:]):.2f}")
        break

score of spisode 100,  -200.00
score of spisode 200,  -198.87
score of spisode 300,  -184.81
score of spisode 400,  -172.91
score of spisode 500,  -161.38
score of spisode 567,  -149.55
Completed after 567 with score  -149.55


### 훈련된 network 저장

In [9]:
torch.save(Q.state_dict(), 'checkpoint.pth')

### Animate it with Video

In [10]:
display = Display(visible=False, size=(1400, 900))
_ = display.start()

def render_mp4(videopath: str) -> str:
  """
  지정된 경로에 있는 MP4 비디오의 b4 인코딩 버전을 포함하는 문자열을 가져옵니다.
  """
  mp4 = open(videopath, 'rb').read()
  base64_encoded_mp4 = b64encode(mp4).decode()
  return f'<video width=400 controls><source src="data:video/mp4;' \
         f'base64,{base64_encoded_mp4}" type="video/mp4"></video>'

In [11]:
file = "notrain.mp4"
video = VideoRecorder(env, file)

# 시각화
state = env.reset()
for _ in range(500):
    env.render()
    video.capture_frame()
    action = env.action_space.sample()
    next_state, reward, done, _ = env.step(action)

env.close()
video.close()

In [12]:
Q.load_state_dict(torch.load('checkpoint.pth'))

<All keys matched successfully>

In [13]:
file = "train.mp4"
video = VideoRecorder(env, file)

# 시각화
state = env.reset()
for _ in range(500):
    env.render()
    video.capture_frame()
    
    state_t = torch.FloatTensor(state).to(device)
    with torch.no_grad():
        action_values = Q(state_t)
    action = np.argmax(action_values.cpu().numpy())

    next_state, reward, done, _ = env.step(action)
    state = next_state

env.close()
video.close()

In [14]:
# file = "notrain.mp4"
file = "train.mp4"

html = render_mp4(file)
HTML(html)