# Exercise 3 - Tree Reduce

**GOAL:** The goal of this exercise is to show how to implement a tree reduce in Ray by passing object IDs into remote functions to encode dependencies between tasks.

In this exercise, you will use Ray to implement parallel data generation and a parallel tree reduction.

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [None]:
ray.init(num_cpus=8, redirect_output=True)

**EXERCISE:** These functions will need to be turned into remote functions so that the tree of tasks can be executed in parallel.

In [None]:
# This is a proxy for a function which generates some data.
def create_data(i):
    time.sleep(0.3)
    return i * np.ones(10000)

# This is a proxy for an expensive aggregation step (which is also
# commutative and associative so it can be used in a tree-reduce).
def aggregate_data(x, y):
    time.sleep(0.3)
    return x * y

**EXERCISE:** Make the data creation tasks run in parallel. Also aggregate the vectors in parallel. Note that the `aggregate_data` function must be called 7 times. They cannot all run in parallel because some depend on the outputs of others. However, it is possible to first run 4 in parallel, then 2 in parallel, and then 1.

In [None]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# Here we generate some data. This could be done in parallel.
vectors = [create_data(i + 1) for i in range(8)]

# Here we aggregate all of the data by getting it on the driver and then
# repeatedly calling aggregate_data. However, this could be done faster by
# making aggregate_data a remote function and aggregating the data in a
# tree-like fashion.

# NOTE: A direct translation of the code below to use Ray will not result in
# a speedup because the underlying graph of dependencies between the tasks is
# essentially linear. There are a handful of ways to do this. One way is to
# explicitly generate the tree of tasks. It can also be done by only changing
# one line (by changing the way you add the output of 'aggregate_data' back
# into the 'vectors' list).
while len(vectors) > 1:
    aggregated = aggregate_data(vectors[0], vectors[1])
    vectors = [aggregated] + vectors[2:]

result = vectors[0]

end_time = time.time()
duration = end_time - start_time

**EXERCISE:** Use the UI to view the task timeline and to verify that the vectors were aggregated with a tree of tasks.

You should be able to see the 8 `create_data` tasks running in parallel followed by 4 `aggregate_data` tasks running in parallel followed by 2 more `aggregate_data` tasks followed by 1 more `aggregate_data` task.

In the timeline, click on **View Options** and select **Flow Events** to visualize tasks dependencies.

In [None]:
import ray.experimental.ui as ui
ui.task_timeline()

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [None]:
assert np.all(result == 40320 * np.ones(10000)), ('Did you remember to '
                                                  'call ray.get?')
assert duration < 0.3 + 0.9 + 0.3, ('FAILURE: The data generation and '
                                    'aggregation took {} seconds. This is '
                                    'too slow'.format(duration))
assert duration > 0.3 + 0.9, ('FAILURE: The data generation and '
                              'aggregation took {} seconds. This is '
                              'too fast'.format(duration))

print('Success! The example took {} seconds.'.format(duration))