In [1]:
import random
import gym
import numpy as np
from collections import deque
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense
from cartpole_environment import CartPoleEnv

GAMMA = 0.95
LEARNING_RATE = 0.001

MEMORY_SIZE = 1000000
BATCH_SIZE = 20

EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.995
ENV_NAME = 'CartPole-v1'

class DQNSolver:

    def __init__(self, observation_space, action_space):
        self.exploration_rate = EXPLORATION_MAX

        self.action_space = action_space
        self.memory = deque(maxlen=MEMORY_SIZE)

        self.model = Sequential()
        self.model.add(Dense(24, input_shape=(observation_space,), activation="relu"))
        self.model.add(Dense(24, activation="relu"))
        self.model.add(Dense(self.action_space, activation="linear"))
        self.model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=LEARNING_RATE))

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.action_space)
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def experience_replay(self):
        if len(self.memory) < BATCH_SIZE:
            return
        batch = random.sample(self.memory, BATCH_SIZE)
        for state, action, reward, state_next, terminal in batch:
            q_update = reward
            if not terminal:
                q_update = (reward + GAMMA * np.amax(self.model.predict(state_next)[0]))
            q_values = self.model.predict(state)
            q_values[0][action] = q_update
            self.model.fit(state, q_values, verbose=0)
        self.exploration_rate *= EXPLORATION_DECAY
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)


def cartpole():
    #env = gym.make(ENV_NAME)
    env = CartPoleEnv()
    observation_space = env.observation_space.shape[0]
    action_space = env.action_space.n
    dqn_solver = DQNSolver(observation_space, action_space)
    run = 0
    while True:
        run += 1
        state = env.reset()
        state = np.reshape(state, [1, observation_space])
        step = 0
        while True:
            step += 1
            #env.render()
            action = dqn_solver.act(state)
            state_next, reward, terminal, info = env.step(action)
            reward = reward if not terminal else -reward
            state_next = np.reshape(state_next, [1, observation_space])
            dqn_solver.remember(state, action, reward, state_next, terminal)
            state = state_next
            if terminal:
                print("Run: " + str(run) + ", exploration: " + str(dqn_solver.exploration_rate) + ", score: " + str(step))
                break
            dqn_solver.experience_replay()


if __name__ == "__main__":
    cartpole()



Run: 1, exploration: 1.0, score: 19
Run: 2, exploration: 0.8911090557802088, score: 24
Run: 3, exploration: 0.8142285204175609, score: 19
Run: 4, exploration: 0.6180388156137953, score: 56
Run: 5, exploration: 0.5704072587541458, score: 17
Run: 6, exploration: 0.5344229416520513, score: 14
Run: 7, exploration: 0.4982051627146237, score: 15
Run: 8, exploration: 0.47862223409330756, score: 9
Run: 9, exploration: 0.4506816115185697, score: 13
Run: 10, exploration: 0.4159480862733536, score: 17
Run: 11, exploration: 0.3877593341372176, score: 15
Run: 12, exploration: 0.3669578217261671, score: 12
Run: 13, exploration: 0.3507711574848344, score: 10


KeyboardInterrupt: 