# Exercise 6 - Process Tasks in Order of Completion

**GOAL:** The goal of this exercise is to show how to use `ray.wait` to process tasks in the order that they finish.

See the documentation for ray.wait at http://ray.readthedocs.io/en/latest/api.html#waiting-for-a-subset-of-tasks-to-finish.

The code below runs 10 tasks and retrieves the results in the order that the tasks were launched. However, since each task takes a random amount of time to finish, we could instead process the tasks in the order that they finish.

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [2]:
ray.init(num_cpus=5, redirect_output=True)

Process STDOUT and STDERR is being redirected to /tmp/raylogs/.
Waiting for redis server at 127.0.0.1:38792 to respond...
Waiting for redis server at 127.0.0.1:13638 to respond...
Starting local scheduler with the following resources: {'CPU': 5, 'GPU': 1}.

View the web UI at http://localhost:8889/notebooks/ray_ui67310.ipynb?token=2c196938546bbdd0f9bae83ed688da1a8acb816e99e2cbda



{'node_ip_address': '127.0.0.1',
 'redis_address': '127.0.0.1:38792',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store91498578', manager_name='/tmp/plasma_manager54532712', manager_port=53114)],
 'local_scheduler_socket_names': ['/tmp/scheduler48699707'],
 'webui_url': 'http://localhost:8889/notebooks/ray_ui67310.ipynb?token=2c196938546bbdd0f9bae83ed688da1a8acb816e99e2cbda'}

In [3]:
@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5))
    return time.time()

**EXERCISE:** Change the code below to use `ray.wait` to get the results of the tasks in the order that they complete.

**NOTE:** It would be a simple modification to maintain a pool of 10 experiments and to start a new experiment whenever one finishes.

In [4]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

result_ids = [f.remote() for _ in range(10)]

# Get the results.
results = []

remaining_ids = result_ids
while remaining_ids:
    ready_id, remaining_ids = ray.wait(remaining_ids)
    result = ray.get(ready_id[0])
    results.append(result)
    print('Processing result which finished after {} seconds.'
          .format(result - start_time))
    
# for result_id in result_ids:
#     result = ray.get(result_id)
#     results.append(result)
#     print('Processing result which finished after {} seconds.'
#           .format(result - start_time))

end_time = time.time()
duration = end_time - start_time

Processing result which finished after 0.5661187171936035 seconds.
Processing result which finished after 1.1098244190216064 seconds.
Processing result which finished after 1.7630667686462402 seconds.
Processing result which finished after 1.9663851261138916 seconds.
Processing result which finished after 2.2949280738830566 seconds.
Processing result which finished after 2.677992582321167 seconds.
Processing result which finished after 3.3782923221588135 seconds.
Processing result which finished after 4.196393728256226 seconds.
Processing result which finished after 5.033103704452515 seconds.
Processing result which finished after 5.365675926208496 seconds.


**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [5]:
assert results == sorted(results), ('The results were not processed in the '
                                    'order that they finished.')

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 5.367706537246704 seconds.


In [6]:
import ray.experimental.ui as ui
ui.task_timeline()


To view fullscreen, open chrome://tracing in Google Chrome and load `/tmp/tmpjga7l5il.json`
