In [1]:
import ray
import torchvision
import torchvision.transforms as transforms

train_dataset = torchvision.datasets.CIFAR10("data", download=True, train=True)
test_dataset = torchvision.datasets.CIFAR10("data", download=True, train=False)

train_dataset: ray.data.Dataset = ray.data.from_torch(train_dataset)
test_dataset: ray.data.Dataset = ray.data.from_torch(test_dataset)

Files already downloaded and verified
Files already downloaded and verified


2023-03-16 21:56:47,040	INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8266 [39m[22m


In [2]:
from typing import Dict, Tuple
import numpy as np
from PIL.Image import Image
import torch


def convert_batch_to_numpy(batch: Tuple[Image, int]) -> Dict[str, np.ndarray]:
    images = np.stack([np.array(image) for image, _ in batch])
    labels = np.array([label for _, label in batch])
    return {"image": images, "label": labels}


train_dataset = train_dataset.map_batches(convert_batch_to_numpy).fully_executed()
test_dataset = test_dataset.map_batches(convert_batch_to_numpy).fully_executed()

2023-03-16 21:57:01,299	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(convert_batch_to_numpy)]


[dataset]: Run `pip install tqdm` to enable progress reporting.


2023-03-16 21:57:02,030	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(convert_batch_to_numpy)]


In [5]:
test_dataset.take(1)[0]['image']

(32, 32, 3)

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)  # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [4]:
from ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchCheckpoint
import torch.nn as nn
import torch.optim as optim
import torchvision


def train_loop_per_worker(config):
    model = train.torch.prepare_model(Net())

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    train_dataset_shard = session.get_dataset_shard("train")

    for epoch in range(2):
        running_loss = 0.0
        train_dataset_batches = train_dataset_shard.iter_torch_batches(
            batch_size=config["batch_size"], device=train.torch.get_device()
        )
        for i, batch in enumerate(train_dataset_batches):
            # get the inputs and labels
            inputs, labels = batch["image"], batch["label"]

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:  # print every 2000 mini-batches
                print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
                running_loss = 0.0

        metrics = dict(running_loss=running_loss)
        checkpoint = TorchCheckpoint.from_state_dict(model.state_dict())
        session.report(metrics, checkpoint=checkpoint)

2023-03-16 20:39:59,622	INFO instantiator.py:21 -- Created a temporary directory at /var/folders/p4/kcmtkxw53z54k341vwwykts80000gn/T/tmpyqs1s8xb
2023-03-16 20:39:59,623	INFO instantiator.py:76 -- Writing /var/folders/p4/kcmtkxw53z54k341vwwykts80000gn/T/tmpyqs1s8xb/_remote_module_non_scriptable.py


In [5]:
from ray.data.preprocessors import TorchVisionPreprocessor

transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
preprocessor = TorchVisionPreprocessor(columns=["image"], transform=transform)

In [6]:
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

use_gpu = ray.available_resources().get("GPU", 0) >= 2

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={"batch_size": 2},
    datasets={"train": train_dataset},
    scaling_config=ScalingConfig(num_workers=2, use_gpu=use_gpu),
    preprocessor=preprocessor
)
result = trainer.fit()
latest_checkpoint = result.checkpoint

0,1
Current time:,2023-03-16 20:40:30
Running for:,00:00:02.49
Memory:,10.0/16.0 GiB

Trial name,status,loc
TorchTrainer_bac1f_00000,RUNNING,127.0.0.1:93323


