# Exercise 2 - Parallel Data Processing with Task Dependencies

**GOAL:** The goal of this exercise is to show how to pass object IDs into remote functions to encode dependencies between tasks.

In this exercise, we construct a sequence of tasks each of which depends on the previous mimicking a data parallel application. Within each sequence, tasks are executed serially, but multiple sequences can be executed in parallel.

In this exercise, you will use Ray to parallelize the computation below and speed it up.

### Concept for this Exercise - Task Dependencies

Suppose we have two remote functions defined as follows.

```python
@ray.remote
def f(x):
    return x
```

Arguments can be passed into remote functions as usual.

```python
>>> x1_id = f.remote(1)
>>> ray.get(x1_id)
1

>>> x2_id = f.remote([1, 2, 3])
>>> ray.get(x2_id)
[1, 2, 3]
```

**Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**.

```python
>>> y1_id = f.remote(x1_id)
>>> ray.get(y1_id)
1

>>> y2_id = f.remote(x2_id)
>>> ray.get(y2_id)
[1, 2, 3]
```

So when implementing a remote function, the function should expect a regular Python object regardless of whether the caller passes in a regular Python object or an object ID.

**Task dependencies affect scheduling.** In the example above, the task that creates `y1_id` depends on the task that creates `x1_id`. This has the following implications.

- The second task will not be executed until the first task has finished executing.
- If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to `x1_id`) will be copied over the network to the machine where the second task is scheduled.

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time



In [2]:
ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True)

2020-02-18 09:41:30,725	INFO resource_spec.py:212 -- Starting Ray with 2.05 GiB memory available for workers and up to 1.03 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '172.17.0.2',
 'redis_address': '172.17.0.2:25624',
 'object_store_address': '/tmp/ray/session_2020-02-18_09-41-30_685497_352/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-02-18_09-41-30_685497_352/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2020-02-18_09-41-30_685497_352'}

These are some helper functions that mimic an example pattern of a data parallel application.

**EXERCISE:** You will need to turn all of these functions into remote functions. When you turn these functions into remote function, you do not have to worry about whether the caller passes in an object ID or a regular object. In both cases, the arguments will be regular objects when the function executes. This means that even if you pass in an object ID, you **do not need to call `ray.get`** inside of these remote functions.

In [3]:
@ray.remote
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

@ray.remote
def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

@ray.remote
def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

@ray.remote
def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

assert hasattr(load_data, 'remote'), 'load_data must be a remote function'
assert hasattr(normalize_data, 'remote'), 'normalize_data must be a remote function'
assert hasattr(extract_features, 'remote'), 'extract_features must be a remote function'
assert hasattr(compute_loss, 'remote'), 'compute_loss must be a remote function'

**EXERCISE:** The loop below takes too long. Parallelize the four passes through the loop by turning `load_data`, `normalize_data`, `extract_features`, and `compute_loss` into remote functions and then retrieving the losses with `ray.get`.

**NOTE:** You should only use **ONE** call to `ray.get`. For example, the object ID returned by `load_data` should be passed directly into `normalize_data` without needing to be retrieved by the driver.

In [4]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    inner_start = time.time()

    data = load_data.remote(filename)
    normalized_data = normalize_data.remote(data)
    features = extract_features.remote(normalized_data)
    loss = compute_loss.remote(features)
    losses.append(loss)
    
    inner_end = time.time()
    
    if inner_end - inner_start >= 0.1:
        raise Exception('You may be calling ray.get inside of the for loop! '
                        'Doing this will prevent parallelism from being exposed. '
                        'Make sure to only call ray.get once outside of the for loop.')

losses = ray.get(losses)
print('The losses are {}.'.format(losses) + '\n')
loss = sum(losses)

end_time = time.time()
duration = end_time - start_time

print('The loss is {}. This took {} seconds. Run the next cell to see '
      'if the exercise was done correctly.'.format(loss, duration))

The losses are [1000.0, 1000.0, 1000.0, 1000.0].

The loss is 4000.0. This took 0.47521018981933594 seconds. Run the next cell to see if the exercise was done correctly.


