(ray/00-raycore)=
# Ray Core

![Status](https://img.shields.io/static/v1.svg?label=Status&message=Finished&color=brightgreen)
[![Source](https://img.shields.io/static/v1.svg?label=GitHub&message=Source&color=181717&logo=GitHub)](https://github.com/particle1331/ok-transformer/blob/master/docs/nb/ray/00-raycore.ipynb)
[![Stars](https://img.shields.io/github/stars/particle1331/ok-transformer?style=social)](https://github.com/particle1331/ok-transformer)

---

**Readings:**  {cite}`timviera`

## Introduction

Distributed computing is becoming increasingly relevant for modern machine learning systems. OpenAI's recent paper AI and Compute suggests that the amount of compute needed to train AI models has roughly doubled every 3.5 months since 2012. Meanwhile, Moore's law has only seen increase in CPU petaflops 2x every 18 months. However, distributed systems are hard to program. Scaling a Python application to a cluster introduces challenges in communication, scheduling, security, failure handling, heterogeneity, transparency, and much more.

Ray provides the compute layer to scale applications without becoming a distributed systems expert. These are some key processes that Ray automatically handles:

- Orchestration. Managing the various components of a distributed system.
- Scheduling. Coordinating when and where tasks are executed.
- Fault tolerance. Ensuring tasks complete regardless of inevitable points of failure.
- Auto-scaling. Adjusting the number of resources allocated to dynamic demand.

To lower the effort needed to scale compute intensive workloads, Ray takes a Python-first approach and integrates with many common data science tools. This allows ML practitioners to parallelize Python applications from a laptop to a cluster with minimal code changes.



```{figure} img/00-petaflops-law.png
---
name: 00-petaflops-law
width: 80%
---
A petaflop/s-day (pfs-day) consists of performing 1015 neural net operations per second for one day. The compute-time product serves as a mental convenience, similar to kW-hr for energy. Doubling time for line of best fit shown is 3.4 months. In contrast, by Moore's law, doubling time for CPU compute happens every 18 months. Source: [OpenAI](https://openai.com/research/ai-and-compute)
```

```{figure} img/00-ray-characteristics.png
---
name: 00-ray-characteristics
width: 100%
---
```

## Ray project

**Python first.** Ray allows you to flexibly compose distributed applications with easy to use primitives in native Python code. This way, you can scale your existing workloads with minimal code changes. Getting started with Ray Core involves just a few key abstractions:

| Abstraction | Description |
| -------- | ------- |
| Tasks | remote, stateless python functions | 
| Actors | remote, stateful python classes | 
| Objects | Tasks and actors create and compute on objects that can be stored and accessed anywhere in the cluster; cached in Ray's distributed shared-memory object store. |

Users can specify their resource requirements for tasks and actors in terms of CPUs, GPUs, and custom resources which are used by the cluster scheduler to distribute tasks for parallelized execution. In Ray, tasks and actors create and compute on objects. These remote objects can be stored anywhere in a Ray cluster.

**Ray core.** Acting as the foundational library for the whole ecosystem, Ray Core provides a minimalist API that enables distributed computing. With just a few methods, you can start building distributed apps:

| Method | Description |
| -------- | ------- |
| `ray.init()`  | Start Ray runtime and connect to the Ray cluster.    |
| `@ray.remote` | Decorator that specifies a Python function or class to be executed as a task (remote function) or actor (remote class) in a different process.     |
| `.remote` | Postfix to the remote functions and classes; remote operations are asynchronous. |
| `ray.put()` | Put an object in the in-memory object store; returns an object reference used to pass the object to any remote function or method call. |
| `ray.get()` | Get a remote object(s) from the object store by specifying the object reference(s). |


Besides the core library, Ray also provides an ecosystem of libraries for ML and AI. 

| Method | Description |
| -------- | ------- |
| Ray Data | Ray Data is a scalable data processing library for ML workloads. It provides flexible and performant APIs for scaling Offline batch inference and Data preprocessing and ingest for ML training. Ray Data uses streaming execution to efficiently process large datasets. |
| Ray Train | Ray Train is a scalable machine learning library for distributed training and fine-tuning. Ray Train allows you to scale model training code from a single machine to a cluster of machines in the cloud, and abstracts away the complexities of distributed computing. Whether you have large models or large datasets, Ray Train is the simplest solution for distributed training. |
| Ray Tune | Tune is a Python library for experiment execution and hyperparameter tuning at any scale. You can tune your favorite machine learning framework (PyTorch, XGBoost, Scikit-Learn, TensorFlow and Keras, and more) by running state of the art algorithms such as Population Based Training (PBT) and HyperBand/ASHA. Tune further integrates with a wide range of additional hyperparameter optimization tools, including Ax, BayesOpt, BOHB, Dragonfly, FLAML, Hyperopt, Nevergrad, Optuna and SigOpt. | 
| Ray Serve | Ray Serve is a scalable model serving library for building online inference APIs. Serve is framework-agnostic, so you can use a single toolkit to serve everything from deep learning models built with frameworks like PyTorch, TensorFlow, and Keras, to Scikit-Learn models, to arbitrary Python business logic. It has several features and performance optimizations for serving Large Language Models such as response streaming, dynamic request batching, multi-node/multi-GPU serving, etc. |
| RLlib | RLlib is an open-source library for reinforcement learning (RL), offering support for production-level, highly distributed RL workloads while maintaining unified and simple APIs for a large variety of industry applications. Whether you would like to train your agents in a multi-agent setup, purely from offline (historic) datasets, or using externally connected simulators, RLlib offers a simple solution for each of your decision making needs. |

```{figure} img/00-ray-data.png
---
name: 00-ray-data
width: 100%
---
It provides flexible and performant APIs for scaling Offline batch inference and Data preprocessing and ingest for ML training. Ray Data uses streaming execution to efficiently process large datasets.
```

**Scalability.** Ray allows users to utilize large compute clusters in an easy, productive, and resource-efficient way.
Fundamentally, Ray treats the entire cluster as a single, unified pool of resources and takes care of optimally mapping compute workloads to the pool. By doing so, Ray largely eliminates non-scalable factors in the system.

A notable strength is Ray's autoscaler implements automatic scaling of Ray clusters based on the resource demands of an application. The autoscaler will increase worker nodes when the Ray workload exceeds the cluster's capacity. Whenever worker nodes sit idle, the autoscaler will scale them down. You can deploy a Ray cluster on AWS, GCP or on Kubernetes via the officially supported KubeRay project.


**Support for heterogeneous hardware.** Heterogeneous systems present new challenges to distribution because each compute unit has its own programming model. Ray natively supports heterogeneous hardware to achieve load balancing, coherency, and consistency under the hood. All you need to do is specify hardware when initializing a task or actor. For example, a developer can specify in the same application that a one task needs 128 CPUs, another task only requires 0.5 GPUs, and an actor requires 36 CPUs and 8 GPUs.

```{figure} img/00-parallel-model-training.png
---
name: 00-parallel-model-training
width: 100%
---

**Parallel training of many models**. Data parallelism pattern for distributed training on large datasets.
When any given model you want to train can fit on a single GPU, then Ray can assign each training run to a separate Ray Task. In this way, all available workers are utilized to run independent remote training rather than one worker running jobs sequentially.
```

```{figure} img/00-distributed-model-training.png
---
name: 00-distributed-model-training
width: 100%
---

**Distributed training of large models**. Model parallelism pattern for distributed large model training.
In contrast to training many models, model parallelism partitions a large model across many machines for training. Ray Train has built-in abstractions for distributing shards of models and running training in parallel.
```

```{figure} img/00-distributed-training.png
---
name: 00-distributed-training
width: 100%
---

**Managing parallel hyperparameter tuning experiments**. Distributed tuning with distributed training per trial.
 Running multiple hyperparameter tuning experiments is a pattern apt for distributed computing because each experiment is independent of one another. Ray Tune handles the hard bit of distributing hyperparameter optimization and makes available key features such as checkpointing the best result, optimizing scheduling, specifying search patterns, and more.
```

```{figure} img/00-batch-predictor.png
---
name: 00-batch-predictor
width: 100%
---

**Batch inference on CPUs and GPUs**. Performing inference on incoming batches of data can be parallelized by exporting the architecture and weights of a trained model to the shared object store. Then, using these model replicas, Ray scales predictions on batches across workers.
```

```{figure} img/00-distributed-rl.png
---
name: 00-distributed-rl
width: 100%
---

**Reinforcement learning**. Ray RLlib offers support for production-level, distributed reinforcement learning workloads while maintaining unified and simple APIs for a large variety of industry applications. Decentralized distributed proximal polixy optimiation (DD-PPO) architecture, supported by Ray RLLib, where sampling and training are done on worker GPUs.
```

```{figure} img/00-serve-model-composition.png
---
name: 00-serve-model-composition
width: 100%
---

**Multi-model composition for model serving**. Ray Serve supports complex model deployment patterns requiring the orchestration of multiple Ray actors, where different actors provide inference for different models. Serve handles both batch and online inference and can scale to thousands of models in production.
```

```{figure} img/00-ml-platform.png
---
name: 00-ml-platform
width: 100%
---

**ML platform**. A machine learning playform can be built on top of Ray as compute layer. [Merlin](https://shopify.engineering/merlin-shopify-machine-learning-platform) shown here is Shopify's ML platform built on Ray. It enables fast-iteration and scaling of distributed applications such as product categorization and recommendations.
```

## Scaling regression with Ray Core

To gain a better feel for Ray, this section will scale a bare bones version of a common ML task: regression on structured data.



**Data.** You will be performing regression on the California House Prices dataset made available by scikit-learn.

**Model and task.** You will train and score random forest models using mean squared error as the metric.
In a lightweight version of hyperparameter tuning, you will be training many models with varying values of `n_estimators`. First, you will encounter a sequential version of model training where each experiment executes in series one after another. Then, you will distribute these training runs with Ray Core to achieve better performance and faster model training.



### Sequential implementation

Starting with a familiar implementation, an assortment of random forest models are trained one by one sequentially as depicted in the diagram below.

```{figure} img/00-task-sequential.png
---
name: 00-task-sequential
width: 100%
---
Timeline of sequential tasks all on one worker. Each "task" in this case is training a random forest model.
```

#### Preliminaries

In [4]:
import time
from operator import itemgetter

import pandas as pd
from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

#### Prepare dataset

In [5]:
X, y = fetch_california_housing(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=201
)

In [8]:
print(X.shape)
X.head(n=5)

(20640, 8)


Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
0,8.3252,41.0,6.984127,1.02381,322.0,2.555556,37.88,-122.23
1,8.3014,21.0,6.238137,0.97188,2401.0,2.109842,37.86,-122.22
2,7.2574,52.0,8.288136,1.073446,496.0,2.80226,37.85,-122.24
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25


#### Set number of models to train

In [10]:
NUM_MODELS = 20

#### Implement function to train and score model

In [11]:
def train_and_score_model(
    train_set: pd.DataFrame,
    test_set: pd.DataFrame,
    train_labels: pd.Series,
    test_labels: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()  # measure wall time for single model training

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set, train_labels)
    y_pred = model.predict(test_set)
    score = mean_squared_error(test_labels, y_pred)

    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )

    return n_estimators, score

This function takes data, creates a RandomForestRegressor model, trains it and scores the model on the test set.

train_and_score_model returns a tuple:

`(n_estimators, mse_score)`

For example:

`(8, 0.2983)`


#### Implement function that runs sequential model training

In [13]:
def run_sequential(n_models: int) -> list[tuple[int, float]]:
    return [
        train_and_score_model(
            train_set=X_train,
            test_set=X_test,
            train_labels=y_train,
            test_labels=y_test,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]

This function trains n_models sequentially for an increasing number of n_estimators (increasing by 4 for each model, e.g. 8, 12, 16, 20, ...).

run_sequential returns a list of tuples:

[(n_estimators, mse_score), (n_estimators, mse_score), ...]
For example:

[(8, 0.2983), (12, 0.2826), (16, 0.2761), (24, 0.2694)]


#### Run sequential model training

In [14]:
%%time
mse_scores = run_sequential(n_models=NUM_MODELS)

n_estimators=8, mse=0.2983, took: 0.93 seconds
n_estimators=12, mse=0.2826, took: 1.27 seconds
n_estimators=16, mse=0.2761, took: 1.69 seconds
n_estimators=20, mse=0.2716, took: 2.09 seconds
n_estimators=24, mse=0.2694, took: 2.54 seconds
n_estimators=28, mse=0.2686, took: 2.94 seconds
n_estimators=32, mse=0.2662, took: 3.36 seconds
n_estimators=36, mse=0.2663, took: 3.84 seconds
n_estimators=40, mse=0.2635, took: 4.18 seconds
n_estimators=44, mse=0.2622, took: 4.62 seconds
n_estimators=48, mse=0.2616, took: 5.03 seconds
n_estimators=52, mse=0.2609, took: 5.47 seconds
n_estimators=56, mse=0.2615, took: 5.97 seconds
n_estimators=60, mse=0.2608, took: 6.33 seconds
n_estimators=64, mse=0.2614, took: 6.69 seconds
n_estimators=68, mse=0.2616, took: 7.16 seconds
n_estimators=72, mse=0.2617, took: 7.51 seconds
n_estimators=76, mse=0.2614, took: 7.91 seconds
n_estimators=80, mse=0.2607, took: 8.41 seconds
n_estimators=84, mse=0.2601, took: 8.90 seconds
CPU times: user 1min 35s, sys: 845 ms, to

#### Analyze results

In [15]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

Best model: mse=0.2601, n_estimators=84


### Parallel implementation

In contrast to the previous approach, you will now utilize all available resources to train these models in parallel. Ray will automatically detect the number of cores on your computer or the amount of resources in a cluster to distribute each defined task.

The diagram below offers an intuition for how tasks are assigned and executed in a parallel approach. You will notice that this introduces a scheduler which is responsible for managing incoming requests, assigning nodes, and detecting available resources.



```{figure} img/00-task-parallel.png
---
name: 00-task-parallel
width: 100%
---
Timeline of parallel tasks all on multiple workers. Each "task" in this case is training a random forest model.
```

#### Initialize Ray runtime

In [28]:
import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()

2023-12-09 23:24:34,207	INFO worker.py:1664 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.9.15
Ray version:,2.8.1
Dashboard:,http://127.0.0.1:8265


Begin by running `ray.init()` to start a fresh Ray cluster and take a look at some useful information:

* Python version
* Ray version
* Link to Ray Dashboard: an observability tool that provides insight into what Ray is doing via helpful metrics and charts

In a distributed system, object references are pointers to objects in memory. Object references can be used to access objects that are stored on different machines, allowing them to communicate with each other and share data.

```{figure} img/00-ray-workers.png
---
name: 00-ray-workers
width: 100%
---
Workers use `ray.put()` to place objects and use `ray.get()` to retrieve them from each node's object store. These object stores form the shared distributed memory that makes objects available across a Ray cluster.
```

In [32]:
X_train_ref = ray.put(X_train)
X_test_ref = ray.put(X_test)
y_train_ref = ray.put(y_train)
y_test_ref = ray.put(y_test)

In [33]:
X_train_ref

ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000005e1f505)

In [34]:
ray.get(X_train_ref).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
19262,3.875,20.0,5.838,1.016,1711.0,3.422,38.44,-122.73
4083,4.3036,36.0,5.06563,1.069085,1115.0,1.925734,34.15,-118.38
19500,3.1857,24.0,4.902494,1.040816,1468.0,3.328798,37.67,-121.03
9062,3.25,32.0,5.220884,0.939759,712.0,2.859438,34.69,-118.15
1196,2.6607,22.0,6.508021,1.197861,537.0,2.871658,39.34,-121.67


Implement function to train and score model

In [35]:
@ray.remote
def train_and_score_model(
    train_set_ref: pd.DataFrame,
    test_set_ref: pd.DataFrame,
    train_labels_ref: pd.Series,
    test_labels_ref: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()  # measure wall time for single model training

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set_ref, train_labels_ref)
    y_pred = model.predict(test_set_ref)
    score = mean_squared_error(test_labels_ref, y_pred)

    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )

    return n_estimators, score


Notice that train_and_score_model is the same function as in the sequential example, except here, you add the @ray.remote decorator to specify that this function will be executed in a distributed manner.



Implement function that runs parallel model training

In [36]:
def run_parallel(n_models: int) -> list[tuple[int, float]]:
    results_ref = [
        train_and_score_model.remote(
            train_set_ref=X_train_ref,
            test_set_ref=X_test_ref,
            train_labels_ref=y_train_ref,
            test_labels_ref=y_test_ref,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]
    return ray.get(results_ref)

Before, you defined `run_sequential()` to train and score `NUM_MODELS`. Working from the inside-out, modifying this into `run_parallel()` involves three steps:

1. Append a `.remote` postfix to `train_and_score_model`.  
    * Remember that you specified this function as a remote task in the previous cell. In Ray, you append this suffix to every remote call.
2. Capture the resulting list of object references in `results_ref`.
    * Rather than waiting for the results, you immediately receive a list of references to results that are expected to be available in the future. This asychronous (non-blocking) call allows a program to continue executing other operations while the potentially time-consuming operations can be computed in the background.
3. Access results with `ray.get()`.
    * Once all models have been assigned to workers, call `ray.get()` on the list of object references `results_ref` to retrieve completed results. This is a synchronous (blocking) operation because it waits until all computation on objects complete.

For example,

```
ray.get([ObjectRef, ObjectRef, ObjectRef, ...])
```

returns list of `(n_estimators, score)` tuples.

In [37]:
%%time
mse_scores = run_parallel(n_models=NUM_MODELS)

[36m(train_and_score_model pid=83699)[0m n_estimators=8, mse=0.2983, took: 1.44 seconds
[36m(train_and_score_model pid=83702)[0m n_estimators=36, mse=0.2663, took: 6.57 seconds[32m [repeated 7x across cluster][0m
[36m(train_and_score_model pid=83706)[0m n_estimators=48, mse=0.2616, took: 8.83 seconds[32m [repeated 3x across cluster][0m
[36m(train_and_score_model pid=83701)[0m n_estimators=64, mse=0.2614, took: 11.42 seconds[32m [repeated 4x across cluster][0m
[36m(train_and_score_model pid=83704)[0m n_estimators=76, mse=0.2614, took: 12.44 seconds[32m [repeated 3x across cluster][0m
CPU times: user 72.8 ms, sys: 85 ms, total: 158 ms
Wall time: 27.5 s


[36m(train_and_score_model pid=83705)[0m n_estimators=84, mse=0.2601, took: 12.33 seconds


Notice 6x performance gain:

Parallel: 10s
Sequential: 1min (60s)


USE GLUE here

In [26]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

Best model: mse=0.2601, n_estimators=84


[36m(train_and_score_model pid=82614)[0m n_estimators=84, mse=0.2601, took: 11.99 seconds


```{figure} img/00-ray-dashboard.png
---
name: 00-ray-dashboard
width: 100%
---
Workers http://127.0.0.1:8265/#/cluster
```

#### Shutdown cluster 

In [27]:
ray.shutdown()

---

■