In [1]:
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style(style="dark")

Using TensorFlow backend.


### Environment

**STATE**

* Cart Position
* Cart Velocity
* Pole Angle
* Pole Velocity at tip

**ACTIONS**
* 0 - LEFT
* 1 - RIGHT

**REWARD**

* 1 for every survived step

### Build Agent

In [2]:
def build_Deep_Q_Agent():
    model = Sequential()
    model.add(Dense(32, input_dim=state_size, activation='relu'))
    model.add(Dense(8, input_dim=state_size, activation='relu'))
    model.add(Dense(action_size, activation='linear')) # LEFT OR RIGHT
    model.compile(loss='mse', optimizer=Adam(lr=learning_rate))
    return model

### Perform action from given state

In [3]:
def perform_action(agent, state, epsilon):
    action = -1
    
    # action[0] = [P_LEFT, P_RIGHT]
    
    if np.random.rand() <= epsilon:
        action = random.randrange(2)
    else:
        # maximize Q(s,a) (state-action function)
        action = agent.predict(state)
        action = np.argmax(action[0]) 
        
    return action

### Remember transition

In [4]:
def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))

### Train agent to maximize future reward

In [10]:
def replay(batch_size, epsilon, done):
    
    # Get a minibatch of previous training memories
    minibatch = random.sample(memory, batch_size)
    
    # For each memory
    for state, action, reward, next_state, done in minibatch:
        
        if not done:
            # Predict from next_state the maximal future discounted reward
            n_state_pred_reward = (reward + gamma * np.amax(agent.predict(next_state)[0]))
        else:
            n_state_pred_reward = reward
        
        
        # Predict from current_state the maximal future discounted reward depeding on action taken
        c_state_pred_reward = agent.predict(state)
        c_state_pred_reward[0][action] = n_state_pred_reward
        
        
        # Train agent to map current state to future discounted reward
        agent.fit(state, current_state_pred_reward, epochs=1, verbose=0)
    
    # Decrease exploration successively
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay
    
    return epsilon

### Load and Save

In [11]:
def load(model, name):
    return model.load_weights(name)

def save(model, name):
    model.save_weights(name)

### Params

In [12]:
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
memory = deque(maxlen=10000) # D-memory (transitions)
gamma = 0.95 # discount rate
epsilon = 0.5 # exploration rate
epsilon_min = 0.01 # min exploration rate
epsilon_decay = 0.995 # decrease exploration successively
learning_rate = 0.0005
episodes = 1000
batch_size = 25
max_score = 0

## Run

In [13]:
#sns.lineplot(range(len(results)), results)

In [None]:
done = False
load_weights = False
results = []

agent = build_Deep_Q_Agent()

if load_weights:
    load(agent, 'weights/cartpole-dqn-50.h5')
    epsilon = 0.01

for episode in range(episodes):

    state = env.reset()
    state = np.reshape(state, [1, state_size])

    for frame in range(episodes):
        env.render()
        #env.reset()
        
        # ---------------------------------------------------------
        # Select action - combine prediction and randomness
        # ---------------------------------------------------------
        
        action = perform_action(agent, state, epsilon)
        next_state, reward, done, info = env.step(action)
        if done: reward = -1
            
        # ---------------------------------------------------------
        # Move to next state and save transition in memory
        # ---------------------------------------------------------
            
        next_state = np.reshape(next_state, [1, state_size])
        remember(state, action, reward, next_state, done) 
        state = next_state
        
        if done:
            
            results.append(frame)
            
            if episode % 25 == 0: print("\nAverage score: ", sum(results[-25:]) / 25.0)
            if frame > max_score: max_score = frame
            
            print("\nEpisode: {}/{}, Score: {}, Epsilon: {:.3}, Top score: {}".format(episode, episodes, frame, float(epsilon), max_score))
            
            break
            
        # ---------------------------------------------------------
        # Sample random minibatches from memory and train model to 
        # maximize future reward from current state
        # ---------------------------------------------------------

        if len(memory) > batch_size:
            epsilon = replay(batch_size, epsilon, done)
            
    if done and frame >= 450:
        print("Victory!")
        break

    #if episode % 25 == 0:
    #    print("\nSave weights: " + str(episode))
    #    save(agent, "weights/cartpole-dqn-" + str(episode) + ".h5")


Average score:  1.48

Episode: 0/1000, Score: 37, Epsilon: 0.471, Top score: 37

Episode: 1/1000, Score: 11, Epsilon: 0.446, Top score: 37

Episode: 2/1000, Score: 10, Epsilon: 0.424, Top score: 37

Episode: 3/1000, Score: 9, Epsilon: 0.405, Top score: 37

Episode: 4/1000, Score: 7, Epsilon: 0.391, Top score: 37

Episode: 5/1000, Score: 13, Epsilon: 0.366, Top score: 37

Episode: 6/1000, Score: 11, Epsilon: 0.347, Top score: 37

Episode: 7/1000, Score: 17, Epsilon: 0.318, Top score: 37

Episode: 8/1000, Score: 9, Epsilon: 0.304, Top score: 37

Episode: 9/1000, Score: 11, Epsilon: 0.288, Top score: 37

Episode: 10/1000, Score: 16, Epsilon: 0.266, Top score: 37

Episode: 11/1000, Score: 8, Epsilon: 0.255, Top score: 37

Episode: 12/1000, Score: 10, Epsilon: 0.243, Top score: 37

Episode: 13/1000, Score: 10, Epsilon: 0.231, Top score: 37

Episode: 14/1000, Score: 7, Epsilon: 0.223, Top score: 37

Episode: 15/1000, Score: 10, Epsilon: 0.212, Top score: 37

Episode: 16/1000, Score: 10, Eps