In [None]:
# This is an implementation of Playing Atari with Deep Reinforcement Learning
# The idea here is that our expected score V(s, a) for a game at a certain state s where we take action a is
# V(s, a) = max_a' V(s', a')+r, where s' is the next state, r is the immediate score increment, and a' is taken from possible actions at the next state
# In other words, we're assuming we always take actions to maximize our expected score
# These are called the Bellman equations
# The idea for the DQN algorithm is to view one of these sides as a prediction (the left) and the other as a target, then train V as a neural network
# We predict the value of a game screen simply using a convolutional neural network
# We use various stability tricks, such as using a replay memory to sample previous states randomly, so our training data is roughly uncorrelated

In [None]:
#@title
!wget http://www.atarimania.com/roms/Roms.rar 
!unrar x -o+ /content/Roms.rar >/dev/nul
!python -m atari_py.import_roms /content/ROMS >/dev/nul
!sudo pip install pyvirtualdisplay
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!apt-get update > /dev/null 2>&1
!apt-get install cmake > /dev/null 2>&1
!pip install --upgrade setuptools 2>&1
!pip install ez_setup > /dev/null 2>&1
!pip install gym[atari] > /dev/null 2>&1

In [None]:
#@title
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import gym
from gym.wrappers import Monitor
import glob
import io
import base64
from IPython.display import HTML
from pyvirtualdisplay import Display
from IPython import display as ipythondisplay
import random as r
import copy
#from google.colab import drive
import skimage
from skimage import io as io2
from skimage.transform import resize
drive.mount('gdrive')
from pyvirtualdisplay import Display
import matplotlib.pyplot as plt

Drive already mounted at gdrive; to attempt to forcibly remount, call drive.mount("gdrive", force_remount=True).


In [None]:
MAX_REPLAY_SIZE = 20000 # Maximum number of previous states saved in replay
GAMMA = 1 # Parameter of DQN algorithm. We weight the reward of the nth time step in the future gamma^n * reward
LOSS_FUNC = nn.HuberLoss() # Loss function used to measure difference between actual value (according to target net) and predicted value (according to value net)
UPDATE_TARGET_LENGTH = 10 # How many games we want to play before updating the target

In [None]:
# This code is not original, and taken from: https://colab.research.google.com/github/jeffheaton/t81_558_deep_learning/blob/master/t81_558_class_12_01_ai_gym.ipynb
# This allows me to record videos of model playing Atari games in Google colab

def query_environment(name):
    env = gym.make(name)
    spec = gym.spec(name)
    print(f"Action Space: {env.action_space}")
    print(f"Observation Space: {env.observation_space}")
    print(f"Max Episode Steps: {spec.max_episode_steps}")
    print(f"Nondeterministic: {spec.nondeterministic}")
    print(f"Reward Range: {env.reward_range}")
    print(f"Reward Threshold: {spec.reward_threshold}")

display = Display(visible=0, size=(1400, 900))
display.start()

def show_video():
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")


def wrap_env(env):
    env = Monitor(env, './video', force=True)
    return env

In [None]:
# Name is name of Atari game in Gym. Usually this is [name]-v0

name = "Breakout-v0"
query_environment(name)
num_action = 4 # Number of possible actions in game

Action Space: Discrete(2)
Observation Space: Box(-3.4028234663852886e+38, 3.4028234663852886e+38, (4,), float32)
Max Episode Steps: 200
Nondeterministic: False
Reward Range: (-inf, inf)
Reward Threshold: 195.0


In [None]:
# value_net is a convolutional neural network used to compute the expected score recieved given the current image displayed by a game

value_net = nn.Sequential(
    nn.Conv2d(1, 32, kernel_size = 8, stride = 4), 
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.Conv2d(32, 32, kernel_size = 4,  stride = 2),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.Conv2d(32, 64, kernel_size = 3,  stride = 1),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.Flatten(1),
    nn.Linear(3136, 256),
    nn.ReLU(),
    nn.Linear(256, num_action)
)

# target_net is identical to value_net, but its parameters are frozen and it is only updated 

target_net = nn.Sequential(
    nn.Conv2d(1, 32, kernel_size = 8, stride = 4), 
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.Conv2d(32, 32, kernel_size = 4,  stride = 2),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.Conv2d(32, 64, kernel_size = 3,  stride = 1),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.Flatten(1),
    nn.Linear(3136, 256),
    nn.ReLU(),
    nn.Linear(256, num_action)
)

# This loads previously trained models

try:
  value_net.load_state_dict(torch.load("gdrive/MyDrive/"+str(name)+"_bot"))
  target_net.load_state_dict(torch.load("gdrive/MyDrive/"+str(name)+"_bot"))
except:
  print("No previously trained model could be found")

<All keys matched successfully>

In [None]:
optimizer = torch.optim.Adam(params = 
            value_net.parameters(), lr=0.0001)

In [None]:
# Function that easily converts any array to a tensor of floats

def toTensor(arr, dtype = torch.float32):
  return torch.tensor(arr, dtype=dtype)

