This project is for a Atari game playing agent, using reinforcement learning.
This project follows the Deepmind research paper, "Playing Atari with Deep Reinforcement Learning"
Link: https://arxiv.org/pdf/1312.5602v1.pdf
Refered to: https://becominghuman.ai/lets-build-an-atari-ai-part-1-dqn-df57e8ff3b26 
and
https://github.com/boyuanf/DeepQLearning

In [0]:
# @title Importing required Libraries
import gym
import random
import numpy as np
import tensorflow as tf
from keras import layers
from skimage.color import rgb2gray
from skimage.transform import resize
from keras.models import Model

from collections import deque
from keras.optimizers import RMSprop
from keras import backend as K
from keras.models import load_model
from keras.models import clone_model
from keras.callbacks import TensorBoard
from keras.losses import mse

Using TensorFlow backend.


In [0]:
# @title Hyper-parameters required
I_EPSILON = 1
F_EPSILON = 0.1
EPSILON_STEPS = 100000
BATCH_SIZE = 32
GAMMA = 0.99
NUM_EPISODES = 2000
REPLAY_MEMORY = 200000
RENDER = False
ATARI_SHAPE = (84,84,4)
ACTION_SIZE = 3

In [0]:
# @title Preprocessing and model
# @markdown Preprocessing the Observation, converting to grayscale and resizing.
def preprocess(observation):
  processed_observation = np.uint8(resize(rgb2gray(observation),(84,84),mode="constant")*255)
  return processed_observation

# @markdown Creating DQN using keras functional API.
def model():
  # Using functional API of keras
  input_frames = layers.Input(ATARI_SHAPE,name="input_frames")
  input_actions = layers.Input((ACTION_SIZE,),name="action")
  
  normalized = layers.Lambda(lambda x: x/255.0, name="normalized")(input_frames)
  
  # "The first hidden layer convolves 16 8×8 filters with stride 4 with the input image and applies a rectifier nonlinearity."
  conv_1 = layers.convolutional.Conv2D(16, (8, 8), strides=(4, 4), activation='relu')(normalized)
  
  # "The second hidden layer convolves 32 4×4 filters with stride 2, again followed by a rectifier nonlinearity."
  conv_2 = layers.convolutional.Conv2D(32, (4, 4), strides=(2, 2), activation='relu')(conv_1)
    
  # Flattening the second convolutional layer.
  conv_flattened = layers.core.Flatten()(conv_2)
  
  # "The final hidden layer is fully-connected and consists of 256 rectifier units."
  hidden = layers.Dense(256, activation='relu')(conv_flattened)
  
  # "The output layer is a fully-connected linear layer with a single output for each valid action."
  output = layers.Dense(ACTION_SIZE)(hidden)
  
  # Finally, we multiply the output by the mask!
  filtered_output = layers.Multiply(name='QValue')([output, input_actions])

  model = Model(inputs=[input_frames, input_actions], outputs=filtered_output)
  model.summary()
  optimizer = RMSprop(lr=0.00025, rho=0.95, epsilon=0.01)
  model.compile(optimizer, loss=mse)
  return model

# @markdown Using epsilon-greedy policy select an action
def get_action(model,epsilon,history):
  if np.random.rand() <= epsilon:
    return random.randrange(ACTION_SIZE)
  else:
    q_value = model.predict([history,np.ones(ACTION_SIZE).reshape(1,ACTION_SIZE)])
    return np.argmax(q_value[0])
 

# @markdown Update memory for experience replay
def exp_replay(exp,history,action,reward,next_history,dead):
  exp.append((history,action,reward,next_history,dead))
  
 
# @markdown Get one-hot 
def one_hot(targets,nb_classes):
  return np.eye(nb_classes)[np.array(targets).reshape(-1)]


