In [3]:
%pip install gym

Note: you may need to restart the kernel to use updated packages.


In [21]:
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import keras
import time
from tqdm import tqdm
import os

In [22]:
ENV_NAME = "CartPole-v1"
# ENV_NAME = 'MountainCar-v0'
env = gym.make(ENV_NAME)
observation_space = env.observation_space.shape[0]
action_space = env.action_space.n

In [23]:
GAMMA = 0.95
LEARNING_RATE = 0.01
LEARNING_RATE_DECAY = 0.01
MEMORY_SIZE = 1000000
BATCH_SIZE = 64
MODEL_PATH = f'{ENV_NAME}_model'
FRAME_RATE = 0.05
EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.995
FRAME_RATE = 0.05 if ENV_NAME in {'CartPole-v1'} else 0.01


In [65]:
class DQNAgent():
    def __init__(self, env):
        self.env = env
        self.exploration_rate = EXPLORATION_MAX
        self.memory = deque(maxlen=MEMORY_SIZE)
        self.should_learn = True
        self.observation_space = env.observation_space.shape[0]
        self.action_space = env.action_space.n

        if os.path.exists(MODEL_PATH):
            self.model = keras.models.load_model(MODEL_PATH)
            print('Using predefined model')
        else:
            self.model = Sequential()
            self.model.add(Dense(24, input_dim=self.observation_space, activation='tanh'))
            self.model.add(Dense(48, activation='tanh'))
            self.model.add(Dense(self.action_space, activation='linear'))
            self.model.compile(loss='mse', optimizer=Adam(lr=LEARNING_RATE, decay=LEARNING_RATE_DECAY))
            print('Creating new model')
        self.iterations = 1

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state, exploration_rate):
        if np.random.rand() < exploration_rate:
            return random.randrange(self.action_space)
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def learn(self):
        states_batch, q_values_batch = [], []
        mini_batch = random.sample(
            self.memory, min(len(self.memory), BATCH_SIZE))
        for state, action, reward, next_state, done in mini_batch:
            q_values = self.model.predict(state)
            q_values[0][action] = reward if done else reward + GAMMA * np.max(self.model.predict(next_state)[0])
            states_batch.append(state[0])
            q_values_batch.append(q_values[0])

        self.model.fit(np.array(states_batch), np.array(q_values_batch), batch_size=len(states_batch), verbose=0)
        if self.exploration_rate > EXPLORATION_MIN:
            self.exploration_rate *= EXPLORATION_DECAY
            
        if self.iterations % 10 == 0:
            self.model.save(MODEL_PATH)

    def process_state(self, state):
        return np.reshape(state, [1, self.observation_space])

    def stats(self):
        return {
            "exploration_rate": self.exploration_rate,
        }
    
    def play(self, exploration):
        state = self.process_state(self.env.reset())
        done = False
        score = 0
        actions = []
        while not done:
            action = self.act(state, exploration)
            actions.append(action)
            next_state, reward, done, _ = self.env.step(action)
            next_state = self.process_state(next_state)
            self.remember(state, action, reward, next_state, done)
            state = next_state
            score += reward

        self.learn()
        return actions, score


In [66]:
agent = DQNAgent(env)

Using predefined model


In [54]:
moves, score = agent.play(EXPLORATION)

In [55]:
def render_moves(moves):
    env.reset()
    env.render()
    for action in moves:
        time.sleep(0.05)
        env.step(action)
        env.render()

In [56]:
render_moves(moves)

In [57]:
scores = []
EXPLORATION = EXPLORATION_MAX

In [58]:
def learn(iterations, render=False):
    global EXPLORATION
    iterator = tqdm(range(iterations))
    for i in iterator:
        moves, score = agent.play(EXPLORATION)
        EXPLORATION *= EXPLORATION_DECAY
        scores.append(score)
        if render:
            iterator.set_description(f'Score: {score} [AVG: {np.mean(scores)}]')
            render_moves(moves)

In [59]:
learn(10, True)

Score: 15.0 [AVG: 25.2]: 100%|██████████| 10/10 [01:06<00:00,  6.66s/it]             


In [44]:
learn(50, True)

Score: 98 [AVG: 31.25]: 100%|██████████| 50/50 [06:10<00:00,  7.41s/it]             


In [70]:
moves, score = agent.play(0)
render_moves(moves)
score

13.0