# All About Ray

In [None]:
# Install Ray libraries first!
!pip install -U "ray[all]"

## Ray Data

### Loading Data

In [None]:
#### Load from S3

import ray
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
print(ds.schema())

In [None]:
#### Load from HuggingFace

import ray.data
from datasets import load_dataset

hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_ds = ray.data.from_huggingface(hf_ds)
ray_ds["train"].take(2)

# output: [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]

In [None]:
#### Load from MongoDB

import ray

# Read a local MongoDB.
ds = ray.data.read_mongo(
    uri="mongodb://localhost:27017",
    database="my_db",
    collection="my_collection",
    pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)

### Transforming Data

In [27]:
#### Transforming rows, calling map() or flat_map().

import os
from typing import Any, Dict
import ray

def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    row["filename"] = os.path.basename(row["path"])
    return row

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
    .map(parse_filename)
)

RayTaskError(OSError): [36mray::_get_read_tasks()[39m (pid=721, ip=10.233.126.39)
  File "/opt/conda/lib/python3.11/site-packages/ray/data/read_api.py", line 1928, in _get_read_tasks
    reader = ds.create_reader(**kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/ray/data/datasource/image_datasource.py", line 66, in create_reader
    return _ImageDatasourceReader(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/ray/data/datasource/image_datasource.py", line 136, in __init__
    super().__init__(
  File "/opt/conda/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 377, in __init__
    paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 650, in _resolve_paths_and_filesystem
    resolved_filesystem, resolved_path = _resolve_filesystem_and_path(
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyarrow/fs.py", line 189, in _resolve_filesystem_and_path
    filesystem, path = FileSystem.from_uri(path)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_fs.pyx", line 470, in pyarrow._fs.FileSystem.from_uri
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: When resolving region for bucket 'ray-example-data': AWS Error NETWORK_CONNECTION during HeadBucket operation: curlCode: 28, Timeout was reached

In [None]:
#### Transforming batches, calling map_batches()

from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity().cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().cpu().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        # Two workers with one GPU each
        compute=ray.data.ActorPoolStrategy(size=2),
        # Batch size is required if you're using GPUs.
        batch_size=4,
        num_gpus=1
    )
)

### Inspecting Data


In [None]:
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
rows = ds.take(1)
print(rows)

# output: [{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}]

### Iterating over Data

In [None]:
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
for row in ds.iter_rows():
    print(row)

# output: {'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
# {'sepal length (cm)': 4.9, 'sepal width (cm)': 3.0, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
# ...
# {'sepal length (cm)': 5.9, 'sepal width (cm)': 3.0, 'petal length (cm)': 5.1, 'petal width (cm)': 1.8, 'target': 2}

### Saving Data

In [None]:
#### save to s3

import ray
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.write_parquet("local:///tmp/iris/")

## Ray Train

In [3]:
# Install torch first
!pip install torch

Collecting torch
  Using cached torch-2.0.1-cp311-cp311-manylinux1_x86_64.whl (619.9 MB)
Collecting sympy (from torch)
  Using cached sympy-1.12-py3-none-any.whl (5.7 MB)
Collecting nvidia-cuda-nvrtc-cu11==11.7.99 (from torch)
  Using cached nvidia_cuda_nvrtc_cu11-11.7.99-2-py3-none-manylinux1_x86_64.whl (21.0 MB)
Collecting nvidia-cuda-runtime-cu11==11.7.99 (from torch)
  Using cached nvidia_cuda_runtime_cu11-11.7.99-py3-none-manylinux1_x86_64.whl (849 kB)
Collecting nvidia-cuda-cupti-cu11==11.7.101 (from torch)
  Using cached nvidia_cuda_cupti_cu11-11.7.101-py3-none-manylinux1_x86_64.whl (11.8 MB)
Collecting nvidia-cudnn-cu11==8.5.0.96 (from torch)
  Using cached nvidia_cudnn_cu11-8.5.0.96-2-py3-none-manylinux1_x86_64.whl (557.1 MB)
Collecting nvidia-cublas-cu11==11.10.3.66 (from torch)
  Using cached nvidia_cublas_cu11-11.10.3.66-py3-none-manylinux1_x86_64.whl (317.1 MB)
Collecting nvidia-cufft-cu11==10.9.0.58 (from torch)
  Using cached nvidia_cufft_cu11-10.9.0.58-py3-none-manylinu

In [None]:
# Example about pytorch

import torch
import torch.nn as nn

import ray
from ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig


# If using GPUs, set this to True.
use_gpu = False


input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

# We define a network here.
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

# We define how to train here.
def train_loop_per_worker():
    dataset_shard = session.get_dataset_shard("train")
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

    model = train.torch.prepare_model(model)

    for epoch in range(num_epochs):
        for batches in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
        ):
            inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"]
            output = model(inputs)
            loss = loss_fn(output, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")

        # Checkpointing
        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(epoch=epoch, model=model.state_dict())
            ),
        )

# Loading data
train_dataset = ray.data.from_items([{"x": x, "y": 2 * x + 1} for x in range(200)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)

# Define the trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)

# Start to train.
result = trainer.fit()

### Distributed Learning

In [None]:
#### PyTorch Vs Ray

 import torch
 from torch.nn.parallel import DistributedDataParallel
+from ray.air import session
+from ray import train
+import ray.train.torch


 def train_func():
-    device = torch.device(f"cuda:{session.get_local_rank()}" if
-        torch.cuda.is_available() else "cpu")
-    torch.cuda.set_device(device)

     # Create model.
     model = NeuralNetwork()

-    model = model.to(device)
-    model = DistributedDataParallel(model,
-        device_ids=[session.get_local_rank()] if torch.cuda.is_available() else None)

+    model = train.torch.prepare_model(model)

### Checkpointing

In [None]:
import ray.train.torch
from ray.air import session, Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer

import torch
import torch.nn as nn
from torch.optim import Adam
import numpy as np

def train_func(config):
    n = 100
    # create a toy dataset
    # data   : X - dim = (n, 4)
    # target : Y - dim = (n, 1)
    X = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
    Y = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))

    # toy neural network : 1-layer
    model = nn.Linear(4, 1)
    criterion = nn.MSELoss()
    optimizer = Adam(model.parameters(), lr=3e-4)
    start_epoch = 0

    checkpoint = session.get_checkpoint()
    if checkpoint:
        # assume that we have run the session.report() example
        # and successfully save some model weights
        checkpoint_dict = checkpoint.to_dict()
        model.load_state_dict(checkpoint_dict.get("model_weights"))
        start_epoch = checkpoint_dict.get("epoch", -1) + 1

    # wrap the model in DDP
    model = ray.train.torch.prepare_model(model)
    for epoch in range(start_epoch, config["num_epochs"]):
        y = model.forward(X)
        # compute loss
        loss = criterion(y, Y)
        # back-propagate loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        state_dict = model.state_dict()
        checkpoint = Checkpoint.from_dict(
            dict(epoch=epoch, model_weights=state_dict)
        )
        # save checkpoint in the training
        session.report({}, checkpoint=checkpoint)

trainer = TorchTrainer(
    train_func,
    train_loop_config={"num_epochs": 2},
    scaling_config=ScalingConfig(num_workers=2),
)

# checkpointing
result = trainer.fit()

# load checkpoint
trainer = TorchTrainer(
    train_func,
    train_loop_config={"num_epochs": 4},
    scaling_config=ScalingConfig(num_workers=2),
    resume_from_checkpoint=result.checkpoint,
)
result = trainer.fit()

print(result.checkpoint.to_dict())
# {'epoch': 3, 'model_weights': OrderedDict([('bias', tensor([0.0902])), ('weight', tensor([[-0.1549, -0.0861,  0.4353, -0.4116]]))]), '_timestamp': 1656108265, '_preprocessor': None, '_current_checkpoint_id': 2}

### Predictor

In [4]:
# Predictor using the resulting model:

import numpy as np
import ray

from ray.train.xgboost import XGBoostTrainer, XGBoostPredictor
from ray.air.config import ScalingConfig

train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
trainer = XGBoostTrainer(
    label_column="y",
    params={"objective": "reg:squarederror"},
    scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset},
)
result = trainer.fit()

