# Ray Crash Course - Python Multiprocessing with Ray

© 2019-2020, Anyscale. All Rights Reserved

![Anyscale Academy](../images/AnyscaleAcademy_Logo_clearbanner_141x100.png)

This lesson explores how to replace two popular multiprocessing libraries with Ray replacements to break the one-machine boundary:

* [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) for general management of process pools.
* [`joblib`](https://joblib.readthedocs.io/en/latest/), the underpinnings of [scikit-learn](https://scikit-learn.org/stable/), which Ray can scale to a cluster.

We also examine how Ray can work with Python's [`asyncio`](https://docs.python.org/3/library/asyncio.html).

> **Tip:** For more about Ray, see [ray.io](https://ray.io) or the [Ray documentation](https://docs.ray.io/en/latest/).

In [1]:
import ray, time, sys, os
import numpy as np

In [2]:
!../tools/start-ray.sh --check --verbose

INFO: Ray is already running.


In [3]:
ray.init(address='auto', ignore_reinit_error=True)

{'node_ip_address': '192.168.1.114',
 'raylet_ip_address': '192.168.1.114',
 'redis_address': '192.168.1.114:6379',
 'object_store_address': '/tmp/ray/session_2020-07-20_13-12-17_876312_3947/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-07-20_13-12-17_876312_3947/sockets/raylet',
 'webui_url': 'localhost:8265',
 'session_dir': '/tmp/ray/session_2020-07-20_13-12-17_876312_3947'}

The Ray Dashboard, if you are running this notebook on a local machine:

In [4]:
print(f'Dashboard URL: http://{ray.get_webui_url()}')

Dashboard URL: http://localhost:8265


## Drop-in Replacements for Popular Single-node, Multiprocessing Libraries

The Python community has three popular libraries for breaking out of Python's _global interpreter lock_ to enable better multiprocessing and concurrency. Ray now offers drop-in replacements for two of them, [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) and [`joblib`](https://joblib.readthedocs.io/en/latest/), and integration with the third, Python's [`asyncio`](https://docs.python.org/3/library/asyncio.html).

This section explores the `multiprocessing.Pool` and `joblib` replacements.

| Library | Library Docs | Ray Docs | Description |
| :------ | :----------- | :------- | :---------- |
| `multiprocessing.Pool` | [docs](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) | [Ray](https://docs.ray.io/en/latest/multiprocessing.html) | Create a pool of processes for running work. The Ray replacement allows scaling to a cluster. |
| `joblib` | [docs](https://joblib.readthedocs.io/en/latest/) | [Ray](https://docs.ray.io/en/latest/joblib.html) | Ray supports running distributed [scikit-learn](https://scikit-learn.org/stable/) programs by implementing a Ray backend for `joblib` using Ray Actors instead of local processes. This makes it easy to scale existing applications that use scikit-learn from a single node to a cluster. |


### Multiprocessing.Pool

If your application already uses `multiprocessing.Pool`, then scaling beyond a single node just requires replacing your import statements from this:

```python
from multiprocessing.pool import Pool
```

To this:

```python
from ray.util.multiprocessing.pool import Pool
```

A local Ray cluster will be started the first time you create a Pool and your tasks will be distributed across it. See [Run on a Cluster](https://docs.ray.io/en/latest/multiprocessing.html#run-on-a-cluster) in the Ray documentation for details on how to use a multi-node Ray cluster instead.

Here is an example:

In [5]:
from ray.util.multiprocessing import Pool

def f(index):
    return index

def run_with_pool(n=100):
    pool = Pool()
    for result in pool.map(f, range(n)):
        print(f'{result}|', end='')

run_with_pool()

0|1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23|24|25|26|27|28|29|30|31|32|33|34|35|36|37|38|39|40|41|42|43|44|45|46|47|48|49|50|51|52|53|54|55|56|57|58|59|60|61|62|63|64|65|66|67|68|69|70|71|72|73|74|75|76|77|78|79|80|81|82|83|84|85|86|87|88|89|90|91|92|93|94|95|96|97|98|99|

We used a function `run_with_pool()` to wrap a scope around the `pool` construction. That way, it goes out of scope when we're finished and Ray can reclaim the resources.

The full `multiprocessing.Pool` API is supported. Please see Python's [multiprocessing documentation](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) for details.

### Joblib

Ray supports running distributed [scikit-learn](https://scikit-learn.org/) programs by implementing a Ray backend for [joblib](https://joblib.readthedocs.io/) using Ray Actors instead of local processes. This makes it easy to scale existing applications that use scikit-learn from a single node to a cluster.

> **Note:** This API is new and may be revised in the future. Please [report any issues](https://github.com/ray-project/ray/issues) you encounter.

To get started, use `from ray.util.joblib import register_ray` and then run `register_ray()`. This will register Ray as a `joblib` backend for `scikit-learn` to use. Then run your original `scikit-learn` code inside `with joblib.parallel_backend('ray')`. This will start a local Ray cluster. 

See [Run on a Cluster](https://docs.ray.io/en/latest/joblib.html#run-on-a-cluster) in the Ray documentation for details on how to use a multi-node Ray cluster instead.

Here is an example. First, we set up Ray with `joblib`:

In [6]:
import joblib
from ray.util.joblib import register_ray
register_ray()

Now let's use an example taken from the scikit-learn examples, [Restricted Boltzmann Machine features for digit classification](https://scikit-learn.org/stable/auto_examples/neural_networks/plot_rbm_logistic_classification.html#sphx-glr-auto-examples-neural-networks-plot-rbm-logistic-classification-py). 

In [7]:
# Authors: Yann N. Dauphin, Vlad Niculae, Gabriel Synnaeve
# License: BSD

import numpy as np
from scipy.ndimage import convolve
from sklearn import linear_model, datasets, metrics
from sklearn.model_selection import train_test_split
from sklearn.neural_network import BernoulliRBM
from sklearn.pipeline import Pipeline
from sklearn.base import clone

In [8]:
# #############################################################################
# Setting up

def nudge_dataset(X, Y):
    """
    This produces a dataset 5 times bigger than the original one,
    by moving the 8x8 images in X around by 1px to left, right, down, up
    """
    direction_vectors = [
        [[0, 1, 0],
         [0, 0, 0],
         [0, 0, 0]],

        [[0, 0, 0],
         [1, 0, 0],
         [0, 0, 0]],

        [[0, 0, 0],
         [0, 0, 1],
         [0, 0, 0]],

        [[0, 0, 0],
         [0, 0, 0],
         [0, 1, 0]]]

    def shift(x, w):
        return convolve(x.reshape((8, 8)), mode='constant', weights=w).ravel()

    X = np.concatenate([X] +
                       [np.apply_along_axis(shift, 1, X, vector)
                        for vector in direction_vectors])
    Y = np.concatenate([Y for _ in range(5)], axis=0)
    return X, Y


# Load Data
X, y = datasets.load_digits(return_X_y=True)
X = np.asarray(X, 'float32')
X, Y = nudge_dataset(X, y)
X = (X - np.min(X, 0)) / (np.max(X, 0) + 0.0001)  # 0-1 scaling

X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y, test_size=0.2, random_state=0)

# Models we will use
logistic = linear_model.LogisticRegression(solver='newton-cg', tol=1)
rbm = BernoulliRBM(random_state=0, verbose=True)

rbm_features_classifier = Pipeline(
    steps=[('rbm', rbm), ('logistic', logistic)])

In [9]:
# #############################################################################
# Training

# Hyper-parameters. These were set by cross-validation,
# using a GridSearchCV. Here we are not performing cross-validation to
# save time.
rbm.learning_rate = 0.06
rbm.n_iter = 10
# More components tend to give better prediction performance, but larger
# fitting time
rbm.n_components = 100
logistic.C = 6000

Now we actually use the Ray backend for `joblib`:

In [10]:
with joblib.parallel_backend('ray'):
    # Training RBM-Logistic Pipeline
    rbm_features_classifier.fit(X_train, Y_train)

    # Training the Logistic regression classifier directly on the pixel
    raw_pixel_classifier = clone(logistic)
    raw_pixel_classifier.C = 100.
    raw_pixel_classifier.fit(X_train, Y_train)

    # #############################################################################
    # Evaluation

    Y_pred = rbm_features_classifier.predict(X_test)
    print("Logistic regression using RBM features:\n%s\n" % (
        metrics.classification_report(Y_test, Y_pred)))

    Y_pred = raw_pixel_classifier.predict(X_test)
    print("Logistic regression using raw pixel features:\n%s\n" % (
        metrics.classification_report(Y_test, Y_pred)))

[BernoulliRBM] Iteration 1, pseudo-likelihood = -25.39, time = 0.11s
[BernoulliRBM] Iteration 2, pseudo-likelihood = -23.77, time = 0.16s
[BernoulliRBM] Iteration 3, pseudo-likelihood = -22.94, time = 0.16s
[BernoulliRBM] Iteration 4, pseudo-likelihood = -21.91, time = 0.15s
[BernoulliRBM] Iteration 5, pseudo-likelihood = -21.69, time = 0.15s
[BernoulliRBM] Iteration 6, pseudo-likelihood = -21.06, time = 0.15s
[BernoulliRBM] Iteration 7, pseudo-likelihood = -20.89, time = 0.15s
[BernoulliRBM] Iteration 8, pseudo-likelihood = -20.64, time = 0.15s




[BernoulliRBM] Iteration 9, pseudo-likelihood = -20.36, time = 0.15s
[BernoulliRBM] Iteration 10, pseudo-likelihood = -20.09, time = 0.15s




Logistic regression using RBM features:
              precision    recall  f1-score   support

           0       1.00      0.98      0.99       174
           1       0.92      0.93      0.92       184
           2       0.94      0.95      0.95       166
           3       0.95      0.89      0.92       194
           4       0.97      0.94      0.95       186
           5       0.91      0.92      0.92       181
           6       0.98      0.97      0.97       207
           7       0.93      0.98      0.96       154
           8       0.89      0.90      0.89       182
           9       0.87      0.91      0.89       169

    accuracy                           0.94      1797
   macro avg       0.94      0.94      0.94      1797
weighted avg       0.94      0.94      0.94      1797


Logistic regression using raw pixel features:
              precision    recall  f1-score   support

           0       0.90      0.92      0.91       174
           1       0.60      0.59      0.60  

If you see warnings about the `The 'context' argument`, you can safely ignore them. 

### Using Ray with asyncio

Python's [`asyncio`](https://docs.python.org/3/library/asyncio.html) can be used with Ray actors and tasks.

> **Note:** The Async API support is experimental and work is ongoing to improve it. Please [report any issues](https://github.com/ray-project/ray/issues) you encounter.

#### Actors
Here is an actor example, adapted from the [Ray documentation](https://docs.ray.io/en/latest/async_api.html).

Note the comment before `run_concurrent`. While normally actor methods are invoked synchronously, in this case there may be concurrent invocations!

In [11]:
import asyncio

@ray.remote
class AsyncActor:
    # Multiple invocations of this method can be running in
    # the event loop at the same time.
    async def run_concurrent(self, index):
        print(f'started {index}')
        await asyncio.sleep(0.2)   # Concurrent workload here
        print(f'finished {index}')
        return index

actor = AsyncActor.remote()

ids = []
values = []
for i in range(10):
    # regular ray.get
    ids.append(actor.run_concurrent.remote(i))

    # async ray.get
    values.append(await actor.run_concurrent.remote(10+i))
print(ray.get(ids))
print(values)

[2m[36m(pid=14981)[0m started 0
[2m[36m(pid=14981)[0m started 10
[2m[36m(pid=14981)[0m finished 0
[2m[36m(pid=14981)[0m finished 10
[2m[36m(pid=14981)[0m started 1
[2m[36m(pid=14981)[0m started 11
[2m[36m(pid=14981)[0m finished 1
[2m[36m(pid=14981)[0m finished 11
[2m[36m(pid=14981)[0m started 2
[2m[36m(pid=14981)[0m started 12
[2m[36m(pid=14981)[0m finished 2
[2m[36m(pid=14981)[0m finished 12
[2m[36m(pid=14981)[0m started 3
[2m[36m(pid=14981)[0m started 13
[2m[36m(pid=14981)[0m finished 3
[2m[36m(pid=14981)[0m finished 13
[2m[36m(pid=14981)[0m started 4
[2m[36m(pid=14981)[0m started 14
[2m[36m(pid=14981)[0m finished 4
[2m[36m(pid=14981)[0m finished 14
[2m[36m(pid=14981)[0m started 5
[2m[36m(pid=14981)[0m started 15
[2m[36m(pid=14981)[0m finished 5
[2m[36m(pid=14981)[0m finished 15
[2m[36m(pid=14981)[0m started 6
[2m[36m(pid=14981)[0m started 16
[2m[36m(pid=14981)[0m finished 6
[2m[36m(pid=14981)[0m fini

Note that using `await` with a method invocation implicitly invokes `ray.get()` on the returned object id.

Under the hood, Ray runs all of the methods inside a single python event loop.

> **Note:** Running blocking `ray.get` and `ray.wait` inside async actor methods is not allowed, because `ray.get` will block the execution of the event loop.

You can limit the number of concurrent task running at once using the `max_concurrency` flag. By default, 1000 tasks can be running concurrently. 

In the following cell, we set the `max_concurrency` to `3`, so the subsequent cell will run tasks three at a time. Since there are `12` total, we'll have four groups, each sleeping about `0.2` seconds, so it should take about `0.8` seconds to run.

In [12]:
actor3 = AsyncActor.options(max_concurrency=3).remote()

In [13]:
%time ray.get([actor3.run_concurrent.remote(i) for i in range(12)])

[2m[36m(pid=14981)[0m finished 9
[2m[36m(pid=14981)[0m finished 19
[2m[36m(pid=14987)[0m started 0
[2m[36m(pid=14987)[0m started 1
[2m[36m(pid=14987)[0m started 2
[2m[36m(pid=14987)[0m finished 0
[2m[36m(pid=14987)[0m started 3
[2m[36m(pid=14987)[0m finished 1
[2m[36m(pid=14987)[0m finished 2
[2m[36m(pid=14987)[0m started 4
[2m[36m(pid=14987)[0m started 5
[2m[36m(pid=14987)[0m finished 3
[2m[36m(pid=14987)[0m started 6
[2m[36m(pid=14987)[0m finished 4
[2m[36m(pid=14987)[0m finished 5
[2m[36m(pid=14987)[0m started 7
[2m[36m(pid=14987)[0m started 8
[2m[36m(pid=14987)[0m finished 6
[2m[36m(pid=14987)[0m started 9
[2m[36m(pid=14987)[0m finished 7
[2m[36m(pid=14987)[0m finished 8
[2m[36m(pid=14987)[0m started 10
[2m[36m(pid=14987)[0m started 11
CPU times: user 54.8 ms, sys: 14.2 ms, total: 69 ms
Wall time: 824 ms


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

See the [03: Ray Internals](../advanced-ray/03-Ray-Internals.ipynb) lesson in the [Advanced Ray](../advanced-ray/00-Advanced-Ray-Overview.ipynb) tutorial for more details on _async actors_.

#### Async Tasks

For Ray tasks, the object ids returned by them can be converted to `async.Future` instances.

In [14]:
@ray.remote
def some_task():
    return 1

# The normal Ray way:
id, _ = ray.wait([some_task.remote()])
ray.get(id)

[1]

[2m[36m(pid=14987)[0m finished 9
[2m[36m(pid=14987)[0m finished 10
[2m[36m(pid=14987)[0m finished 11


The `asyncio` alternative way:

In [18]:
await some_task.remote()

RayConnectionError: Ray has not been started yet. You can start Ray with 'ray.init()'.

In [16]:
future = await asyncio.wait([some_task.remote()])
print(future)
# A tuple is returned:
for x in future:
    print(f'  {type(x)} => {x}')

({<Task finished coro=<_wrap_awaitable() done, defined at /home/paul/anaconda3/envs/anyscale-academy/lib/python3.7/asyncio/tasks.py:623> result=1>}, set())
  <class 'set'> => {<Task finished coro=<_wrap_awaitable() done, defined at /home/paul/anaconda3/envs/anyscale-academy/lib/python3.7/asyncio/tasks.py:623> result=1>}
  <class 'set'> => set()


See the [asyncio docs](https://docs.python.org/3/library/asyncio-task.html) for more details on `asyncio` patterns, including timeouts and `asyncio.gather`.

In [17]:
ray.shutdown()  # "Undo ray.init()". Terminate all the processes started in this notebook.

The next lesson, [Ray Parallel Iterators](05-Ray-Parallel-Iterators.ipynb) introduces the _parallel iterator_ API for simple data ingestion and processing. It can be thought of as syntactic sugar around Ray actors and `ray.wait` loops.