충돌이 발생할 수 있으므로 종속성 설치시 pyarrow 삭제.

In [1]:
!pip uninstall -y pyarrow
!pip install ray==1.10.0
!pip install bs4

Found existing installation: pyarrow 6.0.1
Uninstalling pyarrow-6.0.1:
  Successfully uninstalled pyarrow-6.0.1
Collecting ray==1.10.0
  Downloading ray-1.10.0-cp37-cp37m-manylinux2014_x86_64.whl (59.6 MB)
[K     |████████████████████████████████| 59.6 MB 1.3 MB/s 
Collecting redis>=3.5.0
  Downloading redis-4.1.3-py3-none-any.whl (173 kB)
[K     |████████████████████████████████| 173 kB 53.1 MB/s 
Collecting deprecated>=1.2.3
  Downloading Deprecated-1.2.13-py2.py3-none-any.whl (9.6 kB)
Installing collected packages: deprecated, redis, ray
Successfully installed deprecated-1.2.13 ray-1.10.0 redis-4.1.3


이번 exercise의 목표는 어떻게 간단하게 병렬처리를 할 수 있는지 보는 것.

`@ray.remote` 데코레이터를 이용하면 해당 함수는 병렬로 실행할 수 있음

In [2]:
import ray 

# 일반적인 방식
def regular_function():
    return 1

# 레이를 사용하는 방식
@ray.remote
def remote_function():
    return 1

3가지 차이가 존재
1. 호출시: remote 메서드를 통해 호출을 해야 함
2. 값 반환시: 일반적인 방식으로 호출하면 object ref가 출력이 되므로 ray.get을 통해 가져와야 함
3. 병렬 사용 방식의 코딩: 리스트를 생성하고 함수를 넣은 후 한번에 계산하는 방식을 사용

In [3]:
# 1. 호출시: remote 메서드를 통해 호출을 해야 함
# 2. 값 반환시: 일반적인 방식으로 호출하면 object ref가 출력이 되므로 ray.get을 통해 가져와야 함

print(regular_function())
print(remote_function.remote())
print(ray.get(remote_function.remote()))

1
ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000001000000)
1


In [4]:
# 3. 병렬 사용 방식의 코딩: 리스트를 생성하고 함수를 넣은 후 한번에 계산하는 방식을 사용

# 기본 방식
result = 0
for _ in range(4):
    result += regular_function()
print(result)

# ray사용 병렬 방식
results = []
for _ in range(4):
    results.append(remote_function.remote())
result = sum(ray.get(results))
print(result)

4
4


In [5]:
ray.shutdown()

In [6]:
ray.init(num_cpus=4)

# 연속해서 실행할 때 cpu중 하나가 백그라운드에서 init중일 수 있으므로 간단하게 멈춰줌
import time
time.sleep(0.2)

In [7]:
# 시간 측정하기
def slow_function(i):
    time.sleep(1)
    return i

@ray.remote
def fast_function(i):
    time.sleep(i)
    return i

# 느린실행
start_time = time.time()

slow_results = []
for i in range(4):
    slow_results.append(slow_function(i))

slow_duration = time.time() - start_time

# 빠른 실행
start_time = time.time()

fast_results = []
for i in range(4):
    fast_results.append(fast_function.remote(i))
fast_result = ray.get(fast_results)

fast_duration = time.time() - start_time

print(f"duration: {slow_duration}, results: {slow_results}")
print(f"duration: {fast_duration}, results: {fast_results}")

duration: 4.004502534866333, results: [0, 1, 2, 3]
duration: 4.022339344024658, results: [ObjectRef(480a853c2c4c6f27ffffffffffffffffffffffff0100000001000000), ObjectRef(623b26bdd75b28e9ffffffffffffffffffffffff0100000001000000), ObjectRef(1e9d04d3b7e4dfb2ffffffffffffffffffffffff0100000001000000), ObjectRef(609d7f556b6757adffffffffffffffffffffffff0100000001000000)]


In [8]:
# 실행 결과 저장
ray.timeline(filename="exercise_1.json")

object id를 remote 함수에 전달하여 task간 dependency를 구성하기

In [11]:
import numpy as np

@ray.remote
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

@ray.remote
def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

@ray.remote
def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

@ray.remote
def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

병렬처리가 되어있지 않다면 아래 코드는 1.6초 이상이 걸려야 한다. 각 함수는 0.1초씩 쉬므로 한번의 for문은 0.4초가 소요되기 때문.

그러나 병렬처리가 되므로 for문을 동시에 처리하기 때문에 0.4초보다 약간 더 긴시간이 소요되는 것을 볼 수 있다.


In [13]:
start_time = time.time()
losses = []
for filename in ["file1", "file2", "file3", "file4"]:
    inner_start = time.time()

    data = load_data.remote(filename)
    normalized_data = normalize_data.remote(data)
    features = extract_features.remote(normalized_data)
    loss = compute_loss.remote(features)
    losses.append(loss)

    inner_end = time.time()

losses = ray.get(losses)
print(losses)

loss = sum(losses)
duration = time.time() - start_time

print(loss, duration)

[1000.0, 1000.0, 1000.0, 1000.0]
4000.0 0.45316457748413086


크롤링을 병렬처리 해보기

In [14]:
from bs4 import BeautifulSoup
import requests

import pandas as pd

In [15]:
@ray.remote
def fetch_commits(repo):
    url = f"https://github.com/{repo}/commits/master"
    response = requests.get(url)
    soup = BeautifulSoup(response.text, "lxml")
    df = pd.DataFrame(columns=["title", "link"])
    for g in soup.find_all(class_="commit-title"):
        entry = {}
        title = g.find_all(class_="message")[0]["aria-label"]
        entry["title"] = title
        links = g.find_all(class_="issue-link")
        if len(links) >= 1:
            link = links[0]["data-url"]
            entry["link"] = link
        df = df.append(pd.DataFrame(entry, index=[0]), sort=False)

    df["repository"] = repo
    return df

In [23]:
start = time.time()
repos = ["ray-project/ray", "modin-project/modin", "apache/arrow", "tensorflow/tensorflow"]
results = []
for repo in repos:
    df = fetch_commits.remote(repo)
    results.append(df)

df = pd.concat(ray.get(results), sort=False)
duration = time.time() - start
print(duration)

1.0371742248535156


복잡한 병렬처리 작업

In [29]:
ray.init(num_cpus=9, ignore_reinit_error=True)


2022-02-11 12:49:25,166	INFO worker.py:863 -- Calling ray.init() again after it has already been called.


In [30]:
@ray.remote
def f():
    return 1

@ray.remote
def g():
    results = []
    for _ in range(4):
        results.append(f.remote())
    return results

@ray.remote
def h():
    results = []
    for _ in range(4):
        results.append(f.remote())
    return ray.get(results)

print(ray.get(g.remote()))
print(ray.get(h.remote()))

[ObjectRef(2991578514782464ffffffffffffffffffffffff0100000001000000), ObjectRef(68412c8a2854ffd8ffffffffffffffffffffffff0100000001000000), ObjectRef(2b56898beec276f3ffffffffffffffffffffffff0100000001000000), ObjectRef(94ead23600bc0d02ffffffffffffffffffffffff0100000001000000)]
[1, 1, 1, 1]


In [31]:
@ray.remote
def compute_gradient(data, current_model):
    time.sleep(0.03)
    return 1

@ray.remote
def train_model(hyperparameters):
    current_model = 0
    for _ in range(10):
        total_gradient = sum(ray.get([
                                      compute_gradient.remote(j, current_model) 
                                      for j in range(2)]))
        current_model += total_gradient

    return current_model

In [32]:
start_time = time.time()

results = []
for hyperparameters in [{"learning_rate": 1e-1, "batch_size": 100},
                        {"learning_rate": 1e-2, "batch_size": 100},
                        {"learning_rate": 1e-3, "batch_size": 100}]:
    results.append(train_model.remote(hyperparameters))

results = ray.get(results)

end_time = time.time()
duration = end_time - start_time

In [33]:
print(duration)

3.06950306892395
