In [45]:
import numpy as np
import pandas as pd
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras import Sequential, Model
from tensorflow.keras.optimizers import Adam
from collections import deque
import random
from IPython.display import clear_output
import gym
import matplotlib.pyplot as plt

In [46]:
class DQN_Agent:
  def __init__(self, input_shape, n_actions):
    self.rng = 1
    self.rng_min = 0.1
    self.rng_decay = 0.95
    self.discount = 0.95
    self.weights='weights'
    self.decay_ctr = 0

    self.memory = deque(maxlen=20_000)

    self.input_shape = input_shape
    self.n_actions = n_actions
    self.model = self.create_model()

  def create_model(self):
    input = Input(shape=(4,))
    x = Dense(32, activation='relu')(input)
    x = Dense(16, activation='relu')(x)
    x = Dense(16, activation='relu')(x)
    output = Dense(self.n_actions, activation='linear')(x)
    
    model = Model(inputs=input, outputs=output)
    model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])

    return model

  def remember(self, state, action, reward, state_, done):
    self.memory.append([state, action, reward, state_, done])

  def save(self):
    self.model.save_weights(self.weights)

  def load(self):
    self.model.load_weights(self.weights)

  def action(self, state):
    if random.random() < self.rng:
      return random.randint(0, self.n_actions - 1)
    else:
      return np.argmax(self.predict(state))

  def predict(self, state):
    return self.model.predict(np.reshape(state, (1, self.input_shape)))

  def train(self):
    if len(self.memory) < 10_000:
      return

    self.decay_ctr += 1

    mini_batch = random.sample(self.memory, 32)
    states = np.array([memory[0] for memory in mini_batch])
    states_ = np.array([memory[3] for memory in mini_batch])
    qs = self.model.predict(states)
    qs_ = self.model.predict(states_)

    X = states
    y = []

    for i, memory in enumerate(mini_batch):
      action = memory[1]
      reward = memory[2]
      done = memory[4]

      if done:
        q = reward
      else:
        q = reward + self.discount * np.max(qs_[i])

      qs[i][action] = q
      y.append(qs)
    
    self.model.fit(X, np.array(y), verbose=0, shuffle=False)

    if self.decay_ctr > 10:
      self.decay_rng()
      self.decay_ctr = 0

  def decay_rng(self):
    self.rng = self.rng * self.rng_decay
    if self.rng < self.rng_min:
      self.rng = self.rng_min

In [47]:
env = gym.make('CartPole-v0')
agent = DQN_Agent(4, env.action_space.n)
score_record = []

In [48]:
agent.model.summary()

Model: "model_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_4 (InputLayer)         [(None, 4)]               0         
_________________________________________________________________
dense_21 (Dense)             (None, 32)                160       
_________________________________________________________________
dense_22 (Dense)             (None, 16)                528       
_________________________________________________________________
dense_23 (Dense)             (None, 16)                272       
_________________________________________________________________
dense_24 (Dense)             (None, 2)                 34        
Total params: 994
Trainable params: 994
Non-trainable params: 0
_________________________________________________________________


In [None]:
episodes = 1_000
for episode in range(1, (episodes + 1)):
  state = env.reset()
  done = False
  score = 0
  while not done:
    action = agent.action(state)
    state_, reward, done, info = env.step(action)
    score += reward

    if done:
      reward = -20

    agent.remember(state, action, reward, state_, done)

    if done:
      agent.train()
      clear_output(wait=True)
      print(f'Episode: {episode}\nScore: {score}\nAgent RNG:{agent.rng}')
      score_record.append(score)

    state = state_

Episode: 940
Score: 70.0
Agent RNG:0.1


In [None]:
plt.plot(score_record)

In [None]:
def save_score():
  fh = open('score.txt', 'a')
  fh.write(score)
  fh.close()