In [1]:
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten, Conv2D, MaxPooling2D
from keras.optimizers import Adam

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [6]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=20000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0   # exploration rate
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        
        # Conv Layers
        model.add(Conv2D(32, (4, 4), padding='same', input_shape=self.state_size))
        model.add(Activation('relu'))
        model.add(MaxPooling2D(pool_size=4))
        model.add(Flatten())

        # FC Layers
        model.add(Dense(256, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        # Random exploration
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        
        act_values = self.model.predict(state)
        
        return np.argmax(act_values[0])  # returns action using policy

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        
        for state, action, reward, next_state, done in minibatch:
            target = reward
            
            if not done:
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)))
                
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
            
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

In [7]:
def process_frame(frame):
    mspacman_color = np.array([210, 164, 74]).mean()
    img = frame[1:176:2, ::2] # crop and downsize
    img = img.mean(axis=2) # to greyscale
    img[img==mspacman_color] = 0 # Improve contrast
    img = (img - 128) / 128 - 1 # normalize from -1. to 1.
    return img.reshape(88, 80, 1)

In [8]:
env = gym.make('MsPacman-v0')
state_size = (88, 80, 1)
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)

episodes = 500
done = False
batch_size = 8
skip_start = 90  # Skip the start of every game (it's just waiting time).

In [None]:
for e in range(episodes):
    total_reward = 0
    state = process_frame(env.reset())
    state = np.expand_dims(state, axis=0)
    
    for skip in range(skip_start): # skip the start of each game
            env.step(0)
    
    for time in range(20000):
        env.render()
        
        # Transition Dynamics
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        next_state = np.expand_dims(process_frame(next_state), axis=0)
        
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward
        
        if done:
            print("episode: {}/{}, reward: {}, time: {}, e: {:.5}"
                  .format(e+1, episodes, total_reward, time, agent.epsilon))
            break
            
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)

episode: 1/500, reward: 660.0, time: 771, e: 0.099682
episode: 2/500, reward: 580.0, time: 772, e: 0.099682
