In [1]:
# tag::ds_create[]
import ray

# Create a dataset containing integers in the range [0, 10000).
ds = ray.data.range(10000)

# Basic operations: show the size of the dataset, get a few samples, print the schema.
print(ds.count())  # -> 10000
print(ds.take(5))  # -> [0, 1, 2, 3, 4]
print(ds.schema())  # -> <class 'int'>
# end::ds_create[]

2022-10-05 03:18:22,730	INFO worker.py:1518 -- Started a local Ray instance.


10000
[0, 1, 2, 3, 4]
<class 'int'>


In [2]:
# tag::ds_read_write[]
# Save the dataset to a local file and load it back.
ray.data.range(10000).write_csv("local_dir")
ds = ray.data.read_csv("local_dir")
print(ds.count())
# end::ds_read_write[]

Read progress: 100%|██████████| 25/25 [00:01<00:00, 20.24it/s]
Write Progress: 100%|██████████| 25/25 [00:00<00:00, 502.48it/s]
Read progress: 100%|██████████| 24/24 [00:01<00:00, 21.22it/s]

10000





In [3]:
ds

Dataset(num_blocks=24, num_rows=10000, schema={value: int64})

In [6]:
ds.take(3)

[ArrowRow({'value': 0}), ArrowRow({'value': 1}), ArrowRow({'value': 2})]

In [8]:
ds.get_internal_block_refs()

