# Synchronous PPO with PyBullet Ant

In [1]:
import datetime,gym,os,pybullet_envs,time,os,psutil,ray
import numpy as np
import tensorflow as tf
from util import gpu_sess,suppress_tf_warning,PPOBuffer,save_ppo_model_and_buffer,restore_ppo_model_and_buffer
from ppo import create_ppo_model,create_ppo_graph
np.set_printoptions(precision=2)
suppress_tf_warning() # suppress warning 
gym.logger.set_level(40) # gym logger 
print ("Packaged loaded. TF version is [%s]."%(tf.__version__))

Packaged loaded. TF version is [1.15.0].


### Rollout Worker

In [2]:
def get_env():
    import pybullet_envs,gym
    gym.logger.set_level(40) # gym logger 
    return gym.make('AntBulletEnv-v0')

def get_eval_env():
    import pybullet_envs,gym
    gym.logger.set_level(40) # gym logger
    eval_env = gym.make('AntBulletEnv-v0')
#     _ = eval_env.render(mode='human') # enable rendering
    _ = eval_env.reset()
    for _ in range(3): # dummy run for proper rendering 
        a = eval_env.action_space.sample()
        o,r,d,_ = eval_env.step(a)
        time.sleep(0.01)
    return eval_env

In [3]:
import scipy.signal

def discount_cumsum(x, discount):
    """
    magic from rllab for computing discounted cumulative sums of vectors.
    input: 
        vector x, 
        [x0, 
         x1, 
         x2]
    output:
        [x0 + discount * x1 + discount^2 * x2,  
         x1 + discount * x2,
         x2]
    """
    return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]

In [4]:
class RolloutWorkerClass(object):
    """
    Worker without RAY (for update purposes)
    """
    def __init__(self,pi_lr=3e-4,vf_lr=1e-3,clip_ratio=0.2,seed=1):
        self.seed = seed
        # Each worker should maintain its own environment
        # import pybullet_envs,gym
        from util import suppress_tf_warning
        suppress_tf_warning() # suppress TF warnings
        # gym.logger.set_level(40) # gym logger 
        # self.env = gym.make('AntBulletEnv-v0')
        self.env = get_env()
        odim,adim = self.env.observation_space.shape[0],self.env.action_space.shape[0]
        self.odim = odim
        self.adim = adim
        
        # Create PPO model and computational graph 
        self.model,self.sess = create_ppo_model(env=self.env,hdims=[256,256])
        self.graph = create_ppo_graph(self.model,pi_lr=pi_lr,vf_lr=vf_lr,clip_ratio=clip_ratio)
        
        # Initialize model 
        tf.set_random_seed(self.seed)
        np.random.seed(self.seed)
        self.sess.run(tf.global_variables_initializer())
        
        # Flag to initialize assign operations for 'set_weights()'
        self.FIRST_SET_FLAG = True
    
    def get_action(self,o,deterministic=False):
#         act_op = self.model['mu'] if deterministic else self.model['pi']
#         return self.sess.run([act_op, self.model['v'], self.model['logp_pi']], 
#                              feed_dict={self.model['o_ph']:o.reshape(1,-1)})
        return self.sess.run(self.model['get_action_ops'],feed_dict={self.model['o_ph']:o.reshape(1,-1)})

    def get_weights(self):
        """
        Get weights
        """
        weight_vals = self.sess.run(self.model['main_vars'])
        return weight_vals
    
    def set_weights(self,weight_vals):
        """
        Set weights without memory leakage
        """
        if self.FIRST_SET_FLAG:
            self.FIRST_SET_FLAG = False
            self.assign_placeholders = []
            self.assign_ops = []
            for w_idx,weight_tf_var in enumerate(self.model['main_vars']):
                a = weight_tf_var
                assign_placeholder = tf.placeholder(a.dtype, shape=a.get_shape())
                assign_op = a.assign(assign_placeholder)
                self.assign_placeholders.append(assign_placeholder)
                self.assign_ops.append(assign_op)
                
        for w_idx,weight_tf_var in enumerate(self.model['main_vars']):
            self.sess.run(self.assign_ops[w_idx],
                          {self.assign_placeholders[w_idx]:weight_vals[w_idx]})
            
            
