# Exercise 1 - Simple Data Parallel Example

**GOAL:** The goal of this exercise is to show how to run simple tasks in parallel.

This script is too slow, and the computation is embarrassingly parallel. In this exercise, you will use Ray to execute the functions in parallel to speed it up.

### Concept for this Exercise - Remote Functions

The standard way to turn a Python function into a remote function is to add the `@ray.remote` decorator. Here is an example.

```python
# A regular Python function.
def regular_function():
    return 1

# A Ray remote function.
@ray.remote
def remote_function():
    return 1
```

The differences are the following:

1. **Invocation:** The regular version is called with `regular_function()`, whereas the remote version is called with `remote_function.remote()`.
2. **Return values:** `regular_function` immediately executes and returns `1`, whereas `remote_function` immediately returns an object ID (a future) and then creates a task that will be executed on a worker process. The result can be obtained with `ray.get`.
    ```python
    >>> regular_function()
    1
    
    >>> remote_function.remote()
    ObjectID(1c80d6937802cd7786ad25e50caf2f023c95e350)
    
    >>> ray.get(remote_function.remote())
    1
    ```
3. **Parallelism:** Invocations of `regular_function` happen **serially**, for example
    ```python
    # These happen serially.
    for _ in range(4):
        regular_function()
    ```
    whereas invocations of `remote_function` happen in **parallel**, for example
    ```python
    # These happen in parallel.
    for _ in range(4):
        remote_function.remote()
    ```

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
import time

Start Ray. By default, Ray does not schedule more tasks concurrently than there are CPUs. This example requires four tasks to run concurrently, so we tell Ray that there are four CPUs. Usually this is not done and Ray computes the number of CPUs using `psutil.cpu_count()`. The argument `ignore_reinit_error=True` just ignores errors if the cell is run multiple times.

The call to `ray.init` starts a number of processes.

In [3]:
ray.init(num_cpus=3, include_webui=False, ignore_reinit_error=True)

Calling ray.init() again after it has already been called.


**EXERCISE:** The function below is slow. Turn it into a remote function using the `@ray.remote` decorator.

In [3]:
# This function is a proxy for a more interesting and computationally
# intensive function.
@ray.remote
def slow_function(i):
    time.sleep(1)
    return i

**EXERCISE:** The loop below takes too long. The four function calls could be executed in parallel. Instead of four seconds, it should only take one second. Once `slow_function` has been made a remote function, execute these four tasks in parallel by calling `slow_function.remote()`. Then obtain the results by calling `ray.get` on a list of the resulting object IDs.

In [4]:
# Sleep a little to improve the accuracy of the timing measurements below.
# We do this because workers may still be starting up in the background.
time.sleep(2.0)
start_time = time.time()

results = ray.get([slow_function.remote(i) for i in range(3)])

end_time = time.time()
duration = end_time - start_time

print('The results are {}. This took {} seconds. Run the next cell to see '
      'if the exercise was done correctly.'.format(results, duration))

The results are [0, 1, 2]. This took 1.2578864097595215 seconds. Run the next cell to see if the exercise was done correctly.


**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [6]:
assert results == [0, 1, 2, 3], 'Did you remember to call ray.get?'
assert duration < 1.1, ('The loop took {} seconds. This is too slow.'
                        .format(duration))
assert duration > 1, ('The loop took {} seconds. This is too fast.'
                      .format(duration))

print('Success! The example took {} seconds.'.format(duration))

AssertionError: The loop took 1.2578864097595215 seconds. This is too slow.

**EXERCISE:** Use the UI to view the task timeline and to verify that the four tasks were executed in parallel. After running the cell below, you'll need to click on **View task timeline**".
- Using the **second** button, you can click and drag to **move** the timeline.
- Using the **third** button, you can click and drag to **zoom**. You can also zoom by holding "alt" and scrolling.

**NOTE:** Normally our UI is used as a separate Jupyter notebook. However, for simplicity we embedded the relevant feature here in this notebook.

