In [8]:
from keras.layers import Dense, Activation
from keras.models import Sequential, load_model
import tensorflow as tf
import numpy as np

In [11]:
class ReplayBuffer(object):
    # the replay buffer for the dqn/ddqn, here we use numpy arrays
    def __init__(self, max_size, input_shape, n_actions, discrete=False):
        self.mem_size = max_size
        self.input_shape = input_shape
        self.discrete = discrete
        self.mem_cntr = 0
        self.state_memory = np.zeros((self.mem_size, input_shape))
        self.new_state_memory = np.zeros((self.mem_size, input_shape))
        dtype = np.int8 if self.discrete else np.float32
        self.action_memory = np.zeros((self.mem_size, n_actions), dtype=dtype)
        self.reward_memory = np.zeros(self.mem_size)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.float32)
        
    def store_transition(self, state, action, reward, state_, done):
        # stores a state transition in the buffer
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.reward_memory[index] = reward
        self.terminal_memory[index] = 1-int(done)
        # some adjustability if we decided to go with contiuos action values
        if self.discrete:
            actions = np.zeros(self.action_memory.shape[1])
            actions[action] = 1.0
            self.action_memory[index] = actions
        else:
            self.action_memory[index] = action
        self.mem_cntr += 1
    
    def sample_buffer(self, batch_size):
        # get a sample from the buffer
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, batch_size)
        
        states = self.state_memory[batch]
        states_ = self.new_state_memory[batch]
        rewards = self.reward_memory[batch]
        actions = self.action_memory[batch]
        terminal = self.terminal_memory[batch]
        
        return states, actions, rewards, states_, terminal

In [None]:
def build_dqn(lr, n_actions, input_dims, layer1_dims, layer2_dims):
    # function that returns a Dense Network for representing our q-values
    # we are simply using the Sequential API
    model = Sequential([
        Dense(layer1_dims, input_shape=(input_dims, )),
        Activation('relu'),
        Dense(layer2_dims),
        Activation('relu'),
        Dense(n_actions)
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr), loss='mse')
        
    return model

In [12]:
class DQNAgent(object):
    # class for our dqn-agent
    def __init__(self, alpha, gamma, n_actions, epsilon, batch_size, input_dims, epsilon_dec=0.996, epsilon_end=0.01, mem_size=1000000, chkpt_dir='models/dqn'):
        self.action_space = [i for i in range(n_actions)]
        
        self.n_actions = n_actions
        # reward discount
        self.gamma = gamma
        # exploration rate
        self.epsilon = epsilon
        # decrease in exploration over time
        self.epsilon_dec = epsilon_dec
        # minimal exploration
        self.epsilon_min = epsilon_end
        
        self.batch_size = batch_size
        self.chkpt_dir = chkpt_dir
        
        # create a buffer and a dqn for our agent
        self.memory = ReplayBuffer(mem_size, input_dims, n_actions, discrete=True)
        self.q_eval = build_dqn(alpha, n_actions, input_dims, 256, 256)
        
    def remember(self, state, action, reward, new_state, done):
        # stores a transition in the agents memory
        self.memory.store_transition(state, action, reward, new_state, done)
        
    def choose_action(self, state):
        # chooses a new action for the agent
        state = state[np.newaxis, :]
        rand = np.random.random()
        if rand < self.epsilon:
            # random exploration if epsilon is high enough
            action = np.random.choice(self.action_space)
        else:
            # or move according to the current optimal policy
            actions = self.q_eval.predict(state, verbose=0)
            action = np.argmax(actions)
        return action
    
    def learn(self):
        # if the buffer is not sufficiently full yet, do nothing 
        if self.memory.mem_cntr < self.batch_size:
            return 
        
        # get a sample batch from the buffer
        state, action, reward, new_state, done = self.memory.sample_buffer(self.batch_size)
        
        action_values = np.array(self.action_space, dtype=np.int8)
        action_indices = np.dot(action, action_values)
        
        # get q-value representations from the dqn
        q_eval = self.q_eval.predict(state, verbose=0)
        q_next = self.q_eval.predict(new_state, verbose=0)
        
        q_target = q_eval.copy()
        batch_index = np.arange(self.batch_size, dtype=np.int32)
        
        # train the dqn
        q_target[batch_index, action_indices] = reward + self.gamma*np.max(q_next, axis=1)*done
        _ = self.q_eval.fit(state, q_target, verbose=0)
        
        # update epsilon
        self.epsilon = self.epsilon * self.epsilon_dec if self.epsilon > self.epsilon_min else self.epsilon_min
        
    def save_model(self):
        self.q_eval.save(self.chkpt_dir)
        
    def load_model(self):
        self.q_eval = load_model(self.chkpt_dir)

In [None]:
class DDQNAgent(DQNAgent):
    # for double q-learning, we can inherit most functions from the dqn-agent
    def __init__(self, alpha, gamma, n_actions, epsilon, batch_size, input_dims, epsilon_dec=0.9996, epsilon_end=0.01, mem_size=1000000, chkpt_dir='models/ddqn', replace_target=10):
        super(DDQNAgent, self).__init__(alpha, gamma, n_actions, epsilon, batch_size, input_dims, epsilon_dec, epsilon_end, mem_size, chkpt_dir)
        # create a second (target) network
        self.q_target = build_dqn(alpha, n_actions, input_dims, 256, 256)
        # specifies how often the target is updated
        self.replace_target = replace_target
        
    def learn(self):
        # the learn function is mostly the same as well, the only difference being that the q_next is coming from the target network
        if self.memory.mem_cntr < self.batch_size:
            return
        state, action, reward, new_state, done = self.memory.sample_buffer(self.batch_size)
        action_values = np.array(self.action_space, dtype=np.int8)
        action_indices = np.dot(action, action_values)
            
        q_next = self.q_target.predict(new_state, verbose=0)
        q_eval = self.q_eval.predict(new_state, verbose=0)
            
        q_pred = self.q_eval.predict(state, verbose=0)
            
        max_actions = np.argmax(q_eval, axis=1)
            
        q_target = q_pred
            
        batch_index = np.arange(self.batch_size, dtype=np.int32)
            
        q_target[batch_index, action_indices] = reward + self.gamma*q_next[batch_index, max_actions.astype(int)] * done
            
        _ = self.q_eval.fit(state, q_target, verbose=0)
            
        self.epsilon = self.epsilon * self.epsilon_dec if self.epsilon > self.epsilon_min else self.epsilon_min
            
        # target network only gets updated every so often
        if self.memory.mem_cntr % self.replace_target == 0:
            self.update_network_parameters()
        
    def update_network_parameters(self):
        # updates target network
        self.q_target.set_weights(self.q_eval.get_weights())
        
    def load_model(self):
        super(DDQN, self).load_model()
        # we have to load up a target network as well
        if self.epsilon <= self.epsilon_min:
            self.update_network_parameters()