@ray.remote
class RayRolloutWorkerClass(object):
    """
    Rollout Worker with RAY
    """
    def __init__(self,worker_id=0,ep_len_rollout=1000):
        # Parse
        self.worker_id = worker_id
        self.ep_len_rollout = ep_len_rollout
        # Each worker should maintain its own environment
        # import pybullet_envs,gym
        from util import suppress_tf_warning
        suppress_tf_warning() # suppress TF warnings
        # gym.logger.set_level(40) # gym logger 
        # self.env = gym.make('AntBulletEnv-v0')
        self.env = get_env()
        odim,adim = self.env.observation_space.shape[0],self.env.action_space.shape[0]
        self.odim = odim
        self.adim = adim
        
        # Replay buffers to pass
        self.obs_buf = np.zeros((self.ep_len_rollout,self.odim), dtype=np.float32)
        self.act_buf = np.zeros((self.ep_len_rollout,self.adim), dtype=np.float32)
        self.rew_buf = np.zeros((self.ep_len_rollout), dtype=np.float32)
        self.ret_buf = np.zeros((self.ep_len_rollout), dtype=np.float32)
        self.val_buf = np.zeros((self.ep_len_rollout), dtype=np.float32)
        self.logp_buf = np.zeros((self.ep_len_rollout), dtype=np.float32)
        self.adv_buf = np.zeros((self.ep_len_rollout), dtype=np.float32)
        
        # Create PPO model
        self.model,self.sess = create_ppo_model(env=self.env,hdims=[256,256])
        self.sess.run(tf.global_variables_initializer())
        print ("Ray Worker [%d] Ready."%(self.worker_id))
        
        # Flag to initialize assign operations for 'set_weights()'
        self.FIRST_SET_FLAG = True
        
        # Flag to initialize rollout
        self.FIRST_ROLLOUT_FLAG = True
        
    def get_action(self,o,deterministic=False):
#         act_op = self.model['mu'] if deterministic else self.model['pi']
#         return self.sess.run([act_op, self.model['v'], self.model['logp_pi']], 
#                              feed_dict={self.model['o_ph']:o.reshape(1,-1)})
        return self.sess.run(self.model['get_action_ops'],feed_dict={self.model['o_ph']:o.reshape(1,-1)})
                                    
    def set_weights(self,weight_vals):
        """
        Set weights without memory leakage
        """
        if self.FIRST_SET_FLAG:
            self.FIRST_SET_FLAG = False
            self.assign_placeholders = []
            self.assign_ops = []
            for w_idx,weight_tf_var in enumerate(self.model['main_vars']):
                a = weight_tf_var
                assign_placeholder = tf.placeholder(a.dtype, shape=a.get_shape())
                assign_op = a.assign(assign_placeholder)
                self.assign_placeholders.append(assign_placeholder)
                self.assign_ops.append(assign_op)
                
        for w_idx,weight_tf_var in enumerate(self.model['main_vars']):
            self.sess.run(self.assign_ops[w_idx],
                          {self.assign_placeholders[w_idx]:weight_vals[w_idx]})
            
    def rollout(self):
        """
        Rollout
        """
        if self.FIRST_ROLLOUT_FLAG:
            self.FIRST_ROLLOUT_FLAG = False
            self.o = self.env.reset() # reset environment
            
        # Loop
        for t in range(ep_len_rollout):
            try:
                outs = self.get_action(self.o,deterministic=False) 
            except:
                print('[E] get action')
                
            try:
                self.a,self.v_t,self.logp_t = outs
            except:
                print('[E] distribute outs')
                
            self.o2,self.r,self.d,_ = self.env.step(self.a[0])
            
            # Append
            self.obs_buf[t,:] = self.o
            self.act_buf[t,:] = self.a[0]
            self.rew_buf[t] = self.r
            self.val_buf[t] = self.v_t[0]
            self.logp_buf[t] = self.logp_t[0]
            
            # Save next state 
            self.o = self.o2
            
            if self.d:
                last_val = 0 if self.d else self.sess.run(
                    self.model['v'], feed_dict={self.model['o_ph']: self.o.reshape(1,-1)})[0]
                self.rew_buf[t+1] = self.r
                self.val_buf[t+1] = last_val
                
                # the next two lines implement GAE-Lambda advantage calculation
                # gamma = 0.99, lamda: 0.95
                deltas = self.rew_buf[:-1] + 0.99 * self.val_buf[1:] - self.val_buf[:-1]
                self.adv_buf = discount_cumsum(deltas, 0.99 * 0.95)

                # the next line computes rewards-to-go, to be targets for the value function
                self.ret_buf = discount_cumsum(self.rew_buf, 0.99)[:-1]
                
                self.o = self.env.reset() # reset when done
                
        return self.obs_buf,self.act_buf, \
            self.rew_buf,self.val_buf, \
            self.logp_buf,self.ret_buf, \
            self.adv_buf
    
