Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release] add golden notebook release test for torch/tune/serve #16619

Merged
merged 2 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions release/golden_notebook_tests/golden_notebook_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,17 @@
run:
timeout: 900
script: python workloads/modin_xgboost_test.py

- name: torch_tune_serve_test
owner:
mail: "matt@anyscale.com"
slack: "@team_ml"

cluster:
app_config: torch_tune_serve_app_config.yaml
compute_template: gpu_tpl.yaml

run:
timeout: 1800
script: python workloads/torch_tune_serve_test.py

15 changes: 15 additions & 0 deletions release/golden_notebook_tests/gpu_tpl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

max_workers: 3

head_node_type:
name: head_node
instance_type: m5.4xlarge

worker_node_types:
- name: worker_node
instance_type: g3.8xlarge
min_workers: 3
max_workers: 3
use_spot: false
15 changes: 15 additions & 0 deletions release/golden_notebook_tests/torch_tune_serve_app_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
base_image: "anyscale/ray:1.4.0-gpu"
env_vars: { }
debian_packages:
- curl

python:
pip_packages:
- pytest
- torch
- torchvision
conda_packages: [ ]

post_build_cmds:
- pip uninstall -y ray || true
- pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }}
197 changes: 197 additions & 0 deletions release/golden_notebook_tests/workloads/torch_tune_serve_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import argparse

import ray
import requests
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from filelock import FileLock
from ray import serve, tune
from ray.util.sgd.torch import TorchTrainer, TrainingOperator
from ray.util.sgd.torch.resnet import ResNet18
from ray.util.sgd.utils import override
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import MNIST


def load_mnist_data(train: bool, download: bool):
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.1307, ), (0.3081, ))])

with FileLock(".ray.lock"):
return MNIST(
root="~/data", train=train, download=download, transform=transform)


class MnistTrainingOperator(TrainingOperator):
@override(TrainingOperator)
def setup(self, config):
# Create model.
model = ResNet18(config)
model.conv1 = nn.Conv2d(
1, 64, kernel_size=7, stride=1, padding=3, bias=False)

# Create optimizer.
optimizer = torch.optim.SGD(
model.parameters(),
lr=config.get("lr", 0.1),
momentum=config.get("momentum", 0.9))

# Load in training and validation data.
train_dataset = load_mnist_data(True, True)
validation_dataset = load_mnist_data(False, False)

if config["test_mode"]:
train_dataset = Subset(train_dataset, list(range(64)))
validation_dataset = Subset(validation_dataset, list(range(64)))

train_loader = DataLoader(
train_dataset, batch_size=config["batch_size"], num_workers=2)
validation_loader = DataLoader(
validation_dataset, batch_size=config["batch_size"], num_workers=2)

# Create loss.
criterion = nn.CrossEntropyLoss()

# Register all components.
self.model, self.optimizer, self.criterion = self.register(
models=model, optimizers=optimizer, criterion=criterion)
self.register_data(
train_loader=train_loader, validation_loader=validation_loader)


def train_mnist(test_mode=False, num_workers=1, use_gpu=False):
TorchTrainable = TorchTrainer.as_trainable(
training_operator_cls=MnistTrainingOperator,
num_workers=num_workers,
use_gpu=use_gpu,
config={
"test_mode": test_mode,
"batch_size": 128
})

return tune.run(
TorchTrainable,
num_samples=1,
config={"lr": tune.grid_search([1e-4, 1e-3])},
stop={"training_iteration": 2},
verbose=1,
metric="val_loss",
mode="min",
checkpoint_at_end=True)


def get_best_model(best_model_checkpoint_path):
model_state = torch.load(best_model_checkpoint_path)

model = ResNet18(None)
model.conv1 = nn.Conv2d(
1, 64, kernel_size=7, stride=1, padding=3, bias=False)
model.load_state_dict(model_state["models"][0])
model_id = ray.put(model)

return model_id


@serve.deployment(name="mnist", route_prefix="/mnist")
class MnistDeployment:
def __init__(self, model_id):
use_cuda = torch.cuda.is_available()
self.device = torch.device("cuda" if use_cuda else "cpu")
model = ray.get(model_id).to(self.device)
self.model = model

async def __call__(self, request):
json_input = await request.json()
prediction = await self.my_batch_handler(json_input["image"])
return {"result": prediction.cpu().numpy().tolist()}

@serve.batch(max_batch_size=4, batch_wait_timeout_s=1)
async def my_batch_handler(self, images):
print(f"Processing request with batch size {len(images)}.")
image_tensors = torch.tensor(images)
image_tensors = image_tensors.to(self.device)
outputs = self.model(image_tensors)
predictions = torch.max(outputs.data, 1)[1]
return predictions


def setup_serve(model_id):
serve.start(http_options={
"location": "EveryNode"
}) # Start on every node so `predict` can hit localhost.
MnistDeployment.options(
num_replicas=2, ray_actor_options={
"num_gpus": 1
}).deploy(model_id)


@ray.remote
def predict_and_validate(index, image, label):
def predict(image):
response = requests.post(
"http://localhost:8000/mnist",
json={"image": image.numpy().tolist()})
try:
return response.json()["result"]
except: # noqa: E722
return -1

prediction = predict(image)
print("Querying model with example #{}. "
"Label = {}, Prediction = {}, Correct = {}".format(
index, label, prediction, label == prediction))
return prediction


def test_predictions(test_mode=False):
# Load in data
dataset = load_mnist_data(False, True)
num_to_test = 10 if test_mode else 1000
filtered_dataset = [dataset[i] for i in range(num_to_test)]
images, labels = zip(*filtered_dataset)

# Remote function calls are done here for parallelism.
# As a byproduct `predict` can hit localhost.
predictions = ray.get([
predict_and_validate.remote(i, images[i], labels[i])
for i in range(num_to_test)
])

correct = 0
for label, prediction in zip(labels, predictions):
if label == prediction:
correct += 1

print("Labels = {}. Predictions = {}. {} out of {} are correct.".format(
list(labels), predictions, correct, num_to_test))


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test",
action="store_true",
default=False,
help="Finish quickly for testing.")
args = parser.parse_args()

ray.client("anyscale://").connect()
num_workers = 2
use_gpu = True

print("Training model.")
analysis = train_mnist(args.smoke_test, num_workers, use_gpu)

print("Retrieving best model.")
best_checkpoint = analysis.best_checkpoint
model_id = get_best_model(best_checkpoint)

print("Setting up Serve.")
setup_serve(model_id)

print("Testing Prediction Service.")
test_predictions(args.smoke_test)

print("Test Successful!")