In [None]:
import sys
from random import randrange, choice, choices, random
import os
from ale_python_interface import ALEInterface
import numpy as np
import cv2
import time

In [None]:
rom_path = os.path.join(os.path.dirname(os.path.abspath('__file__')),'ROMs','Space_Invaders.bin')

if not os.path.exists(rom_path):
    print("Invalid ROM path")

In [None]:
ale = ALEInterface()

ale.setBool(b'display_screen', True)
#ale.setBool(b'sound',True)
ale.setInt(b'frame_skip', 3)
#ale.setBool(b'blablabla', True)
ale.setBool(b'color_averaging', True)

ale.loadROM(bytes(rom_path, encoding='utf-8'))

In [None]:
legal_actions = ale.getLegalActionSet()

In [None]:
legal_actions

In [None]:
import matplotlib.pyplot as plt

In [None]:
class Preprocessor(object):
    def __init__(self):
        self.last_preprocessed_seq = None
        
    def initialize_last_preprocessed_seq(self, sequence):
        last_screen = sequence[-1]
        resized_screen = cv2.resize(last_screen, dsize=(84,110), interpolation = cv2.INTER_AREA)
        cropped_screen = resized_screen[17:110-9,:]
        self.last_preprocessed_seq = np.empty((84,84,4))
        for i in range(4):
            self.last_preprocessed_seq[:,:,i] = cropped_screen
        
        return self.last_preprocessed_seq
                                     
    def phi(self, sequence):
        if self.last_preprocessed_seq is None:
            return self.initialize_last_preprocessed_seq(sequence)
            
        last_screen = sequence[-1]
        resized_screen = cv2.resize(last_screen, dsize=(84,110), interpolation = cv2.INTER_AREA)
        cropped_screen = resized_screen[17:110-9,:]
        
        preprocessed_seq = np.empty((84,84,4))
        preprocessed_seq[:,:,:3] = self.last_preprocessed_seq[:,:,1:]
            
        preprocessed_seq[:,:,-1] = cropped_screen

        self.last_preprocessed_seq = preprocessed_seq

        return preprocessed_seq
    
    def preprocess(self, sequence):
        return self.phi(sequence)

In [None]:
from keras.layers import Dense, Conv2D, Flatten
from keras.models import Sequential
from IPython.display import SVG
from keras.utils import model_to_dot
from keras.optimizers import RMSprop

In [None]:
def get_Q():
    model = Sequential()
    model.add(Conv2D(filters=16, kernel_size=(8,8), strides=(4,4), input_shape=(84,84,4), activation='relu'))
    model.add(Conv2D(filters=32, kernel_size=(4,4), strides=(2,2), activation='relu'))
    model.add(Flatten())
    model.add(Dense(units=256, activation='relu'))
    model.add(Dense(units=len(legal_actions)))
    
    model.compile(loss="mse", optimizer=RMSprop())
    return model

In [None]:
SVG(model_to_dot(get_Q()).create(prog='dot', format='svg'))


In [None]:
from collections import deque

In [None]:
def update_epsilon(epsilon):
    if epsilon < 0.1:
        epsilon = 0.1
        return
    elif epsilon == 0.1:
        return
    else:
        epsilon -= 9.000000000000001e-07 # epsilon -= (1-0.1) / 1000000

In [None]:
def DQN():
    minibatch_size = 32
    D = deque([], maxlen=1000000)
    Q = get_Q()
    epsilon = 1.
    screen = np.empty((210,160))
    preprocessor = Preprocessor()
    gamma = 0.9
    
    for num_episode in range(10000):
        ale.reset_game()
        ale.getScreenGrayscale(screen)
        sequence = [screen]
        preprocessed_input = preprocessor.phi(sequence)
        while not ale.game_over():
            if random() <= epsilon:
                action = choice(legal_actions)
            else:
                action = np.argmax(Q.predict(np.expand_dims(preprocessed_input, 0))[0])
            update_epsilon(epsilon)
            reward = ale.act(action)
            ale.getScreenGrayscale(screen)
            sequence.append(action)
            sequence.append(screen)
            previous_input = preprocessed_input
            preprocessed_input = preprocessor.phi(sequence)
            D.append((previous_input,action, reward, preprocessed_input, ale.game_over()))
            if len(D) > minibatch_size:
                X = []
                target = []
                sample_minibatch = choices(D, k=minibatch_size)
                
                for previous_input,action, reward, next_input, is_terminal in sample_minibatch:
                    q_value = reward
                    if not is_terminal:
                        q_value += gamma * np.amax(Q.predict(np.expand_dims(next_input,0)))
                    prediction = Q.predict(np.expand_dims(previous_input, 0))[0]
                    prediction[action] = q_value
                    target.append(prediction)
                    X.append(previous_input)

                Q.fit(x=np.array(X), y= np.array(target), epochs=1)
                
    

In [None]:
for i in range(10):
    ale.reset_game()
    while not ale.game_over():
        reward = ale.act(choice(legal_actions))
        #print(reward)

In [None]:
DQN()

In [None]:
import gym

In [None]:
env = gym.make('SpaceInvaders-v0')

In [None]:
env.reset()
done = False
while not done:
    env.render()
    obs, reward, done, info = env.step(env.action_space.sample()) # take a random action
    time.sleep(0.01)
env.close()

In [None]:
ale.reset_game?