In [None]:
import tensorflow as tf
import numpy as np
import tensorflow_probability as tfp

# critic network

In [None]:
class Critic(tf.keras.Model):
    def __init__(self, units=[32, 16, 8, 1]):
        super(Critic, self).__init__()
        
        self.l0 = tf.keras.layers.Dense(units=units[0], activation=tf.nn.relu)
        self.l1 = tf.keras.layers.Dense(units=units[1], activation=tf.nn.relu)
        self.l2 = tf.keras.layers.Dense(units=units[2], activation=tf.nn.relu)
        self.l3 = tf.keras.layers.Dense(units=units[3], activation=None)
        
    
    def call(self, inputs):
        h = self.l0(inputs)
        h = self.l1(h)
        h = self.l2(h)
        out = self.l3(h)
        return out

# actor network  
continous action

In [None]:
class Actor(tf.keras.Model):
    def __init__(self, units=[32, 16, 8], action_dims=1):
        super(Actor, self).__init__()
        
        self.l0 = tf.keras.layers.Dense(units=units[0], activation=tf.nn.relu)
        self.l1 = tf.keras.layers.Dense(units=units[1], activation=tf.nn.relu)
        self.l2 = tf.keras.layers.Dense(units=units[2], activation=tf.nn.relu)
        
        self.mu = tf.keras.layers.Dense(units=action_dims, activation=tf.nn.tanh)
        self.sigma = tf.keras.layers.Dense(units=action_dims, activation=tf.nn.softplus)
    
    def call(self, inputs):
        h = self.l0(inputs)
        h = self.l1(h)
        h = self.l2(h)
        
        mu = self.mu(h) * 2.
        sigma = self.sigma(h)
        return mu, sigma

# PPO2

In [None]:
class PPO():
    def __init__(self, obs_dims, act_dims):
        self.gamma = 0.9
        self.critic_lr = 2e-4 
        self.actor_lr = 1e-4
        self.clip_epsilon = 0.2
        self.sync_critic_steps = 10
        self.obs_dims = obs_dims
        self.act_dims = act_dims
        
        self.sync_critic_cnt = 0
        
        self.critic_new = Critic(units=[32, 16, 8, 1])
        self.critic_old = Critic(units=[32, 16, 8, 1])
        self.critic_new.build(input_shape=(None, self.obs_dims))
        self.critic_old.build(input_shape=(None, self.obs_dims))
        self.critic_old.set_weights(self.critic_new.get_weights())
        
        self.actor_new = Actor(units=[32, 16, 8], action_dims=self.act_dims)
        self.actor_old = Actor(units=[32, 16, 8], action_dims=self.act_dims)
        self.actor_new.build(input_shape=(None, self.obs_dims))
        self.actor_old.build(input_shape=(None, self.obs_dims))
        self.actor_old.set_weights(self.actor_new.get_weights())
        
        self.status = {"episode": None,
                       "critic_loss": None,
                       "actor_obj": None,
                       "critic_sync_times": 0,
                       "actor_sync_times": 0
                       }
        
    
    def choose_action(self, obs):
        mu, sigma = self.actor_old(obs)
        distb = tfp.distributions.Normal(loc=mu[0], scale=sigma[0])
        action = distb.sample(1)[0]
        return action.numpy()
    
    def critic_training(self, obs, q_target):
        with tf.GradientTape() as tape:
            q_value = self.critic_new(obs)
            loss = tf.reduce_mean(tf.losses.mean_squared_error(q_target, q_value))
        
        grads = tape.gradient(target=loss, sources=self.critic_new.trainable_variables)
        tf.optimizers.Adam(self.critic_lr).apply_gradients(zip(grads, self.critic_new.trainable_variables))
        
        self.sync_critic_cnt += 1
        if self.sync_critic_cnt % self.sync_critic_steps == 0:
            self.__sync_critic_params()
        
        self.status["critic_loss"] = float(loss)
    
    def actor_training(self, obs, actions, q_target):
        advantage = q_target - self.critic_new(obs)
        
        with tf.GradientTape() as tape:
            mu_new, sigma_new = self.actor_new(obs)
            mu_old, sigma_old = self.actor_old(obs)
            
            distb_new = tfp.distributions.Normal(loc=mu_new, scale=sigma_new)
            distb_old = tfp.distributions.Normal(loc=mu_old, scale=sigma_old)
            
            prob_new = distb_new.prob(actions)
            prob_old = distb_old.prob(actions)
            
            ratio = prob_new / (prob_old + 1e-8)
            
            objective = - tf.reduce_mean(
                tf.minimum(ratio * advantage,
                           tf.clip_by_value(ratio, 1.-self.clip_epsilon, 1.+self.clip_epsilon) * advantage)
            )
        
        grads = tape.gradient(target=objective, sources=self.actor_new.trainable_variables)
        tf.optimizers.Adam(self.actor_lr).apply_gradients(zip(grads, self.actor_new.trainable_variables))
            
        self.status["actor_obj"] = float(objective)
    
    def __sync_critic_params(self, ):
        self.critic_old.set_weights(self.critic_new.get_weights())
        self.status["critic_sync_times"] += 1
    
    def sync_actor_params(self, ):
        self.actor_old.set_weights(self.actor_new.get_weights())
        self.status["actor_sync_times"] += 1

