In [None]:
# tag::ds_create[]
import ray

# Create a dataset containing integers in the range [0, 10000).
ds = ray.data.range(10000)

# Basic operations: show the size of the dataset, get a few samples, print the schema.
print(ds.count())  # -> 10000
print(ds.take(5))  # -> [0, 1, 2, 3, 4]
print(ds.schema())  # -> <class 'int'>
# end::ds_create[]

In [None]:
# tag::ds_read_write[]
# Save the dataset to a local file and load it back.
ray.data.range(10000).write_csv("local_dir")
ds = ray.data.read_csv("local_dir")
print(ds.count())
# end::ds_read_write[]

In [None]:
# tag::ds_transform[]
# Basic transformations: join two datasets, filter, and sort.
ds1 = ray.data.range(10000)
ds2 = ray.data.range(10000)
ds3 = ds1.union(ds2)
print(ds3.count())  # -> 20000

# Filter the combined dataset to only the even elements.
ds3 = ds3.filter(lambda x: x % 2 == 0)
print(ds3.count())  # -> 10000
print(ds3.take(5))  # -> [0, 2, 4, 6, 8]

# Sort the filtered dataset.
ds3 = ds3.sort()
print(ds3.take(5))  # -> [0, 0, 2, 2, 4]
# end::ds_transform[]

In [None]:
# tag::ds_repartition[]
ds1 = ray.data.range(10000)
print(ds1.num_blocks())  # -> 200
ds2 = ray.data.range(10000)
print(ds2.num_blocks())  # -> 200
ds3 = ds1.union(ds2)
print(ds3.num_blocks())  # -> 400

print(ds3.repartition(200).num_blocks())  # -> 200
# end::ds_repartition[]

In [None]:
# tag::ds_schema_1[]
ds = ray.data.from_items([{"id": "abc", "value": 1}, {"id": "def", "value": 2}])
print(ds.schema())  # -> id: string, value: int64
# end::ds_schema_1[]

In [None]:
# tag::ds_schema_2[]
pandas_df = ds.to_pandas()  # pandas_df will inherit the schema from our Dataset.
# end::ds_schema_2[]

In [None]:
# tag::ds_compute_1[]
ds = ray.data.range(10000).map(lambda x: x ** 2)
ds.take(5)  # -> [0, 1, 4, 9, 16]
# end::ds_compute_1[]

In [None]:
# tag::ds_compute_2[]
import numpy as np


ds = ray.data.range(10000).map_batches(lambda batch: np.square(batch).tolist())
ds.take(5)  # -> [0, 1, 4, 9, 16]
# end::ds_compute_2[]

In [None]:
# tag::ds_compute_3[]
def load_model():
    # Return a dummy model just for this example.
    # In reality, this would likely load some model weights onto a GPU.
    class DummyModel:
        def __call__(self, batch):
            return batch

    return DummyModel()

class MLModel:
    def __init__(self):
        # load_model() will only run once per actor that's started.
        self._model = load_model()

    def __call__(self, batch):
        return self._model(batch)


ds.map_batches(MLModel, compute="actors")
# end::ds_compute_3[]

# TODO how can we make this more concrete?
cpu_intensive_preprocessing = lambda batch: batch
gpu_intensive_inference = lambda batch: batch

In [None]:
# tag::ds_pipeline_1[]
ds = ray.data.read_parquet("s3://my_bucket/input_data")\
        .map(cpu_intensive_preprocessing)\
        .map_batches(gpu_intensive_inference, compute="actors", num_gpus=1)\
        .repartition(10)\
        .write_parquet("s3://my_bucket/output_predictions")
# end::ds_pipeline_1[]

In [None]:
# tag::ds_pipeline_2[]
ds = ray.data.read_parquet("s3://my_bucket/input_data")\
        .window(blocks_per_window=5)\
        .map(cpu_intensive_preprocessing)\
        .map_batches(gpu_intensive_inference, compute="actors", num_gpus=1)\
        .repartition(10)\
        .write_parquet("s3://my_bucket/output_predictions")
# end::ds_pipeline_2[]

In [None]:
# tag::parallel_sgd_1[]
from sklearn import datasets
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split

@ray.remote
class TrainingWorker:
    def __init__(self, alpha: float):
        self._model = SGDClassifier(alpha=alpha)

    def train(self, train_shard: ray.data.Dataset):
        for i, epoch in enumerate(train_shard.iter_epochs()):
            X, Y = zip(*list(epoch.iter_rows()))
            self._model.partial_fit(X, Y, classes=[0, 1])

        return self._model

    def test(self, X_test: np.ndarray, Y_test: np.ndarray):
        return self._model.score(X_test, Y_test)