**NOTE:** The first time you click **View task timeline** it may take **several minutes** to start up. This will change.

**NOTE:** If you run more tasks and want to regenerate the UI, you need to move the slider bar a little bit and then click **View task timeline** again.

**NOTE:** The timeline visualization may only work in **Chrome**.

In [7]:
import ray.experimental.ui as ui
ui.task_timeline()

Dropdown(description='Selection Options:', index=1, options=('% total time', '% total tasks'), value='% total …

HBox(children=(IntRangeSlider(value=(0, 100), continuous_update=False, description='%:'), VBox(children=(Float…

HBox(children=(Checkbox(value=True, layout=Layout(width='20px')), Label(value='Task submissions', layout=Layou…

HBox(children=(Label(value='View options:', layout=Layout(width='100px')), Dropdown(index=1, options=('Basic',…

Button(description='View task timeline', style=ButtonStyle())

# Exercise 2 - Parallel Data Processing with Task Dependencies

**GOAL:** The goal of this exercise is to show how to pass object IDs into remote functions to encode dependencies between tasks.

In this exercise, we construct a sequence of tasks each of which depends on the previous mimicking a data parallel application. Within each sequence, tasks are executed serially, but multiple sequences can be executed in parallel.

In this exercise, you will use Ray to parallelize the computation below and speed it up.

### Concept for this Exercise - Task Dependencies

Suppose we have two remote functions defined as follows.

```python
@ray.remote
def f(x):
    return x
```

Arguments can be passed into remote functions as usual.

```python
>>> x1_id = f.remote(1)
>>> ray.get(x1_id)
1

>>> x2_id = f.remote([1, 2, 3])
>>> ray.get(x2_id)
[1, 2, 3]
```

**Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**.

```python
>>> y1_id = f.remote(x1_id)
>>> ray.get(y1_id)
1

>>> y2_id = f.remote(x2_id)
>>> ray.get(y2_id)
[1, 2, 3]
```

So when implementing a remote function, the function should expect a regular Python object regardless of whether the caller passes in a regular Python object or an object ID.

**Task dependencies affect scheduling.** In the example above, the task that creates `y1_id` depends on the task that creates `x1_id`. This has the following implications.

- The second task will not be executed until the first task has finished executing.
- If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to `x1_id`) will be copied over the network to the machine where the second task is scheduled.

In [8]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [9]:
ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True)

Calling ray.init() again after it has already been called.


These are some helper functions that mimic an example pattern of a data parallel application.

**EXERCISE:** You will need to turn all of these functions into remote functions. When you turn these functions into remote function, you do not have to worry about whether the caller passes in an object ID or a regular object. In both cases, the arguments will be regular objects when the function executes. This means that even if you pass in an object ID, you **do not need to call `ray.get`** inside of these remote functions.

In [None]:
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

assert hasattr(load_data, 'remote'), 'load_data must be a remote function'
assert hasattr(normalize_data, 'remote'), 'normalize_data must be a remote function'
assert hasattr(extract_features, 'remote'), 'extract_features must be a remote function'
assert hasattr(compute_loss, 'remote'), 'compute_loss must be a remote function'

**EXERCISE:** The loop below takes too long. Parallelize the four passes through the loop by turning `load_data`, `normalize_data`, `extract_features`, and `compute_loss` into remote functions and then retrieving the losses with `ray.get`.

**NOTE:** You should only use **ONE** call to `ray.get`. For example, the object ID returned by `load_data` should be passed directly into `normalize_data` without needing to be retrieved by the driver.

In [None]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    inner_start = time.time()

    data = load_data.remote(filename)
    normalized_data = normalize_data.remote(data)
    features = extract_features.remote(normalized_data)
    loss = compute_loss.remote(features)
    losses.append(loss)
    
    inner_end = time.time()
    
    if inner_end - inner_start >= 0.1:
        raise Exception('You may be calling ray.get inside of the for loop! '
                        'Doing this will prevent parallelism from being exposed. '
                        'Make sure to only call ray.get once outside of the for loop.')

print('The losses are {}.'.format(losses) + '\n')
loss = sum(ray.get(losses))

end_time = time.time()
duration = end_time - start_time

print('The loss is {}. This took {} seconds. Run the next cell to see '
      'if the exercise was done correctly.'.format(loss, duration))

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [None]:
assert loss == 4000
assert duration < 0.8, ('The loop took {} seconds. This is too slow.'
                        .format(duration))
assert duration > 0.4, ('The loop took {} seconds. This is too fast.'
                        .format(duration))

print('Success! The example took {} seconds.'.format(duration))

**EXERCISE:** Use the UI to view the task timeline and to verify that the relevant tasks were executed in parallel. After running the cell below, you'll need to click on **View task timeline**".
- Using the **second** button, you can click and drag to **move** the timeline.
- Using the **third** button, you can click and drag to **zoom**. You can also zoom by holding "alt" and scrolling.

In the timeline, click on **View Options** and select **Flow Events** to visualize tasks dependencies.

In [None]:
import ray.experimental.ui as ui
ui.task_timeline()

# Exercise 3 - Nested Parallelism

**GOAL:** The goal of this exercise is to show how to create nested tasks by calling a remote function inside of another remote function.

In this exercise, you will implement the structure of a parallel hyperparameter sweep which trains a number of models in parallel. Each model will be trained using parallel gradient computations.

### Concepts for this Exercise - Nested Remote Functions

Remote functions can call other functions. For example, consider the following.

```python
@ray.remote
def f():
    return 1

@ray.remote
def g():
    # Call f 4 times and return the resulting object IDs.
    return [f.remote() for _ in range(4)]

@ray.remote
def h():
    # Call f 4 times, block until those 4 tasks finish,
    # retrieve the results, and return the values.
    return ray.get([f.remote() for _ in range(4)])
```

Then calling `g` and `h` produces the following behavior.

```python
>>> ray.get(g.remote())
[ObjectID(b1457ba0911ae84989aae86f89409e953dd9a80e),
 ObjectID(7c14a1d13a56d8dc01e800761a66f09201104275),
 ObjectID(99763728ffc1a2c0766a2000ebabded52514e9a6),
 ObjectID(9c2f372e1933b04b2936bb6f58161285829b9914)]

>>> ray.get(h.remote())
[1, 1, 1, 1]
```

**One limitation** is that the definition of `f` must come before the definitions of `g` and `h` because as soon as `g` is defined, it will be pickled and shipped to the workers, and so if `f` hasn't been defined yet, the definition will be incomplete.

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [None]:
ray.init(num_cpus=9, include_webui=False, ignore_reinit_error=True)

This example represents a hyperparameter sweep in which multiple models are trained in parallel. Each model training task also performs data parallel gradient computations.

**EXERCISE:** Turn `compute_gradient` and `train_model` into remote functions so that they can be executed in parallel. Inside of `train_model`, do the calls to `compute_gradient` in parallel and fetch the results using `ray.get`.

In [None]:
@ray.remote
def compute_gradient(data, current_model):
    time.sleep(0.03)
    return 1

@ray.remote
def train_model(hyperparameters):
    current_model = 0
    # Iteratively improve the current model. This outer loop cannot be parallelized.
    for _ in range(10):
        # EXERCISE: Parallelize the list comprehension in the line below. After you
        # turn "compute_gradient" into a remote function, you will need to call it
        # with ".remote". The results must be retrieved with "ray.get" before "sum"
        # is called.
        total_gradient = sum([compute_gradient.remote(j, current_model) for j in range(2)])
        current_model += total_gradient

    return ray.get(current_model)

assert hasattr(compute_gradient, 'remote'), 'compute_gradient must be a remote function'
assert hasattr(train_model, 'remote'), 'train_model must be a remote function'

**EXERCISE:** The code below runs 3 hyperparameter experiments. Change this to run the experiments in parallel.

In [None]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# Run some hyperparaameter experiments.
results = []
for hyperparameters in [{'learning_rate': 1e-1, 'batch_size': 100},
                        {'learning_rate': 1e-2, 'batch_size': 100},
                        {'learning_rate': 1e-3, 'batch_size': 100}]:
    results.append(train_model.remote(hyperparameters))

# EXERCISE: Once you've turned "results" into a list of Ray ObjectIDs
# by calling train_model.remote, you will need to turn "results" back
# into a list of integers, e.g., by doing "results = ray.get(results)".
results = ray.get(results)

end_time = time.time()
duration = end_time - start_time

assert all([isinstance(x, int) for x in results]), 'Looks like "results" is {}. You may have forgotten to call ray.get.'.format(results)

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [None]:
assert results == [20, 20, 20]
assert duration < 0.5, ('The experiments ran in {} seconds. This is too '
                         'slow.'.format(duration))
assert duration > 0.3, ('The experiments ran in {} seconds. This is too '
                        'fast.'.format(duration))

print('Success! The example took {} seconds.'.format(duration))

**EXERCISE:** Use the UI to view the task timeline and to verify that the pattern makes sense.

In [None]:
import ray.experimental.ui as ui
ui.task_timeline()

# Exercise 4 - Introducing Actors

**Goal:** The goal of this exercise is to show how to create an actor and how to call actor methods.

See the documentation on actors at http://ray.readthedocs.io/en/latest/actors.html.

Sometimes you need a "worker" process to have "state". For example, that state might be a neural network, a simulator environment, a counter, or something else entirely. However, remote functions are side-effect free. That is, they operate on inputs and produce outputs, but they don't change the state of the worker they execute on.

Actors are different. When we instantiate an actor, a brand new worker is created, and all methods that are called on that actor are executed on the newly created worker.

This means that with a single actor, no parallelism can be achieved because calls to the actor's methods will be executed one at a time. However, multiple actors can be created and methods can be executed on them in parallel.

### Concepts for this Exercise - Actors

To create an actor, decorate Python class with the `@ray.remote` decorator.

```python
@ray.remote
class Example(object):
    def __init__(self, x):
        self.x = x
    
    def set(self, x):
        self.x = x
    
    def get(self):
        return self.x
```

Like regular Python classes, **actors encapsulate state that is shared across actor method invocations**.

Actor classes differ from regular Python classes in the following ways.
1. **Instantiation:** A regular class would be instantiated via `e = Example(1)`. Actors are instantiated via
    ```python
    e = Example.remote(1)
    ```
    When an actor is instantiated, a **new worker process** is created by a local scheduler somewhere in the cluster.
2. **Method Invocation:** Methods of a regular class would be invoked via `e.set(2)` or `e.get()`. Actor methods are invoked differently.
    ```python
    >>> e.set.remote(2)
    ObjectID(d966aa9b6486331dc2257522734a69ff603e5a1c)
    
    >>> e.get.remote()
    ObjectID(7c432c085864ed4c7c18cf112377a608676afbc3)
    ```
3. **Return Values:** Actor methods are non-blocking. They immediately return an object ID and **they create a task which is scheduled on the actor worker**. The result can be retrieved with `ray.get`.
    ```python
    >>> ray.get(e.set.remote(2))
    None
    
    >>> ray.get(e.get.remote())
    2
    ```

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [None]:
ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True)

**EXERCISE:** Change the `Foo` class to be an actor class by using the `@ray.remote` decorator.

In [None]:
@ray.remote
class Foo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter

assert hasattr(Foo, 'remote'), 'You need to turn "Foo" into an actor with @ray.remote.'

**EXERCISE:** Change the intantiations below to create two actors by calling `Foo.remote()`.

In [None]:
# Create two Foo objects.
f1 = Foo.remote()
f2 = Foo.remote()

**EXERCISE:** Parallelize the code below. The two actors can execute methods in parallel (though each actor can only execute one method at a time).

In [None]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# Reset the actor state so that we can run this cell multiple times without
# changing the results.
f1.reset.remote()
f2.reset.remote()

# We want to parallelize this code. However, it is not straightforward to
# make "increment" a remote function, because state is shared (the value of
# "self.counter") between subsequent calls to "increment". In this case, it
# makes sense to use actors.
results = []
for _ in range(5):
    results.append(f1.increment.remote())
    results.append(f2.increment.remote())

results = ray.get(results)
end_time = time.time()
duration = end_time - start_time

assert not any([isinstance(result, ray.ObjectID) for result in results]), 'Looks like "results" is {}. You may have forgotten to call ray.get.'.format(results)

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [None]:
assert results == [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]

assert duration < 3, ('The experiments ran in {} seconds. This is too '
                      'slow.'.format(duration))
assert duration > 2.5, ('The experiments ran in {} seconds. This is too '
                        'fast.'.format(duration))

print('Success! The example took {} seconds.'.format(duration))

# Exercise 5 - Actor Handles

**GOAL:** The goal of this exercise is to show how to pass around actor handles.

Suppose we wish to have multiple tasks invoke methods on the same actor. For example, we may have a single actor that records logging information from a number of tasks. We can achieve this by passing a handle to the actor as an argument into the relevant tasks.

### Concepts for this Exercise - Actor  Handles

First of all, suppose we've created an actor as follows.

```python
@ray.remote
class Actor(object):
    def method(self):
        pass

# Create the actor
actor = Actor.remote()
```

Then we can define a remote function (or another actor) that takes an actor handle as an argument.

```python
@ray.remote
def f(actor):
    # We can invoke methods on the actor.
    x_id = actor.method.remote()
    # We can block and get the results.
    return ray.get(x_id)
```

Then we can invoke the remote function a few times and pass in the actor handle.

```python
# Each of the three tasks created below will invoke methods on the same actor.
f.remote(actor)
f.remote(actor)
f.remote(actor)
```

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from collections import defaultdict
import ray
import time

In [None]:
ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True)

In this exercise, we're going to write some code that runs several "experiments" in parallel and has each experiment log its results to an actor. The driver script can then periodically pull the results from the logging actor.

**EXERCISE:** Turn this `LoggingActor` class into an actor class.

In [None]:
@ray.remote
class LoggingActor(object):
    def __init__(self):
        self.logs = defaultdict(lambda: [])
    
    def log(self, index, message):
        self.logs[index].append(message)
    
    def get_logs(self):
        return dict(self.logs)


assert hasattr(LoggingActor, 'remote'), ('You need to turn LoggingActor into an '
                                         'actor (by using the ray.remote keyword).')

**EXERCISE:** Instantiate the actor.

In [None]:
logging_actor = LoggingActor.remote()

# Some checks to make sure this was done correctly.
assert hasattr(logging_actor, 'get_logs')

Now we define a remote function that runs and pushes its logs to the `LoggingActor`.

**EXERCISE:** Modify this function so that it invokes methods correctly on `logging_actor` (you need to change the way you call the `log` method).

In [None]:
@ray.remote
def run_experiment(experiment_index, logging_actor):
    for i in range(60):
        time.sleep(1)
        # Push a logging message to the actor.
        logging_actor.log.remote(experiment_index, 'On iteration {}'.format(i))

Now we create several tasks that use the logging actor.

In [None]:
experiment_ids = [run_experiment.remote(i, logging_actor) for i in range(3)]

While the experiments are running in the background, the driver process (that is, this Jupyter notebook) can query the actor to read the logs.

**EXERCISE:** Modify the code below to dispatch methods to the `LoggingActor`.

In [None]:
logs = ray.get(logging_actor.get_logs.remote())
print(logs)

assert isinstance(logs, dict), ("Make sure that you dispatch tasks to the "
                                "actor using the .remote keyword and get the results using ray.get.")

**EXERCISE:** Try running the above box multiple times and see how the results change (while the experiments are still running in the background). You can also try running more of the experiment tasks and see what happens.