<a href="https://colab.research.google.com/github/scadusseau/Implementing-RL-DQN-algorithm-on-video-games/blob/master/DQNonGames.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Implementing a DQN algorithm on video games https://www.youtube.com/watch?v=5fHngyN8Qhw

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
#Bon ici on utilise une astuce pour pouvoir afficher l'image sur google colab
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg libav-tools xorg-dev libsdl2-dev swig cmake > /dev/null 2>&1
!apt-get install x11-utils > /dev/null 2>&1

!pip install gym[atari]

In [0]:
!sudo apt-get install xvfb

In [0]:
!apt install swig cmake libopenmpi-dev zlib1g-dev
!pip install stable-baselines==2.5.1 box2d box2d-kengz

In [0]:
!pip install atari-py

In [0]:
import gym
from gym import logger as gymlogger
from gym.wrappers import Monitor
gymlogger.set_level(40) #error only

import matplotlib.pyplot as plt

import numpy as np

import glob
import io
import base64
from IPython import display as ipythondisplay
from IPython.display import HTML

from pyvirtualdisplay import Display

In [0]:
#Pour l'affichage
display = Display(visible=0, size=(800, 600))
display.start()

"""
Utility functions to enable video recording of gym environment and displaying it
To enable video, just do "env = wrap_env(env)""
"""

def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else: 
    print("Could not find video")
    

def wrap_env(env):
  env = Monitor(env, './video', force=True)
  return env

In [0]:
from keras.layers import Dense, Activation
from keras.models import Sequential, load_model
from keras.optimizers import Adam
import numpy as np

class ReplayBuffer(object):
  def __init__(self, max_size, input_shape, n_actions, discrete=False):
    self.mem_size = max_size
    self.mem_cntr = 0
    self.input_shape = input_shape
    self.discrete = discrete
    self.state_memory = np.zeros((self.mem_size, input_shape))
    self.new_state_memory = np.zeros((self.mem_size, input_shape))
    dtype = np.int8 if self.discrete else np.float32
    self.action_memory = np.zeros((self.mem_size, n_actions), dtype = dtype)
    self.reward_memory = np.zeros(self.mem_size)
    self.terminal_memory = np.zeros(self.mem_size, dtype=np.float32)

  def store_transition(self, state, action, reward, state_, done):
    index = self.mem_cntr % self.mem_size
    self.state_memory[index] = state
    self.new_state_memory[index] = state_
    self.reward_memory[index] = reward
    self.terminal_memory[index] = 1 - int(done)
    if self.discrete:
      actions = np.zeros(self.action_memory.shape[1])
      actions[action] = 1.0
      self.action_memory[index] = actions
    else:
      self.action_memory[index] = action
    self.mem_cntr += 1

  def sample_buffer(self, batch_size):
    max_mem = min(self.mem_cntr, self.mem_size)
    batch = np.random.choice(max_mem, batch_size)

    states = self.state_memory[batch]
    states_ = self.new_state_memory[batch]
    rewards = self.reward_memory[batch]
    actions = self.action_memory[batch]
    terminal = self.terminal_memory[batch]

    return states, actions, rewards, states_, terminal

def build_dqn(lr, n_actions, input_dims, fc1_dims, fc2_dims):
  model = Sequential([Dense(fc1_dims, input_shape=(input_dims, )),
                      Activation('relu'),
                      Dense(fc2_dims),
                      Activation('relu'),
                      Dense(n_actions)])
  model.compile(optimizer=Adam(lr=lr), loss='mse')

  return model