# test

In [None]:
import gym

env = gym.make("Pendulum-v0")
agent = PPO(obs_dims=env.observation_space.shape[0], act_dims=env.action_space.shape[0])

In [None]:
max_step = 256
batch_size = 32
learning_times = 15
test_per_episode = 5
test_episode = 5
gamma = 0.9

episode = 0

while True:
    obs = env.reset()
    obs_seq, reward_seq, action_seq = [], [], []
    for step in range(max_step):
        print(agent.actor_old(np.array([[1.,0.,0.]]).astype(np.float32)))
        print("[1.,0.,0.] action: ", agent.choose_action(np.array([[1.,0.,0.]]).astype(np.float32)))
        print(agent.actor_old(np.array([[1.,0.,0.5]]).astype(np.float32)))
        print("[1.,0.,0.5] action: ", agent.choose_action(np.array([[1.,0.,0.5]]).astype(np.float32)))
        action = agent.choose_action(np.array([obs]).astype(np.float32))
        action = np.clip(action, -2., 2.)
        obs_, reward, done, info = env.step(action)
        reward = (reward + 8.0)
        
        if len(obs_seq) < batch_size-1:
            obs_seq.append(obs)
            reward_seq.append(reward)
            action_seq.append(action)

        else:   
            obs_seq.append(obs)
            reward_seq.append(reward)
            action_seq.append(action)
            
            q_target = [agent.critic_old(np.array([obs_]).astype(np.float32)).numpy()[0]]
            for r in reward_seq[::-1]:
                q_target.append(r + gamma * q_target[-1])
            q_target.pop(0)
            q_target.reverse()
            
            obs_buf = np.vstack(obs_seq).astype(np.float32)
            action_buf = np.vstack(action_seq).astype(np.float32)
            q_target = np.vstack(q_target).astype(np.float32)
            
            for _ in range(learning_times):
                agent.critic_training(obs=obs_buf, q_target=q_target)
                print(agent.status)
                
            for _ in range(learning_times):
                agent.actor_training(obs=obs_buf, actions=action_buf, q_target=q_target)
                print(agent.status)
            
            obs_seq, reward_seq, action_seq = [], [], []
            agent.sync_actor_params()
        
        obs = obs_
        
    agent.status["episode"] = episode
    episode += 1
        
        
        
    if episode % test_per_episode == 0:
        for ep in range(test_episode):
            toltal_reward = 0
            s = env.reset()
            for step in range(max_step):
                a = agent.choose_action(np.array([s]).astype(np.float32))
                a = np.clip(a, -2., 2.)
                s_, r, _, _ = env.step(a)
                env.render()
                toltal_reward += r
                s = s_
                print("obs:{}, action:{}, reward:{}".format(s, a, r))
        print("********* episode: {}, toltal_reward: {}".format(episode, toltal_reward/test_episode))

In [None]:
env.close()