In [None]:
import numpy as np
from keras import layers
import tensorflow as tf
from keras import backend as K
from keras.models import Model
import numpy as np
from keras import optimizers
import random
from collections import namedtuple, deque
import gym
import matplotlib.pyplot as plt
import argparse

class Actor:
    def __init__(self, input_dim, output_dim, tau, gamma):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.tau = tau
        self.gamma = gamma
        self.model = self.__make_model()

        # Update function will be defined in train method
        self.update_function = None

    def __make_model(self):
        input_layer = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(input_layer)
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dense(256, activation='relu')(x)
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dense(self.output_dim, activation='tanh')(x)

        model = Model(inputs=input_layer, outputs=x)
        model.compile(optimizer=optimizers.Adam(learning_rate=0.0001))  # Changed lr to learning_rate
        return model

    def get_action(self, state):
        return self.model.predict(state)

    def train(self, state, grads):
        with tf.GradientTape() as tape:
            actions = self.model(state)
            loss = -tf.reduce_mean(grads * actions)  # Minimize negative gradients
        gradients = tape.gradient(loss, self.model.trainable_variables)
        self.model.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

    def soft_update(self, target):
        weights = np.array(self.model.get_weights())
        target_weights = np.array(target.get_weights())
        target_weights = self.tau * weights + (1 - self.tau) * target_weights
        target.set_weights(target_weights)
        return target

In [None]:
class Critic:
    def __init__(self, input_dim, output_dim, tau, gamma):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.tau = tau
        self.gamma = gamma

        self.model = self.__make_model()

    def __make_model(self):
        state_input_layer = layers.Input(shape=(self.input_dim,))
        action_input_layer = layers.Input(shape=(self.output_dim,))

        state_x = layers.BatchNormalization()(state_input_layer)
        action_x = layers.BatchNormalization()(action_input_layer)

        state_x = layers.Dense(128, activation='relu')(state_x)
        state_x = layers.Dense(256, activation='relu')(state_x)
        action_x = layers.Dense(128, activation='relu')(action_x)

        x = layers.Concatenate()([state_x, action_x])
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dense(1, activation='linear')(x)

        model = Model(inputs=[state_input_layer, action_input_layer], outputs=x)
        model.compile(optimizer=optimizers.Adam(learning_rate=0.0001))  # Changed lr to learning_rate
        return model
    def train(self, state, action, reward):
        target = reward + self.gamma * self.model.predict([state, action])
        self.model.fit([state, action], target, verbose=0)

    def get_gradient(self, state, action):
        with tf.GradientTape() as tape:
            tape.watch(action)
            q_value = self.model([state, action])
        return tape.gradient(q_value, action)

    def soft_update(self, target):
        weights = np.array(self.model.get_weights())
        target_weights = np.array(target.get_weights())
        target_weights = self.tau * weights + (1 - self.tau) * target_weights
        target.set_weights(target_weights)
        return target

In [None]:
class ReplayBuffer():
    def __init__(self, maxlen=20000, batch_size=640):
        self.memory = deque()
        self.batch_size = batch_size
        self.experience = namedtuple("Experience",field_names=["state", "action", "reward", "next_state", "done"])

    def add(self, state, action, reward, next_state, done):
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        return random.sample(self.memory, self.batch_size)


    def __len__(self):
        return len(self.memory)

In [None]:
class Agent:
    def __init__(self,input_dim,output_dim, tau = 0.001, gamma =0.99,train_batch_size = 640):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.tau = tau
        self.gamma = gamma
        self.train_batch_size = train_batch_size
        self.main_critic = Critic(input_dim,output_dim,tau,gamma)
        self.target_critic = Critic(input_dim,output_dim,tau,gamma)

        self.main_actor = Actor(input_dim,output_dim,tau,gamma)
        self.target_actor = Actor(input_dim,output_dim,tau,gamma)

        self.target_critic.model.set_weights(self.main_critic.model.get_weights())
        self.target_actor.model.set_weights(self.main_actor.model.get_weights())

        self.memory = ReplayBuffer(batch_size = train_batch_size)

    def get_action(self,state):
        return self.main_actor.get_action(state)

    def train(self):
        data = self.memory.sample()
        states = np.vstack([e.state for e in data if e is not None])
        actions = np.array([e.action for e in data if e is not None]).astype(np.float32).reshape(-1, self.output_dim)
        rewards = np.array([e.reward for e in data if e is not None]).astype(np.float32).reshape(-1, 1)
        dones = np.array([e.done for e in data if e is not None]).astype(np.uint8).reshape(-1, 1)
        next_states = np.vstack([e.next_state for e in data if e is not None])

        actions_next = self.target_actor.model.predict(next_states)
        Q_targets_next = self.target_critic.model.predict([next_states, actions_next])

        Q_targets = rewards + self.gamma * Q_targets_next * (1 - dones)

        # Train the main critic
        self.main_critic.model.fit([states, actions], Q_targets, verbose=0)

        action_gradients = self.main_critic.get_gradient(states, actions)
        self.main_actor.train(states, action_gradients)

        self.target_actor.model.set_weights(self.main_actor.model.get_weights())
        self.target_critic.model.set_weights(self.main_critic.model.get_weights())


In [None]:
env = gym.make("MountainCarContinuous-v0")
agent = Agent(2,1,train_batch_size = 640)

In [None]:
agent.main_actor.model.get_config()