**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [5]:
assert loss == 4000
assert duration < 0.8, ('The loop took {} seconds. This is too slow.'
                        .format(duration))
assert duration > 0.4, ('The loop took {} seconds. This is too fast.'
                        .format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.47521018981933594 seconds.


**EXERCISE:** Use the UI to view the task timeline and to verify that the four tasks were executed in parallel. You can do this as follows.

1. Run the following cell to generate a JSON file containing the profiling data.
2. Download the timeline file by right clicking on `timeline02.json` in the navigator to the left and choosing **"Download"**.
3. Open [chrome://tracing/](chrome://tracing/) in the Chrome web browser, click on the **"Load"** button and load the downloaded JSON file.

To navigate within the timeline, do the following.
- Move around by clicking and dragging.
- Zoom in and out by holding **alt** and scrolling.

**NOTE:** The timeline visualization will only work in **Chrome**.

In [6]:
ray.timeline(filename="timeline02.json")

### Application: Parallel web-scraping

One useful application of what we have learned so far is to scrape information from the web. We will illustrate this in a toy setting, but the same principles apply on a large scale where crawling through websites, parsing them and extracting useful content (e.g. for building a search index or populating a database) is often very computationally demanding.

We break up the process into multiple steps. We first grab the raw HTML of the website using Python's requests package. Then, we use BeautifulSoup to parse the HTML to find the relevant information. Finally, we populate a pandas DataFrames so that we are able to work with the data.

To demonstrate this, we scrape GitHub commits to see the latest commits on several repositories.

In [7]:
!pip install requests
!pip install bs4
!pip install lxml



In [8]:
from bs4 import BeautifulSoup
import requests

import pandas as pd

The following function uses these libraries to parse the latest commits from several repositories on GitHub.

In [12]:
@ray.remote
def fetch_commits(repo):
    url = 'https://github.com/{}/commits/master'.format(repo)
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'lxml')
    df = pd.DataFrame(columns=['title', 'link'])
    for g in soup.find_all(class_='commit-title'):
        entry = {}
        title = g.find_all(class_='message')[0]['aria-label']
        entry['title'] = title
        links = g.find_all(class_='issue-link')
        if len(links) >= 1:
            link = links[0]['data-url']
            entry['link'] = link
        df = df.append(pd.DataFrame(entry, index=[0]), sort=False)
    
    df['repository'] = repo
    return df

Let's try this out to get results for some ray related topics serially.

In [None]:
start = time.time()
repos = ["ray-project/ray", "modin-project/modin", "tensorflow/tensorflow", "apache/arrow"]
results = []
for repo in repos:
    df = fetch_commits.remote(repo)
    results.append(df)
    
df = pd.concat(ray.get(results), sort=False)
duration = time.time() - start
print("Constructing the dataframe took {} seconds.".format(duration))

**EXERCISE**: Speed up the above serial query by making `fetch_commits` a remote function in order to scrape GitHub results in parallel. Then, see a sample of the data scraped below and feel free to play with the data to find other resources to learn more about these libraries!

In [11]:
df

Unnamed: 0,title,link,repository
0,[core] Make sure to unsubscribe get dependenci...,,ray-project/ray
0,Removing Pyarrow dependency (#7146),https://github.com/ray-project/ray/issues/7146,ray-project/ray
0,Fix various issues/warnings that come up on Je...,https://github.com/ray-project/ray/issues/7147,ray-project/ray
0,Ssh command format (#7176),https://github.com/ray-project/ray/issues/7176,ray-project/ray
0,[Serve] Added support for no http route servic...,https://github.com/ray-project/ray/issues/7010,ray-project/ray
...,...,...,...
0,ARROW-2447: [C++] Device and MemoryManager API...,,apache/arrow
0,ARROW-7722: [FlightRPC][Java] disable flaky Fl...,,apache/arrow
0,PARQUET-1770: [C++][CI] Add fuzz target for re...,,apache/arrow
0,ARROW-7754: [C++] Make Result<> faster\n\nThe ...,,apache/arrow
