In [1]:
import numpy as np
import gym
import tensorflow as tf
import numpy as np
import torch
import torch.nn.functional as F
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import random
import keras
import tensorflow.keras.backend as K
import matplotlib.pyplot as plt

Using TensorFlow backend.


In [2]:
class MemoryBuffer:                                         ## Experience replay
    def __init__(self, obs_dim, act_dim, size):
        self.obs1_buf = np.zeros([size, obs_dim], dtype=np.float32) #Initialization of the set of all current states
        self.obs2_buf = np.zeros([size, obs_dim], dtype=np.float32) # Initialization of the set of all next states
        self.acts_buf = np.zeros(size, dtype=np.int32)                # Initialization of the set of all actions
        self.rews_buf = np.zeros(size, dtype=np.float32)     # Initialization of the set of all rewards
        self.done_buf = np.zeros(size, dtype=np.float32)
        self.ptr = 0                                         # Initial numbers
        self.size = 0
        self.max_size = size    

    def store(self, obs, act, rew, next_obs, done):  # Store one transition <s,a,r,s'>
        self.obs1_buf[self.ptr] = obs.flatten()           # Return an array to one dimension
        self.obs2_buf[self.ptr] = next_obs.flatten()
        self.acts_buf[self.ptr] = act
        self.rews_buf[self.ptr] = rew
        self.done_buf[self.ptr] = done
        self.ptr = (self.ptr+1) % self.max_size      # Variation of index
        self.size = min(self.size+1, self.max_size)  

    def sample_batch(self, batch_size=32):                 # Store all transitions as dictionnary.
        idxs = np.random.randint(0, self.size, size=batch_size)
        return dict(obs1=self.obs1_buf[idxs],                      # All current states
                    obs2=self.obs2_buf[idxs],                       # All next states
                    acts=self.acts_buf[idxs],                        # All actions
                    rews=self.rews_buf[idxs],                         # All rewards
                    done=self.done_buf[idxs])                         # Boolean

In [3]:
class CreateNetwork(torch.nn.Module):                          # Neural network to get the RND 
    def __init__(self,num_states, hidden_units,num_actions):    # Initialization
        super(CreateNetwork, self).__init__()
        self.num_states = num_states
        self.hidden_units = hidden_units
        self.num_actions = num_actions
        
        self.first_layer = torch.nn.Linear(self.num_states,self.hidden_units, 'linear')
        self.third_layer = torch.nn.Linear(hidden_units,hidden_units, 'linear')
        self.second_layer = torch.nn.Linear(self.hidden_units, self.num_actions,'linear')
        self.last_activa = torch.nn.Softmax(dim=1)       # normalize the tensor along each row.
        
    def forward(self,input_state):                            # The input is the state
        input_state = F.relu(self.first_layer(input_state))
        input_state = F.relu(self.third_layer(input_state))
        output = F.relu(self.second_layer(input_state))
        #output = self.last_activa(input_state)
        
        return output

class RND:                                                  
    
    def __init__(self, hidden_units):
        self.num_states = env.observation_space.shape[0]
        self.hidden_units = hidden_units
        self.num_actions = env.action_space.n
        
        # Target network
        self.target_network = CreateNetwork(env.observation_space.shape[0], hidden_units,env.action_space.n) # target network
        
        # Predictor network
        self.predictor_network = CreateNetwork(env.observation_space.shape[0], hidden_units,env.action_space.n) # predictor network
        self.optimizer = torch.optim.Adam(self.predictor_network.parameters(),lr=0.001)
        
        
    def Intrinsic(self,inputs):                                 # The reward function
        inputs = torch.tensor(inputs, dtype=torch.float32)
        target_output = self.target_network(inputs).detach()
        predictor_output = self.predictor_network(inputs)
        int_reward = torch.pow(target_output - predictor_output,2).sum() # The L2 norm
        return int_reward                                                      # return the intrinsic reward

In [6]:
class MyModel(tf.keras.Model):
    
    """A simple mlp to be used for approximating Q-functions.
    """
    def __init__(self, num_states, hidden_units, hidden_activations, num_actions):
        super(MyModel, self).__init__()
        self.input_layer = tf.keras.layers.InputLayer(input_shape=(num_states,))
        self.hidden_layers = []
        for i, n in enumerate(hidden_units):
            self.hidden_layers.append(tf.keras.layers.Dense(
              units = n, 
                activation = hidden_activations[i], 
              kernel_initializer = 'RandomNormal'))
        self.output_layer = tf.keras.layers.Dense(
            units = num_actions, 
            activation = 'linear', 
            kernel_initializer = 'RandomNormal')
      
    
    @tf.function
    def call(self, inputs):
        z = self.input_layer(inputs)
        for layer in self.hidden_layers:
            z = layer(z)
        output = self.output_layer(z)
        return output
    