print ("Rollout worker classes (with and without RAY) ready.")

Rollout worker classes (with and without RAY) ready.


In [5]:
# vb = np.zeros((100,1))
# ab = np.zeros((100,8))

In [6]:
# a = [[0.96776974,-0.7034082,0.5083786,0.06832138,0.2833113,0.32790965,0.8092029,-0.29485628]]
# v_t = [0.00756513]

In [7]:
# vb[t,:] = v_t

In [8]:
# ab[0:3,:]

### Initilize PyBullet Ant Environment

In [9]:
n_cpu = 11
n_workers = 10
total_steps,evaluate_every,print_every = 50000,200,10
ep_len_rollout = 50*10 # 10sec rollout
batch_size,update_count = 128,ep_len_rollout
num_eval,max_ep_len_eval = 5,15e3

### Initialize Workers

In [10]:
ray.init(num_cpus=n_cpu,
         memory = 5*1024*1024*1024,
         object_store_memory = 10*1024*1024*1024,
         driver_object_store_memory = 1*1024*1024*1024)
tf.reset_default_graph()
R = RolloutWorkerClass(pi_lr=3e-4,vf_lr=1e-3,clip_ratio=0.2,seed=1)
workers = [RayRolloutWorkerClass.remote(worker_id=i,ep_len_rollout=ep_len_rollout) 
           for i in range(n_workers)]
print ("RAY initialized with [%d] cpus and [%d] workers."%
       (n_cpu,n_workers))

2020-06-29 21:27:18,780	INFO resource_spec.py:212 -- Starting Ray with 4.98 GiB memory available for workers and up to 10.0 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-06-29 21:27:19,146	INFO services.py:1170 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


RAY initialized with [11] cpus and [10] workers.


In [11]:
time.sleep(1)

### Replay Buffers

In [12]:
eval_env = get_eval_env()
adim,odim = eval_env.action_space.shape[0],eval_env.observation_space.shape[0]
buf = PPOBuffer(odim=odim, adim=adim, size=int(10e6), gamma=0.99, lam=0.95)

### Loop

In [13]:
start_time = time.time()
n_env_step = 0 # number of environment steps
train_pi_iters = train_v_iters = 80
target_kl = 0.01

for t in range(int(total_steps)):
    esec = time.time()-start_time
    
    # Synchronize worker weights
    weights = R.get_weights()
    set_weights_list = [worker.set_weights.remote(weights) for worker in workers] 
    
    # Make rollout and accumulate to Buffers
    ops = [worker.rollout.remote() for worker in workers]
    rollout_vals = ray.get(ops)
    for rollout_val in rollout_vals:
        o_buffer,a_buffer,r_buffer,v_buffer,logp_buffer,ret_buffer,adv_buffer = rollout_val
        for i in range(ep_len_rollout):
            o,a,r,v,logp,ret,adv \
                = o_buffer[i,:],a_buffer[i,:],r_buffer[i],v_buffer[i], \
                  logp_buffer[i],ret_buffer[i],adv_buffer[i]
            buf.store(o, a, r, v, logp, ret, adv) 
            n_env_step += 1
            
            o, ep_ret, ep_len = eval_env.reset(), 0, 0
    
    # Update
    for _ in range(int(update_count)):
        inputs = {k:v for k,v in zip(R.model['all_phs'], buf.get())}
        pi_l_old, v_l_old, ent = sess.run([R.graph['pi_loss'], R.graph['v_loss'], R.graph['approx_ent']], 
                                          feed_dict=inputs)
        
        # Training
        for i in range(train_pi_iters):
            _, kl = sess.run([R.graph['train_pi'], R.graph['approx_kl']], feed_dict=inputs)
            if kl > 1.5 * target_kl:
                break
        for _ in range(train_v_iters):
            sess.run(R.graph['train_v'], feed_dict=inputs)
            
        # Log changes from update
        pi_l_new,v_l_new,kl,cf = sess.run(
            [graph['pi_loss'],graph['v_loss'],graph['approx_kl'],graph['clipfrac']],
            feed_dict=feeds)

    # Evaluate
    if (t == 0) or (((t+1)%evaluate_every) == 0) or (t == (total_steps-1)): 
        ram_percent = psutil.virtual_memory().percent # memory usage
        print ("[Evaluate] step:[%d/%d][%.1f%%] #step:[%.1e] time:[%s] ram:[%.1f%%]."%
               (t+1,total_steps,t/total_steps*100,
                n_env_step,
                time.strftime("%H:%M:%S", time.gmtime(time.time()-start_time)),
                ram_percent))
        
        for eval_idx in range(num_eval): 
            o,d,ep_ret,ep_len = eval_env.reset(),False,0,0
