# Exercise 6 - Process Tasks in Order of Completion

**GOAL:** The goal of this exercise is to show how to use `ray.wait` to process tasks in the order that they finish.

See the documentation for ray.wait at http://ray.readthedocs.io/en/latest/api.html#waiting-for-a-subset-of-tasks-to-finish.

The code below runs 10 tasks and retrieves the results in the order that the tasks were launched. However, since each task takes a random amount of time to finish, we could instead process the tasks in the order that they finish.

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [2]:
ray.init(num_cpus=5, redirect_output=True)

Process STDOUT and STDERR is being redirected to /tmp/raylogs/.
Waiting for redis server at 127.0.0.1:38663 to respond...
Waiting for redis server at 127.0.0.1:28883 to respond...
Starting local scheduler with the following resources: {'CPU': 5, 'GPU': 0}.

View the web UI at http://localhost:8894/notebooks/ray_ui53433.ipynb?token=d3c070986b12254cca7f9fe6f097342afb6acef634604abf



{'local_scheduler_socket_names': ['/tmp/scheduler64259118'],
 'node_ip_address': '172.17.0.2',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store22321798', manager_name='/tmp/plasma_manager57038259', manager_port=32339)],
 'raylet_socket_names': [],
 'redis_address': '172.17.0.2:38663',
 'webui_url': 'http://localhost:8894/notebooks/ray_ui53433.ipynb?token=d3c070986b12254cca7f9fe6f097342afb6acef634604abf'}

In [3]:
@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5))
    return time.time()

**EXERCISE:** Change the code below to use `ray.wait` to get the results of the tasks in the order that they complete.

**NOTE:** It would be a simple modification to maintain a pool of 10 experiments and to start a new experiment whenever one finishes.

In [19]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

result_ids = [f.remote() for _ in range(10)]

# Get the results.
results = []
first_results, _ = ray.wait(result_ids, num_returns=10)
print("-"*20)
for result_id in result_ids:
    result = ray.get(result_id)
    #result, _ = ray.wait(result_id, num_returns=1)
    #result = ray.get(result)
    results.append(result)
    print('Processing {} result finished after {} seconds.'
          .format(result_id,(result - start_time)))

results = [first_results]
end_time = time.time()
duration = end_time - start_time

--------------------
Processing ObjectID(bef0c615111987a0ec13d7a1fc647ce41e467667) result finished after 1.7210731506347656 seconds.
Processing ObjectID(9ae8db2c9f09e9bc9993393bca2ae9699ed779f1) result finished after 2.595484733581543 seconds.
Processing ObjectID(87f97c17b45fa24162eb8d2515905f9c7d4d53c6) result finished after 4.469526290893555 seconds.
Processing ObjectID(d651e2573d2dbb35eb265e456e2db9de42b82cb0) result finished after 1.5424232482910156 seconds.
Processing ObjectID(e036e5c97beab34003124d0d3723c018c5e2a56d) result finished after 2.8313965797424316 seconds.
Processing ObjectID(cd54ad4e78ec452aeb66a49f47867b7dec1faf8d) result finished after 5.239848375320435 seconds.
Processing ObjectID(6a102450b6970ab0bfebc33f850ecbb414566901) result finished after 2.8058509826660156 seconds.
Processing ObjectID(9e3672a606eb4b2829bbddc055f1d65253637895) result finished after 7.407268047332764 seconds.
Processing ObjectID(c6c704530c06a2336ef919f3f97035969f168ed3) result finished after 6.7

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [20]:
assert results == sorted(results), ('The results were not processed in the '
                                    'order that they finished.')

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 7.41782808303833 seconds.
