In [5]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from keras.models import load_model

In [2]:
class ReplayBuffer():
  def __init__(self, max_size, input_dim):
    self.mem_size = max_size
    self.mem_counter = 0

    self.state_memory = np.zeros((self.mem_size, *input_dim), dtype=np.float32)
    self.new_state_memory = np.zeros((self.mem_size, *input_dim), dtype=np.float32)
    self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
    self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
    self.terminal_memory = np.zeros(self.mem_size, dtype=np.int32)

  def store_transition(self, state, action, reward, state_, done):
    index = self.mem_counter % self.mem_size
    self.state_memory[index] = state
    self.new_state_memory[index] = state_
    self.reward_memory[index] = reward
    self.action_memory[index] = action
    self.terminal_memory[index] = 1 - int(done)
    self.mem_counter += 1

  def sample_buffer(self, batch_size):
    max_mem = min(self.mem_counter, self.mem_size)
    batch = np.random.choice(max_mem, batch_size, replace=False)

    states = self.state_memory[batch]
    states_ = self.new_state_memory[batch]
    reward = self.reward_memory[batch]
    action = self.action_memory[batch]
    terminal = self.terminal_memory[batch]

    return states, action, reward, states_, terminal

In [4]:
def build_dqn(lr, n_actions, input_dim, fc1_dims, fc2_dims):
  model = keras.Sequential([
                            keras.layers.Dense(fc1_dims, activation="relu"),
                            keras.layers.Dense(fc2_dims, activation='relu'),
                            keras.layers.Dense(n_actions, activation=None)])
  
  opt = Adam(learning_rate=lr)
  model.compile(optimizer=opt, loss='mean_squared_error')

  return model

In [6]:
class Agent():
  def __init__(self, lr, gamma, n_actions, epsilon, batch_size, 
               input_dims, epsilon_dec=1e-3, epsilon_end=0.01, 
               mem_size=1000000, fname='dqn_model.h5'):
    self.action = [i for i in range(n_actions)]
    self.gamma = gamma
    self.epsilon = epsilon
    self.eps_min = epsilon_end
    self.batch_size = batch_size
    self.model_file = fname
    self.memory = ReplayBuffer(mem_size, input_dims)
    self.q_eval = build_dqn(lr, n_actions, input_dims, 256, 256)

  def store_transition(self, state, action, reward, new_state, done):
    self.memory.store_transition(state, action, reward, new_state, done)

  def choose_action(self, observation):
    if np.random.random() < self.epsilon:
      action = np.random.choice(self.action)
    else:
      state = np.array([observation])
      actions = self.q_eval.predict(state)
      action = np.argmax(actions)
    return action 

  def learn(self):
    if self.memory.mem_counter < self.batch_size:
      return 

    states, actions, rewards, states_, done = self.memory.sample_buffer(self.batch_size)
    q_eval = self.q_eval.predict(states)
    q_next = self.q_eval.predict(states_)

    q_target = np.copy(q_eval)
    batch_index = np.argmax(self.batch_size, dtype=np.int32)

    q_target[batch_index, actions] = rewards + self.gamma * np.max(q_next, axis=1) * done
    self.q_eval.train_on_batch(states, q_target)
    self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min

  def save_model(self):
    self.q_eval.save(self.model_file)
  
  def load_model(self):
    self.q_eval = load_model(self.model_file)

In [14]:
!pip install --upgrade ribs[all] gym~=0.17.0 Box2D~=2.3.10 tqdm

Collecting ribs[all]
  Downloading ribs-0.4.0-py3-none-any.whl (59 kB)
[K     |████████████████████████████████| 59 kB 5.6 MB/s 
Collecting Box2D~=2.3.10
  Downloading Box2D-2.3.10-cp37-cp37m-manylinux1_x86_64.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 30.3 MB/s 
Installing collected packages: ribs, Box2D
Successfully installed Box2D-2.3.10 ribs-0.4.0


In [15]:
import gym

In [8]:
tf.compat.v1.disable_eager_execution()

In [None]:
env = gym.make('LunarLander-v2')
lr = 0.001
n_games = 500
agent = Agent(gamma=0.99, epsilon=1.0, lr=lr, 
              input_dims=env.observation_space.shape[0], 
              n_actions=env.action_space.n, mem_size=1000000, batch_size=64, epsilon_end=0.01)

scores = []
eps_history =  []

for i in range(n_games):
  done = False
  score = 0
  observation = env.reset()
  while not done:
    action = agent.choose_action(observation)
    observation_, reward, done, info = env.step(action)
    score += reward
    agent.store_transition(observation, action, reward, observation_, done)
    observation = observation_
    agent.learn()
  eps_history.append(agent.epsilon)
  scores.append(score)

  avg_score = np.mean(scores[-100:])
  print('episode: ', i, 'score %.2f' % score, 
        'average_score %.2f' % avg_score, 'epsilon %.2f' % agent.epsilon)