#             _ = eval_env.render(mode='human') 
            while not(d or (ep_len == max_ep_len_eval)):
                a,_,_ = R.get_action(o,deterministic=True)
                o,r,d,_ = eval_env.step(a)
#                 _ = eval_env.render(mode='human') 
                ep_ret += r # compute return 
                ep_len += 1
            print ("[Evaluate] [%d/%d] ep_ret:[%.4f] ep_len:[%d]"
                %(eval_idx,num_eval,ep_ret,ep_len)) 
            
        # Save current SAC model and replay buffers 
        npz_path = '../data/net/pybullet_ant/ppo_model_and_buffers.npz'
        save_ppo_model_and_buffer(npz_path,R,ppobuf,VERBOSE=False)
    
print ("Done.")

[2m[36m(pid=57976)[0m Ray Worker [5] Ready.
[2m[36m(pid=57974)[0m Ray Worker [8] Ready.
[2m[36m(pid=57977)[0m Ray Worker [2] Ready.
[2m[36m(pid=57982)[0m Ray Worker [6] Ready.
[2m[36m(pid=57980)[0m Ray Worker [4] Ready.
[2m[36m(pid=57979)[0m Ray Worker [0] Ready.
[2m[36m(pid=57978)[0m Ray Worker [1] Ready.
[2m[36m(pid=57983)[0m Ray Worker [3] Ready.
[2m[36m(pid=57975)[0m Ray Worker [9] Ready.
[2m[36m(pid=57973)[0m Ray Worker [7] Ready.


RayTaskError(IndexError): [36mray::RayRolloutWorkerClass.rollout()[39m (pid=57976, ip=172.30.1.25)
  File "python/ray/_raylet.pyx", line 463, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 417, in ray._raylet.execute_task.function_executor
  File "<ipython-input-4-5fc80bb7f9e8>", line 163, in rollout
IndexError: too many indices for array

### Close

In [None]:
eval_env.close()

In [None]:
ray.shutdown()

### Save model weights and replay buffers

In [None]:
# Path to save the npz file 
npz_path = '../data/net/pybullet_ant/ppo_model_and_buffers_final.npz'
save_ppo_model_and_buffer(npz_path,R,ppobuf,VERBOSE=False)

### Reset the worker

In [None]:
R.sess.run(tf.global_variables_initializer())

### Load and assign model weights

In [None]:
# Load npz
npz_path = '../data/net/pybullet_ant/model_and_buffers_final.npz'
restore_ppo_model_and_buffer(npz_path,R,ppobuf,VERBOSE=True)

### Test-Run

In [None]:
eval_env = get_eval_env()
o,d,ep_ret,ep_len = eval_env.reset(),False,0,0
_ = eval_env.render(mode='human') 
while not(d or (ep_len == max_ep_len_eval)):
    a = R.get_action(o,deterministic=True)
    o,r,d,_ = eval_env.step(a)
    _ = eval_env.render(mode='human') 
    ep_ret += r # compute return 
    ep_len += 1
print ("[Evaluate] ep_ret:[%.4f] ep_len:[%d]"
    %(ep_ret,ep_len))
eval_env.close() # close env