### Preliminaries

In [1]:
import time
from operator import itemgetter

import pandas as pd
from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

### Prepare dataset


In [2]:
X, y = fetch_california_housing(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=201
)
X.head(n=5)

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
0,8.3252,41.0,6.984127,1.02381,322.0,2.555556,37.88,-122.23
1,8.3014,21.0,6.238137,0.97188,2401.0,2.109842,37.86,-122.22
2,7.2574,52.0,8.288136,1.073446,496.0,2.80226,37.85,-122.24
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25


In [3]:
# You will use NUM_MODELS as a benchmark to compare performance
# across sequential and parallel implementations.

NUM_MODELS = 20

In [4]:
def train_and_score_model(
    train_set: pd.DataFrame,
    test_set: pd.DataFrame,
    train_labels: pd.Series,
    test_labels: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()  # measure wall time for single model training

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set, train_labels)
    y_pred = model.predict(test_set)
    score = mean_squared_error(test_labels, y_pred)

    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )

    return n_estimators, score

In [5]:
def run_sequential(n_models: int) -> list[tuple[int, float]]:
    return [
        train_and_score_model(
            train_set=X_train,
            test_set=X_test,
            train_labels=y_train,
            test_labels=y_test,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]

This function trains n_models sequentially for an increasing number of n_estimators (increasing by 4 for each model, e.g. 8, 12, 16, 20, ...).

run_sequential returns a list of tuples:

[(n_estimators, mse_score), (n_estimators, mse_score), ...]
For example:

[(8, 0.2983), (12, 0.2826), (16, 0.2761), (24, 0.2694)]

In [6]:
%%time

mse_scores = run_sequential(n_models=NUM_MODELS)

n_estimators=8, mse=0.2983, took: 1.07 seconds
n_estimators=12, mse=0.2826, took: 1.58 seconds
n_estimators=16, mse=0.2761, took: 2.13 seconds
n_estimators=20, mse=0.2716, took: 2.96 seconds
n_estimators=24, mse=0.2694, took: 3.22 seconds
n_estimators=28, mse=0.2686, took: 3.79 seconds
n_estimators=32, mse=0.2662, took: 4.39 seconds
n_estimators=36, mse=0.2663, took: 4.68 seconds
n_estimators=40, mse=0.2635, took: 5.93 seconds
n_estimators=44, mse=0.2622, took: 7.33 seconds
n_estimators=48, mse=0.2616, took: 6.31 seconds
n_estimators=52, mse=0.2609, took: 6.85 seconds
n_estimators=56, mse=0.2615, took: 8.01 seconds
n_estimators=60, mse=0.2608, took: 8.46 seconds
n_estimators=64, mse=0.2614, took: 9.32 seconds
n_estimators=68, mse=0.2616, took: 10.27 seconds
n_estimators=72, mse=0.2617, took: 10.41 seconds
n_estimators=76, mse=0.2614, took: 10.00 seconds
n_estimators=80, mse=0.2607, took: 10.62 seconds
n_estimators=84, mse=0.2601, took: 11.18 seconds
CPU times: user 2min 6s, sys: 1.99 s

In [7]:
### Analyze results

In [8]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

Best model: mse=0.2601, n_estimators=84


Looking at the results of training, make a note on how long training NUM_MODELS sequentially took. Continue on to the next section to learn about how to improve runtime by distributing this task.

### Parallel implementation
In contrast to the previous approach, you will now utilize all available resources to train these models in parallel. Ray will automatically detect the number of cores on your computer or the amount of resources in a cluster to distribute each defined task.

The diagram below offers an intuition for how tasks are assigned and executed in a parallel approach. You will notice that this introduces a scheduler which is responsible for managing incoming requests, assigning nodes, and detecting available resources.

In [9]:
!pip install ray

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [10]:
import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()

2024-11-13 11:38:42,062	INFO util.py:154 -- Outdated packages:
  ipywidgets==7.8.4 found, needs ipywidgets>=8
Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2024-11-13 11:38:44,316	INFO worker.py:1816 -- Started a local Ray instance.


0,1
Python version:,3.10.12
Ray version:,2.38.0


Begin by running ray.init() to start a fresh Ray cluster and take a look at some useful information:

Python version
Ray version
Link to Ray Dashboard: an observability tool that provides insight into what Ray is doing via helpful metrics and charts

Workers use ray.put() to place objects and use ray.get() to retrieve them from each node's object store. These object stores form the shared distributed memory that makes objects available across a Ray cluster.

In a distributed system, object references are pointers to objects in memory. Object references can be used to access objects that are stored on different machines, allowing them to communicate with each other and share data.

In [11]:
X_train_ref = ray.put(X_train)
X_test_ref = ray.put(X_test)
y_train_ref = ray.put(y_train)
y_test_ref = ray.put(y_test)

By placing the training and testing data into Ray's object store, these objects are now available to all remote tasks and actors in the cluster.

Coding Exercise

To practice working with object references, use the cell below to:

Print what X_train_ref looks like.
Retrieve X_train by using ray.get() on the object reference.
An example Object Reference looks like this:

In [12]:
### SAMPLE IMPLEMENTATION ###

# print the object reference
print(X_train_ref)

# inspect the in-memory object
ray.get(X_train_ref)

ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505)


Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
19262,3.8750,20.0,5.838000,1.016000,1711.0,3.422000,38.44,-122.73
4083,4.3036,36.0,5.065630,1.069085,1115.0,1.925734,34.15,-118.38
19500,3.1857,24.0,4.902494,1.040816,1468.0,3.328798,37.67,-121.03
9062,3.2500,32.0,5.220884,0.939759,712.0,2.859438,34.69,-118.15
1196,2.6607,22.0,6.508021,1.197861,537.0,2.871658,39.34,-121.67
...,...,...,...,...,...,...,...,...
9082,4.1667,12.0,6.741713,1.109116,2188.0,3.022099,34.69,-118.17
1891,3.3295,17.0,6.144444,1.150000,466.0,2.588889,38.92,-120.00
501,2.2841,52.0,4.350649,1.077922,152.0,1.974026,37.85,-122.27
7680,4.0774,35.0,5.225389,0.992228,1388.0,3.595855,33.92,-118.10


