In [1]:
import ray

# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")

# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

# Create a test dataset by dropping the target column.
test_dataset = valid_dataset.drop_columns(cols=["target"])

2022-12-16 07:37:12,991	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


[dataset]: Run `pip install tqdm` to enable progress reporting.


In [2]:
import numpy as np

from ray.data.preprocessors import Concatenator, Chain, StandardScaler

# Create a preprocessor to scale some columns and concatenate the result.
preprocessor = Chain(
    StandardScaler(columns=["mean radius", "mean texture"]),
    Concatenator(exclude=["target"], dtype=np.float32),
)

In [3]:
import torch
import torch.nn as nn

from ray import train
from ray.air import session
from ray.air.config import ScalingConfig
from ray.train.torch import TorchCheckpoint, TorchTrainer


def create_model(input_features):
    return nn.Sequential(
        nn.Linear(in_features=input_features, out_features=16),
        nn.ReLU(),
        nn.Linear(16, 16),
        nn.ReLU(),
        nn.Linear(16, 1),
        nn.Sigmoid(),
    )


def train_loop_per_worker(config):
    batch_size = config["batch_size"]
    lr = config["lr"]
    epochs = config["num_epochs"]
    num_features = config["num_features"]

    # Get the Ray Dataset shard for this data parallel worker,
    # and convert it to a PyTorch Dataset.
    train_data = session.get_dataset_shard("train")
    # Create model.
    model = create_model(num_features)
    model = train.torch.prepare_model(model)

    loss_fn = nn.BCELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for cur_epoch in range(epochs):
        for batch in train_data.iter_torch_batches(
            batch_size=batch_size, dtypes=torch.float32
        ):
            # "concat_out" is the output column of the Concatenator.
            inputs, labels = batch["concat_out"], batch["target"]
            optimizer.zero_grad()
            predictions = model(inputs)
            train_loss = loss_fn(predictions, labels.unsqueeze(1))
            train_loss.backward()
            optimizer.step()
        loss = train_loss.item()
        session.report({"loss": loss}, checkpoint=TorchCheckpoint.from_model(model))


num_features = len(train_dataset.schema().names) - 1

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={
        "batch_size": 128,
        "num_epochs": 20,
        "num_features": num_features,
        "lr": 0.001,
    },
    scaling_config=ScalingConfig(
        num_workers=3,  # Number of workers to use for data parallelism.
        use_gpu=False,
        trainer_resources={"CPU": 0},  # so that the example works on Colab.
    ),
    datasets={"train": train_dataset},
    preprocessor=preprocessor,
)
# Execute training.
result = trainer.fit()
print(f"Last result: {result.metrics}")
# Last result: {'loss': 0.6559339960416158, ...}

[2m[36m(RayTrainWorker pid=80174)[0m 2022-12-16 07:38:54,663	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=3]
[2m[36m(RayTrainWorker pid=80174)[0m 2022-12-16 07:38:55,972	INFO train_loop_utils.py:270 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=80174)[0m 2022-12-16 07:38:55,972	INFO train_loop_utils.py:330 -- Wrapping provided model in DistributedDataParallel.


Trial name,_time_this_iter_s,_timestamp,_training_iteration,date,done,episodes_total,experiment_id,hostname,iterations_since_restore,loss,node_ip,pid,should_checkpoint,time_since_restore,time_this_iter_s,time_total_s,timestamp,timesteps_since_restore,timesteps_total,training_iteration,trial_id,warmup_time
TorchTrainer_44c17_00000,0.0964417,1671172736,1,2022-12-16_07-38-56,False,,6305d946512447fa828a7c994344995a,krk-mp5ek,1,4.79171,127.0.0.1,80144,True,13.8375,13.8375,13.8375,1671172736,0,,1,44c17_00000,0.0380981


2022-12-16 07:39:00,876	INFO tune.py:762 -- Total run time: 24.38 seconds (24.22 seconds for the tuning loop).


Last result: {'loss': 0.6072853803634644, '_timestamp': 1671172738, '_time_this_iter_s': 0.10623908042907715, '_training_iteration': 20, 'time_this_iter_s': 0.10453581809997559, 'should_checkpoint': True, 'done': True, 'timesteps_total': None, 'episodes_total': None, 'training_iteration': 20, 'trial_id': '44c17_00000', 'experiment_id': '6305d946512447fa828a7c994344995a', 'date': '2022-12-16_07-38-58', 'timestamp': 1671172738, 'time_total_s': 16.073720932006836, 'pid': 80144, 'hostname': 'krk-mp5ek', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 16.073720932006836, 'timesteps_since_restore': 0, 'iterations_since_restore': 20, 'warmup_time': 0.03809809684753418, 'experiment_tag': '0'}


In [4]:
from ray import tune

param_space = {"train_loop_config": {"lr": tune.loguniform(0.0001, 0.01)}}
metric = "loss"

In [5]:
from ray.tune.tuner import Tuner, TuneConfig
from ray.air.config import RunConfig

tuner = Tuner(
    trainer,
    param_space=param_space,
    tune_config=TuneConfig(num_samples=5, metric=metric, mode="min"),
)
# Execute tuning.
result_grid = tuner.fit()

# Fetch the best result.
best_result = result_grid.get_best_result()
print("Best Result:", best_result)
# Best Result: Result(metrics={'loss': 0.278409322102863, ...})

  tuner = Tuner(


[2m[1m[36m(scheduler +2m30s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.


2022-12-16 07:47:18,363	ERROR tune.py:758 -- Trials did not complete: [TorchTrainer_5356d_00000, TorchTrainer_5356d_00001, TorchTrainer_5356d_00002, TorchTrainer_5356d_00003, TorchTrainer_5356d_00004]
2022-12-16 07:47:18,365	INFO tune.py:762 -- Total run time: 497.41 seconds (497.15 seconds for the tuning loop).


RuntimeError: No best trial found for the given metric: loss. This means that no trial has reported this metric, or all values reported for this metric are NaN. To not ignore NaN values, you can set the `filter_nan_and_inf` arg to False.

In [None]:
from ray.train.batch_predictor import BatchPredictor
from ray.train.torch import TorchPredictor

# You can also create a checkpoint from a trained model using
# `TorchCheckpoint.from_model`.
checkpoint = best_result.checkpoint

batch_predictor = BatchPredictor.from_checkpoint(
    checkpoint, TorchPredictor, model=create_model(num_features)
)

predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
# {'predictions': array([1.], dtype=float32)}
# {'predictions': array([0.], dtype=float32)}
# ...