# Basic Ray Usage

In [1]:
import os,ray,time
import numpy as np
print ("Ready.")

Ready.


### Central worker and distributed workers

In [2]:
class CentralWorkerClass(object):
    """
    Central Worker
    """
    def __init__(self):
        self.data = []
    def get_data(self):
        return self.data
    def get_max_datum(self):
        if len(self.data) == 0:
            max_datum = 0
        else:
            max_datum = np.max(np.asarray(self.data))
        return max_datum
    def set_data(self,datum):
        self.data.append(datum)
        
@ray.remote
class DistributedWorkerClass(object):
    """
    Distributed Worker
    """
    def __init__(self,worker_id=0):
        self.id = worker_id 
        self.datum = 0
        self.tick = 0
        print ("[%d] worker ready."%(self.id))
    def rollout(self,val):
        time.sleep(1.0) # wait for 1 second
        self.datum = val+np.random.randn()
    def read_datum(self):
        return self.datum
    def set_tick(self,tick):
        self.tick = tick
    def get_tick(self):
        return self.tick 
print ("Ready")

Ready


### Initialize workers

In [3]:
C = CentralWorkerClass() # init central worker
n_worker = 4
ray.init(num_cpus=n_worker); # init Ray
# Initialize distributed workers
workers = [DistributedWorkerClass.remote(worker_id=wid) for wid in range(n_worker)]
# Tick
get_tick_ops = [worker.get_tick.remote() for worker in workers] # non-block
get_tick_vals = ray.get(get_tick_ops)
print (" worker ticks:%s"%(get_tick_vals))

2020-08-19 10:39:16,966	INFO resource_spec.py:231 -- Starting Ray with 37.06 GiB memory available for workers and up to 18.54 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-08-19 10:39:17,430	INFO services.py:1193 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


 worker ticks:[0, 0, 0, 0]


### Run

In [4]:
max_tick = 5
for tick in range(max_tick):
    print ("tick:[%d/%d]"%(tick,max_tick))
    
    # Set tick to rollout workers
    set_tick_ops = [worker.set_tick.remote(tick) for worker in workers] # non-block
    get_tick_ops = [worker.get_tick.remote() for worker in workers] # non-block
    get_tick_vals = ray.get(get_tick_ops)
    print (" worker ticks:%s"%(get_tick_vals))
    
    # Rollout (non-block operation)
    start = time.time()
    rollout_list = [worker.rollout.remote(C.get_max_datum()) for worker in workers] 
    print (" Rollouts took [%.3f]sec."%(time.time()-start))

    # Read rollout data (block operation)
    start = time.time()
    read_ops = [worker.read_datum.remote() for worker in workers] # block
    rollout_vals = ray.get(read_ops) # array
    print (" Reading  took [%.3f]sec."%(time.time()-start))

    # Append rollout data to the central worker
    for rollout_val in rollout_vals:
        C.set_data(datum=rollout_val)
      
    # Print
    data = C.get_data()
    n_data = len(data)
    print (" n_data:[%d] max_data:[%.3f]"%(n_data,C.get_max_datum()))

[2m[36m(pid=29384)[0m [1] worker ready.
[2m[36m(pid=29385)[0m [2] worker ready.
[2m[36m(pid=29383)[0m [3] worker ready.
[2m[36m(pid=29386)[0m [0] worker ready.
tick:[0/5]
 worker ticks:[0, 0, 0, 0]
 Rollouts took [0.001]sec.
 Reading  took [1.004]sec.
 n_data:[4] max_data:[0.261]
tick:[1/5]
 worker ticks:[1, 1, 1, 1]
 Rollouts took [0.004]sec.
 Reading  took [1.005]sec.
 n_data:[8] max_data:[1.217]
tick:[2/5]
 worker ticks:[2, 2, 2, 2]
 Rollouts took [0.004]sec.
 Reading  took [1.005]sec.
 n_data:[12] max_data:[1.935]
tick:[3/5]
 worker ticks:[3, 3, 3, 3]
 Rollouts took [0.004]sec.
 Reading  took [1.005]sec.
 n_data:[16] max_data:[2.328]
tick:[4/5]
 worker ticks:[4, 4, 4, 4]
 Rollouts took [0.004]sec.
 Reading  took [1.005]sec.
 n_data:[20] max_data:[2.412]


### Shutdow Ray

In [5]:
ray.shutdown()
print ("Ray shutdown.")

Ray shutdown.