### Implement function to train and score model

In [13]:
@ray.remote
def train_and_score_model(
    train_set_ref: pd.DataFrame,
    test_set_ref: pd.DataFrame,
    train_labels_ref: pd.Series,
    test_labels_ref: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()  # measure wall time for single model training

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set_ref, train_labels_ref)
    y_pred = model.predict(test_set_ref)
    score = mean_squared_error(test_labels_ref, y_pred)

    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )

    return n_estimators, score

Shutdown Ray runtime
ray.shutdown()Notice that train_and_score_model is the same function as in the sequential example, except here, you add the @ray.remote decorator to specify that this function will be executed in a distributed manner.

Implement function that runs parallel model training

In [14]:
def run_parallel(n_models: int) -> list[tuple[int, float]]:
    results_ref = [
        train_and_score_model.remote(
            train_set_ref=X_train_ref,
            test_set_ref=X_test_ref,
            train_labels_ref=y_train_ref,
            test_labels_ref=y_test_ref,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]
    return ray.get(results_ref)

Before, you defined run_sequential() to train and score NUM_MODELS. Working from the inside-out, modifying this into run_parallel() involves three steps:

1. Append a .remote postfix to train_and_score_model.
* Remember that you specified this function as a remote task in the previous cell. In Ray, you append this suffix to every remote call.
2. Capture the resulting list of object references in results_ref.
* Rather than waiting for the results, you immediately receive a list of references to results that are expected to be available in the future. This asychronous (non-blocking) call allows a program to continue executing other operations while the potentially time-consuming operations can be computed in the background.
3. Access results with ray.get().
* Once all models have been assigned to workers, call ray.get() on the list of object references results_ref to retrieve completed results. This is a synchronous (blocking) operation because it waits until all computation on objects complete.

# For example,

ray.get([ObjectRef, ObjectRef, ObjectRef, ...])
returns list of (n_estimators, score) tuples.

### Run parallel model training

In [15]:
%%time

mse_scores = run_parallel(n_models=NUM_MODELS)

[36m(train_and_score_model pid=10685)[0m n_estimators=8, mse=0.2983, took: 1.65 seconds
[36m(train_and_score_model pid=10684)[0m n_estimators=28, mse=0.2686, took: 6.91 seconds[32m [repeated 5x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)[0m
[36m(train_and_score_model pid=10685)[0m n_estimators=40, mse=0.2635, took: 10.26 seconds[32m [repeated 3x across cluster][0m
[36m(train_and_score_model pid=10691)[0m n_estimators=52, mse=0.2609, took: 12.77 seconds[32m [repeated 3x across cluster][0m
[36m(train_and_score_model pid=10688)[0m n_estimators=68, mse=0.2616, took: 14.86 seconds[32m [repeated 2x across cluster][0m
[36m(train_and_score_model pid=10686)[0m n_estimators=76, mse=0.2614, took: 15.33 seconds[32m [repeated 4x across cluster][0m
[36m(train_and_score_model pid=10691)[0m n_est

CPU times: user 102 ms, sys: 38.7 ms, total: 141 ms
Wall time: 32.7 s


### Notice 6x performance gain:

Parallel: 10s
Sequential: 1min (60s)
(experiment on the M1 MacBook Pro)

Analyze results

In [16]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

Best model: mse=0.2601, n_estimators=84


!pip install py-spy
!sudo chown root:root `which py-spy`
!sudo chmod u+s `which py-spy

In [17]:
# Shutdown Ray runtime
ray.shutdown()