class Agent(object):
  def __init__(self, alpha, gamma, n_actions, epsilon, batch_size,
               input_dims, epsilon_dec=0.996, epsilon_end=0.01,
               mem_size=1000000, fname='dqn_model.h5'):
    self.action_space = [i for i in range(n_actions)]
    self.n_actions = n_actions
    self.gamma = gamma
    self.epsilon = epsilon
    self.epsilon_dec = epsilon_dec
    self.epsilon_min = epsilon_end
    self.batch_size = batch_size
    self.model_file = fname

    self.memory = ReplayBuffer(mem_size, input_dims, n_actions, discrete=True)
    self.q_eval = build_dqn(alpha, n_actions, input_dims, 256, 256)
  
  def remember(self, state, action, reward, new_state, done):
    self.memory.store_transition(state, action, reward, new_state, done)

  def choose_action(self, state):
    state = state[np.newaxis, :]
    rand = np.random.random()
    if rand < self.epsilon:
      action = np.random.choice(self.action_space)
    else:
      actions = self.q_eval.predict(state)
      action = np.argmax(actions)

    return action

  def learn(self):
    if self.memory.mem_cntr < self.batch_size:
      return
    state, action, reward, new_state, done = self.memory.sample_buffer(self.batch_size)
    action_values = np.array(self.action_space, dtype=np.int8)
    action_indices = np.dot(action, action_values)

    q_eval = self.q_eval.predict(state)
    q_next = self.q_eval.predict(new_state)

    q_target = q_eval.copy()

    batch_index = np.arange(self.batch_size, dtype=np.int32)

    q_target[batch_index, action_indices] = reward + self.gamma*np.max(q_next, axis=1)*done

    _ = self.q_eval.fit(state, q_target, verbose=0)

    self.epsilon = self.epsilon*self.epsilon_dec if self.epsilon > self.epsilon_min else self.epsilon_min

  def save_model(self):
    path = "/content/gdrive/My Drive/"+self.model_file 
    self.q_eval.save(path)

  def load_model(self):
    path = "/content/gdrive/My Drive/"+self.model_file  
    self.q_eval = load_model(path)

# **For Lunar Lander**

In [0]:
import numpy as np
import gym


env = wrap_env(gym.make('LunarLander-v2'))
n_games = 500
agent = Agent(gamma=0.99, epsilon=1.0, alpha=0.0005, input_dims=8, n_actions = 4,
              mem_size=1000000, batch_size=64, epsilon_end=0.01)
  
scores = []
eps_history = []

for i in range(n_games):
  done = False
  score = 0
  observation = env.reset()
  while not done:
    action = agent.choose_action(observation)
    observation_, reward, done, info = env.step(action)
    score += reward
    agent.remember(observation, action, reward, observation_, done)
    observation = observation_
    agent.learn()

  eps_history.append(agent.epsilon)
  scores.append(score)

  avg_score = np.mean(scores[max(0, i-100):(i+1)])
  print('episode ', i, 'score %.2f' % score, 'average score %.2f' % avg_score)

  if i % 10 == 0 and i > 0:
    agent.save_model()

env.close()
show_video()

# **For Space Invader**

In [0]:
import numpy as np
import gym
import cv2

def preprocess(img):
    return np.matrix.flatten(cv2.resize(cv2.cvtColor(img, cv2.COLOR_RGB2GRAY), (84, 90)))

env = wrap_env(gym.make('SpaceInvaders-v0'))
n_games = 500
agent = Agent(gamma=0.99, epsilon=1.0, alpha=0.0005, input_dims=7560, n_actions = env.action_space.n,
              mem_size=1000000, batch_size=64, epsilon_end=0.01)
  
#agent.load_model()

scores = []
eps_history = []

for i in range(n_games):
  done = False
  score = 0
  observation = env.reset()
  observation = preprocess(observation)
  while not done:
    action = agent.choose_action(observation)
    observation_, reward, done, info = env.step(action)
    observation_ = preprocess(observation_)
    score += reward
    agent.remember(observation, action, reward, observation_, done)
    observation = observation_
    agent.learn()

  eps_history.append(agent.epsilon)
  scores.append(score)

  avg_score = np.mean(scores[max(0, i-100):(i+1)])
  print('episode ', i, 'score %.2f' % score, 'average score %.2f' % avg_score)

  if i % 10 == 0 and i > 0:
    agent.save_model()

env.close()
show_video()