In [1]:
import random
from collections import deque

import numpy as np
import gym
from keras.layers import Dense
from keras.models import Sequential
from keras.optimizers import Adam

Using TensorFlow backend.


In [4]:
class DQLSolver:
    def __init__(self, nn, max_batch_size=1024, mini_batch_size=32):
        self._nn = nn
        self._batch = deque(maxlen=max_batch_size)
        self.mini_batch_size = mini_batch_size
    
    def get_action(self, env, state, eps=.2):
        # Choose action at random
        if random.random() < eps:
            return env.action_space.sample()
        return np.argmax(self._nn.predict(np.array([state])))
        
    def remember(self, s, a, r, sn, done):
        self._batch.append((s, a, r, sn, done))
        return
    
    def exp_replay(self, gamma=.99):
        if len(self._batch) < self.mini_batch_size:
            return
        mini_batch = random.sample(self._batch, self.mini_batch_size)
        
        state_vec = []
        q_target_vec = []
        for state, action, reward, state_new, done in mini_batch:
            if done:
                q_update = reward
            else:
                q_update = reward + gamma * np.amax(self._nn.predict(np.array([state_new])))
            q_target = self._nn.predict(np.array([state]))
            q_target[0][action] = q_update
            state_vec.append(state)
            q_target_vec.append(q_target[0])
        
        state_vec = np.array(state_vec)
        q_target_vec = np.array(q_target_vec)
        self._nn.fit(state_vec, q_target_vec, verbose=0, batch_size=self.mini_batch_size//4, epochs=5)
    
    

In [5]:
env = gym.make('CartPole-v0')
n_epi = 30

model = Sequential()

model.add(Dense(24, input_shape=env.observation_space.shape, activation='relu'))
model.add(Dense(24, activation='relu'))
model.add(Dense(env.action_space.n, activation='linear'))
model.compile(loss='mse', optimizer=Adam(lr=.001))
solver = DQLSolver(model)

monitor = True
for i_epi in range(n_epi):
    
        
    state = env.reset()
    
    avg_vec = []
    for t in range(1000):
        if not i_epi % 1 and monitor:
            env.render()
        # Choose epsilon greedy action
        action = solver.get_action(env, state)
        # step
        state_new, reward, done, _ = env.step(action)
        # reward
        reward += 1
        reward -= abs(state[0]) / 2.4
        reward -= abs(state[1])
        if done: reward -= -2
        # Store sars
        solver.remember(state, action, reward, state_new, done)
        # experience replay
        solver.exp_replay()
        
        if done:
            avg_vec.append(t)
            if not i_epi % 5:
                print(np.mean(avg_vec))
                avg_vec = []
            break
            
        state = state_new
    
env.close()

9.0
9.0
45.0
109.0
144.0
199.0
