# benchmark.ipynb

This notebook contains the text and code for the next blog post in the zero-copy model series, 
title TBD.

The first post explained how to load PyTorch models for inference extremely fast by leveraging the Plasma object store's ability to load numeric data directly from shared memory.

In this post, we talk in more concrete terms about how to use this zero-copy model loading for model serving. We put together a simple model serving system, then set up a microbenchmark that simulates a heavy-tailed traffic pattern.

In [1]:
# Initialization and import code goes in this cell.

# Imports: Python core, then third-party, then local.
# Try to keep each block in alphabetical order, or the linter may get angry.
import asyncio
import concurrent.futures
import requests
import starlette
import time
import urllib
from typing import Dict, Any, Callable

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import ray
from ray import serve
import torch
import transformers

import zerocopy


# Reduce the volume of warning messages from `transformers`
transformers.logging.set_verbosity_error()


def reboot_ray():
    if ray.is_initialized():
        ray.shutdown()

    if torch.cuda.is_available():
        return ray.init(num_gpus=1)
    else:
        return ray.init()

# Title of new blog post goes here

*Recap of previous blog post goes here.*



## Scenario

The end-to-end scenario for our benchmark involves supporting an AI chatbot.
The chatbot's conversational AI runs off of a conversation tree (**TODO:** What's the best term for this tree?). Some of the nodes of this tree invoke question answering models.

Our benchmark will cover the model serving portion of the chatbot's backend. This 
model serving layer runs question answering (QA) models on behalf of the 
chatbot's conversational AI. The chatbot's conversation tree leads to 4 very different
question answering scenarios, and each scenario has its own dedicated QA
model. Because the chatbot speaks 3 different languages, there are three versions of
each model deployed: one for each language. So the model serving layer runs a total of
12 models to cover the 4 question types and 3 languages.

> **TODO:** Cartoon block diagram of the end-to-end scenario. 
> Diagram should show a user interacting with a chatbot. The chatbot runs off of a conversation tree. 
> Some of the nodes of the conversation tree have question answering models hanging off of them.

