In [16]:
import gym
import random
import numpy as np
import tensorflow as tf
import itertools

from collections import deque
from tensorflow.keras import Model, Sequential 
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import (BatchNormalization, Flatten, Dense, ReLU)

In [17]:
model_savefolder = "./model"

In [18]:
def set_device():
    print('-------------------')
    if len(tf.config.list_physical_devices('GPU')) > 0:
        print("GPU available.")
        device = '/gpu:0'
    else:
        print("No GPU available.")
        device = '/cpu:0'
    print('-------------------')
    return device

In [29]:
class DQNAgent:
    def __init__(self, action_space, batch_size=3, replay_memory_size=5, epsilon=0.1, 
                 learning_rate=1e-3, discount_factor=1., load=False, save=True):
        self.epsilon = epsilon
        self.lr = learning_rate
        self.discount_factor = discount_factor
        self.batch_size = batch_size
        self.replay_memory_size = replay_memory_size
    
        self.replay_memory = deque(maxlen=replay_memory_size)
        
        self.action_space = action_space
        self.criterion = MeanSquaredError()
        
        self.device = set_device()
        
        self.save = save
        #self.opt = Adam(lr=INIT_LR, decay=INIT_LR / EPOCHS)
        self.opt = Adam(lr=self.lr)
        
        if load:
            print('--------------------')
            print(f'Loading model from: {0}')
            print('--------------------')
            self.dqn = tf.keras.models.load_model(model_savefolder)
            self.dqn.build((None, 4))
        else:
            self.dqn = DQN(self.action_space)
            self.dqn.build((None, 4))
        
    def choose_action(self, obs):
        if np.random.uniform() < self.epsilon:
            return np.random.choice(self.action_space)
        else:
            obs = np.expand_dims(obs, axis=0)
            
            action = np.argmax(self.dqn(obs), axis=1)[0]
            
            return action
    
    def remember(self, obs, action, reward, next_obs, done):
        self.replay_memory.append([obs, action, reward, next_obs, done])
        
    def train(self):
        batch = random.sample(self.replay_memory, self.batch_size)
        batch = np.array(batch, dtype=object)
     
        obs = tf.stack(batch[:, 0])
        actions = batch[:, 1]
        rewards = tf.stack(batch[:, 2])
        next_obs = tf.stack(batch[:, 3])  
        dones = tf.stack(batch[:, 4])
        
        with tf.GradientTape() as tape:
            tape.watch(self.dqn.trainable_variables)
        
            q_obs = self.dqn(obs)
            q_next_obs = self.dqn(next_obs)

            targets = tf.where(dones, rewards, self.discount_factor * tf.math.reduce_max(q_next_obs, axis=1))
           
            action_idx = tf.stack([list(range(len(actions))), actions], axis=1)
            q_obs = tf.gather_nd(q_obs, action_idx) # corresponding to taken actions
            
            loss = self.criterion(targets, q_obs) # td error
            
            print(loss)
        grads = tape.gradient(loss, self.dqn.trainable_variables)
        self.opt.apply_gradients(zip(grads, self.dqn.trainable_variables))

In [30]:
class DQN(Model):
    """ Deep Q-learning network.
    """
    def __init__(self, num_actions, num_obs=4):
        super(DQN, self).__init__()
        self.device = set_device()
        
        self.layer = Sequential([Dense(24),
                                 ReLU(),
                                 Dense(num_actions),
                                 ReLU()])
    @tf.function
    def call(self, x):
        return self.layer(x)

In [31]:
def run_training(env, agent, T=10):
    step = 0
    
    for i in range(T):
        done = False
        obs = env.reset()
        
        while not done:
            #env.render()
            action = agent.choose_action(obs)
            
            next_obs, reward, done, _ = env.step(action)
            
            agent.remember(obs, action, reward, next_obs, done)
            
            if step > agent.replay_memory_size:
                agent.train()
            
            if agent.save:
                agent.dqn.save(model_savefolder)
            
            step += 1

In [32]:
def main():
    env = gym.make('CartPole-v1')
    
    """
        observation: {cart position (0), cart velocity (1), pole angle (2), pole angle velocity (3)}
        
        action: {push cart to the left (0) or right (1)}
    """
    observation_space = env.observation_space.shape[0]
    action_space = env.action_space.n
    
    agent = DQNAgent(action_space=action_space, save=False)
    
    run_training(env, agent)
    
    done = False
    obs = env.reset()
    #while True:
    #    env.render()
    #    action = agent.choose_action(obs)

    #    next_obs, reward, done, _ = env.step(action)

    env.close()

In [33]:
if __name__ == "__main__":
    main()

-------------------
No GPU available.
-------------------
-------------------
No GPU available.
-------------------
tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.33333334, shape=(), dtype=float32)
tf.Tensor(0.33333334, shape=(), dtype=float32)
tf.Tensor(0.3333358, shape=(), dtype=float32)
tf.Tensor(7.8208967e-07, shape=(), dtype=float32)
tf.Tensor(0.33333334, shape=(), dtype=float32)
tf.Tensor(0.0024707902, shape=(), dtype=float32)
tf.Tensor(0.0022942487, shape=(), dtype=float32)
tf.Tensor(0.023923941, shape=(), dtype=float32)
tf.Tensor(0.045234, shape=(), dtype=float32)
tf.Tensor(0.055033445, shape=(), dtype=float32)
tf.Tensor(0.08071706, shape=(), dtype=float32)
tf.Tensor(0.081545025, shape=(), dtype=float32)
tf.Tensor(0.10593635, shape=(), dtype=float32)
tf.Tensor(0.110720575, shape=(), dtype=float32)
tf.Tensor(0.112123765, shape=(), dtype=float32)
tf.Tensor(0.06555561, shape=(), dtype=float32)
tf.Tensor(0.067108445, shape=(), dtype=float32)
tf.Tensor(0.36641982, shape=(), dty