# Intro to Ray Serve

This notebook will introduce you to Ray Serve, a framework for building and deploying scalable ML applications.

<div class="alert alert-block alert-info">
    
<b>Here is the roadmap for this notebook:</b>

<ul>
    <li><b>Part 1:</b> Overview of Ray Serve</li>
    <li><b>Part 2:</b> Implement an MNISTClassifier service</li>
    <li><b>Part 3:</b> Advanced features of Ray Serve</li>
    <li><b>Part 4:</b> Ray Serve in Production</li>
</ul>
</div>


## Imports

In [1]:
from typing import Any
from torchvision import transforms

import json
import numpy as np
import ray
import requests
import torch
from ray import serve
from matplotlib import pyplot as plt
from fastapi import FastAPI
from starlette.requests import Request

## 1. Overview of Ray Serve

Serve is a framework for serving ML applications. 

Here is a high-level overview of the architecture of a Ray Serve Application.

<img src='https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_Serve/serve_architecture.png' width=700/>

An Application is a collection of one or more Deployments that are deployed together.

### Deployments

`Deployment` is the fundamental developer-facing element of serve.

<img src='https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_Serve/deployment.png' width=600/>

Each deployment can have multiple replicas. 

A replica is implemented as a Ray actor with a queue to process incoming requests.

Each replica can be configured with a set of compute resources. 

### When to use Ray Serve?

Ray Serve is designed to be used in the following scenarios:
- Build end-to-end ML applications with a flexible and programmable python API
- Flexibly scale up and down your compute resources to meet the demand of your application
- Easy to develop on a local machine, and scale to a multi-node GPU cluster