# end::parallel_sgd_1[]

In [None]:
# tag::parallel_sgd_2[]
ALPHA_VALS = [0.00008, 0.00009, 0.0001, 0.00011, 0.00012] 

print(f"Starting {len(ALPHA_VALS)} training workers.")
workers = [TrainingWorker.remote(alpha) for alpha in ALPHA_VALS]
# end::parallel_sgd_2[]

In [None]:
# tag::parallel_sgd_3[]
# Generate training & validation data for a classification problem.
X_train, X_test, Y_train, Y_test = train_test_split(*datasets.make_classification())

# Create a dataset pipeline out of the training data. The data will be randomly
# shuffled and split across the workers for 10 iterations.
train_ds = ray.data.from_items(list(zip(X_train, Y_train)))
shards = train_ds.repeat(10)\
                 .random_shuffle_each_window()\
                 .split(len(workers), locality_hints=workers)
# end::parallel_sgd_3[]

In [None]:
# tag::parallel_sgd_4[]
# Wait for training to complete on all of the workers.
ray.get([worker.train.remote(shard) for worker, shard in zip(workers, shards)])
# end::parallel_sgd_4[]

In [None]:
# tag::parallel_sgd_5[]
# Get validation results from each worker.
print(ray.get([worker.test.remote(X_test, Y_test) for worker in workers]))
# end::parallel_sgd_5[]

In [None]:
# tag::dask_on_ray_1[]
import ray
from ray.util.dask import enable_dask_on_ray

ray.init()  # Start or connect to Ray.
enable_dask_on_ray()  # Enable the Ray scheduler backend for Dask.
# end::dask_on_ray_1[]

In [None]:
# tag::dask_on_ray_2[]
import dask

df = dask.datasets.timeseries()
df = df[df.y > 0].groupby("name").x.std()
df.compute()  # Trigger the task graph to be evaluated.
# end::dask_on_ray_2[]

In [None]:
# tag::dask_on_ray_3[]
import ray
ds = ray.data.range(10000)

# Convert the Dataset to a Dask DataFrame.
df = ds.to_dask()
print(df.std().compute())  # -> 2886.89568

# Convert the Dask DataFrame back to a Dataset.
ds = ray.data.from_dask(df)
print(ds.std())  # -> 2886.89568
# end::dask_on_ray_3[]

In [None]:
# tag::ml_pipeline_preprocess[]
import ray
from ray.util.dask import enable_dask_on_ray

import dask.dataframe as dd

LABEL_COLUMN = "is_big_tip"

enable_dask_on_ray()


def load_dataset(path: str, *, include_label=True):
    # Load the data and drop unused columns.
    df = dd.read_csv(path, assume_missing=True,
                     usecols=["tpep_pickup_datetime", "tpep_dropoff_datetime",
                              "passenger_count", "trip_distance", "fare_amount",
                              "tip_amount"])

    # Basic cleaning, drop nulls and outliers.
    df = df.dropna()
    df = df[(df["passenger_count"] <= 4) &
            (df["trip_distance"] < 100) &
            (df["fare_amount"] < 1000)]

    # Convert datetime strings to datetime objects.
    df["tpep_pickup_datetime"] = dd.to_datetime(df["tpep_pickup_datetime"])
    df["tpep_dropoff_datetime"] = dd.to_datetime(df["tpep_dropoff_datetime"])

    # Add three new features: trip duration, hour the trip started, and day of the week.
    df["trip_duration"] = (df["tpep_dropoff_datetime"] -
                           df["tpep_pickup_datetime"]).dt.seconds
    df = df[df["trip_duration"] < 4 * 60 * 60] # 4 hours.
    df["hour"] = df["tpep_pickup_datetime"].dt.hour
    df["day_of_week"] = df["tpep_pickup_datetime"].dt.weekday

    if include_label:
        # Calculate label column: if tip was more or less than 20% of the fare.
        df[LABEL_COLUMN] = df["tip_amount"] > 0.2 * df["fare_amount"]

    # Drop unused columns.
    df = df.drop(
        columns=["tpep_pickup_datetime", "tpep_dropoff_datetime", "tip_amount"]
    )

    return ray.data.from_dask(df)
# end::ml_pipeline_preprocess[]

In [None]:
# tag::ml_pipeline_model[]
import torch
import torch.nn as nn
import torch.nn.functional as F

NUM_FEATURES = 6