predictor = XGBoostPredictor.from_checkpoint(result.checkpoint)
predictions = predictor.predict(np.expand_dims(np.arange(32, 64), 1))

  from .autonotebook import tqdm as notebook_tqdm
2023-08-22 03:28:21,657	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2023-08-22 03:28:22,051	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


ModuleNotFoundError: No module named 'xgboost'

In [None]:
# Batch Predictor:

import pandas as pd
from ray.train.batch_predictor import BatchPredictor

batch_predictor = BatchPredictor.from_checkpoint(result.checkpoint, XGBoostPredictor)
predict_dataset = ray.data.from_pandas(pd.DataFrame({"x": np.arange(32)}))
predictions = batch_predictor.predict(
    data=predict_dataset,
    batch_size=8,
    min_scoring_workers=2,
)
predictions.show()

## Ray Tune

In [8]:
# Example a (x ** 2) + b, a and b are the hyperparameters we want to tune to minimize the objective

from ray.air import session
from ray import tune


def objective(x, a, b):  # Define an objective function.
    return a * (x**2) + b


def trainable(config):  # Pass a "config" dictionary into your trainable.

    for x in range(20):  # "Train" for 20 iterations and compute intermediate scores.
        score = objective(x, config["a"], config["b"])
        session.report({"score": score})  # Send the score to Tune.

space = {"a": tune.uniform(0, 1), "b": tune.uniform(0, 1)}
tuner = tune.Tuner(
    trainable, param_space=space, tune_config=tune.TuneConfig(num_samples=10, mode="min", metric="score",)
)
results = tuner.fit()
print(results.get_best_result())  # Get best result object

0,1
Current time:,2023-08-22 05:50:41
Running for:,00:00:01.90
Memory:,43.1/881.9 GiB