#### Key Ray Serve Features
Ray Serve provides the following key features and optimizations:
- [response streaming](https://docs.ray.io/en/latest/serve/tutorials/streaming.html)
- [dynamic request batching](https://docs.ray.io/en/latest/serve/advanced-guides/dyn-req-batch.html)
- [multi-node/multi-GPU serving](https://docs.ray.io/en/latest/serve/tutorials/vllm-example.html)
- [model multiplexing](https://docs.ray.io/en/latest/serve/model-multiplexing.html)
- [fractional compute resource usage](https://docs.ray.io/en/latest/serve/configure-serve-deployment.html)

## 2. Implement an MNISTClassifier service

Let’s jump right in and get a simple ML service up and running on Ray Serve. 

Recall the `MNISTClassifier` we built to perform batch inference on the `MNIST` dataset.

In [2]:
class OfflineMNISTClassifier:
    def __init__(self, local_path: str):
        self.model = torch.jit.load(local_path)
        self.model.to("cuda")
        self.model.eval()

    def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
        return self.predict(batch)
    
    def predict(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
        images = torch.tensor(batch["image"]).float().to("cuda")

        with torch.no_grad():
            logits = self.model(images).cpu().numpy()

        batch["predicted_label"] = np.argmax(logits, axis=1)
        return batch

In [3]:
# We download the model from s3 to the EFS storage
!aws s3 cp s3://anyscale-public-materials/ray-ai-libraries/mnist/model/model.pt /mnt/cluster_storage/model.pt

download: s3://anyscale-public-materials/ray-ai-libraries/mnist/model/model.pt to ../../../../../../mnt/cluster_storage/model.pt


Here is how we can use the `OfflineMNISTClassifier` to perform batch inference on a dataset of random images.

In [4]:
# Create a dataset of random images
ds = ray.data.from_items([{"image": np.random.rand(1, 28, 28)} for _ in range(100)])

# Map the OfflineMNISTClassifier to the dataset
ds = ds.map_batches(
    OfflineMNISTClassifier,
    fn_constructor_kwargs={"local_path": "/mnt/cluster_storage/model.pt"},
    concurrency=1,
    num_gpus=1,
    batch_size=10
)

# Take a look at the first 10 predictions
ds.take_batch(10)["predicted_label"]

2025-04-16 14:22:03,039	INFO worker.py:1843 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
2025-04-16 14:22:04,885	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-04-16_14-22-01_744497_3812578/logs/ray-data
2025-04-16 14:22:04,886	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(OfflineMNISTClassifier)] -> LimitOperator[limit=10]


Running 0: 0.00 row [00:00, ? row/s]

- MapBatches(OfflineMNISTClassifier) 1: 0.00 row [00:00, ? row/s]

- limit=10 2: 0.00 row [00:00, ? row/s]

array([6, 1, 6, 6, 6, 1, 6, 6, 6, 1])

[36m(ProxyActor pid=4005504)[0m INFO 2025-04-16 14:23:25,716 proxy 192.168.99.98 -- Proxy starting on node b5c3f964f3685e6e9f297622e3fe2b306edecca5177c480a37ad9a77 (HTTP port: 8000).
[36m(ProxyActor pid=4005504)[0m INFO 2025-04-16 14:23:25,772 proxy 192.168.99.98 -- Got updated endpoints: {}.
[36m(ServeController pid=4005503)[0m INFO 2025-04-16 14:23:26,037 controller 4005503 -- Deploying new version of Deployment(name='OnlineMNISTClassifier', app='mnist_classifier') (initial target replicas: 1).
[36m(ProxyActor pid=4005504)[0m INFO 2025-04-16 14:23:26,040 proxy 192.168.99.98 -- Got updated endpoints: {Deployment(name='OnlineMNISTClassifier', app='mnist_classifier'): EndpointInfo(route='/', app_is_cross_language=False)}.
[36m(ProxyActor pid=4005504)[0m INFO 2025-04-16 14:23:26,048 proxy 192.168.99.98 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x714efff45d90>.
[36m(ServeController pid=4005503)[0m INFO 2025-04-16 14:23:26,140 controller 4005503

[36m(autoscaler +3m19s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.




















































































Now, if want to migrate to an online inference setting, we can transform this into a Ray Serve Deployment by applying the `@serve.deployment` decorator 


In [5]:
@serve.deployment() # this is the decorator to add
class OnlineMNISTClassifier:
    def __init__(self, local_path: str):
        self.model = torch.jit.load(local_path)
        self.model.to("cuda")
        self.model.eval()

    async def __call__(self, request: Request) -> dict[str, Any]: # __call__ now takes a Starlette Request object
        batch = json.loads(await request.json()) # we will need to parse the JSON body of the request
        return await self.predict(batch)
    
    async def predict(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
        # same code as OfflineMNISTClassifier.predict except we added async to the method
        images = torch.tensor(batch["image"]).float().to("cuda")

        with torch.no_grad():
            logits = self.model(images).cpu().numpy()

        batch["predicted_label"] = np.argmax(logits, axis=1)
        return batch

We can now instantiate the `OnlineMNISTClassifier` as a Ray Serve Application using `.bind`.

In [6]:
mnist_deployment = OnlineMNISTClassifier.options(
    num_replicas=1,
    ray_actor_options={"num_gpus": 1},
)

mnist_app = mnist_deployment.bind(local_path="/mnt/cluster_storage/model.pt")

<div class="alert alert-block alert-warning">

**Note:** `.bind` is a method that takes in the arguments to pass to the Deployment constructor.

</div>


We can then run the application 

In [7]:
mnist_deployment_handle = serve.run(mnist_app, name='mnist_classifier', blocking=False)

INFO 2025-04-16 14:23:25,991 serve 3812578 -- Started Serve in namespace "serve".
INFO 2025-04-16 14:23:29,119 serve 3812578 -- Application 'mnist_classifier' is ready at http://127.0.0.1:8000/.


We can test it as an HTTP endpoint

In [8]:
images = np.random.rand(2, 1, 28, 28).tolist()
json_request = json.dumps({"image": images})
response = requests.post("http://localhost:8000/", json=json_request)
response.json()["predicted_label"]

[1, 6]

We can also test it as a gRPC endpoint

In [9]:
batch = {"image": np.random.rand(10, 1, 28, 28)}
response = await mnist_deployment_handle.predict.remote(batch)
response["predicted_label"]

INFO 2025-04-16 14:23:45,662 serve 3812578 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x7c082c1078c0>.


array([6, 1, 1, 6, 6, 1, 6, 6, 6, 1])

## 3. Advanced features of Ray Serve

### Using fractions of a GPU

With Ray we can specify fractional compute resources for each deployment's replica. 

This is useful to help us fully utilize a GPU especially when running small models like our `MNISTClassifier` model.

Here is how to specify only 10% of a GPU's compute resources for our `MNISTClassifier` model.

In [12]:
#mnist_app = OnlineMNISTClassifier.options(
#    num_replicas=4, # we can scale to up to 10 replicas on a single GPU
#    ray_actor_options={"num_gpus": 0.1}, 
#).bind(local_path="/mnt/cluster_storage/model.pt")

ray.shutdown()

Next we update the running application by running serve.run with the new options.

In [11]:
mnist_deployment_handle = serve.run(mnist_app, name='mnist_classifier', blocking=False)

INFO 2025-04-16 14:24:00,313 serve 3812578 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.


KeyboardInterrupt: 

We can test the new application by sending a sample request.

In [None]:
images = np.random.rand(2, 1, 28, 28).tolist()
json_request = json.dumps({"image": images})
response = requests.post("http://localhost:8000/", json=json_request)
response.json()["predicted_label"]

### Customizing autoscaling

Ray Serve provides a simple way to autoscale the number of replicas in a deployment. It is primarily based on the target number of ongoing requests per replica.

i.e. here is how we can set the autoscaling config for our `OnlineMNISTClassifier` deployment.

In [None]:
mnist_app = OnlineMNISTClassifier.options(
    ray_actor_options={"num_gpus": 0.1}, 
    autoscaling_config={
        "target_ongoing_requests": 10,
    },
).bind(local_path="/mnt/cluster_storage/model.pt")

We can also control more granularly the autoscaling logic by setting:
- the upscale and downscale delays
- the intervals at which the replica sends metrics reports about the current number of ongoing requests
- the look-back period used to evaluate the current number of ongoing requests

Here is an example of how to set these options:

In [None]:
mnist_app = OnlineMNISTClassifier.options(
    ray_actor_options={"num_gpus": 0.1}, 
    autoscaling_config={
        "target_ongoing_requests": 10,
        "upscale_delay_s": 10,
        "downscale_delay_s": 10,
        "metrics_interval_s": 10,
        "look_back_period_s": 10, 
    },
).bind(local_path="/mnt/cluster_storage/model.pt")

We can additionally control the minimum and maximum number of replicas that can be scaled up and down. 

We can even specify to start scaling up from 0 replicas.

In [None]:
mnist_app = OnlineMNISTClassifier.options(
    ray_actor_options={"num_gpus": 0.1}, 
    autoscaling_config={
        "target_ongoing_requests": 10,
        "initial_replicas": 0, # scale up from 0 replicas
        "min_replicas": 0,
        "max_replicas": 10,
        # extreme upscale speeds
        "upscale_delay_s": 0,
        "metrics_interval_s": 0.1,
        "look_back_period_s": 0.1,
    },
).bind(local_path="/mnt/cluster_storage/model.pt")

Let's run the application with the new autoscaling config.

In [None]:
mnist_deployment_handle = serve.run(mnist_app, name='mnist_classifier', blocking=False)

Looking at the Ray Serve dashboard, we can see we are currently at 0 replicas - i.e. no GPU resources are being used.

<img src='https://anyscale-public-materials.s3.us-west-2.amazonaws.com/ray-ai-libraries/diagrams/autoscaling_at_0.png' width=700/>

We can send out a larger number of requests to the `OnlineMNISTClassifier` deployment to see the autoscaling in action.

In [None]:
batch = {"image": np.random.rand(10, 1, 28, 28)}
[
    mnist_deployment_handle.predict.remote(batch)
    for _ in range(100)
]

Looking at the Ray Serve dashboard, we can see that the number of replicas has scaled up to 10 as expected.

<img src='https://anyscale-public-materials.s3.us-west-2.amazonaws.com/ray-ai-libraries/diagrams/autoscaling_at_10.png' width=700/>

Let's shutdown the service for now.

In [13]:
serve.shutdown()

### Composing Deployments

Ray Serve allows us to compose Deployments together to build more complex applications.

Lets compose our `OnlineMNISTClassifier` with an `OnlineMNISTPreprocessor` deployment that performs the necessary transformations on the input data.


In [None]:
@serve.deployment
class OnlineMNISTPreprocessor:
    def __init__(self):
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ])
        
    async def run(self, batch: dict[str, Any]) -> dict[str, Any]:
        images = batch["image"]
        images = [self.transform(np.array(image, dtype=np.uint8)).cpu().numpy() for image in images]
        return {"image": images}

preprocessor_app = OnlineMNISTPreprocessor.bind()

In [None]:
preprocessor_handle = serve.run(preprocessor_app, name='mnist_preprocessor', blocking=False, route_prefix="/preprocess")

Let's load an image and pass it to the `ImageTransformDeployment`

In [None]:
ds = ray.data.read_images("s3://anyscale-public-materials/ray-ai-libraries/mnist/50_per_index/", include_paths=True)
image_batch = ds.take_batch(10)

In [None]:
# plot the first image using matplotlib
plt.imshow(image_batch["image"][0], cmap="gray")
plt.show()

In [None]:
normalized_batch = await preprocessor_handle.run.remote(image_batch)

for image in normalized_batch["image"]:
    assert image.shape == (1, 28, 28) # channel, height, width
    assert image.min() >= -1 and image.max() <= 1 # normalized to [-1, 1]

We will proceed to shutdown the preprocessor application to prove it will be automatically created by the ingress.


In [None]:
serve.shutdown()

Let's now build an ingress for our application that composes the `ImageTransformDeployment` and `OnlineMNISTClassifier`

In [None]:
@serve.deployment
class ImageServiceIngress:
    def __init__(self, preprocessor: OnlineMNISTPreprocessor, model: OnlineMNISTClassifier):
        self.preprocessor = preprocessor
        self.model = model

    async def __call__(self, request: Request):
        batch = json.loads(await request.json())
        response = await self.preprocessor.run.remote(batch)
        return await self.model.predict.remote(response)

In [None]:
image_classifier_ingress = ImageServiceIngress.bind(
    preprocessor=OnlineMNISTPreprocessor.bind(),
    model=OnlineMNISTClassifier.options(
        num_replicas=1,
        ray_actor_options={"num_gpus": 0.1},
    ).bind(local_path="/mnt/cluster_storage/model.pt"),
)

handle = serve.run(image_classifier_ingress, name='image_classifier', blocking=False)

Let's test the application by sending a sample HTTP request to our ingress endpoint.


In [None]:
json_request = json.dumps({"image": image_batch["image"].tolist()}) 
response = requests.post("http://localhost:8000/", json=json_request)
response.json()["predicted_label"]

### Integrating with FastAPI

Ray Serve can be integrated with FastAPI to provide:
- HTTP routing
- Pydantic model validation
- OpenAPI documentation

To integrate a Deployment with FastAPI, we can use the `@serve.ingress` decorator to designate a FastAPI app as the entrypoint for HTTP requests to our Serve application.

In [None]:
app = FastAPI()

@serve.deployment
@serve.ingress(app)
class ImageServiceIngress:
    def __init__(self, preprocessor: OnlineMNISTPreprocessor, model: OnlineMNISTClassifier):
        self.preprocessor = preprocessor
        self.model = model
    
    @app.post("/predict")
    async def predict(self, request: Request):
        batch = json.loads(await request.json())
        response = await self.preprocessor.run.remote(batch)
        out = await self.model.predict.remote(response)
        return {"predicted_label": out["predicted_label"].tolist()}

We now can build the application and run it.

In [None]:
image_classifier_ingress = ImageServiceIngress.bind(
    preprocessor=OnlineMNISTPreprocessor.bind(),
    model=OnlineMNISTClassifier.options(
        num_replicas=1,
        ray_actor_options={"num_gpus": 0.1},
    ).bind(local_path="/mnt/cluster_storage/model.pt"),
)

handle = serve.run(image_classifier_ingress, name='image_classifier', blocking=False)

After running the application, we can get test it as an HTTP endpoint programmatically.

In [None]:
json_request = json.dumps({"image": image_batch["image"].tolist()}) 
response = requests.post("http://localhost:8000/predict", json=json_request)
response.json()["predicted_label"]

We can also visit the auto-generated FastAPI docs at http://localhost:8000/docs to get an interactive UI to test our endpoint.

## 4. Ray Serve in Production

1. Klaviyo built their model serving platform with Ray Serve. See [this article from Klaviyo Engineering](https://klaviyo.tech/how-klaviyo-built-a-robust-model-serving-platform-with-ray-serve-c02ec65788b3)
2. Samsara uses Ray Serve to bridge the gap of development to deployment of their models. See [this article from Samsara Engineering](https://www.samsara.com/blog/building-a-modern-machine-learning-platform-with-ray)

## Clean up 

Let's shutdown the application and clean up the resources we created.

In [None]:
serve.shutdown()
!rm -rf /mnt/cluster_storage/model.pt