# Overview of Ray

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Generic/ray_logo.png" width="20%" loading="lazy">

## Learning objectives

* Understand what Ray is.
* Recognize key characteristics of Ray.
* Tour the three layers of Ray.
    * Ray Core
    * Native libraries
    * Ecosystem of integrations
* Explore the most common Ray use cases.
* Implement a regression task.
    * Sequentially in generic Python
    * In parallel with Ray
* Identify where to go next with Ray.

## Hands-on code example - scaling regression with Ray Core

### Introduction

To gain a better feel for Ray, this section will scale a bare bones version of a common ML task: regression on structured data.

#### Data

You will be performing regression on the [California House Prices](https://scikit-learn.org/stable/datasets/real_world.html#california-housing-dataset) dataset made available by scikit-learn.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/California_dataset.png" width="80%" loading="lazy">|
|:--|
|`n_samples = 20640`, target is numeric and corresponds to the average house value in units of 100k.|

#### Model and task

You will train and score [random forest](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html) models using [mean squared error](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.mean_squared_error.html) as the metric.

In a lightweight version of hyperparameter tuning, you will be training many models with varying values of `n_estimators`. First, you will encounter a sequential version of model training where each experiment executes in series one after another. Then, you will distribute these training runs with Ray Core to achieve better performance and faster model training.

### Sequential implementation

Starting with a familiar implementation, an assortment of random forest models are trained one by one sequentially as depicted in the diagram below.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/sequential_timeline.png" width="70%" loading="lazy">|
|:--|
|Timeline of sequential tasks all on one worker. Each "task" in this case is training a random forest model.|

#### Preliminaries

In [None]:
import time
from operator import itemgetter

import pandas as pd
from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

#### Prepare dataset

In [None]:
X, y = fetch_california_housing(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=201
)

In [None]:
X.head(n=5)

#### Set number of models to train

In [None]:
# You will use NUM_MODELS as a benchmark to compare performance
# across sequential and parallel implementations.

NUM_MODELS = 20

#### Implement function to train and score model

In [None]:
def train_and_score_model(
    train_set: pd.DataFrame,
    test_set: pd.DataFrame,
    train_labels: pd.Series,
    test_labels: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()  # measure wall time for single model training

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set, train_labels)
    y_pred = model.predict(test_set)
    score = mean_squared_error(test_labels, y_pred)

    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )

    return n_estimators, score

This function takes data, creates a `RandomForestRegressor` model, trains it and scores the model on the test set.

`train_and_score_model` returns a tuple:
```
(n_estimators, mse_score)
```

For example:

```
(8, 0.2983)
```

#### Implement function that runs **sequential** model training

In [None]:
def run_sequential(n_models: int) -> list[tuple[int, float]]:
    return [
        train_and_score_model(
            train_set=X_train,
            test_set=X_test,
            train_labels=y_train,
            test_labels=y_test,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]

This function trains `n_models` sequentially for an increasing number of `n_estimators` (increasing by 4 for each model, e.g. 8, 12, 16, 20, ...). 

`run_sequential` returns a list of tuples:
```
[(n_estimators, mse_score), (n_estimators, mse_score), ...]
```

For example:

```
[(8, 0.2983), (12, 0.2826), (16, 0.2761), (24, 0.2694)]
```

#### Run sequential model training 

In [None]:
%%time

mse_scores = run_sequential(n_models=NUM_MODELS)

Note: wall time on an M1 MacBook Pro: 1min (60s).

#### Analyze results

In [None]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

Looking at the results of training, make a note on how long training `NUM_MODELS` sequentially took. Continue on to the next section to learn about how to improve runtime by distributing this task.

### Parallel implementation

In contrast to the previous approach, you will now utilize all available resources to train these models in parallel. Ray will automatically detect the number of cores on your computer or the amount of resources in a cluster to distribute each defined task.

The diagram below offers an intuition for how tasks are assigned and executed in a parallel approach. You will notice that this introduces a scheduler which is responsible for managing incoming requests, assigning nodes, and detecting available resources.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/distributed_timeline.png" width="80%" loading="lazy">|
|:--|
|A generic timeline with ten tasks running across 4 workers in parallel, with minor overhead from the scheduler.|

#### Initialize Ray runtime

In [None]:
import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()

Begin by running `ray.init()` to start a fresh Ray cluster and take a look at some useful information:

* Python version
* Ray version
* Link to Ray Dashboard: an observability tool that provides insight into what Ray is doing via helpful metrics and charts

#### Put data in the object store

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/object_store.png" width="70%" loading="lazy">|
|:--|
|Workers use `ray.put()` to place objects and use `ray.get()` to retrieve them from each node's object store. These object stores form the shared distributed memory that makes objects available across a Ray cluster.|

In a distributed system, object references are pointers to objects in memory. Object references can be used to access objects that are stored on different machines, allowing them to communicate with each other and share data.

In [None]:
X_train_ref = ray.put(X_train)
X_test_ref = ray.put(X_test)
y_train_ref = ray.put(y_train)
y_test_ref = ray.put(y_test)

By placing the training and testing data into Ray's object store, these objects are now available to all remote tasks and actors in the cluster.

**Coding Exercise**

To practice working with object references, use the cell below to:

1. Print what `X_train_ref` looks like.
2. Retrieve `X_train` by using `ray.get()` on the object reference.

An example Object Reference looks like this:

`ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000002000000)`

In [None]:
### YOUR CODE HERE ###

**Solution**

In [None]:
### SAMPLE IMPLEMENTATION ###

# print the object reference
print(X_train_ref)

# inspect the in-memory object
ray.get(X_train_ref)

#### Implement function to train and score model

In [None]:
@ray.remote
def train_and_score_model(
    train_set_ref: pd.DataFrame,
    test_set_ref: pd.DataFrame,
    train_labels_ref: pd.Series,
    test_labels_ref: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()  # measure wall time for single model training

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set_ref, train_labels_ref)
    y_pred = model.predict(test_set_ref)
    score = mean_squared_error(test_labels_ref, y_pred)

    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )

    return n_estimators, score

