From b416df36a28b115cc3aa4578f0e01805ebc16b3e Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Wed, 16 Jun 2021 15:41:12 -0700 Subject: [PATCH 1/2] [release] add golden notebook release test for torch/tune/serve --- .../golden_notebook_tests.yaml | 14 ++ release/golden_notebook_tests/gpu_tpl.yaml | 15 ++ .../torch_tune_serve_app_config.yaml | 15 ++ .../workloads/torch_tune_serve_test.py | 194 ++++++++++++++++++ 4 files changed, 238 insertions(+) create mode 100644 release/golden_notebook_tests/gpu_tpl.yaml create mode 100755 release/golden_notebook_tests/torch_tune_serve_app_config.yaml create mode 100644 release/golden_notebook_tests/workloads/torch_tune_serve_test.py diff --git a/release/golden_notebook_tests/golden_notebook_tests.yaml b/release/golden_notebook_tests/golden_notebook_tests.yaml index 4e7d854f46723..c4f0ae2e5312d 100644 --- a/release/golden_notebook_tests/golden_notebook_tests.yaml +++ b/release/golden_notebook_tests/golden_notebook_tests.yaml @@ -23,3 +23,17 @@ run: timeout: 900 script: python workloads/modin_xgboost_test.py + +- name: torch_tune_serve_test + owner: + mail: "matt@anyscale.com" + slack: "@team_ml" + + cluster: + app_config: torch_tune_serve_app_config.yaml + compute_template: gpu_tpl.yaml + + run: + timeout: 900 + script: python workloads/torch_tune_serve_test.py + diff --git a/release/golden_notebook_tests/gpu_tpl.yaml b/release/golden_notebook_tests/gpu_tpl.yaml new file mode 100644 index 0000000000000..25c3f1c99be07 --- /dev/null +++ b/release/golden_notebook_tests/gpu_tpl.yaml @@ -0,0 +1,15 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 3 + +head_node_type: + name: head_node + instance_type: m5.4xlarge + +worker_node_types: + - name: worker_node + instance_type: g3.8xlarge + min_workers: 3 + max_workers: 3 + use_spot: false diff --git a/release/golden_notebook_tests/torch_tune_serve_app_config.yaml b/release/golden_notebook_tests/torch_tune_serve_app_config.yaml new file mode 100755 index 0000000000000..34034955d0b0e --- /dev/null +++ b/release/golden_notebook_tests/torch_tune_serve_app_config.yaml @@ -0,0 +1,15 @@ +base_image: "anyscale/ray:1.4.0-gpu" +env_vars: { } +debian_packages: + - curl + +python: + pip_packages: + - pytest + - torch + - torchvision + conda_packages: [ ] + +post_build_cmds: + - pip uninstall -y ray || true + - pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }} diff --git a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py new file mode 100644 index 0000000000000..4d0b98f98e964 --- /dev/null +++ b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py @@ -0,0 +1,194 @@ +import argparse + +import ray +import requests +import torch +import torch.nn as nn +import torchvision.transforms as transforms +from filelock import FileLock +from ray import serve, tune +from ray.util.sgd.torch import TorchTrainer, TrainingOperator +from ray.util.sgd.torch.resnet import ResNet18 +from ray.util.sgd.utils import override +from torch.utils.data import DataLoader, Subset +from torchvision.datasets import MNIST + + +def load_mnist_data(train: bool, download: bool): + transform = transforms.Compose( + [transforms.ToTensor(), + transforms.Normalize((0.1307, ), (0.3081, ))]) + + with FileLock(".ray.lock"): + return MNIST( + root="~/data", train=train, download=download, transform=transform) + + +class MnistTrainingOperator(TrainingOperator): + @override(TrainingOperator) + def setup(self, config): + # Create model. + model = ResNet18(config) + model.conv1 = nn.Conv2d( + 1, 64, kernel_size=7, stride=1, padding=3, bias=False) + + # Create optimizer. + optimizer = torch.optim.SGD( + model.parameters(), + lr=config.get("lr", 0.1), + momentum=config.get("momentum", 0.9)) + + # Load in training and validation data. + train_dataset = load_mnist_data(True, True) + validation_dataset = load_mnist_data(False, False) + + if config["test_mode"]: + train_dataset = Subset(train_dataset, list(range(64))) + validation_dataset = Subset(validation_dataset, list(range(64))) + + train_loader = DataLoader( + train_dataset, batch_size=config["batch_size"], num_workers=2) + validation_loader = DataLoader( + validation_dataset, batch_size=config["batch_size"], num_workers=2) + + # Create loss. + criterion = nn.CrossEntropyLoss() + + # Register all components. + self.model, self.optimizer, self.criterion = self.register( + models=model, optimizers=optimizer, criterion=criterion) + self.register_data( + train_loader=train_loader, validation_loader=validation_loader) + + +def train_mnist(test_mode=False, num_workers=1, use_gpu=False): + TorchTrainable = TorchTrainer.as_trainable( + training_operator_cls=MnistTrainingOperator, + num_workers=num_workers, + use_gpu=use_gpu, + config={ + "test_mode": test_mode, + "batch_size": 128 + }) + + return tune.run( + TorchTrainable, + num_samples=1, + config={"lr": tune.grid_search([1e-4, 1e-3])}, + stop={"training_iteration": 2}, + verbose=1, + metric="val_loss", + mode="min", + checkpoint_at_end=True) + + +def get_best_model(best_model_checkpoint_path): + model_state = torch.load(best_model_checkpoint_path) + + model = ResNet18(None) + model.conv1 = nn.Conv2d( + 1, 64, kernel_size=7, stride=1, padding=3, bias=False) + model.load_state_dict(model_state["models"][0]) + model_id = ray.put(model) + + return model_id + + +@serve.deployment(name="mnist", route_prefix="/mnist") +class MnistDeployment: + def __init__(self, model_id): + use_cuda = torch.cuda.is_available() + self.device = torch.device("cuda" if use_cuda else "cpu") + model = ray.get(model_id).to(self.device) + self.model = model + + async def __call__(self, request): + json_input = await request.json() + prediction = await self.my_batch_handler(json_input["image"]) + return {"result": prediction.cpu().numpy().tolist()} + + @serve.batch(max_batch_size=4, batch_wait_timeout_s=1) + async def my_batch_handler(self, images): + print(f"Processing request with batch size {len(images)}.") + image_tensors = torch.tensor(images) + image_tensors = image_tensors.to(self.device) + outputs = self.model(image_tensors) + predictions = torch.max(outputs.data, 1)[1] + return predictions + + +def setup_serve(model_id): + serve.start() + MnistDeployment.options( + num_replicas=2, ray_actor_options={ + "num_gpus": 1 + }).deploy(model_id) + + +def predict(image): + # handle = MnistDeployment.get_handle() + # return ray.get(handle.my_batch_handler.remote(image.numpy().tolist())) + response = requests.post( + "http://localhost:8000/mnist", json={"image": image.numpy().tolist()}) + try: + return response.json()["result"] + except: # noqa: E722 + return -1 + + +@ray.remote +def predict_and_validate(index, image, label): + prediction = predict(image) + print("Querying model with example #{}. " + "Label = {}, Prediction = {}, Correct = {}".format( + index, label, prediction, label == prediction)) + return prediction + + +def test_predictions(): + # Load in data + dataset = load_mnist_data(False, True) + num_to_test = 10 + filtered_dataset = [dataset[i] for i in range(num_to_test)] + images, labels = zip(*filtered_dataset) + predictions = ray.get([ + predict_and_validate.remote(i, images[i], labels[i]) + for i in range(num_to_test) + ]) + + correct = 0 + for label, prediction in zip(labels, predictions): + if label == prediction: + correct += 1 + + print("Labels = {}. Predictions = {}. {} out of {} are correct.".format( + list(labels), predictions, correct, num_to_test)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--smoke-test", + action="store_true", + default=False, + help="Finish quickly for testing.") + args = parser.parse_args() + + ray.client("anyscale://").connect() + num_workers = 2 + use_gpu = True + + print("Training model.") + analysis = train_mnist(args.smoke_test, num_workers, use_gpu) + + print("Retrieving best model.") + best_checkpoint = analysis.best_checkpoint + model_id = get_best_model(best_checkpoint) + + print("Setting up Serve.") + setup_serve(model_id) + + print("Testing Prediction Service.") + test_predictions() + + print("Test Successful!") From 8026bf2558c53237fff631e32a877875a829c905 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Wed, 23 Jun 2021 16:58:43 -0700 Subject: [PATCH 2/2] start serve on all nodes so remote localhost works --- .../golden_notebook_tests.yaml | 2 +- .../workloads/torch_tune_serve_test.py | 33 ++++++++++--------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/release/golden_notebook_tests/golden_notebook_tests.yaml b/release/golden_notebook_tests/golden_notebook_tests.yaml index c4f0ae2e5312d..2ea1c515f405f 100644 --- a/release/golden_notebook_tests/golden_notebook_tests.yaml +++ b/release/golden_notebook_tests/golden_notebook_tests.yaml @@ -34,6 +34,6 @@ compute_template: gpu_tpl.yaml run: - timeout: 900 + timeout: 1800 script: python workloads/torch_tune_serve_test.py diff --git a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py index 4d0b98f98e964..d58bedf28ea30 100644 --- a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py +++ b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py @@ -118,26 +118,26 @@ async def my_batch_handler(self, images): def setup_serve(model_id): - serve.start() + serve.start(http_options={ + "location": "EveryNode" + }) # Start on every node so `predict` can hit localhost. MnistDeployment.options( num_replicas=2, ray_actor_options={ "num_gpus": 1 }).deploy(model_id) -def predict(image): - # handle = MnistDeployment.get_handle() - # return ray.get(handle.my_batch_handler.remote(image.numpy().tolist())) - response = requests.post( - "http://localhost:8000/mnist", json={"image": image.numpy().tolist()}) - try: - return response.json()["result"] - except: # noqa: E722 - return -1 - - @ray.remote def predict_and_validate(index, image, label): + def predict(image): + response = requests.post( + "http://localhost:8000/mnist", + json={"image": image.numpy().tolist()}) + try: + return response.json()["result"] + except: # noqa: E722 + return -1 + prediction = predict(image) print("Querying model with example #{}. " "Label = {}, Prediction = {}, Correct = {}".format( @@ -145,12 +145,15 @@ def predict_and_validate(index, image, label): return prediction -def test_predictions(): +def test_predictions(test_mode=False): # Load in data dataset = load_mnist_data(False, True) - num_to_test = 10 + num_to_test = 10 if test_mode else 1000 filtered_dataset = [dataset[i] for i in range(num_to_test)] images, labels = zip(*filtered_dataset) + + # Remote function calls are done here for parallelism. + # As a byproduct `predict` can hit localhost. predictions = ray.get([ predict_and_validate.remote(i, images[i], labels[i]) for i in range(num_to_test) @@ -189,6 +192,6 @@ def test_predictions(): setup_serve(model_id) print("Testing Prediction Service.") - test_predictions() + test_predictions(args.smoke_test) print("Test Successful!")