In [1]:
import numpy as np
import gym

import tensorflow as tf
from tensorflow.keras.layers import Dense, InputLayer
from tensorflow.keras import Sequential
from tensorflow.keras.optimizers import Adam

from collections import deque
import random

from IPython.display import clear_output

In [2]:
class DQN_Agent:
    def __init__(self):
        self.memory = deque(maxlen=2_000)
        
        self.random = 1
        self.random_decay = 0.999
        self.random_min = 0.01
        
        self.discount = 0.95
        
        self.model = self.create_model()
    
    def create_model(self):
        model = Sequential()
        model.add(InputLayer(input_shape=(4,)))
        model.add(Dense(12, activation='relu'))
        model.add(Dense(12, activation='relu'))
        model.add(Dense(2, activation='linear'))
        model.compile(
            optimizer=(Adam(learning_rate=0.001)),
            loss='mse',
            metrics=['accuracy']
        )
        return model
    
    def get_action(self, states):
        if random.random() < self.random:
            return random.randint(0,1)
        else:
            return np.argmax(self.model.predict(states))
    
    def train(self):
        if len(self.memory) < 500:
            return
        
        mini_batch = random.sample(self.memory, 32)
        
        current_states = np.array([transition[0] for transition in mini_batch])
        next_states = np.array([transition[3] for transition in mini_batch])
        current_qs = self.model.predict(current_states)
        next_qs = self.model.predict(next_states)
            
        X = current_states
        y = []
        
        for i, obs in enumerate(mini_batch):
            current_state = obs[0]
            action = obs[1]
            reward = obs[2]
            next_state = obs[3]
            done = obs[4]
            
            if done:
                new_q = reward
            else:
                new_q = reward + self.discount * np.max(next_qs[i])
            
            current_qs[i, action] = new_q
            
            y.append(current_qs)
        
        self.model.fit(X, np.array(y), verbose=0, shuffle=False)
        self.random = self.random * self.random_decay
        if self.random < self.random_min:
            self.random = self.random_min
        
    def remember(self, current_state, action, reward, next_state, done):
        self.memory.append([current_state, action, reward, next_state, done])
        
    def save(self):
        self.model.save('cartpole_v2.model')
        
    def load(self):
        self.model = tf.keras.models.load_model('cartpole_v2.model')
        self.random = 0.01

In [3]:
agent = DQN_Agent()

In [None]:
env = gym.make('CartPole-v0')

for _ in range(2000):
    dead = False
    observation = env.reset()
    total_reward = 0
    while not dead:
        env.render()
        
        q = agent.get_action(
            np.array(observation).reshape(1,4)
        )
        observation_, reward, done, info = env.step(q)
        total_reward += reward
        if done:
            reward = -20
        agent.remember(observation, q, reward, observation_, done)
        observation = observation_

        if done:
            agent.train()
            clear_output(wait=True)
            print(f'no. of games: {_ + 1}')
            print(f'total reward: {total_reward}')
            print(f'random percentage: {agent.random}')
            dead = True
    
env.close()

no. of games: 1633
total reward: 194.0
random percentage: 0.19932740843000615


In [None]:
agent.save()

In [None]:
# agent.load()