## **Tutorial**

Ray is a distributed execution engine to efficiently achieve multiprocessing.

### **Overview**

1. Multiple **worker** processess execute tasks and store results in object stores. Each worker is a sepereate process.
2. One **object store** per node stores immutable objects in shared memory and allows workers to access.
3. One **local scheduler** per node assign tasks to workers on the same node.
4. A **driver** is the Python process that user controls. Driver submits tasks to its local scheduler.
5. A **Redis server** maintains much of the system's state. 

In [1]:
import ray
ray.init()

2019-03-12 09:45:29,965	INFO node.py:439 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-03-12_09-45-29_52245/logs.
2019-03-12 09:45:30,074	INFO services.py:364 -- Waiting for redis server at 127.0.0.1:17891 to respond...
2019-03-12 09:45:30,187	INFO services.py:364 -- Waiting for redis server at 127.0.0.1:26094 to respond...
2019-03-12 09:45:30,191	INFO services.py:761 -- Starting Redis shard with 3.44 GB max memory.
2019-03-12 09:45:30,201	INFO services.py:1449 -- Starting the Plasma object store with 5.15 GB memory using /tmp.


{'node_ip_address': None,
 'redis_address': '10.30.198.126:17891',
 'object_store_address': '/tmp/ray/session_2019-03-12_09-45-29_52245/sockets/plasma_store',
 'webui_url': None,
 'raylet_socket_name': '/tmp/ray/session_2019-03-12_09-45-29_52245/sockets/raylet'}

## **Immutable remote objects**

Ray can create and compute **Remote Objects**. Objects are referred by its **object IDs**. <br> 
Remote objects are stored in **object store**. Remote objects are immutable.

### **Put and Get**

***ray.get*** and ***ray.put*** can be used to convert python objects and object IDs <br>
***ray.put(x)*** takes python object and copies it to the local object store.<br>
***ray.get(x_id)*** takes an object ID and creates a python object.

In [2]:
x = 'example'
x_id = ray.put(x)
x_id

ObjectID(ffffffff51c4ca3ba26d0531464bbf1613bba1f6)

In [3]:
ray.get(x_id)

'example'

In [4]:
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

## **Asynchronous Computation**

Ray function enables arbitary Python function to be executed asynchronously

In [5]:
def add1(a,b):
    return a + b

***add1(1,2)*** returns ***3*** and causes the Python interpreter to block until the computation has finished.

In [6]:
@ray.remote
def add2(a,b):
    return a + b

***add2.remote(1,2)*** returns an object IDs and creates **task**. The task can be executed asynchronously.

In [7]:
x_id = add2.remote(1,2)
ray.get(x_id)

3

### **Remote functions**

In [8]:
import time

def f1():
    time.sleep(1)

@ray.remote
def f2():
    time.sleep(1)

In [9]:
%%time
li = [f1() for _ in range(10)]

CPU times: user 151 ms, sys: 75.8 ms, total: 227 ms
Wall time: 10 s


In [10]:
%%time
li = ray.get([f2.remote() for _ in range(10)])

CPU times: user 46.6 ms, sys: 24.5 ms, total: 71.1 ms
Wall time: 3.01 s


In [11]:
%%time
li = ray.get([f2.remote() for _ in range(48)])

CPU times: user 177 ms, sys: 86.5 ms, total: 263 ms
Wall time: 12 s


In [12]:
%%time
li = ray.get([f2.remote() for _ in range(49)])

CPU times: user 185 ms, sys: 94.5 ms, total: 280 ms
Wall time: 13 s


Tasks are distributed to each core. <br>Since my cpu has 12 cores available, 48 tasks take 4 seconds while 49 tasks take 5 seconds

<br>**Each argument in remote functions may be passed by its value or by object ID**

In [13]:
print(ray.get(add2.remote(1,2)))
print(ray.get(add2.remote(1, ray.put(2))))
print(ray.get(add2.remote(ray.put(1), ray.put(2))))

3
3
3


<br>**Multipile object IDs**

In [14]:
@ray.remote(num_return_vals = 3)
def return_multiple():
    return 1,2,3

a_id, b_id, c_id = return_multiple.remote()

### **Expressing dependencies between tasks**

In [15]:
@ray.remote
def f(x):
    return x+1

x = f.remote(0)
y = f.remote(x)
z = f.remote(y)

ray.get(z)

3

second and third line will not executed until the previous one has finished. <br>In above example, **parallelism** can **not** be achieved.

In [16]:
import numpy as np

@ray.remote
def generate_data_ray():
    return np.random.normal(1000)

@ray.remote
def aggregate_data(x,y):
    return x+ y

In [17]:
# Below code will launch 100 tasks that will be scheduled on various nodes.
data = [generate_data_ray.remote() for _ in range(100)]

# Perform a tree reduce
while len(data) > 1:
    data.append(aggregate_data.remote(data.pop(0), data.pop(0)))
    
ray.get(data)

[99976.15457874804]

### **Remote Functions within Remote Functions**

All of the above examples were calling the remote functions only from the driver. <br>
Below example is the worker process calling the remote functions.

In [18]:
@ray.remote
def sub_experiment(i, j):
    # Run the jth sub-experiment for the ith experiment.
    return i + j

@ray.remote
def run_experiment(i):
    sub_results = []
    # Launch tasks to perform 10 sub-experiments in parallel.
    for j in range(10):
        sub_results.append(sub_experiment.remote(i,j))
    
    return sum(ray.get(sub_results))

results = [run_experiment.remote(i) for i in range(5)]
ray.get(results)

[45, 55, 65, 75, 85]