# Synchronous SAC with PyBullet Ant

In [1]:
import datetime,gym,os,pybullet_envs,time,os,psutil,ray
import numpy as np
import tensorflow as tf
from util import gpu_sess,suppress_tf_warning
from sac import ReplayBuffer,create_sac_model,create_sac_graph
np.set_printoptions(precision=2)
suppress_tf_warning() # suppress warning 
gym.logger.set_level(40) # gym logger 
print ("Packaged loaded. TF version is [%s]."%(tf.__version__))

Packaged loaded. TF version is [1.14.0].


### Rollout Worker

In [2]:
class RolloutWorkerClass(object):
    """
    Worker without RAY (for update purposes)
    """
    def __init__(self,lr=1e-3,gamma=0.99,alpha=0.1,polyak=0.995,seed=1):
        self.seed = seed
        # Each worker should maintain its own environment
        import pybullet_envs,gym
        from util import suppress_tf_warning
        suppress_tf_warning() # suppress TF warnings
        gym.logger.set_level(40) # gym logger 
        
        self.env = gym.make('AntBulletEnv-v0')
        odim,adim = self.env.observation_space.shape[0],self.env.action_space.shape[0]
        self.odim = odim
        self.adim = adim
        
        # Create SAC model and computational graph 
        self.model,self.sess = create_sac_model(odim=self.odim,adim=self.adim)
        self.step_ops,self.target_init = \
            create_sac_graph(self.model,lr=lr,gamma=gamma,alpha=alpha,polyak=polyak)
        
        # Initialize model 
        tf.set_random_seed(self.seed)
        np.random.seed(self.seed)
        self.sess.run(tf.global_variables_initializer())
        self.sess.run(self.target_init)
    
    def get_action(self,o,deterministic=False):
        act_op = self.model['mu'] if deterministic else self.model['pi']
        return self.sess.run(act_op, feed_dict={self.model['o_ph']:o.reshape(1,-1)})[0]

    def get_weights(self):
        """
        Get weights
        """
        weight_vals = self.sess.run(self.model['main_vars'])
        return weight_vals
    
@ray.remote
class RayRolloutWorkerClass(object):
    """
    Rollout Worker with RAY
    """
    def __init__(self,worker_id=0,ep_len_rollout=1000):
        # Parse
        self.worker_id = worker_id
        self.ep_len_rollout = ep_len_rollout
        # Each worker should maintain its own environment
        import pybullet_envs,gym
        from util import suppress_tf_warning
        suppress_tf_warning() # suppress TF warnings
        gym.logger.set_level(40) # gym logger 

        self.env = gym.make('AntBulletEnv-v0')
        odim,adim = self.env.observation_space.shape[0],self.env.action_space.shape[0]
        self.odim = odim
        self.adim = adim
        # Replay buffers to pass
        self.o_buffer = np.zeros((self.ep_len_rollout,self.odim))
        self.a_buffer = np.zeros((self.ep_len_rollout,self.adim))
        self.r_buffer = np.zeros((self.ep_len_rollout))
        self.o2_buffer = np.zeros((self.ep_len_rollout,self.odim))
        self.d_buffer = np.zeros((self.ep_len_rollout))
        # Create SAC model
        self.model,self.sess = create_sac_model(odim=self.odim,adim=self.adim)
        self.sess.run(tf.global_variables_initializer())
        print ("Ray Worker [%d] Ready."%(self.worker_id))
        
        # Flag to initialize assign operations for 'set_weights()'
        self.FIRST_SET_FLAG = True
        
        # Flag to initialize rollout
        self.FIRST_ROLLOUT_FLAG = True
        
    def get_action(self,o,deterministic=False):
        act_op = self.model['mu'] if deterministic else self.model['pi']
        return self.sess.run(act_op, feed_dict={self.model['o_ph']:o.reshape(1,-1)})[0]
    
    def set_weights(self,weight_vals):
        """
        Set weights without memory leakage
        """
        if self.FIRST_SET_FLAG:
            self.FIRST_SET_FLAG = False
            self.assign_placeholders = []
            self.assign_ops = []
            for w_idx,weight_tf_var in enumerate(self.model['main_vars']):
                a = weight_tf_var
                assign_placeholder = tf.placeholder(a.dtype, shape=a.get_shape())
                assign_op = a.assign(assign_placeholder)
                self.assign_placeholders.append(assign_placeholder)
                self.assign_ops.append(assign_op)
        for w_idx,weight_tf_var in enumerate(self.model['main_vars']):
            # Memory-leakage-free assign (hopefully)
            self.sess.run(self.assign_ops[w_idx],
                          {self.assign_placeholders[w_idx]:weight_vals[w_idx]})
            
    def rollout(self):
        """
        Rollout
        """
        if self.FIRST_ROLLOUT_FLAG:
            self.FIRST_ROLLOUT_FLAG = False
            self.o = self.env.reset() # reset environment
        # Loop
        for t in range(ep_len_rollout):
            self.a = self.get_action(self.o,deterministic=False) 
            self.o2,self.r,self.d,_ = self.env.step(self.a)
            # Append
            self.o_buffer[t,:] = self.o
            self.a_buffer[t,:] = self.a
            self.r_buffer[t] = self.r
            self.o2_buffer[t,:] = self.o2
            self.d_buffer[t] = self.d
            # Save next state 
            self.o = self.o2
            if self.d: self.o = self.env.reset() # reset when done 
        return self.o_buffer,self.a_buffer,self.r_buffer,self.o2_buffer,self.d_buffer
    