Trial name,status,loc,a,b,iter,total time (s),score
trainable_d2a74_00000,TERMINATED,10.233.126.174:10826,0.953869,0.0268538,20,0.143476,344.373
trainable_d2a74_00001,TERMINATED,10.233.126.174:10824,0.533089,0.0548289,20,0.184188,192.5
trainable_d2a74_00002,TERMINATED,10.233.126.174:10825,0.892717,0.573544,20,0.14275,322.844
trainable_d2a74_00003,TERMINATED,10.233.126.174:10826,0.555733,0.252978,20,0.171353,200.873
trainable_d2a74_00004,TERMINATED,10.233.126.174:10827,0.0684515,0.579124,20,0.222048,25.2901
trainable_d2a74_00005,TERMINATED,10.233.126.174:10828,0.403462,0.00194268,20,0.337543,145.652
trainable_d2a74_00006,TERMINATED,10.233.126.174:10829,0.581593,0.469742,20,0.216001,210.425
trainable_d2a74_00007,TERMINATED,10.233.126.174:10830,0.415749,0.267121,20,0.27906,150.352
trainable_d2a74_00008,TERMINATED,10.233.126.174:10829,0.495647,0.733256,20,0.116086,179.662
trainable_d2a74_00009,TERMINATED,10.233.126.174:10832,0.244803,0.0615271,20,0.358397,88.4355


2023-08-22 05:50:41,181	INFO tune.py:1148 -- Total run time: 1.92 seconds (1.89 seconds for the tuning loop).


Result(
  metrics={'score': 25.2901314545067, 'done': True, 'trial_id': 'd2a74_00004', 'experiment_tag': '4_a=0.0685,b=0.5791'},
  path='/home/jovyan/ray_results/trainable_2023-08-22_05-50-39/trainable_d2a74_00004_4_a=0.0685,b=0.5791_2023-08-22_05-50-39',
  checkpoint=None
)


## Ray Serve

In [None]:
# Install transformers first
!pip install transformers

In [None]:
# File name: serve_quickstart.py
from starlette.requests import Request

import ray
from ray import serve

from transformers import pipeline


@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)


translator_app = Translator.bind()

# server: serve run serve_quickstart:translator_app
# client: python model_client.py

## Ray RLlib

In [None]:
from ray.rllib.algorithms.ppo import PPOConfig

config = (  # 1. Configure the algorithm,
    PPOConfig()
    .environment("Taxi-v3")
    .rollouts(num_rollout_workers=2)
    .framework("torch")
    .training(model={"fcnet_hiddens": [64, 64]})
    .evaluation(evaluation_num_workers=1)
)

algo = config.build()  # 2. build the algorithm,

for _ in range(5):
    print(algo.train())  # 3. train it,

algo.evaluate()  # 4. and evaluate it.


## Scenarios

### Secnario1: Offline Batch Inference

In [None]:
# install transformer first
!pip install transformers

from typing import Dict
import numpy as np

import ray

# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))

# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class HuggingFacePredictor:
    def __init__(self):
        from transformers import pipeline
        # Initialize a pre-trained GPT2 Huggingface pipeline.
        self.model = pipeline("text-generation", model="gpt2")

    # Logic for inference on 1 batch of data.
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        # Get the predictions from the input batch.
        predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
        # `predictions` is a list of length-one lists. For example:
        # [[{'generated_text': 'output_1'}], ..., [{'generated_text': 'output_2'}]]
        # Modify the output to get it into the following format instead:
        # ['output_1', 'output_2']
        batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
        return batch

# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(HuggingFacePredictor, compute=scale)
# Step 4: Show one prediction output.
predictions.show(limit=1)

### Secnario2: Distributed Data Ingest with Ray Data and Ray Train

In [None]:
import ray
from ray.air import session
from ray.air.config import ScalingConfig
from ray.train.torch import TorchTrainer

import numpy as np
from typing import Dict

# Load the data.
train_ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
## Uncomment to randomize the block order each epoch.
# train_ds = train_ds.randomize_block_order()


# Define a preprocessing function.
def normalize_length(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    new_col = batch["sepal.length"] / np.max(batch["sepal.length"])
    batch["normalized.sepal.length"] = new_col
    del batch["sepal.length"]
    return batch


# Preprocess your data any way you want. This will be re-run each epoch.
# You can use Ray Data preprocessors here as well,
# e.g., preprocessor.fit_transform(train_ds)
train_ds = train_ds.map_batches(normalize_length)


def train_loop_per_worker():
    # Get an iterator to the dataset we passed in below.
    it = session.get_dataset_shard("train")

    # Train for 10 epochs over the data. We'll use a shuffle buffer size
    # of 10k elements, and prefetch up to 10 batches of size 128 each.
    for _ in range(10):
        for batch in it.iter_batches(
            local_shuffle_buffer_size=10000, batch_size=128, prefetch_batches=10
        ):
            print("Do some training on batch", batch)


my_trainer = TorchTrainer(
    train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=2),
    datasets={"train": train_ds},
)
my_trainer.fit()