In [1]:
#!pip3 install gym==0.15.3
#!pip3 install atari_py

In [2]:
# Environment creation and random number generator
import gym, random, sys, copy, time, torch
import math, glob, io, matplotlib, os
import cv2, base64, keras
import warnings
warnings.simplefilter("ignore", UserWarning)

# Linear algebra and data manipulation libraries
import numpy as np
import pandas as pd
import tensorflow as tf
from datetime import datetime
from gym import logger as gymlogger
from gym.wrappers import Monitor
from gym.spaces import Box
from IPython.display import HTML
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display


from collections import deque
import matplotlib.pyplot as plt
import matplotlib.pylab as pylab

# Deep learning model requirements
import tensorflow.keras as keras
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch as T

In [3]:
np.warnings.filterwarnings('ignore', category=np.VisibleDeprecationWarning)

In [4]:
# Deep Q Network Model
class DQN(nn.Module):
    def __init__(self, learning_rate):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 8, stride=4, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 4, stride=2)
        self.conv3 = nn.Conv2d(64, 128, 3)
        
        self.fc1 = nn.Linear(128*19*8, 512)
        self.fc2 = nn.Linear(512, 6) # 6 Actions
        
        self.optimizer = optim.RMSprop(self.parameters(), lr=learning_rate)
        self.loss = nn.MSELoss()
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)
        
    def forward(self, observation):
        # Convert frames to Tensor
        obs = np.stack(observation, axis=0)
        obs = T.Tensor(observation).to(self.device)
        # Reshape for convolutional layers
        obs = obs.view(-1, 1, 185, 95) 
        obs = F.relu(self.conv1(obs))
        obs = F.relu(self.conv2(obs))
        obs = F.relu(self.conv3(obs))
        
        # Flatten convolutional images, then feed into fc
        obs = obs.view(-1, 128*19*8)
        obs = F.relu(self.fc1(obs))
        
        actions = self.fc2(obs)
        
        # Returns a matrix, kx6 where k is number of images
        return actions


In [5]:
class Agent:
    def __init__(self, gamma, epsilon, learning_rate, max_memory, epsilon_min=0.5, replace=10000, action_space=[0,1,2,3,4,5]):
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.action_space = action_space
        self.max_memory = max_memory
        self.steps = 0
        self.step_counter = 0 # Target Network Replacement
        self.memory = []
        self.memory_counter = 0
        self.replace_target_counter = replace
        self.Q_eval = DQN(learning_rate) # Current state guess
        self.Q_next = DQN(learning_rate) # Next state guess
    
    def remember(self, state, action, reward, next_state):
        if self.memory_counter < self.max_memory:
            self.memory.append([state, action, reward, next_state])
        else:
            self.memory[self.memory_counter % self.max_memory] = [state, action, reward, next_state]
        
        self.memory_counter += 1
    
    def act(self, observation):
        # Take in a sequence of observations
        rand = np.random.random()
        actions = self.Q_eval.forward(observation)
        
        if rand < 1 - self.epsilon:
            action = T.argmax(actions[1]).item()
        else:
            action = np.random.choice(self.action_space)
            
        self.steps += 1
        return action

    def train(self, batch_size):
        # Batch Learning Zero Grad
        self.Q_eval.optimizer.zero_grad()
        if self.replace_target_counter is not None and self.step_counter % self.replace_target_counter == 0:
            self.Q_next.load_state_dict(self.Q_eval.state.dict())
            
        if self.memory_counter + batch_size < self.max_memory:
            memory_start = int(np.random.choice(range(self.memory_counter)))
        else:
            memory_start = int(np.random.choice(range(self.max_memory - batch_size - 1)))
        
        mini_batch = self.memory[memory_start:memory_start + batch_size]
        memory = np.array(mini_batch)
        
        Q_pred = self.Q_eval.forward(list(memory[:, 0][:])).to(self.Q_eval.device)
        Q_next = self.Q_next.forward(list(memory[:, 3][:])).to(self.Q_eval.device)
        
        max_a = T.argmax(Q_next, dim=1).to(self.Q_eval.device)
        rewards = T.Tensor(list(memory[:, 2])).to(self.Q_eval.device)
        Q_target = Q_pred
        Q_target[:, max_a] = rewards + self.gamma * T.max(Q_next[1])
        
        if self.steps > 500:
            if self.epsilon - 1e-4 > self.epsilon_min:
                self.epsilon -= 1e-4 # Converge Epsilon
            else:
                self.epsilon = self.epsilon_min
                
        loss = self.Q_eval.loss(Q_target, Q_pred).to(self.Q_eval.device)
        loss.backward()
        self.Q_eval.optimizer.step()
        self.step_counter += 1

In [6]:
env = gym.make('SpaceInvaders-v4')
agent = Agent(gamma=0.95, epsilon=1.0, learning_rate=0.03, max_memory=5000, replace=None)

# Initialize our agents memory
while agent.memory_counter < agent.max_memory:
    observation = env.reset()
    done = False
    while not done:
        action = env.action_space.sample()
        state, reward, done, info = env.step(action)
        if done and info['ale.lives'] == 0:
            reward = -100
        
        agent.remember(np.mean(observation[15:200, 30:125], axis=2),
                               action, reward, np.mean(state[15:200, 30:125], axis=2))
        observation = state
        
print("Finished Initializing Agent Memory")

Finished Initializing Agent Memory


In [None]:
scores = []
avg_score = 0
episode_history = []
episodes = 50
batch_size = 32
total_runs = 1

# Record Gameplay
display = Display(visible=0, size=(1400, 900))
display.start()
env = Monitor(env, './video', force=True)

for episode in range(episodes):

    print("Game {} hyperparameters: Epsilon: {} Average Score: {}".format(episode+1, agent.epsilon, (avg_score/total_runs)))
    episode_history.append(agent.epsilon)
    done = False
    observation = env.reset()
    frames = [np.sum(observation[15:200, 30:125], axis=2)]
    score = 0
    last_action = 0
    
    while not done:
        if len(frames) == 3:
            action = agent.act(frames)
            frames = []
        else:
            action = last_action
            
        state, reward, done, info = env.step(action)
        score += reward
        frames.append(np.sum(observation[15:200, 30:125], axis=2))
        if done and info['ale.lives'] == 0:
            reward = -100
            
        agent.remember(np.mean(observation[15:200, 30:125], axis=2), action, reward, np.mean(state[15:200, 30:125], axis=2))
    
        observation = state
        agent.train(batch_size)
        last_action = action
                       
    scores.append(score)
    avg_score += score
    total_runs += 1
    print("End Score: {}".format(score))
    
    # Render a video of the session
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[-1]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt='test' autoplay loop controls style='height: 400px;'> <source src='data:video/mp4;base64,{0}' type='video/mp4' /> </video>'''.format(encoded.decode('ascii'))))


Game 1 hyperparameters: Epsilon: 1.0 Average Score: 0.0