{'name': 'functional_7',
 'trainable': True,
 'layers': [{'module': 'keras.layers',
   'class_name': 'InputLayer',
   'config': {'batch_shape': (None, 2),
    'dtype': 'float32',
    'sparse': False,
    'name': 'input_layer_14'},
   'registered_name': None,
   'name': 'input_layer_14',
   'inbound_nodes': []},
  {'module': 'keras.layers',
   'class_name': 'BatchNormalization',
   'config': {'name': 'batch_normalization_14',
    'trainable': True,
    'dtype': {'module': 'keras',
     'class_name': 'DTypePolicy',
     'config': {'name': 'float32'},
     'registered_name': None},
    'axis': -1,
    'momentum': 0.99,
    'epsilon': 0.001,
    'center': True,
    'scale': True,
    'beta_initializer': {'module': 'keras.initializers',
     'class_name': 'Zeros',
     'config': {},
     'registered_name': None},
    'gamma_initializer': {'module': 'keras.initializers',
     'class_name': 'Ones',
     'config': {},
     'registered_name': None},
    'moving_mean_initializer': {'module': 'ke

In [None]:
epsilon = 0.999
epsilon_decaying = 0.99995
def run_episode(train = True, render = False, train_batch_size = 640,verbose = False):
    global epsilon
    global epsilon_decaying
    epsilon *= epsilon_decaying
    record = []
    done = False
    frame = env.reset()
    ep_reward = 0
    while done != True:
        if render:
            env.render()
        state = frame.reshape(1,-1)
        state = (state - env.observation_space.low) / \
                (env.observation_space.high - env.observation_space.low)
        if np.random.random() < epsilon:
            action = np.clip(agent.get_action(state) + (np.random.normal()),-1,1)
        else:
            action = agent.get_action(state)
        next_frame, reward, done, _ = env.step(action)
        if reward <100 :
            reward = -1.
        else :
            reward = 100.
        agent.memory.add(state,action,reward,next_frame.reshape(1,-1),done)
        ep_reward += reward
        frame = next_frame
        if verbose :
            print('state : ', state, ', action :', action, ', reward : ',reward,', reward : ', reward,', done : ',done,\
                ', ep_reward : ',ep_reward)
    if train:
        print('trained_start')
        agent.train()
        print('trained_well')
    print("ep_reward:", ep_reward)

    episode_reward_lst.append(ep_reward)

In [None]:
episode_reward_lst = []


In [None]:
'''
agent.main_critic.model.save_weights("./well_trained_main_critic_"+str(iterate+1)+".h5")
agent.target_critic.model.save_weights("./well_trained_target_critic_"+str(iterate+1)+".h5")
agent.main_actor.model.save_weights("./well_trained_main_actor_"+str(iterate+1)+".h5")
agent.target_actor.model.save_weights("./well_trained_target_actor_"+str(iterate+1)+".h5")
'''

'\nagent.main_critic.model.save_weights("./well_trained_main_critic_"+str(iterate+1)+".h5")\nagent.target_critic.model.save_weights("./well_trained_target_critic_"+str(iterate+1)+".h5")\nagent.main_actor.model.save_weights("./well_trained_main_actor_"+str(iterate+1)+".h5") \nagent.target_actor.model.save_weights("./well_trained_target_actor_"+str(iterate+1)+".h5")\n'

In [None]:
def run_training(iteration,save_point):
    for iterate in range(1,iteration+1):
        print('iterate : ',iterate)
        if iterate % 5 == 0:
            run_episode(train = True, render = False, train_batch_size=640,verbose=False)
        else:
            run_episode(train = False, render = False, train_batch_size=640,verbose=False)
        if iterate % save_point == 0:
            agent.main_critic.model.save_weights("./well_trained_main_critic_"+str(iterate+1)+".h5")
            agent.target_critic.model.save_weights("./well_trained_target_critic_"+str(iterate+1)+".h5")
            agent.main_actor.model.save_weights("./well_trained_main_actor_"+str(iterate+1)+".h5")
            agent.target_actor.model.save_weights("./well_trained_target_actor_"+str(iterate+1)+".h5")

In [None]:
run_training(1000,1000)


iterate :  1
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 116ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step


  reward -= math.pow(action[0], 2) * 0.1
  if not isinstance(terminated, (bool, np.bool8)):
  logger.warn(f"{pre} is not within the observation space.")


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step

ValueError: No loss to compute. Provide a `loss` argument in `compile()`.

In [None]:
env.close()


In [None]:
def test(render = False,verbose = False):
    done = False
    frame = env.reset()
    ep_reward = 0
    while done != True:
        if render:
            env.render()
        state = frame.reshape(1,-1)
        state = (state - env.observation_space.low) / \
                (env.observation_space.high - env.observation_space.low)

        action = agent.get_action(state)
        next_frame, reward, done, _ = env.step(action)
        if reward <100 :
            reward = -1.
        else :
            reward = 100.
        ep_reward += reward
        frame = next_frame
        if verbose :
            print('state : ', state, ', action :', action, ', reward : ',reward,', reward : ', reward,', done : ',done,\
                ', ep_reward : ',ep_reward)

In [None]:
test(render = True,verbose = True)


In [None]:
env.close()


In [None]:
import matplotlib.pyplot as plt


In [None]:
%matplotlib inline
plt.plot(episode_reward_lst)
plt.show()

In [None]:
agent.main_critic.model.summary()