In [None]:
 # Returns the action that gives the best possible predicted score based on the target network 

def chooseActionTargets(states):
  with torch.no_grad():
    vals = target_net(states).numpy()
  return [max([(vals[k][i], i) for i in range(len(vals[k]))])[0] for k in range(len(vals))]

 # Returns the action that gives the best possible predicted score based on the value network 

def chooseAction(state):
  with torch.no_grad():
    vals = value_net(state)[0].numpy()
  return max([(vals[i], i) for i in range(len(vals))])

 # Sets the target network's parameters to be equal to the value network 's parameters
  
def updateTarget():
  target_net.load_state_dict(value_net.state_dict())

In [None]:
replay = []

# Adds a state to the replay memory, along with the action taken, resulting state, reward given, and whether the game finished or not
def addReplay(state, action, observation, reward, done):
  if len(replay) == MAX_REPLAY_SIZE:
    replay.pop(0)
  replay.append((state, action, observation, reward, done))

# Returns a single state from the replay memory. Right now, I've modified the function to give the final states of games more often, since these are more useful for determining the actual value of states
def getOneReplay():
  ret = replay[r.randint(0, len(replay)-1)]
  while ret[4] == False:
   ret = replay[r.randint(0, len(replay)-1)]
   if r.random() >= 0.9:
      return ret
  return ret

# Returns a sample of size n of previous states in replay memory
def sampleReplay(n):
  return [getOneReplay() for i in range(n)]

# Returns 0 if b, else x. Useful for finding target rewards in the case when a game is finished
def zero_if(x, b):
  if b:
    return 0
  return x

# Computers the loss of a replay sample
def compute_loss(replay_sample):
  states = toTensor([tup[0] for tup in replay_sample]) 
  actions = toTensor([tup[1] for tup in replay_sample], dtype = torch.int64) 
  observations = toTensor([tup[2] for tup in replay_sample])
  rewards = toTensor([tup[3] for tup in replay_sample])
  dones = [tup[4] for tup in replay_sample]
  pred_vals = torch.squeeze(value_net(states).gather(1, actions.unsqueeze(1))) # Predicted values of states based on value net
  targ_max_vals = chooseActionTargets(observations) # Best possible actions of new states based on target network, used to find target values
  actual_vals = rewards + toTensor([zero_if(GAMMA*targ_max_vals[i], int(dones[i]) == 1) for i in range(len(replay_sample))]) # Actual values are gamma*max target value + reward, or just the reward if it's a terminating state, according to DQN
  loss = LOSS_FUNC(pred_vals, actual_vals) 
  return loss

# Trains value net on a replay batch of size bs
def learnFromReplay(bs = 1024):
  if len(replay) < 1000: # If the replay is too small, don't bother training. 
    return 0
  optimizer.zero_grad()
  sample = sampleReplay(bs)
  loss = compute_loss(sample)
  loss.backward()
  optimizer.step()
  return float(loss)

In [None]:

# Used to preprocess an image given by the game. Right now, we convert to an 84 x 84 black and white image
def preproc(image):
  return np.expand_dims(np.transpose(skimage.color.rgb2gray(resize(image, (84, 84)))), 0)

# Plays the game once. Epsilon is the probability for a given state we choose an action randomly "explore" over "exploit", show described whether we want to return a video of us playing the game
def playGame(show=False, epsilon = 0.1):

  score = 0
  losses = []

  if show:
    env = wrap_env(gym.make(name).env)
  else:
    env = gym.make(name).env
  observation = env.reset()
  observation = preproc(observation)

  while True:
    env.render()
    curr_state = observation # Current state
    if not r.random() < epsilon:
      _, action = chooseAction(toTensor(curr_state).unsqueeze(0)) # Chooses an action
    else:
      action = env.action_space.sample() # Unless, with probability epsilon, act randomly
    action = int(action)
    
    observation, reward, done, info = env.step(action) # Get reward and new state
    score += reward # Update total score
    observation = preproc(observation)
    addReplay(curr_state, action, observation, reward, done) # Add current state to replay memory
    if done:
        break
  env.close()
  if show:
    show_video()
  return score

In [None]:
scores = [] # Used to keep track of scores of games and training losses, to measure performance over time
losses = []

In [None]:
num_games = 0
avg_num = 100 # Number of batches we train the model for after playing one game
train_iter = 1000
while num_games < train_iter:
  score = playGame(False, epsilon = 0.1) 
  avg = 0
  for i in range(avg_num):
    avg += learnFromReplay(32)
  loss = avg/avg_num
  scores.append(score)
  losses.append(loss)
  num_games  += 1
  if num_games % UPDATE_TARGET_LENGTH == 0:
    updateTarget()
plt.plot(scores) # Plots scores and losses over time
plt.show()
plt.plot(losses)
plt.show()
torch.save(value_net.state_dict(), "gdrive/MyDrive/"+str(name)+"_bot") # Saves model

In [None]:
playGame(True, epsilon = 0.00) # Now plays the game with no randomness, and shows video of performance