class DQN:

    def train(Q_main, Q_targ, gamma, buffer, batch_size, optimizer, env):
        batch = buffer.sample_batch(batch_size=32)  # Sample of the store transitons
        o = batch['obs1']
        a = batch['acts']
        r = batch['rews']
        o2 = batch['obs2']
        d = batch['done']
        
        # number of actions for one-hot encoding
        num_acts = env.action_space.n
        rewardd = RND(10)
        # Create target for bellman update    Y_i
        gamma = 0.94
        target_action_values = np.max(Q_targ.predict(o2), axis=1)
        target_action_values = np.where(d, r, r+gamma*target_action_values)
        
        
        # Find the MSE loss between the taget and actual Q-values for the batch 
        with tf.GradientTape() as tape:
            main_action_values = Q_main(np.atleast_2d(o.astype('float32'))) #we want to Convert inputs to arrays
                                                                            # with at least two dimension.
            selected_action_values = tf.math.reduce_sum(main_action_values * tf.one_hot(a, num_acts), axis=1)
            # selected_action_values = tf.math.reduce_sum(main_action_values * a, axis=1)
            loss = tf.math.reduce_sum(tf.square(target_action_values - selected_action_values))
            
            
        # Extract gradients from the tape and apply using the chosen optimizer
        variables = Q_main.trainable_variables
        gradients = tape.gradient(loss, variables) # returns a gradient tensor
        optimizer.apply_gradients(zip(gradients, variables)) #takes list of (gradient, variable) pairs as
                                                             # input (that's why you use zip)
            
    def sync_weights(Q_main, Q_targ):
        """Copy the weights from the main Q-network to the 
            target Q-network.
        """
        #self.Q_targ = Q_targ
        #self.Q_main = Q_main
        variables1 = Q_targ.trainable_variables
        variables2 = Q_main.trainable_variables
        
        for v, w in zip(variables1, variables2):
            v.assign(w.numpy())                     # assign the weights of v2 (Q-main) to v1 (Q-target)

            
    def run(env, agent_params, training_params):
        # Set random seed for reproducability
        seed_value = 10
        np.random.seed(seed_value) # Generate pseudo-random number i
        
        env = gym.make('MountainCar-v0')
        buffer = MemoryBuffer(env.observation_space.shape[0],env.action_space.n,32)
        
        
        
        # Train agent
        all_rewards = [] # will contain the sum of extrinsic and intrinsic rewards
        total_steps = 0
        
        
        # MODEL PARAMETERS
        hidden_dims = agent_params['hidden_layer_sizes']
        hidden_activations = agent_params['hidden_layer_activations']
        gamma = agent_params['discount_factor']
        lr = agent_params['learning_rate']
        epsilon = agent_params['epsilon']
        min_epsilon = agent_params['min_epsilon']
        epsilon_decay = agent_params['epsilon_decay']
        
        # TRAINING PARAMETERS
        num_episodes = training_params['num_episodes']
        max_steps = training_params['max_steps']
        min_experiences = training_params['min_experiences']
        biffer_size = training_params['buffer_size']
        batch_size = training_params['batch_size']
        copy_step = training_params['copy_step']
        plot = training_params['plot_results']
        #seed_value = training_params['seed_value']
        
        optimizer = tf.keras.optimizers.Adam(lr)
        
        Q_main =MyModel(env.observation_space.shape[0], [64,64], ["relu", "relu"], env.action_space.n)
        
        Q_targ = MyModel(env.observation_space.shape[0], [64,64], ["relu", "relu"], env.action_space.n)
        rewardd = RND(10)
        extr_rew = []             # will contain just extrinsic rewards
        
        for ep in range(num_episodes):
            o = env.reset()
            all_rewards.append(0.0)
            epsilon = max(min_epsilon, epsilon * epsilon_decay)
            epsilon = round(epsilon,3)
            extr_rew.append(0.0)
            
            # play game
            for t in range(max_steps):
                
                env.render()
                # take action 
                if(np.random.random() < epsilon):
                    action = np.random.randint(env.action_space.n)
                else:
                    action = np.argmax(Q_main.predict(np.atleast_2d(o))[0])
                
           
            # step in environment
                o2, r, d, _ = env.step(action)
                rewardd_i = rewardd.Intrinsic(torch.from_numpy(o))
                rewardd_i = rewardd_i.item()
                list1 = list(o2)
                if list1[0] == 0.6 and list1[1] >= 0.07:
                    reward_ex = 0.5
                else:
                    reward_ex = r
                combine_reward = rewardd_i + reward_ex
                #print(d)
                all_rewards[-1] += combine_reward
                extr_rew[-1] += reward_ex
                
                # store transition
                buffer.store(o,action,combine_reward,o2,d)
            
            # train (action replay)
                if total_steps >= min_experiences:
                    DQN.train(Q_main, Q_targ, gamma, buffer, batch_size, optimizer, env)

                
                if total_steps % copy_step == 0:
                    DQN.sync_weights(Q_main, Q_targ)
                  
            # update observation
                o = o2
            
            # update step count
                total_steps += 1
            
            #print("episode: {}, epsillon; {}, reward: {}".format(ep,epsilon, np.mean(extr_rew[-1])))
                
                if list1[0] == 0.6 and list1[1] >= 0.07:
                    return "episode: {}, epsillon; {}, reward: {}".format(ep,epsilon, np.mean(all_rewards[-1]))
    
        if plot:
        
            dqn_rnd_returns = [r if i<20 else np.mean(all_rewards[i-20:i]) for i, r in enumerate(all_rewards)]
            fig, ax = plt.subplots(figsize=(14,7))
            ax.set_xlabel("Number of Episodes")
            ax.set_ylabel("Returns")
            ax.plot(dqn_rnd_returns,'-b', label='DQN+RND')
            plt.savefig("DQN_RND.png", dpi=300)

In [7]:
env = gym.make('MountainCar-v0')
agent_params = {'hidden_layer_sizes':10,'min_epsilon':0.1,'hidden_layer_activations':'relu','discount_factor':0.9,
                        'learning_rate':0.001,'epsilon':1.0,'epsilon_decay':0.94}
training_params = {'num_episodes':200,'max_steps':600,'min_experiences':50,'buffer_size':20000,
                   'batch_size':128,'copy_step':20,'plot_results' : True}
rrr = DQN()
buffer = MemoryBuffer(env.observation_space.shape[0],env.action_space.n,128)

rnd = RND(10)

In [7]:
#rrr.run(agent_params,training_params)