Notice that `train_and_score_model` is *the same function* as in the sequential example, except here, you add the `@ray.remote` decorator to specify that this function will be executed in a distributed manner.

#### Implement function that runs **parallel** model training

In [None]:
def run_parallel(n_models: int) -> list[tuple[int, float]]:
    results_ref = [
        train_and_score_model.remote(
            train_set_ref=X_train_ref,
            test_set_ref=X_test_ref,
            train_labels_ref=y_train_ref,
            test_labels_ref=y_test_ref,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]
    return ray.get(results_ref)

Before, you defined `run_sequential()` to train and score `NUM_MODELS`. Working from the inside-out, modifying this into `run_parallel()` involves three steps:

1. Append a `.remote` postfix to `train_and_score_model`.  
    * Remember that you specified this function as a remote task in the previous cell. In Ray, you append this suffix to every remote call.
2. Capture the resulting list of object references in `results_ref`.
    * Rather than waiting for the results, you immediately receive a list of references to results that are expected to be available in the future. This asychronous (non-blocking) call allows a program to continue executing other operations while the potentially time-consuming operations can be computed in the background.
3. Access results with `ray.get()`.
    * Once all models have been assigned to workers, call `ray.get()` on the list of object references `results_ref` to retrieve completed results. This is a synchronous (blocking) operation because it waits until all computation on objects complete.

For example,

```
ray.get([ObjectRef, ObjectRef, ObjectRef, ...])
```

returns list of `(n_estimators, score)` tuples.

#### Run parallel model training 

In [None]:
%%time

mse_scores = run_parallel(n_models=NUM_MODELS)

Notice **6x performance gain**:

* Parallel: 10s
* Sequential: 1min (60s)


*(experiment on the M1 MacBook Pro)*

#### Analyze results

In [None]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

Training has completed with a **6x performance gain** due to parallel execution.

#### Shutdown Ray runtime

In [None]:
ray.shutdown()

Disconnect the worker and terminate processes started by `ray.init()`.

### Summary of Part 2: code example

You achieved a significant performance gain by introducing parallel model training. You adapted a sequential model training computational job to run in parallel by using the Ray Core API.

With Ray, you parallelized training without having to implement the orchestration, fault tolerance or autoscaling component that requires specialized knowledge of distributed systems.

#### Key concepts

1. [**Tasks**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#tasks). Remote, stateless Python functions
1. [**Actors**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). Remote, stateful Python classes
1. [**Objects**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#objects). Tasks and actors create and compute on objects that can be stored and accessed anywhere in the cluster; cached in Ray's distributed [shared-memory](https://en.wikipedia.org/wiki/Shared_memory) object store

#### Key API elements

* **`ray.init()`**  
Start Ray runtime and connect to the Ray cluster.
* **`@ray.remote`**  
Decorator that specifies a Python function or class to be executed as a task (remote function) or actor (remote class) in a different process.
* **`.remote`**  
Postfix to the remote functions and classes; remote operations are *asynchronous*.
* **`ray.put()`**  
Put an object in the in-memory object store; returns an object reference used to pass the object to any remote function or method call.
* **`ray.get()`**  
Get a remote object(s) from the object store by specifying the object reference(s).

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/side_by_side.png" width="100%" loading="lazy">|
|:--|
|Comparison of sample workflow with minimal code changes needed to distribute tasks on Ray.|

# Connect with the Ray community

You can learn and get more involved with the Ray community of developers and researchers:

* [**Ray documentation**](https://docs.ray.io/en/latest)

* [**Official Ray site**](https://www.ray.io/)  
Browse the ecosystem and use this site as a hub to get the information that you need to get going and building with Ray.

* [**Join the community on Slack**](https://forms.gle/9TSdDYUgxYs8SA9e8)  
Find friends to discuss your new learnings in our Slack space.

* [**Use the discussion board**](https://discuss.ray.io/)  
Ask questions, follow topics, and view announcements on this community forum.

* [**Join a meetup group**](https://www.meetup.com/Bay-Area-Ray-Meetup/)  
Tune in on meet-ups to listen to compelling talks, get to know other users, and meet the team behind Ray.

* [**Open an issue**](https://github.com/ray-project/ray/issues/new/choose)  
Ray is constantly evolving to improve developer experience. Submit feature requests, bug-reports, and get help via GitHub issues.

* [**Become a Ray contributor**](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html)  
We welcome community contributions to improve our documentation and Ray framework.

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Generic/ray_logo.png" width="20%" loading="lazy">