# Exercise 3 - Tree Reduce

**GOAL:** The goal of this exercise is to show how to implement a tree reduce in Ray by passing object IDs into remote functions to encode dependencies between tasks.

In this exercise, you will use Ray to implement parallel data generation and a parallel tree reduction.

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [2]:
ray.init(num_cpus=8, redirect_output=True)

Waiting for redis server at 127.0.0.1:27217 to respond...
Waiting for redis server at 127.0.0.1:12656 to respond...
Starting local scheduler with 8 CPUs, 0 GPUs


{'local_scheduler_socket_names': ['/tmp/scheduler63580621'],
 'node_ip_address': '127.0.0.1',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store50097614', manager_name='/tmp/plasma_manager21141053', manager_port=20595)],
 'redis_address': '127.0.0.1:27217',
 'webui_url': ''}

**EXERCISE:** These functions will need to be turned into remote functions so that the tree of tasks can be executed in parallel.

In [3]:
# This is a proxy for a function which generates some data.
@ray.remote
def create_data(i):
    time.sleep(0.3)
    return i * np.ones(10000)

# This is a proxy for an expensive aggregation step (which is also
# commutative and associative so it can be used in a tree-reduce).
@ray.remote
def aggregate_data(x, y):
    time.sleep(0.3)
    return x * y

**EXERCISE:** Make the data creation tasks run in parallel. Also aggregate the vectors in parallel. Note that the `aggregate_data` function must be called 7 times. They cannot all run in parallel because some depend on the outputs of others. However, it is possible to first run 4 in parallel, then 2 in parallel, and then 1.

In [10]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# EXERCISE: Here we generate some data. Do this part in parallel.
vectors = [create_data.remote(i + 1) for i in range(8)]

# Here we aggregate all of the data repeatedly calling aggregate_data. This
# can be sped up using Ray.
#
# NOTE: A direct translation of the code below to use Ray will not result in
# a speedup because each function call uses the output of the previous function
# call so the function calls must be executed serially.
#
# EXERCISE: Speed up the aggregation below by using Ray. Note that this will
# require restructuring the code to expose more parallelism. First run 4 tasks
# aggregating the 8 values in pairs. Then run 2 tasks aggregating the resulting
# 4 intermediate values in pairs. then run 1 task aggregating the two resulting
# values. Lastly, you will need to call ray.get to retrieve the final result.
#result = aggregate_data(vectors[0], vectors[1])
#result = aggregate_data(result, vectors[2])
#result = aggregate_data(result, vectors[3])
#result = aggregate_data(result, vectors[4])
#result = aggregate_data(result, vectors[5])
#result = aggregate_data(result, vectors[6])
#result = aggregate_data(result, vectors[7])

result1 = aggregate_data.remote(vectors[0], vectors[1])
result2 = aggregate_data.remote(vectors[2], vectors[3])
result3 = aggregate_data.remote(vectors[4], vectors[5])
result4 = aggregate_data.remote(vectors[6], vectors[7])

result5 = aggregate_data.remote(ray.get(result1), ray.get(result2))
result6 = aggregate_data.remote(ray.get(result3), ray.get(result4))
result7 = aggregate_data.remote(ray.get(result5), ray.get(result6))

result = ray.get(result7)

# NOTE: For clarity, the aggregation above is written out as 7 separate function
# calls, but this can be done more easily in a while loop via
#
#     while len(vectors) > 1:
#         vectors = [aggregate_data(vectors[0], vectors[1])] + vectors[2:]
#     result = vectors[0]
#
# When expressed this way, the change from serial aggregation to tree-structured
# aggregation can be made simply by appending the result of aggregate_data to the
# end of the vectors list as opposed to the beginning.
#
# EXERCISE: Think about why this is true.

end_time = time.time()
duration = end_time - start_time

**EXERCISE:** Use the UI to view the task timeline and to verify that the vectors were aggregated with a tree of tasks.

You should be able to see the 8 `create_data` tasks running in parallel followed by 4 `aggregate_data` tasks running in parallel followed by 2 more `aggregate_data` tasks followed by 1 more `aggregate_data` task.

In the timeline, click on **View Options** and select **Flow Events** to visualize tasks dependencies.

In [11]:
import ray.experimental.ui as ui
ui.task_timeline()

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [12]:
assert np.all(result == 40320 * np.ones(10000)), ('Did you remember to '
                                                  'call ray.get?')
assert duration < 0.3 + 0.9 + 0.3, ('FAILURE: The data generation and '
                                    'aggregation took {} seconds. This is '
                                    'too slow'.format(duration))
assert duration > 0.3 + 0.9, ('FAILURE: The data generation and '
                              'aggregation took {} seconds. This is '
                              'too fast'.format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 1.2155711650848389 seconds.