print ("Rollout worker classes (with and without RAY) ready.")

Rollout worker classes (with and without RAY) ready.


### Initilize PyBullet Ant Environment

In [3]:
env_name = 'AntBulletEnv-v0'
test_env = gym.make(env_name)
_ = test_env.render(mode='human') # enable rendering on test_env
_ = test_env.reset()
for _ in range(3): # dummy run for proper rendering 
    a = test_env.action_space.sample()
    o,r,d,_ = test_env.step(a)
    time.sleep(0.01)
print ("[%s] ready."%(env_name))

[AntBulletEnv-v0] ready.


In [4]:
odim,adim = o.shape[0],a.shape[0]

In [5]:
n_cpu = n_workers = 15
total_steps,evaluate_every = 5000,200
ep_len_rollout = 100
batch_size,update_count = 128,ep_len_rollout
num_eval,max_ep_len_eval = 3,1e3

### Initialize Workers

In [6]:
ray.init(num_cpus=n_cpu,
         memory = 5*1024*1024*1024,
         object_store_memory = 10*1024*1024*1024,
         driver_object_store_memory = 1*1024*1024*1024)
tf.reset_default_graph()
R = RolloutWorkerClass(lr=1e-3,gamma=0.99,alpha=0.1,polyak=0.995,seed=0)
workers = [RayRolloutWorkerClass.remote(worker_id=i,ep_len_rollout=ep_len_rollout) 
           for i in range(n_workers)]
print ("RAY initialized with [%d] cpus and [%d] workers."%
       (n_cpu,n_workers))

2020-06-22 01:30:12,926	INFO resource_spec.py:212 -- Starting Ray with 4.98 GiB memory available for workers and up to 10.0 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-06-22 01:30:13,317	INFO services.py:1078 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


RAY initialized with [15] cpus and [15] workers.


In [7]:
time.sleep(1)

### Replay Buffers

In [8]:
replay_buffer = ReplayBuffer(odim=odim,adim=adim,size=int(1e6))
replay_buffer_short = ReplayBuffer(odim=odim,adim=adim,size=int(1e5))

### Loop