For our question answering models, we'll use 12 copies of `deepset/roberta-base-squad2`,
the most popular question answering model on the [Huggingface model marketplace](https://huggingface.co/models).
Here's some code to load that model.

In [2]:
model_name = "deepset/roberta-base-squad2"

# Strip out this timing code for the blog version.
print("Time to load with standard method: ", end="")
%timeit -r3 transformers.pipeline("question-answering", model=model_name)
qa = transformers.pipeline("question-answering", model=model_name)

Time to load with standard method: 5.94 s ± 310 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


The performance of this model isn't very sensitive to the specific question and context provided,
so we define a single set of inputs and outputs for simplicity.

In [3]:
qa_input = {
    "question": "What is 1 + 1?",
    "context": 
        """Addition (usually signified by the plus symbol +) is one of the four basic operations of 
        arithmetic, the other three being subtraction, multiplication and division. The addition of two 
        whole numbers results in the total amount or sum of those values combined. The example in the
        adjacent image shows a combination of three apples and two apples, making a total of five apples. 
        This observation is equivalent to the mathematical expression "3 + 2 = 5" (that is, "3 plus 2 
        is equal to 5").
        """
}

result = qa(qa_input)
qa_answer = result["answer"]
result

{'score': 4.278851065464551e-06, 'start': 483, 'end': 484, 'answer': '5'}

## Baseline results

Let's start with a baseline implementation of model serving for this model. This baseline implementation emulates running each QA model in a separate container. The server has 12 CPUs, so each container gets 1 CPU. We implement this baseline configuration with a pool of Ray actors.

In [4]:
serve.shutdown()
reboot_ray()
serve.start()

[2m[36m(pid=9016)[0m 2021-10-21 15:04:43,484	INFO checkpoint_path.py:15 -- Using RayInternalKVStore for controller checkpoint and recovery.
[2m[36m(pid=9016)[0m 2021-10-21 15:04:43,488	INFO http_state.py:75 -- Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:OlGhmW:SERVE_PROXY_ACTOR-node:192.168.0.238-0' on node 'node:192.168.0.238-0' listening on '127.0.0.1:8000'
2021-10-21 15:04:43,731	INFO api.py:455 -- Started Serve instance in namespace 'fff95715-1ff4-4e14-911c-a881e348da3b'.


<ray.serve.api.Client at 0x7fb6a87277f0>

In [5]:
class QAModel:
    def __init__(self):
        self._qa = transformers.pipeline("question-answering",
                                         model=model_name)

    def __call__(self, request: starlette.requests.Request):
        # Pull model inputs from URL query parameters.
        # A production version of this code would sanitize these strings.
        model_input = {
            "question": request.query_params["question"],
            "context": request.query_params["context"]
        }
        return self._qa(model_input)


# Define endpoints
NUM_QA_MODELS = 12
deployments = [
    serve.deployment(QAModel, f"qa{model_num}")
    for model_num in range(NUM_QA_MODELS)
]

for d in deployments:
    d.deploy(_blocking=False)

# Wait a moment so log output doesn't go to the next cell's output
time.sleep(1.)

2021-10-21 15:04:43,748	INFO api.py:243 -- Updating deployment 'qa0'. component=serve deployment=qa0
2021-10-21 15:04:43,756	INFO api.py:243 -- Updating deployment 'qa1'. component=serve deployment=qa1
2021-10-21 15:04:43,767	INFO api.py:243 -- Updating deployment 'qa2'. component=serve deployment=qa2
2021-10-21 15:04:43,781	INFO api.py:243 -- Updating deployment 'qa3'. component=serve deployment=qa3
2021-10-21 15:04:43,797	INFO api.py:243 -- Updating deployment 'qa4'. component=serve deployment=qa4
2021-10-21 15:04:43,816	INFO api.py:243 -- Updating deployment 'qa5'. component=serve deployment=qa5
[2m[36m(pid=9016)[0m 2021-10-21 15:04:43,818	INFO backend_state.py:896 -- Adding 1 replicas to deployment 'qa0'. component=serve deployment=qa0
[2m[36m(pid=9013)[0m INFO:     Started server process [9013]
2021-10-21 15:04:43,907	INFO api.py:243 -- Updating deployment 'qa6'. component=serve deployment=qa6
[2m[36m(pid=9016)[0m 2021-10-21 15:04:43,829	INFO backend_state.py:896 -- Addin

In [6]:
# Try out the deployment.
# This web service call blocks until the asychronous deployment has completed.
params = urllib.parse.urlencode(qa_input)
requests.get(f"http://127.0.0.1:8000/qa0?{params}").json()

{'score': 4.278851065464551e-06, 'start': 483, 'end': 484, 'answer': '5'}

Let's wrap this model web service in a callback function that calls the model, retrieves the result, and
returns elapsed time.

In [7]:
def call_model(model_num: int, question: str, context: str, expected_answer: str) -> float:
    """
    Callack function that calls the model deployment, retrieves and 
    validates the result, and returns elapsed time.

    :param model_num: Index of the model to call
    :param question: The `question` argument to pass to the QA model
    :param context: The `context` argument to pass to the QA model
    :param expected_answer: The answer that the model should return

    :returns: Tuple of start and end times of the web service call
    """
    # For now, use the same input every time
    params = urllib.parse.urlencode({"question": question, "context": context})

    start_time = time.time()
    result = requests.get(f"http://127.0.0.1:8000/qa{model_num}?{params}").json()
    end_time = time.time()

    # Do some basic validation
    if result["answer"] != expected_answer:
        raise ValueError(f"Unexpected result: {result}")

    return (start_time, end_time)


times = call_model(0, qa_input["question"], qa_input["context"], qa_answer)
f"{times[1] - times[0]:1.3f} seconds elapsed"

'0.381 seconds elapsed'

Now we can define a simple benchmark.

Our benchmark generates a trace of requests, then plays back the trace and measures the 
latency of each request. 

The request rate changes each second, with the rate of a particular 1-second window drawn from the Poisson
distribution. Here's the code to generate the start times for the trace.

In [8]:
def gen_start_times(requests_per_sec: float, num_sec: int,
                    seed: int) -> np.ndarray:
    """
    Generate a trace of inference request start times. Divides the trace
    into 1-second intervals. Each interval gets a number of requests drawn
    from a Poissson distribution. These requests are evenly spread through the
    interval.

    :param requests_per_sec: Average requests per second overall
    :param num_sec: Number of seconds of trace to generate
    :param seed: Seed for the random number generator

    :returns: Numpy array of timestamps (starting from 0) for the requests
     in the trace
    """
    trace = []
    rng = np.random.default_rng(seed)

    # Compute the number of requests in each 1-second window.
    req_per_window = rng.poisson(requests_per_sec, size=num_sec)

    for window_num in range(num_sec):
        num_requests = req_per_window[window_num]
        if num_requests > 0:
            request_interval = 1.0 / num_requests
            for i in range(num_requests):
                trace.append(window_num + request_interval * i)

    return np.array(trace)

Each request goes to a randomly-selected model. The choice of models is
weighted according to a truncated Poisson distribution. Here's the code to generate
the list of model IDs for the requests in the trace.

In [9]:
def gen_model_ids(lambda_: float, num_models: int, num_points: int,
                  seed: int) -> np.ndarray:
    """
    Draw model IDs at random from a truncated Poisson distribution.

    :param lambda_: Primary parameter of the distribution, which also happens to 
     be the mean value of the (untruncated) distribution.
    :num_models: Number of models. This function will truncate the Poisson 
     distribution such that only values < num_models will be returned.
    :param num_points: Number of random model IDs to return.
    :param seed: Seed for the random number generator

    :returns: Randomly generated model IDs for a series of requests, as a
     1-dimensional array.
    """
    # Draw numbers from a truncated Poisson distribution.
    # Start with a non-truncated distribution, then resample for
    # any values that went over the limit. 
    rng = np.random.default_rng(seed)
    result = rng.poisson(lambda_, size=num_points)
    while np.any(result >= num_models):
        new_values = rng.poisson(lambda_, size=np.sum(result >= num_models))
        result[result >= num_models] = new_values
    return result

The benchmark itself generates and then plays back the trace, measuring the end-to-end latency of each request.

In [10]:
def run_benchmark(model_callback: Callable, requests_per_sec: float, 
                  num_sec: int, model_lambda: float = 0.3,
                  seed: int = 42) -> pd.DataFrame:
    """
    A simple benchmark in Python.

    Sends a stream of requests to multiple models, with the rate varying
    according to a Poisson distribution and division of traffic among models
    following a truncated Poisson distribution.

    :param model_callback: Thread-safe callback function that makes a 
     single request and returns elapsed time. Should have the signature
     `f(model_num: int, question: str, context: str, expected_answer: str)`
    :param request_per_sec: Mean of the Poisson distribution that determines
     the number of requests in each 1-second window.
    :param num_sec: Seconds of traffic to generate at the requested rate.
     The actual session will extend past this window until all open requests
     have finished.
    :param model_lambda: Primary parameter of the truncated Poisson
     distribution used to split requests among models. Approximately 
     equal to the mean of the distribution. The default value of 0.3 sends
     70% of traffic to model 0.
    :param seed: Seed for the random number generator

    :returns: DataFrame of benchmark results at per-request granularity
    """
    # Preallocate the trace as a set of lists.
    benchmark_start_time = time.time()
    desired_start_times = (
        gen_start_times(requests_per_sec, num_sec, seed)
        + benchmark_start_time)
    num_requests = desired_start_times.shape[0]
    model_nums = gen_model_ids(model_lambda, NUM_QA_MODELS, num_requests,
                               seed)
    actual_start_times = [None] * num_requests
    end_times = [None] * num_requests

    # Because some notebook servers (i.e. VSCode) don't play well with
    # asyncio, we use threads to manage concurrent requests.
    thread_pool = concurrent.futures.ThreadPoolExecutor(1000)

    # Map from request object to request number
    active_requests = {}  # type: Dict[concurrent.futures.Future, int]

    # Main event loop: Spawn background requests, get their responses.
    request_num = 0
    while request_num < num_requests or len(active_requests) > 0:
        sec_to_next = (
            1.0 if request_num >= num_requests
            else desired_start_times[request_num] - time.time()
        )
        if sec_to_next <= 0:
            # Time to send the next request
            model_num = model_nums[request_num]
            future = thread_pool.submit(
                model_callback, model_num,
                qa_input["question"], qa_input["context"], qa_answer)
            active_requests[future] = request_num
            request_num += 1
        else:
            # Block until it's time to send the next request or a previous
            # request is done.
            ready_set, _ = concurrent.futures.wait(
                list(active_requests.keys()), 
                timeout=sec_to_next)

            # Record timings from any open requests that have completed.
            for future in ready_set:
                request_id = active_requests.pop(future)
                start_time, end_time = future.result()
                actual_start_times[request_id] = start_time
                end_times[request_id] = end_time

    # Collate results as a DataFrame
    result = pd.DataFrame({
        "request_id": range(num_requests),
        "model_num": model_nums, 
        "desired_start": desired_start_times, 
        "actual_start": actual_start_times, 
        "end": end_times
    })

    # Make all times relative to start of the trace
    for key in ("desired_start", "actual_start", "end"):
        result[key] -= benchmark_start_time
    result["latency"] = result["end"] - result["actual_start"]

    return result


# Quick test run
run_benchmark(call_model, 12, 5)

Unnamed: 0,request_id,model_num,desired_start,actual_start,end,latency
0,0,1,0.000000,0.000898,0.400686,0.399788
1,1,1,0.066667,0.069783,0.791132,0.721349
2,2,0,0.133333,0.134029,0.532247,0.398218
3,3,2,0.200000,0.202648,0.614423,0.411775
4,4,0,0.266667,0.267616,1.302230,1.034614
...,...,...,...,...,...,...
57,57,0,4.615385,4.616411,17.607030,12.990619
58,58,0,4.692308,4.692817,17.607163,12.914346
59,59,0,4.769231,4.769844,17.607320,12.837476
60,60,0,4.846154,4.851322,17.607443,12.756121


Let's run the benchmark with our baseline model deployment.

In [None]:
# Run the benchmark at multiple different request rates
REQUEST_RATES = (2, 3, 4, 5, 6, 8, 10, 12, 14)
RUNNING_TIME_SEC = 60
to_concat = []
for request_rate in REQUEST_RATES:
    print(f"Running at {request_rate} requests/sec.")
    times = run_benchmark(call_model, request_rate, RUNNING_TIME_SEC)
    times.insert(0, "request_rate", request_rate)
    to_concat.append(times)

results = pd.concat(to_concat)

Running at 2 requests/sec.
Running at 3 requests/sec.
Running at 4 requests/sec.
Running at 5 requests/sec.
Running at 6 requests/sec.
Running at 8 requests/sec.
Running at 10 requests/sec.


In [None]:
results[results["request_rate"] == 10]

In [None]:
agg_results = results.groupby("request_rate").aggregate({"latency": ["mean", "median", "max"]})
agg_results

In [None]:
plt.scatter(agg_results.index, agg_results["latency", "mean"])
plt.xlabel("Average Requests per Second")
plt.ylabel("Average Latency (sec)")

## Using zero-copy model loading

Now let's redo this baseline using zero-copy model loading.
First we'll need to convert the model into a format that can be loaded without copying
data. The model is actually a pipeline of multiple operations, but the RoBERTa model
at its center is orders of magnitude larger and more CPU-intensive than everything else, so we'll only apply zero-copy loading to that part.



In [None]:
serve.shutdown()
reboot_ray()
serve.start()

## Introducing `zerocopy`

We've created a Python package, `zerocopy`, with the model rewrite code from our previous post (TODO: Publish the package to PyPI).

To use that package, you'll need to install it with `pip`, then import it into your script.

```python
import zerocopy
```


In [None]:
# TODO: Move this code to the `zerocopy` library.
@ray.remote
def call_model_zero_copy(model_ref: ray.ObjectRef, args, kwargs) -> Any:
    """
    Ray task that uses zero-copy model loading to reconstitute a model
    from Plasma, then invokes the model's ``__call__()`` method.

    :param model_ref: Object reference to a tuple of model skeleton
     and model weights, as returned by :func:`extract_tensors`
    :param args: Ordered arguments to pass to the model's :func:`__call__`
     method
    :param kwargs: Keyword arguments to pass to the model's :func:`__call__`
     method

    :returns: Return value from the model's :func:`__call__` method
    """
    # Suppress PyTorch warnings about immutable tensors
    import warnings
    warnings.filterwarnings("ignore")

    model_skeleton, model_weights = model_ref
    zerocopy.replace_tensors(model_skeleton, model_weights)
    with torch.no_grad():
        return model_skeleton(*args, **kwargs)

In [None]:
# Call the model directly
inputs = qa.tokenizer(qa_input["question"], qa_input["context"], return_tensors="pt")
qa.model(**inputs)

In [None]:
# Call the model via `call_model`. Results should be the same as the previous cell.
model_ref = ray.put(zerocopy.extract_tensors(qa.model))
ray.get(call_model_zero_copy.remote(model_ref, [], inputs))

The time to invoke the model once via `call_model_zero_copy()` is almost the same as running the model locally.

In [None]:
# Compare timings
print("       Time to run locally: ", end="")
%timeit qa.model(**inputs)
print("Time to run with zero-copy: ", end="")
%timeit ray.get(call_model_zero_copy.remote(model_ref, [], inputs))

If we run inference multiple times, `call_model_zero_copy()` can send those inference requests to separate Ray tasks that run in parallel.

In [None]:
def run_local(num_repeats: int):
    for _ in range(num_repeats):
        qa.model(**inputs)


def run_zero_copy(num_repeats: int):
    futures = [call_model_zero_copy.remote(model_ref, [], inputs) for _ in range(num_repeats)]
    ray.get(futures)


NUM_REPEATS = 100
print(f"       Time to run {NUM_REPEATS} times locally: ", end="")
%timeit -r 3 run_local(NUM_REPEATS)
print(f"Time to run {NUM_REPEATS} times with zero-copy: ", end="")
%timeit -r 3 run_zero_copy(NUM_REPEATS)

Now let's define a Ray Serve endpoint that runs the model preprocessing code locally and farms out model inference 
to Ray tasks that use zero-copy model loading.

In [None]:
class ZeroCopyQAModel:
    def __init__(self):
        # TODO: Move this rewrite to the `zerocopy` library.
        # Load the entire pipeline, then copy the model portion to Plasma.
        self._qa = transformers.pipeline("question-answering", model=model_name)
        model_ref = ray.put(zerocopy.extract_tensors(self._qa.model))

        # Replace the pipeline's model with a callback that farms out work to
        # Ray tasks.
        class _ModelCallback:
            def __call__(self, *args, **kwargs):
                return ray.get(call_model_zero_copy.remote(model_ref, args, kwargs))
        self._qa.model = _ModelCallback()

        # Use a threadpool because the model is called from pre/postprocessing code
        # that is not asyncio-aware
        self._threadpool = concurrent.futures.ThreadPoolExecutor()

    async def __call__(self, request: starlette.requests.Request):
        # Pull model inputs from URL query parameters.
        # A production version of this code would sanitize these strings.
        model_input = {
            "question": request.query_params["question"],
            "context": request.query_params["context"]
        }
        result = await asyncio.get_running_loop().run_in_executor(
            self._threadpool, lambda: self._qa(model_input))
        return result

    def __del__(self):  # Ray Serve needs this callback
        pass


# Define endpoints
NUM_QA_MODELS = 12
deployments = [
    serve.deployment(ZeroCopyQAModel, f"qa{model_num}",
                     ray_actor_options={"num_cpus": 0.1})
    for model_num in range(NUM_QA_MODELS)
]

for d in deployments:
    d.deploy(_blocking=False)

# Wait a moment so log output doesn't go to the next cell's output
time.sleep(1.)

In [None]:
# Try out the new deployment.
# This web service call blocks until the asychronous deployment has completed.
params = urllib.parse.urlencode(qa_input)
requests.get(f"http://127.0.0.1:8000/qa0?{params}").json()

We've deployed these models to the same URLs, so the benchmark code from before should work without
any changes.

In [None]:
# Quick test run
run_benchmark(call_model, 5, 10)

In [None]:
# Run the benchmark at multiple different request rates
to_concat = []
for request_rate in REQUEST_RATES:
    print(f"Running at {request_rate} requests/sec.")
    times = run_benchmark(call_model, request_rate, RUNNING_TIME_SEC)
    times.insert(0, "request_rate", request_rate)
    to_concat.append(times)

results_zerocopy = pd.concat(to_concat)

In [None]:
agg_results_zerocopy = results_zerocopy.groupby("request_rate").aggregate({
    "latency": ["mean", "median", "max"]})
agg_results_zerocopy

In [None]:
# Plot the two sets of results against each other.
plt.rcParams.update({"font.size": 16})
plt.figure(figsize=(7, 5))
plt.scatter(agg_results.index, agg_results["latency", "mean"],
            label="baseline")
plt.scatter(agg_results_zerocopy.index, 
            agg_results_zerocopy["latency", "mean"],
            label="zero-copy")
plt.xlabel("Average Requests per Second")
plt.ylabel("Average Latency (sec)")
plt.legend()

In [None]:
# Old code defined an actor

# class ModelCallback:
#     def __init__(self, model_ref: ray.ObjectRef):
#         self._model_ref = model_ref

#     def __call__(self, *args: Any, **kwargs: Any) -> Any:
#         return ray.get(call_model.remote(self._model_ref, args, kwargs))

# @ray.remote
# class QAModelZeroCopyActor:
#     def __init__(self):
#         self._qa = transformers.pipeline("question-answering", model=model_name)
#         self._model_ref = ray.put(zerocopy.extract_tensors(self._qa.model))
#         self._qa.model = ModelCallback(self._model_ref)

#     def run_inference(self, input_: Dict[str, str]) -> Dict[str, Any]:
#         return self._qa(input_)

# zero_copy_actors = [QAModelZeroCopyActor.options(max_concurrency=8).remote() 
#                     for _ in range(NUM_QA_MODELS)]
# ray.get(zero_copy_actors[0].run_inference.remote(qa_input))