[2m[36m(TorchTrainer pid=93323)[0m 2023-03-16 20:40:30,111	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[TorchVisionPreprocessor] -> AllToAllOperator[randomize_block_order]
[2m[36m(RayTrainWorker pid=93338)[0m 2023-03-16 20:40:35,759	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=2]
[2m[36m(RayTrainWorker pid=93338)[0m 2023-03-16 20:40:36,852	INFO train_loop_utils.py:255 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=93338)[0m 2023-03-16 20:40:36,852	INFO train_loop_utils.py:315 -- Wrapping provided model in DistributedDataParallel.


[2m[36m(RayTrainWorker pid=93339)[0m [1,  2000] loss: 2.192
[2m[36m(RayTrainWorker pid=93338)[0m [1,  2000] loss: 2.198
[2m[36m(RayTrainWorker pid=93339)[0m [1,  4000] loss: 1.846
[2m[36m(RayTrainWorker pid=93338)[0m [1,  4000] loss: 1.866
[2m[36m(RayTrainWorker pid=93339)[0m [1,  6000] loss: 1.666
[2m[36m(RayTrainWorker pid=93338)[0m [1,  6000] loss: 1.668
[2m[36m(RayTrainWorker pid=93339)[0m [1,  8000] loss: 1.594
[2m[36m(RayTrainWorker pid=93338)[0m [1,  8000] loss: 1.574
[2m[36m(RayTrainWorker pid=93339)[0m [1, 10000] loss: 1.506
[2m[36m(RayTrainWorker pid=93338)[0m [1, 10000] loss: 1.502
[2m[36m(RayTrainWorker pid=93339)[0m [1, 12000] loss: 1.464
[2m[36m(RayTrainWorker pid=93338)[0m [1, 12000] loss: 1.479


Trial name,_time_this_iter_s,_timestamp,_training_iteration,date,done,episodes_total,experiment_id,hostname,iterations_since_restore,node_ip,pid,running_loss,should_checkpoint,time_since_restore,time_this_iter_s,time_total_s,timestamp,timesteps_since_restore,timesteps_total,training_iteration,trial_id,warmup_time
TorchTrainer_bac1f_00000,26.2573,1678970481,2,2023-03-16_20-41-21,False,,c67991d8ca8546fa9c0f3b61efab92b2,mac,2,127.0.0.1,93323,610.467,True,51.3855,26.2526,51.3855,1678970481,0,,2,bac1f_00000,0.156917


[2m[36m(RayTrainWorker pid=93339)[0m [2,  2000] loss: 1.442
[2m[36m(RayTrainWorker pid=93338)[0m [2,  2000] loss: 1.423
[2m[36m(RayTrainWorker pid=93339)[0m [2,  4000] loss: 1.363
[2m[36m(RayTrainWorker pid=93338)[0m [2,  4000] loss: 1.411
[2m[36m(RayTrainWorker pid=93339)[0m [2,  6000] loss: 1.348
[2m[36m(RayTrainWorker pid=93338)[0m [2,  6000] loss: 1.348
[2m[36m(RayTrainWorker pid=93339)[0m [2,  8000] loss: 1.347
[2m[36m(RayTrainWorker pid=93338)[0m [2,  8000] loss: 1.322
[2m[36m(RayTrainWorker pid=93339)[0m [2, 10000] loss: 1.283
[2m[36m(RayTrainWorker pid=93338)[0m [2, 10000] loss: 1.288
[2m[36m(RayTrainWorker pid=93339)[0m [2, 12000] loss: 1.247
[2m[36m(RayTrainWorker pid=93338)[0m [2, 12000] loss: 1.286


2023-03-16 20:41:23,972	INFO tune.py:798 -- Total run time: 56.38 seconds (56.35 seconds for the tuning loop).


In [7]:
from ray.train.torch import TorchPredictor
from ray.train.batch_predictor import BatchPredictor

batch_predictor = BatchPredictor.from_checkpoint(
    checkpoint=latest_checkpoint,
    predictor_cls=TorchPredictor,
    model=Net(),
)

outputs: ray.data.Dataset = batch_predictor.predict(
    data=test_dataset,
    dtype=torch.float,
    feature_columns=["image"],
    keep_columns=["label"],
    # We will use GPU if available.
    num_gpus_per_worker=ray.available_resources().get("GPU", 0)
)

2023-03-16 20:42:26,990	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[TorchVisionPreprocessor]
2023-03-16 20:42:30,419	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(ScoringWrapper)]


In [8]:
import numpy as np


def convert_logits_to_classes(df):
    best_class = df["predictions"].map(lambda x: x.argmax())
    df["prediction"] = best_class
    return df[["prediction", "label"]]


predictions = outputs.map_batches(convert_logits_to_classes)

predictions.show(1)


2023-03-16 20:42:32,709	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(convert_logits_to_classes)]


{'prediction': 5, 'label': 3}


In [9]:
def calculate_prediction_scores(df):
    df["correct"] = df["prediction"] == df["label"]
    return df


scores = predictions.map_batches(calculate_prediction_scores)

scores.show(1)

2023-03-16 20:42:32,750	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(calculate_prediction_scores)]


{'prediction': 5, 'label': 3, 'correct': False}


In [10]:
scores.sum(on="correct") / scores.count()

2023-03-16 20:42:32,779	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[aggregate]


0.5584

In [11]:
from ray import serve
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import json_to_ndarray


serve.run(
    PredictorDeployment.bind(
        TorchPredictor,
        latest_checkpoint,
        model=Net(),
        http_adapter=json_to_ndarray,
    )
)

[2m[36m(ServeController pid=93408)[0m INFO 2023-03-16 20:42:33,569 controller 93408 http_state.py:129 - Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:SERVE_PROXY_ACTOR-6fd0598e2fa7ee7d98de2442fa8b11b9e6f389adce467a9cb617f642' on node '6fd0598e2fa7ee7d98de2442fa8b11b9e6f389adce467a9cb617f642' listening on '127.0.0.1:8000'
[2m[36m(HTTPProxyActor pid=93409)[0m INFO:     Started server process [93409]
2023-03-16 20:42:34,252	INFO api.py:254 -- Started detached Serve instance in namespace "serve".
2023-03-16 20:42:34,266	INFO client.py:540 -- Updating deployment 'PredictorDeployment'. component=serve deployment=PredictorDeployment
[2m[36m(ServeController pid=93408)[0m INFO 2023-03-16 20:42:34,304 controller 93408 deployment_state.py:1333 - Adding 1 replica to deployment 'PredictorDeployment'.
2023-03-16 20:42:36,278	INFO client.py:555 -- Deployment 'PredictorDeployment' is ready at `http://127.0.0.1:8000/`. component=serve deployment=PredictorDeployment


RayServeSyncHandle(deployment='PredictorDeployment')

In [12]:
image = test_dataset.take(1)[0]["image"]

In [13]:
import requests

payload = {"array": image.tolist(), "dtype": "float32"}
response = requests.post("http://localhost:8000/", json=payload)
response.json()

{'predictions': [818.4107055664062,
  -54.2667121887207,
  205.0116424560547,
  244.10177612304688,
  -44.22550964355469,
  278.5929870605469,
  -436.99383544921875,
  -129.3734588623047,
  187.9035186767578,
  -387.4586486816406]}

[2m[36m(HTTPProxyActor pid=93409)[0m INFO 2023-03-16 20:42:36,385 http_proxy 127.0.0.1 http_proxy.py:373 - POST / 200 14.5ms
[2m[36m(ServeReplica:PredictorDeployment pid=93410)[0m INFO 2023-03-16 20:42:36,384 PredictorDeployment PredictorDeployment#FoDFLK replica.py:518 - HANDLE __call__ OK 11.2ms