In [9]:
start_time = time.time()
n_env_step = 0 # number of environment steps
for t in range(int(total_steps)):
    esec = time.time()-start_time
    
    # Synchronize worker weights
    weights = R.get_weights()
    set_weights_list = [worker.set_weights.remote(weights) for worker in workers] 
    
    # Make rollout and accumulate to Buffers
    ops = [worker.rollout.remote() for worker in workers]
    rollout_vals = ray.get(ops)
    for rollout_val in rollout_vals:
        o_buffer,a_buffer,r_buffer,o2_buffer,d_buffer = rollout_val
        for i in range(ep_len_rollout):
            o,a,r,o2,d = o_buffer[i,:],a_buffer[i,:],r_buffer[i],o2_buffer[i,:],d_buffer[i]
            replay_buffer.store(o, a, r, o2, d) 
            replay_buffer_short.store(o, a, r, o2, d) 
            n_env_step += 1
    
    # Update
    for _ in range(int(update_count)):
        batch = replay_buffer.sample_batch(batch_size//2) 
        batch_short = replay_buffer_short.sample_batch(batch_size//2) 
        feed_dict = {R.model['o_ph']: np.concatenate((batch['obs1'],batch_short['obs1'])),
                     R.model['o2_ph']: np.concatenate((batch['obs2'],batch_short['obs2'])),
                     R.model['a_ph']: np.concatenate((batch['acts'],batch_short['acts'])),
                     R.model['r_ph']: np.concatenate((batch['rews'],batch_short['rews'])),
                     R.model['d_ph']: np.concatenate((batch['done'],batch_short['done']))
                    }
        outs = R.sess.run(R.step_ops, feed_dict)
    
    # Evaluate
    if (t == 0) or (((t+1)%evaluate_every) == 0): 
        ram_percent = psutil.virtual_memory().percent # memory usage
        print ("[Evaluate] step:[%d/%d][%.1f%%] #step:[%.1e] time:[%s] ram:[%.1f%%]."%
               (t+1,total_steps,t/total_steps*100,
                n_env_step,
                time.strftime("%H:%M:%S", time.gmtime(time.time()-start_time)),
                ram_percent)
              )
        for eval_idx in range(num_eval): 
            o,d,ep_ret,ep_len = test_env.reset(),False,0,0
            _ = test_env.render(mode='human') 
            while not(d or (ep_len == max_ep_len_eval)):
                a = R.get_action(o,deterministic=True)
                o,r,d,_ = test_env.step(a)
                _ = test_env.render(mode='human') 
                ep_ret += r # compute return 
                ep_len += 1
            print ("[Evaluate] [%d/%d] ep_ret:[%.4f] ep_len:[%d]"
                %(eval_idx,num_eval,ep_ret,ep_len))
    
print ("Done.")

[2m[36m(pid=23270)[0m Ray Worker [14] Ready.
[2m[36m(pid=23267)[0m Ray Worker [13] Ready.
[2m[36m(pid=23274)[0m Ray Worker [4] Ready.
[2m[36m(pid=23272)[0m Ray Worker [2] Ready.
[2m[36m(pid=23277)[0m Ray Worker [0] Ready.
[2m[36m(pid=23276)[0m Ray Worker [12] Ready.
[2m[36m(pid=23279)[0m Ray Worker [8] Ready.
[2m[36m(pid=23269)[0m Ray Worker [7] Ready.
[2m[36m(pid=23280)[0m Ray Worker [3] Ready.
[2m[36m(pid=23278)[0m Ray Worker [5] Ready.
[2m[36m(pid=23271)[0m Ray Worker [10] Ready.
[2m[36m(pid=23268)[0m Ray Worker [1] Ready.
[2m[36m(pid=23273)[0m Ray Worker [9] Ready.
[2m[36m(pid=23275)[0m Ray Worker [11] Ready.
[2m[36m(pid=23266)[0m Ray Worker [6] Ready.
[Evaluate] step:[1/5000][0.0%] #step:[1.5e+03] time:[00:00:03] ram:[31.3%].
[Evaluate] [0/3] ep_ret:[8.8756] ep_len:[20]
[Evaluate] [1/3] ep_ret:[8.5296] ep_len:[20]
[Evaluate] [2/3] ep_ret:[8.7337] ep_len:[20]
[Evaluate] step:[200/5000][4.0%] #step:[3.0e+05] time:[00:03:21] ram:[35.0%].
[

[Evaluate] [2/3] ep_ret:[1924.9499] ep_len:[1000]
[2m[33m(pid=raylet)[0m E0622 02:34:52.481386 23251 node_manager.cc:3118] Plasma object ffffffffffffffffffffffff01000080fedf0000 was evicted before the raylet could pin it.
[2m[33m(pid=raylet)[0m E0622 02:37:03.308148 23251 node_manager.cc:3118] Plasma object ffffffffffffffffffffffff01000080d6e70000 was evicted before the raylet could pin it.
[Evaluate] step:[4000/5000][80.0%] #step:[6.0e+06] time:[01:07:29] ram:[37.0%].
[Evaluate] [0/3] ep_ret:[2163.5112] ep_len:[1000]
[Evaluate] [1/3] ep_ret:[1926.7001] ep_len:[1000]
[Evaluate] [2/3] ep_ret:[2056.0654] ep_len:[1000]
[2m[33m(pid=raylet)[0m E0622 02:38:07.672744 23251 node_manager.cc:3118] Plasma object ffffffffffffffffffffffff010000804deb0000 was evicted before the raylet could pin it.
[Evaluate] step:[4200/5000][84.0%] #step:[6.3e+06] time:[01:10:51] ram:[37.1%].
[Evaluate] [0/3] ep_ret:[1722.7748] ep_len:[1000]
[Evaluate] [1/3] ep_ret:[2016.1374] ep_len:[1000]
[Evaluate] [2/3

### Close

In [10]:
test_env.close()

In [11]:
ray.shutdown()

### Test-Run

In [12]:
gym.logger.set_level(40)
env_name = 'AntBulletEnv-v0'
test_env = gym.make(env_name)
_ = test_env.render(mode='human') # enable rendering on test_env
_ = test_env.reset()
for _ in range(3): # dummy run for proper rendering 
    a = test_env.action_space.sample()
    o,r,d,_ = test_env.step(a)
    time.sleep(0.01)
print ("[%s] ready."%(env_name))
o,d,ep_ret,ep_len = test_env.reset(),False,0,0
_ = test_env.render(mode='human') 
while not(d or (ep_len == max_ep_len_eval)):
    a = R.get_action(o,deterministic=True)
    o,r,d,_ = test_env.step(a)
    _ = test_env.render(mode='human') 
    ep_ret += r # compute return 
    ep_len += 1
print ("[Evaluate] ep_ret:[%.4f] ep_len:[%d]"
    %(eval_idx,ep_len))
test_env.close() # close env

[AntBulletEnv-v0] ready.
[Evaluate] ep_ret:[2.0000] ep_len:[1000]
