# Developing a DQN algorithm from Scratch for Cartpole

In [1]:
# See that, we are using the correct environment
import sys
sys.executable

'/usr/bin/python3'

- Code modified from: https://github.com/keon/deep-q-learning

# Imports

In [2]:
import numpy as np
import gym
import random
import keras.backend as k
from collections import deque

from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten
from keras.optimizers import Adam

Using TensorFlow backend.


In [3]:
ENV_NAME = 'CartPole-v0'

# Get the environment and extract the number of actions available in the Cartpole problem
env = gym.make(ENV_NAME)
np.random.seed(1)
env.seed(1)
nb_actions = env.action_space.n

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


  result = entry_point.load(False)


In [4]:
input_shape = env.observation_space.shape[0]
print(input_shape)

4


# Simple NN Model

In [5]:
model = Sequential()
model.add(Dense(16, input_dim=input_shape, activation='relu'))
model.add(Dense(nb_actions))
model.add(Activation('linear'))
model.compile(loss='mse',
                      optimizer=Adam(lr=0.001))
print(model.summary())

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 16)                80        
_________________________________________________________________
dense_2 (Dense)              (None, 2)                 34        
_________________________________________________________________
activation_1 (Activation)    (None, 2)                 0         
Total params: 114
Trainable params: 114
Non-trainable params: 0
_________________________________________________________________
None


# Developing a DQN Agent

In [6]:
# Deep Q-learning Agent
class SimpleAgent:
    def __init__(self, action_size, model,exp=1.0):
        self.action_size = action_size
        self.memory = deque(maxlen=10000)
        self.gamma = 0.99    # discount factor
        
        self.epsilon = exp  # exploration rate
        
        self.epsilon_min = 0.01 # We always do some exploration
        
        self.epsilon_decay = 0.999
        
        self.model = model
        
    def buffer(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        
    def get_act(self, state):
        #Return Random action
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # returns action
    
    #This is where training happens
    def replay(self, batch_size):
        
        #Get a Random Minibatch
        minibatch = random.sample(self.memory, batch_size)
        
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
              target = reward + self.gamma * \
                       np.amax(self.model.predict(next_state)[0])
            target_f = self.model.predict(state)
            
            #only update the target for action taken
            target_f[0][action] = target
            
            
            self.model.fit(state, target_f, epochs=1, verbose=0)
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [7]:
# initialize gym environment and the agent
env = gym.make('CartPole-v0')
nb_actions = env.action_space.n
agent = SimpleAgent(nb_actions,model)

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


# Load the Model

In [13]:
model.load_weights('Trained_Scratch_Cartpole.h5f')

# Training of the Model: Let us Play!!

In [14]:
state_size = env.observation_space.shape[0]

In [10]:
done = False
batch_size = 64
num_episodes = 20
max_steps = 200


        
for e in range(num_episodes):
    state = env.reset()
    state = np.reshape(state, [1, input_shape]) #Reshape for later NN Training
    for steps in range(max_steps):
        #Visualization of the env
        #env.render()

        #Get the action from the Agent
        action = agent.get_act(state)
        
        #Take this action
        next_state, reward, done, _ = env.step(action)
        
        
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size])
        agent.buffer(state, action, reward, next_state, done)
        state = next_state
        if done:
            print("episode: {}/{}, steps: {}, exploration: {:.2}"
                  .format(e, num_episodes, steps, agent.epsilon))
            break
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)

# Saving the model

In [11]:
model.save_weights("Trained_Scratch_Cartpole.h5f", overwrite=False)

In [18]:
env.close()

# Test the agent

In [19]:
env = gym.make('CartPole-v0')
test_episode= 5
model.load_weights('Trained_Scratch_Cartpole.h5f')

agent = SimpleAgent(nb_actions,model,exp=0.0)

for e in range(test_episode):
    state = env.reset()
    
    for steps in range(200):
        
        state = np.reshape(state, [1, input_shape]) #Reshape for later NN input
        
        #Visualization of the env
        env.render()

        #Get the action from the Agent
        action = agent.get_act(state)
        
        #Take this action
        state, reward, done, _ = env.step(action)
        
        if done:
            print("test_episode: {}/{}, steps: {}, exploration: {:.2}"
                  .format(e, test_episode, steps, agent.epsilon))
            break

env.close()

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
test_episode: 0/5, steps: 199, exploration: 0.0
test_episode: 1/5, steps: 199, exploration: 0.0
test_episode: 2/5, steps: 199, exploration: 0.0
test_episode: 3/5, steps: 199, exploration: 0.0
test_episode: 4/5, steps: 199, exploration: 0.0
