# Exercise 7 - Process Tasks in Order of Completion

**GOAL:** The goal of this exercise is to show how to use `ray.wait` to process tasks in the order that they finish.

See the documentation for ray.wait at https://ray.readthedocs.io/en/latest/api.html#ray.wait.

The code below runs 10 tasks and retrieves the results in the order that the tasks were launched. However, since each task takes a random amount of time to finish, we could instead process the tasks in the order that they finish.

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [2]:
ray.init(num_cpus=5, include_webui=False, ignore_reinit_error=True)

Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-01-24_09-57-00_20904/logs.
Waiting for redis server at 127.0.0.1:51983 to respond...
Waiting for redis server at 127.0.0.1:36362 to respond...
Starting the Plasma object store with 20.0 GB memory using /dev/shm.


{'node_ip_address': '192.168.23.45',
 'redis_address': '192.168.23.45:51983',
 'object_store_addresses': ['/tmp/ray/session_2019-01-24_09-57-00_20904/sockets/plasma_store'],
 'raylet_socket_names': ['/tmp/ray/session_2019-01-24_09-57-00_20904/sockets/raylet'],
 'webui_url': ''}

In [3]:
@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5))
    return time.time()

**EXERCISE:** Change the code below to use `ray.wait` to get the results of the tasks in the order that they complete.

**NOTE:** It would be a simple modification to maintain a pool of 10 experiments and to start a new experiment whenever one finishes.

In [11]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

result_ids = [f.remote() for _ in range(10)]
remaining_ids = result_ids

# Get the results.
results = []
for i in range(len(result_ids)):
    ready_ids, remaining_ids = ray.wait(remaining_ids, num_returns=1)
    result = ray.get(ready_ids[0])
    results.append(result)
    print('Processing result which finished after {} seconds.'
          .format(result - start_time))

end_time = time.time()
duration = end_time - start_time

Processing result which finished after 0.7375590801239014 seconds.
Processing result which finished after 1.530632495880127 seconds.
Processing result which finished after 2.8941402435302734 seconds.
Processing result which finished after 3.388817071914673 seconds.
Processing result which finished after 3.7495760917663574 seconds.
Processing result which finished after 3.761099338531494 seconds.
Processing result which finished after 4.1503682136535645 seconds.
Processing result which finished after 5.128110408782959 seconds.
Processing result which finished after 5.387217283248901 seconds.
Processing result which finished after 7.52557897567749 seconds.


**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [12]:
assert results == sorted(results), ('The results were not processed in the '
                                    'order that they finished.')

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 7.529031038284302 seconds.