class FarePredictor(nn.Module):
    def __init__(self):
        super().__init__()

        self.fc1 = nn.Linear(NUM_FEATURES, 256)
        self.fc2 = nn.Linear(256, 16)
        self.fc3 = nn.Linear(16, 1)

        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(16)

    def forward(self, *x):
        x = torch.cat(x, dim=1)
        x = F.relu(self.fc1(x))
        x = self.bn1(x)
        x = F.relu(self.fc2(x))
        x = self.bn2(x)
        x = F.sigmoid(self.fc3(x))

        return x
# end::ml_pipeline_model[]

In [None]:
# tag::ml_pipeline_train_1[]
import ray.train as train


def train_epoch(iterable_dataset, model, loss_fn, optimizer, device):
    model.train()
    for X, y in iterable_dataset:
        X = X.to(device)
        y = y.to(device)

        # Compute prediction error.
        pred = torch.round(model(X.float()))
        loss = loss_fn(pred, y)

        # Backpropagation.
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
# end::ml_pipeline_train_1[]

In [None]:
# tag::ml_pipeline_train_2[]
def validate_epoch(iterable_dataset, model, loss_fn, device):
    num_batches = 0
    model.eval()
    loss = 0
    with torch.no_grad():
        for X, y in iterable_dataset:
            X = X.to(device)
            y = y.to(device)
            num_batches += 1
            pred = torch.round(model(X.float()))
            loss += loss_fn(pred, y).item()
    loss /= num_batches
    result = {"loss": loss}
    return result
# end::ml_pipeline_train_2[]

In [None]:
# tag::ml_pipeline_train_3[]
def train_func(config):
    batch_size = config.get("batch_size", 32)
    lr = config.get("lr", 1e-2)
    epochs = config.get("epochs", 3)

    train_dataset_pipeline_shard = train.get_dataset_shard("train")
    validation_dataset_pipeline_shard = train.get_dataset_shard("validation")

    model = train.torch.prepare_model(FarePredictor())

    loss_fn = nn.SmoothL1Loss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    train_dataset_iterator = train_dataset_pipeline_shard.iter_epochs()
    validation_dataset_iterator = \
        validation_dataset_pipeline_shard.iter_epochs()

    for epoch in range(epochs):
        train_dataset = next(train_dataset_iterator)
        validation_dataset = next(validation_dataset_iterator)

        train_torch_dataset = train_dataset.to_torch(
            label_column=LABEL_COLUMN,
            batch_size=batch_size,
        )
        validation_torch_dataset = validation_dataset.to_torch(
            label_column=LABEL_COLUMN,
            batch_size=batch_size)

        device = train.torch.get_device()

        train_epoch(train_torch_dataset, model, loss_fn, optimizer, device)
        result = validate_epoch(validation_torch_dataset, model, loss_fn,
                                device)
        train.report(**result)
        train.save_checkpoint(epoch=epoch, model_weights=model.module.state_dict())
# end::ml_pipeline_train_3[]

In [None]:
# tag::ml_pipeline_train_4[]
def get_training_datasets(*, test_pct=0.8):
    ds = load_dataset("nyc_tlc_data/yellow_tripdata_2020-01.csv")
    ds, _ = ds.split_at_indices([int(0.01 * ds.count())])
    train_ds, test_ds = ds.split_at_indices([int(test_pct * ds.count())])
    train_ds_pipeline = train_ds.repeat().random_shuffle_each_window()
    test_ds_pipeline = test_ds.repeat()
    return {"train": train_ds_pipeline, "validation": test_ds_pipeline}
# end::ml_pipeline_train_4[]

In [None]:
# tag::ml_pipeline_train_5[]
trainer = train.Trainer("torch", num_workers=4)
config = {"lr": 1e-2, "epochs": 3, "batch_size": 64}
trainer.start()
trainer.run(train_func, config, dataset=get_training_datasets())
model_weights = trainer.latest_checkpoint.get("model_weights")
trainer.shutdown()
# end::ml_pipeline_train_5[]

In [None]:
# tag::ml_pipeline_inference[]
class InferenceWrapper:
    def __init__(self):
        self._model = FarePredictor()
        self._model.load_state_dict(model_weights)
        self._model.eval()

    def __call__(self, df):
        tensor = torch.as_tensor(df.to_numpy(), dtype=torch.float32)
        with torch.no_grad():
            predictions = torch.round(self._model(tensor))
        df[LABEL_COLUMN] = predictions.numpy()
        return df


ds = load_dataset("nyc_tlc_data/yellow_tripdata_2021-01.csv", include_label=False)
ds.map_batches(InferenceWrapper, compute="actors").write_csv("output")
# end::ml_pipeline_inference[]