[ObjectRef(91581beb08e6c9deffffffffffffffffffffffff0100000001000000),
 ObjectRef(ae46b8beecd25f3affffffffffffffffffffffff0100000001000000),
 ObjectRef(aa3d5d11e415fe88ffffffffffffffffffffffff0100000001000000),
 ObjectRef(a6d6d59239756144ffffffffffffffffffffffff0100000001000000),
 ObjectRef(c7528efcb2fd36edffffffffffffffffffffffff0100000001000000),
 ObjectRef(6efb86ef2d286c40ffffffffffffffffffffffff0100000001000000),
 ObjectRef(89af82725933373effffffffffffffffffffffff0100000001000000),
 ObjectRef(5168ff79929289e3ffffffffffffffffffffffff0100000001000000),
 ObjectRef(3e43f22e6ab31cdcffffffffffffffffffffffff0100000001000000),
 ObjectRef(594c3bb38e594811ffffffffffffffffffffffff0100000001000000),
 ObjectRef(64ac0404a8f0916fffffffffffffffffffffffff0100000001000000),
 ObjectRef(cf9aed5eec5a308bffffffffffffffffffffffff0100000001000000),
 ObjectRef(4f4ef6205ce35f90ffffffffffffffffffffffff0100000001000000),
 ObjectRef(17ed96eaf1aa4b2affffffffffffffffffffffff0100000001000000),
 ObjectRef(28c737615

In [11]:
for i in ds.iter_batches():
    print(i)

     value
0        0
1        1
2        2
3        3
4        4
..     ...
251    251
252    252
253    253
254    254
255    255

[256 rows x 1 columns]
     value
0      256
1      257
2      258
3      259
4      260
..     ...
251    507
252    508
253    509
254    510
255    511

[256 rows x 1 columns]
     value
0      512
1      513
2      514
3      515
4      516
..     ...
251    763
252    764
253    765
254    766
255    767

[256 rows x 1 columns]
     value
0      768
1      769
2      770
3      771
4      772
..     ...
251   1019
252   1020
253   1021
254   1022
255   1023

[256 rows x 1 columns]
     value
0     1024
1     1025
2     1026
3     1027
4     1028
..     ...
251   1275
252   1276
253   1277
254   1278
255   1279

[256 rows x 1 columns]
     value
0     1280
1     1281
2     1282
3     1283
4     1284
..     ...
251   1531
252   1532
253   1533
254   1534
255   1535

[256 rows x 1 columns]
     value
0     1536
1     1537
2     1538
3     1539
4     154

In [16]:
# tag::ds_transform[]
# Basic transformations: join two datasets, filter, and sort.
ds1 = ray.data.range(10000)
ds2 = ray.data.range(10000)
ds3 = ds1.union(ds2)
print(ds3.count())  # -> 20000

# Filter the combined dataset to only the even elements.
ds3 = ds3.filter(lambda x: x % 2 == 0)
print(ds3.count())  # -> 10000
print(ds3.take(5))  # -> [0, 2, 4, 6, 8]

# Sort the filtered dataset.
ds3 = ds3.sort()
print(ds3.take(5))  # -> [0, 0, 2, 2, 4]
# end::ds_transform[]




20000


Read->Filter: 100%|██████████| 50/50 [00:01<00:00, 25.20it/s]


10000
[0, 2, 4, 6, 8]


Sort Sample: 100%|██████████| 50/50 [00:00<00:00, 106.53it/s]
Shuffle Map: 100%|██████████| 50/50 [00:00<00:00, 550.34it/s]
Shuffle Reduce: 100%|██████████| 50/50 [00:00<00:00, 509.24it/s]

[0, 0, 2, 2, 4]





In [20]:
# tag::ds_repartition[]
ds1 = ray.data.range(10000)
print(ds1.num_blocks())  # -> 25 - 200
ds2 = ray.data.range(10000) 
print(ds2.num_blocks())  # -> 25 - 200
ds3 = ds1.union(ds2)
print(ds3.num_blocks())  # -> 50 - 400

print(ds3.repartition(200).num_blocks())  # -> 200
# end::ds_repartition[]

25
25
50


Read:  70%|███████   | 35/50 [00:00<00:00, 344.88it/s][2m[36m(_map_block_nosplit pid=9957)[0m E1005 03:40:49.219019433   10012 chttp2_transport.cc:1103]   Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
Read: 100%|██████████| 50/50 [00:02<00:00, 23.32it/s] 
Repartition: 100%|██████████| 200/200 [00:00<00:00, 1093.16it/s]

200





In [22]:
ds3

Dataset(num_blocks=50, num_rows=20000, schema=<class 'int'>)

In [27]:
# tag::ds_schema_1[]
ds = ray.data.from_items([{"id": "abc", "value": 1}, {"id": "def", "value": 2}])
print(ds.schema())  # -> id: string, value: int64
# end::ds_schema_1[]

id: string
value: int64


In [28]:
# tag::ds_schema_2[]
pandas_df = ds.to_pandas()  # pandas_df will inherit the schema from our Dataset.
# end::ds_schema_2[]

In [29]:
# tag::ds_compute_1[]
ds = ray.data.range(10000).map(lambda x: x ** 2)
ds.take(5)  # -> [0, 1, 4, 9, 16]
# end::ds_compute_1[]

Read->Map:   0%|          | 0/25 [00:00<?, ?it/s][2m[36m(reduce pid=11424)[0m E1005 03:44:48.104876349   11458 chttp2_transport.cc:1103]   Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
Read->Map: 100%|██████████| 25/25 [00:02<00:00, 10.91it/s]


[0, 1, 4, 9, 16]

In [30]:
# tag::ds_compute_2[]
import numpy as np


ds = ray.data.range(10000).map_batches(lambda batch: np.square(batch).tolist())
ds.take(5)  # -> [0, 1, 4, 9, 16]
# end::ds_compute_2[]

Read->Map_Batches: 100%|██████████| 25/25 [00:00<00:00, 52.29it/s]


[0, 1, 4, 9, 16]

In [31]:
# tag::ds_compute_3[]
def load_model():
    # Returns a dummy model for this example.
    # In reality, this would likely load some model weights onto a GPU.
    class DummyModel:
        def __call__(self, batch):
            return batch

    return DummyModel()


class MLModel:
    def __init__(self):
        # load_model() will only run once per actor that's started.
        self._model = load_model()

    def __call__(self, batch):
        return self._model(batch)


ds.map_batches(MLModel, compute="actors")
# end::ds_compute_3[]


cpu_intensive_preprocessing = lambda batch: batch
gpu_intensive_inference = lambda batch: batch

Map Progress (1 actors 1 pending): 100%|██████████| 25/25 [00:01<00:00, 13.67it/s]


In [None]:
# tag::parallel_sgd_1[]
from sklearn import datasets
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split

@ray.remote
class TrainingWorker:
    def __init__(self, alpha: float):
        self._model = SGDClassifier(alpha=alpha)

    def train(self, train_shard: ray.data.Dataset):
        for i, epoch in enumerate(train_shard.iter_epochs()):
            X, Y = zip(*list(epoch.iter_rows()))
            self._model.partial_fit(X, Y, classes=[0, 1])

        return self._model

    def test(self, X_test: np.ndarray, Y_test: np.ndarray):
        return self._model.score(X_test, Y_test)
# end::parallel_sgd_1[]

In [None]:
# tag::parallel_sgd_2[]
ALPHA_VALS = [0.00008, 0.00009, 0.0001, 0.00011, 0.00012] 

print(f"Starting {len(ALPHA_VALS)} training workers.")
workers = [TrainingWorker.remote(alpha) for alpha in ALPHA_VALS]
# end::parallel_sgd_2[]

In [None]:
# tag::parallel_sgd_3[]
# Generate training & validation data for a classification problem.
X_train, X_test, Y_train, Y_test = train_test_split(*datasets.make_classification())

# Create a dataset pipeline out of the training data. The data will be randomly
# shuffled and split across the workers for 10 iterations.
train_ds = ray.data.from_items(list(zip(X_train, Y_train)))
shards = train_ds.repeat(10)\
                 .random_shuffle_each_window()\
                 .split(len(workers), locality_hints=workers)

# Wait for training to complete on all of the workers.
ray.get([worker.train.remote(shard) for worker, shard in zip(workers, shards)])
# end::parallel_sgd_3[]

In [None]:
# tag::parallel_sgd_5[]
# Get validation results from each worker.
print(ray.get([worker.test.remote(X_test, Y_test) for worker in workers]))
# end::parallel_sgd_5[]

In [None]:
# tag::dask_on_ray_3[]
import ray
ds = ray.data.range(10000)

# Convert the Dataset to a Dask DataFrame.
df = ds.to_dask()
print(df.std().compute())  # -> 2886.89568

# Convert the Dask DataFrame back to a Dataset.
ds = ray.data.from_dask(df)
print(ds.std())  # -> 2886.89568
# end::dask_on_ray_3[]

In [None]:
# tag::ml_pipeline_preprocess[]
import ray
from ray.util.dask import enable_dask_on_ray

import dask.dataframe as dd

LABEL_COLUMN = "is_big_tip"
FEATURE_COLUMNS = ["passenger_count", "trip_distance", "fare_amount",
                   "trip_duration", "hour", "day_of_week"]

enable_dask_on_ray()


def load_dataset(path: str, *, include_label=True):
    # Load the data and drop unused columns.
    columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "tip_amount",
               "passenger_count", "trip_distance", "fare_amount"]
    df = dd.read_parquet(path, columns=columns)

    # Basic cleaning, drop nulls and outliers.
    df = df.dropna()
    df = df[(df["passenger_count"] <= 4) &
            (df["trip_distance"] < 100) &
            (df["fare_amount"] < 1000)]

    # Convert datetime strings to datetime objects.
    df["tpep_pickup_datetime"] = dd.to_datetime(df["tpep_pickup_datetime"])
    df["tpep_dropoff_datetime"] = dd.to_datetime(df["tpep_dropoff_datetime"])

    # Add three new features: trip duration, hour the trip started,
    # and day of the week.
    df["trip_duration"] = (df["tpep_dropoff_datetime"] -
                           df["tpep_pickup_datetime"]).dt.seconds
    df = df[df["trip_duration"] < 4 * 60 * 60] # 4 hours.
    df["hour"] = df["tpep_pickup_datetime"].dt.hour
    df["day_of_week"] = df["tpep_pickup_datetime"].dt.weekday

    if include_label:
        # Calculate label column: if tip was more or less than 20% of the fare.
        df[LABEL_COLUMN] = df["tip_amount"] > 0.2 * df["fare_amount"]

    # Drop unused columns.
    df = df.drop(
        columns=["tpep_pickup_datetime", "tpep_dropoff_datetime", "tip_amount"]
    )

    return ray.data.from_dask(df).repartition(100)
# end::ml_pipeline_preprocess[]

In [None]:
# tag::ml_pipeline_train_1[]
import torch
import torch.nn as nn

from ray.air import session
from ray.air.config import ScalingConfig
import ray.train as train
from ray.train.torch import TorchCheckpoint, TorchTrainer

from fare_predictor import FarePredictor


def train_loop_per_worker(config: dict):
    batch_size = config.get("batch_size", 32)
    lr = config.get("lr", 1e-2)
    num_epochs = config.get("num_epochs", 3)

    dataset_shard = train.get_dataset_shard("train")
    model = train.torch.prepare_model(FarePredictor())

    loss_fn = nn.SmoothL1Loss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    model = train.torch.prepare_model(model)

    for epoch in range(num_epochs):
        loss = 0
        num_batches = 0
        for batch in dataset_shard.iter_torch_batches(
            batch_size=batch_size, dtypes=torch.float
        ):
            labels = torch.unsqueeze(batch[LABEL_COLUMN], dim=1)
            inputs = torch.cat(
                [torch.unsqueeze(batch[f], dim=1) for f in FEATURE_COLUMNS], dim=1
            )
            output = model(inputs)
            batch_loss = loss_fn(output, labels)
            optimizer.zero_grad()
            batch_loss.backward()
            optimizer.step()

            num_batches += 1
            loss += batch_loss.item()

        # loss /= num_batches
        session.report(
            {"epoch": epoch, "loss": loss},
            checkpoint=TorchCheckpoint.from_model(model)
        )
# end::ml_pipeline_train_1[]

In [None]:
# tag::ml_pipeline_train_2[]
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={"lr": 1e-2, "num_epochs": 3, "batch_size": 64},
    scaling_config=ScalingConfig(num_workers=4),
    datasets={"train": load_dataset("nyc_tlc_data/yellow_tripdata_2020-01.parquet")},
)

result = trainer.fit()
trained_model = result.checkpoint
# end::ml_pipeline_train_2[]

In [None]:
# tag::ml_pipeline_inference[]
from ray.train.torch import TorchPredictor
from ray.train.batch_predictor import BatchPredictor

batch_predictor = BatchPredictor(trained_model, TorchPredictor)
ds = load_dataset(
    "nyc_tlc_data/yellow_tripdata_2021-01.parquet", include_label=False)

batch_predictor.predict_pipelined(ds, blocks_per_window=10)
# end::ml_pipeline_inference[]