<a href="https://colab.research.google.com/github/hectic97/Boostcamp-AI-Tech/blob/main/U-stage/Week8/parallel_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# install ray
!pip install ray

Collecting ray
[?25l  Downloading https://files.pythonhosted.org/packages/11/14/15d0f0aec20a4674a996429160565a071688f27f49f789327ebed8188ffb/ray-1.2.0-cp37-cp37m-manylinux2014_x86_64.whl (47.5MB)
[K     |████████████████████████████████| 47.5MB 91kB/s 
[?25hCollecting aiohttp
[?25l  Downloading https://files.pythonhosted.org/packages/88/c0/5890b4c8b04a79b7360e8fe4490feb0bb3ab179743f199f0e6220cebd568/aiohttp-3.7.4.post0-cp37-cp37m-manylinux2014_x86_64.whl (1.3MB)
[K     |████████████████████████████████| 1.3MB 41.2MB/s 
Collecting py-spy>=0.2.0
[?25l  Downloading https://files.pythonhosted.org/packages/0c/b7/2056a6f06adb93f679f2a1e415dd33219b7c66ba69b8fd2ff1668b8064ed/py_spy-0.3.4-py2.py3-none-manylinux1_x86_64.whl (3.2MB)
[K     |████████████████████████████████| 3.2MB 27.2MB/s 
Collecting colorful
[?25l  Downloading https://files.pythonhosted.org/packages/b0/8e/e386e248266952d24d73ed734c2f5513f34d9557032618c8910e605dfaf6/colorful-0.5.4-py2.py3-none-any.whl (201kB)
[K     |███

In [3]:
import ray
import time

# Check node
ray.init()

2021-03-17 01:11:47,706	INFO services.py:1174 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'metrics_export_port': 57166,
 'node_id': '9acaf34c33c69eb046e5386b020b58c2f62dc4eb8caaf925bdd79fa8',
 'node_ip_address': '172.28.0.2',
 'object_store_address': '/tmp/ray/session_2021-03-17_01-11-47_007509_62/sockets/plasma_store',
 'raylet_ip_address': '172.28.0.2',
 'raylet_socket_name': '/tmp/ray/session_2021-03-17_01-11-47_007509_62/sockets/raylet',
 'redis_address': '172.28.0.2:6379',
 'session_dir': '/tmp/ray/session_2021-03-17_01-11-47_007509_62',
 'webui_url': '127.0.0.1:8265'}

## ray

remote function은 Ray의 프로세스에 의해 비동기적으로 실행됩니다. 따라서 아래의 코드는 ray에 의해서 비동기적으로 실행됩니다.

In [7]:
@ray.remote(num_cpus=1)
def f(x):
    time.sleep(1)
    return x

def g(x):
    time.sleep(1)
    return x

In [9]:
start = time.time()
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
    result_ids.append(f.remote(i))

# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids)  # [0, 1, 2, 3]

print("Ray 소요시간: {}".format(time.time() - start))

start = time.time()
result_ids = []
for i in range(4):
    result_ids.append(g(i))
print("No Ray 소요시간: {}".format(time.time()-start))

Ray 소요시간: 2.0075857639312744
No Ray 소요시간: 4.003834247589111


## Sequential

## Task Dependency

한 Task가 끝나야 다른 Task를 실행할 수 있는 경우가 있습니다. 이런 경우를 Task Dependency가 있다고 합니다.

병렬로 x_id와 y_id를 연산하고 z_id를 구하는 예제

```
[x_id, y_id] > z_id
```

In [15]:
import numpy as np

@ray.remote
def create_matrix(size):
    return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
    return np.dot(x, y)

size = 0 # check O(?) as # of input increases

# Get the results with ray.
start = time.time()
x_id = create_matrix.remote([size, size])
y_id = create_matrix.remote([size, size])
z_id = multiply_matrices.remote(x_id, y_id)
z = ray.get(z_id)
print("{} z_id_with_ray : {}".format(size, time.time() - start))

size = 10

start = time.time()
x_id = create_matrix.remote([size, size])
y_id = create_matrix.remote([size, size])
z_id = multiply_matrices.remote(x_id, y_id)
z = ray.get(z_id)
print("{} z_id_with_ray : {}".format(size,time.time() - start))

size = 100

start = time.time()
x_id = create_matrix.remote([size, size])
y_id = create_matrix.remote([size, size])
z_id = multiply_matrices.remote(x_id, y_id)
z = ray.get(z_id)
print("{} z_id_with_ray : {}".format(size,time.time() - start))

size = 1000

start = time.time()
x_id = create_matrix.remote([size, size])
y_id = create_matrix.remote([size, size])
z_id = multiply_matrices.remote(x_id, y_id)
z = ray.get(z_id)
print("{} z_id_with_ray : {}".format(size,time.time() - start))



0 z_id_with_ray : 0.03999590873718262
10 z_id_with_ray : 0.015011310577392578
100 z_id_with_ray : 0.019176006317138672
1000 z_id_with_ray : 0.3899657726287842


## Aggregation

연산을 어떻게 병렬로 배치하느냐에 따라서 보는 속도의 차이
![](https://miro.medium.com/max/1400/1*vHz3troEmr4uLns0V8VmdA.jpeg)


In [16]:
import time

@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y

In [17]:
start = time.time()
# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
print("Vanilla version : {}".format(time.time() - start))


start = time.time()
# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)
print("Advanced version : {}".format(time.time() - start))

Vanilla version : 7.0391762256622314
Advanced version : 4.01997971534729


## Reference

https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8