# @markdown Training mini batches
def train_batch(model,exp):
  mini_batch = random.sample(exp, BATCH_SIZE)
  history = np.zeros((BATCH_SIZE,ATARI_SHAPE[0],ATARI_SHAPE[1],ATARI_SHAPE[2]))
  next_history = np.zeros((BATCH_SIZE,ATARI_SHAPE[0],ATARI_SHAPE[1],ATARI_SHAPE[2]))
  target = np.zeros((BATCH_SIZE,))
  
  action,reward,dead = [], [], []
  
  for i, val in enumerate(mini_batch):
    history[i] = val[0]
    action.append(val[1])
    reward.append(val[2])
    next_history[i] = val[3]
    dead.append(val[4])
    
  actions = np.ones((BATCH_SIZE,ACTION_SIZE))
  next_q_values = model.predict([next_history,actions])
  
  for i in range(BATCH_SIZE):
    # terminal target is the reward
    if dead:
      target[i]=reward[i]
    # Non-terminal the target is the discounted reward
    else:
      target[i]=reward[i]+GAMMA*np.amax(next_q_values[i])
      
  action_one_hot = one_hot(action ,ACTION_SIZE)
  target_one_hot = action_one_hot * target[:,None]
  
  fitted_model = model.fit([history,action_one_hot],target_one_hot,epochs=1,batch_size=BATCH_SIZE,verbose=0)
  
  return fitted_model.history['loss'][0]
  

In [0]:
# @title Training the model
# @markdown environment = BreakoutDeterministic-v4
def train():
  env = gym.make('BreakoutDeterministic-v4')
  experience = deque(maxlen=REPLAY_MEMORY)
  episode_number = 0
  epsilon = I_EPSILON
  epsilon_decay = (I_EPSILON-F_EPSILON)/EPSILON_STEPS
  target_model = model()
  while episode_number <= NUM_EPISODES:
    done = False
    dead = False
    step, score, start_life = 0, 0, 5
    loss = 0.0
    observe = env.reset()
    
    # taking a random step at the start of the episode
    observe, _, _, _ = env.step(1)
    
    state = preprocess(observe)
    history = np.stack((state,state,state,state),axis=2)
    history = np.reshape([history],(1,84,84,4))
    
    while not done:
      if epsilon > F_EPSILON and episode_number > 320:
        epsilon -= epsilon_decay
        
      action = get_action(target_model,epsilon,history)
      
      observe, reward, done, info = env.step(action)
      
      next_state = preprocess(observe)
      next_state = np.reshape([next_state], (1, 84, 84, 1))
      next_history = np.append(next_state, history[:, :, :, :3], axis=3)
      
      if start_life > info['ale.lives']:
        dead = True
        start_life = info['ale.lives']
        
      exp_replay(experience,history,action,reward,next_history,dead)
      
      if episode_number > 320:
        loss = loss + train_batch(target_model, experience)
        
      
      score += reward
      
      if dead:
        dead = False
      else:
        history = next_history

      step += 1
      
      if done:
        print('episode: {}, score: {}, avg loss: {}, step: {}, Replay length: {}'
              .format(episode_number, score, loss / float(step), step, len(experience)))
        episode_number += 1
     
  target_model.save("trained_model.h5")
  env.close()

In [0]:
train()

In [0]:
# @title Testing the agent for 10 Episodes
def test():
    env = gym.make('BreakoutDeterministic-v4')

    episode_number = 0
    epsilon = 0.001
    model = load_model("trained_model.h5")
    while episode_number < 10:

        done = False
        dead = False
        # 1 episode = 5 lives
        score, start_life = 0, 5
        observe = env.reset()

        observe, _, _, _ = env.step(1)
        # At start of episode, there is no preceding frame
        # So just copy initial states to make history
        state = preprocess(observe)
        history = np.stack((state, state, state, state), axis=2)
        history = np.reshape([history], (1, 84, 84, 4))

        while not done:
            #env.render()

            # get action for the current history and go one step in environment
            action = get_action(model, epsilon, history)

            observe, reward, done, info = env.step(action)
            # pre-process the observation --> history
            next_state = preprocess(observe)
            next_state = np.reshape([next_state], (1, 84, 84, 1))
            next_history = np.append(next_state, history[:, :, :, :3], axis=3)

            # if the agent missed ball, agent is dead --> episode is not over
            if start_life > info['ale.lives']:
                dead = True
                start_life = info['ale.lives']

            score += reward

            # If agent is dead, set the flag back to false, but keep the history unchanged,
            # to avoid to see the ball up in the sky
            if dead:
                dead = False
            else:
                history = next_history

            if done:
                episode_number += 1
                print('episode: {}, score: {}'.format(episode_number, score))
    env.close()


In [0]:
test()

In [0]:
# Install the PyDrive wrapper & import libraries.
# This only needs to be done once in a notebook.
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client.
# This only needs to be done once in a notebook.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Create & upload a text file.
model_file = drive.CreateFile({'title' : 'trained_model.h5'})
model_file.SetContentFile('trained_model.h5')
model_file.Upload()
print('Uploaded file with ID {}'.format(model_file.get('id')))