# Install Dependencies

If you are running on Google Colab, you need to install the necessary dependencies before beginning the exercise.

**NOTE**: After installing the dependencies, you need to click on "RESTART RUNTIME"

In [None]:
print('NOTE: Intentionally crashing session to use the newly installed library.\n')

!pip uninstall -y pyarrow
!pip install ray[debug]==0.7.5

# A hack to force the runtime to restart, needed to include the above dependencies.
import os
os._exit(0)

NOTE: Intentionally crashing session to use the newly installed library.

Collecting ray[debug]==0.7.5
  Downloading ray-0.7.5-cp37-cp37m-manylinux1_x86_64.whl (74.9 MB)
[K     |████████████████████████████████| 74.9 MB 260 kB/s  eta 0:00:01
Collecting pytest
  Downloading pytest-5.4.1-py3-none-any.whl (246 kB)
[K     |████████████████████████████████| 246 kB 41.1 MB/s eta 0:00:01
Collecting funcsigs
  Downloading funcsigs-1.0.2-py2.py3-none-any.whl (17 kB)
Collecting setproctitle; extra == "debug"
  Downloading setproctitle-1.1.10.tar.gz (24 kB)
Collecting psutil; extra == "debug"
  Downloading psutil-5.7.0.tar.gz (449 kB)
[K     |████████████████████████████████| 449 kB 63.0 MB/s eta 0:00:01
Collecting py>=1.5.0
  Downloading py-1.8.1-py2.py3-none-any.whl (83 kB)
[K     |████████████████████████████████| 83 kB 5.4 MB/s  eta 0:00:01
[?25hCollecting more-itertools>=4.0.0
  Downloading more_itertools-8.2.0-py3-none-any.whl (43 kB)
[K     |████████████████████████████████| 43 kB 6.

Failed to build setproctitle psutil
Installing collected packages: py, more-itertools, pluggy, pytest, funcsigs, setproctitle, psutil, ray
    Running setup.py install for setproctitle ... [?25lerror
[31m    ERROR: Command errored out with exit status 1:
     command: /usr/local/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-l8tl86e7/setproctitle/setup.py'"'"'; __file__='"'"'/tmp/pip-install-l8tl86e7/setproctitle/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-qq65jv04/install-record.txt --single-version-externally-managed --compile --install-headers /usr/local/include/python3.7m/setproctitle
         cwd: /tmp/pip-install-l8tl86e7/setproctitle/
    Complete output (10 lines):
    running install
    running build
    running build_ext
    building 'setproctitle' extension
    creating 

# Exercise 6 - Handling Slow Tasks

**GOAL:** The goal of this exercise is to show how to use `ray.wait` to avoid waiting for slow tasks.

See the documentation for `ray.wait` at https://ray.readthedocs.io/en/latest/package-ref.html?highlight=ray.wait#ray.wait.

### Concepts for this Exercise - ray.wait

After launching a number of tasks, you may want to know which ones have finished executing. This can be done with `ray.wait`. The function works as follows.

```python
ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout=None)
```

**Arguments:**
- `object_ids`: This is a list of object IDs.
- `num_returns`: This is maximum number of object IDs to wait for. The default value is `1`.
- `timeout`: This is the maximum amount of time in milliseconds to wait for. So `ray.wait` will block until either `num_returns` objects are ready or until `timeout` milliseconds have passed.

**Return values:**
- `ready_ids`: This is a list of object IDs that are available in the object store.
- `remaining_ids`: This is a list of the IDs that were in `object_ids` but are not in `ready_ids`, so the IDs in `ready_ids` and `remaining_ids` together make up all the IDs in `object_ids`.

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

print('Successfully imported ray!')

Successfully imported ray!


In [2]:
ray.init(num_cpus=6, object_store_memory=2*10**9, ignore_reinit_error=True)

# Sleep a little to improve the accuracy of the timing measurements used below,
# because some workers may still be starting up in the background.
time.sleep(2.0)

2020-04-30 08:00:53,853	INFO resource_spec.py:212 -- Starting Ray with 81.69 GiB memory available for workers and up to 1.86 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-04-30 08:00:54,301	INFO services.py:1148 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


Define a remote function that takes a variable amount of time to run.

In [3]:
@ray.remote
def f(i):
    np.random.seed(5 + i)
    x = np.random.uniform(0, 4)
    time.sleep(x)
    return i, time.time()

**EXERCISE:** Using `ray.wait`, change the code below so that `initial_results` consists of the outputs of the first three tasks to complete instead of the first three tasks that were submitted.

In [7]:
start_time = time.time()

# This launches 6 tasks, each of which takes a random amount of time to
# complete.
result_ids = [f.remote(i) for i in range(6)]
# Get one batch of tasks. Instead of waiting for a fixed subset of tasks, we
# should instead use the first 3 tasks that finish.

initial_ids , remaining_ids = ray.wait(result_ids, num_returns=3, timeout=None)
result_ids = initial_ids + remaining_ids
initial_results = ray.get(result_ids[:3])

end_time = time.time()
duration = end_time - start_time

**EXERCISE:** Change the code below so that `remaining_results` consists of the outputs of the last three tasks to complete.

In [8]:
# Wait for the remaining tasks to complete.
remaining_results = ray.get(result_ids[3:])

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [9]:
assert len(initial_results) == 3
assert len(remaining_results) == 3

initial_indices = [result[0] for result in initial_results]
initial_times = [result[1] for result in initial_results]
remaining_indices = [result[0] for result in remaining_results]
remaining_times = [result[1] for result in remaining_results]

assert set(initial_indices + remaining_indices) == set(range(6))

assert duration < 1.5, ('The initial batch of ten tasks was retrieved in '
                        '{} seconds. This is too slow.'.format(duration))

assert duration > 0.8, ('The initial batch of ten tasks was retrieved in '
                        '{} seconds. This is too slow.'.format(duration))

# Make sure the initial results actually completed first.
assert max(initial_times) < min(remaining_times)

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.893104076385498 seconds.


# Exercise 7 - Process Tasks in Order of Completion

**GOAL:** The goal of this exercise is to show how to use `ray.wait` to process tasks in the order that they finish.

See the documentation for ray.wait at https://ray.readthedocs.io/en/latest/package-ref.html?highlight=ray.wait#ray.wait.

## Concepts for this exercise - `ray.wait`

After launching a number of tasks, you may want to run the results sequentially. To do so, we build off of exercise 6 and use `ray.wait` to execute the results sequentially. 

We are able to use `ray.wait` because the two lists returned by **`ray.wait` maintains the ordering of the input list**. That is, if `f` is a remote function, the code 
```python
    results = ray.wait([f.remote(i) for i in range(100)], num_returns=10)
```
will return `(ready_list, remain_list)` and the `ObjectID`s of in those lists will be ordered by the argument passed to `f` above.

In [10]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import ray
import time

In [11]:
ray.init(num_cpus=5, include_webui=False, ignore_reinit_error=True)

2020-04-30 09:27:08,720	ERROR worker.py:682 -- Calling ray.init() again after it has already been called.


In [12]:
@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5))
    return time.time()

**EXERCISE:** Change the code below to use `ray.wait` to get the results of the tasks in the order that they complete.

**NOTE:** It would be a simple modification to maintain a pool of 10 experiments and to start a new experiment whenever one finishes.

In [21]:
start_time = time.time()

remaining_result_ids = [f.remote() for _ in range(10)]

# Get the results.
results = []
while len(remaining_result_ids) > 0:
    # EXERCISE: Instead of simply waiting for the first result from
    # remaining_result_ids, use ray.wait to get the first one to finish.
    # result_id = remaining_result_ids[0]
    # remaining_result_ids = remaining_result_ids[1:]
    result_id, remaining_result_ids = ray.wait(remaining_result_ids, num_returns=1)
    result = ray.get(result_id[0])
    results.append(result)
    print('Processing result which finished after {} seconds.'
          .format(result - start_time))    

end_time = time.time()
duration = end_time - start_time

Processing result which finished after 2.9861671924591064 seconds.
Processing result which finished after 3.06778621673584 seconds.
Processing result which finished after 3.569009304046631 seconds.
Processing result which finished after 4.024271488189697 seconds.
Processing result which finished after 4.904977321624756 seconds.
Processing result which finished after 5.060281753540039 seconds.
Processing result which finished after 5.29135537147522 seconds.
Processing result which finished after 5.932264804840088 seconds.
Processing result which finished after 6.672206401824951 seconds.
Processing result which finished after 6.68027925491333 seconds.


**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [22]:
assert results == sorted(results), ('The results were not processed in the '
                                    'order that they finished.')

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 6.681589603424072 seconds.
