<a href="https://colab.research.google.com/github/komal-SkyNET/ai-neural-networks/blob/master/google-colab/tf2_dqn_impl_lunar_lander.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip3 install gym[box2d]
!pip3 install box2d-py
# !pip3 install 'gym[all]'

In [0]:
try:
  %tensorflow_version 2.x
except:
  pass
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model

TensorFlow 2.x selected.


In [0]:
class ReplayBuffer:

  def __init__(self, mem_size, input_dims):
    self.mem_size = mem_size
    self.mem_cntr = 0
    self.state_mem = np.zeros((self.mem_size, *input_dims), 
                                 dtype=np.float32)
    self.new_state_mem = np.zeros((self.mem_size, *input_dims), 
                                 dtype=np.float32)
    self.action_mem = np.zeros(self.mem_size, dtype=np.int32)
    self.reward_mem = np.zeros(self.mem_size, dtype=np.int32)
    self.terminal_mem = np.zeros(self.mem_size, dtype=np.int32)

  def store_transition(self, state, action, rew, state_, done):
    index = self.mem_cntr % self.mem_size
    self.state_mem[index] = state
    self.action_mem[index] = action
    self.new_state_mem[index] = state_
    self.reward_mem[index] = reward
    self.terminal_mem[index] = 1 - int(done)
    self.mem_cntr += 1

  def sample_buffer(self, batch_size):
    max_mem = min(self.mem_cntr, self.mem_size)
    batch = np.random.choice(max_mem, batch_size, replace=False)

    states = self.state_mem[batch]
    states_ = self.new_state_mem[batch]
    rewards = self.reward_mem[batch]
    actions = self.action_mem[batch]
    terminal = self.terminal_mem[batch]

    return states, actions, rewards, states_, terminal




In [0]:
def build_dqn(lr, n_actions, input_dims, fc1_dims, fc2_dims):
  model = keras.Sequential([
    keras.layers.Dense(fc1_dims, input_shape = input_dims, 
                       activation='relu'),
    keras.layers.Dense(fc2_dims, activation='relu'),
    keras.layers.Dense(n_actions, activation=None)])
  model.compile(optimizer=Adam(learning_rate=lr),
                loss='mean_squared_error')
  return model



In [0]:
class Agent:
  
  def __init__(self, lr, gamma, n_actions, epsilon, batch_size, 
               input_dims, epsilon_dec=1e-4, epsilon_end=0.01,
               mem_size = 1000000, fname='dqn_model.pkl'):
    self.action_space = [i for i in range(n_actions)]
    self.gamma = gamma
    self.eps_dec = epsilon_dec
    self.epsilon = epsilon 
    self.eps_min = epsilon_end
    self.batch_size = batch_size
    self.model_file = fname
    self.memory = ReplayBuffer(mem_size, input_dims)
    self.q_eval = build_dqn(lr, n_actions, input_dims, 256, 256)

  def store_transition(self, state, action, reward, new_state, done):
    self.memory.store_transition(state, action, reward, new_state, done)

  def choose_action(self, observation):
    if np.random.random() < self.epsilon:
      action = np.random.choice(self.action_space)
    else:
      # if 8 observations , input dims need it in (1,8) dims
      state = np.array([observation])
      actions = self.q_eval.predict(state)
      action = np.argmax(actions)

    return action

  def learn(self):
    if self.memory.mem_cntr < self.batch_size:
      return 

    states, actions, rewards, states_, dones = \
          self.memory.sample_buffer(self.batch_size)
    
    q_eval = self.q_eval.predict(states)
    q_next = self.q_eval.predict(states_)

    q_target = np.copy(q_eval)
    batch_index = np.arange(self.batch_size, dtype=np.int32)

    q_target[batch_index, actions] = rewards + \
          self.gamma * np.max(q_next, axis=1)*dones
    self.q_eval.train_on_batch(states, q_target)
    self.epsilon = self.epsilon - self.eps_dec if self.epsilon > \
        self.eps_min else self.eps_min

  def save_model(self):
    self.q_eval.save(self.model_file)
  
  def load_model(self):
    self.q_eval = load_model(self.model_file)





In [0]:
# Driver
import gym 

# tf.compat.v1.disable_eager_execution()
env = gym.make('LunarLander-v2')
n_games = 1000
lr = 0.001
print(env.observation_space.shape)
print(env.action_space.n)
agent = Agent(gamma = 0.99, epsilon=1.0, lr = lr,
              input_dims=env.observation_space.shape, 
              n_actions = env.action_space.n, 
              mem_size = 1000000, batch_size=64,
              epsilon_end=0.01)

scores = []
eps_history = []

for i in range(n_games):
  done = False
  score = 0
  obs = env.reset()
  while not done:
    action = agent.choose_action(obs)
    obs_, reward, done, info = env.step(action)
    score += reward
    agent.store_transition(obs, action, reward, obs_, done)
    obs = obs_
    agent.learn()
  eps_history.append(agent.epsilon)
  scores.append(score)

  avg_score = np.mean(scores[-100:])
  print('episode: ', i , 'score %.2f' % score, 
        'average_score: %.2f' % avg_score, 
         'epsilon %.2f' % agent.epsilon)

