Skip to content

Commit

Permalink
[Serve] Add nightly test for Serve failure recovery (#19125)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaodong committed Oct 12, 2021
1 parent c2377fb commit 85b8a6d
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 11 deletions.
4 changes: 3 additions & 1 deletion python/ray/serve/storage/checkpoint_path.py
Expand Up @@ -32,7 +32,9 @@ def make_kv_store(checkpoint_path, namespace):

if parsed_url.scheme == "s3":
bucket = parsed_url.netloc
prefix = parsed_url.path
# We need to strip leading "/" in path as right key to use in
# boto3. Ex: s3://bucket/folder/file.zip -> key = "folder/file.zip"
prefix = parsed_url.path.lstrip("/")
logger.info(
"Using Ray S3 KVStore for controller checkpoint and recovery: "
f"bucket={bucket} checkpoint_path={checkpoint_path}")
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/storage/kv_store.py
Expand Up @@ -186,7 +186,7 @@ def __init__(
):
self._namespace = namepsace
self._bucket = bucket
self._prefix = prefix + "/" if prefix else ""
self._prefix = prefix
if not boto3:
raise ImportError(
"You tried to use S3KVstore client without boto3 installed."
Expand All @@ -199,7 +199,7 @@ def __init__(
aws_session_token=aws_session_token)

def get_storage_key(self, key: str) -> str:
return f"{self._prefix}{self._namespace}-{key}"
return f"{self._prefix}/{self._namespace}-{key}"

def put(self, key: str, val: bytes) -> bool:
"""Put the key-value pair into the store.
Expand Down
15 changes: 15 additions & 0 deletions release/serve_tests/serve_tests.yaml
Expand Up @@ -27,12 +27,27 @@
- name: serve_micro_benchmark
cluster:
app_config: app_config.yaml
# 16 CPUS
compute_template: compute_tpl_single_node.yaml

run:
timeout: 7200
long_running: False
script: python workloads/serve_micro_benchmark.py

smoke_test:
timeout: 600

- name: serve_cluster_fault_tolerance
cluster:
app_config: app_config.yaml
# 16 CPUS
compute_template: compute_tpl_single_node.yaml

run:
timeout: 7200
long_running: False
script: python workloads/serve_cluster_fault_tolerance.py

smoke_test:
timeout: 600
119 changes: 119 additions & 0 deletions release/serve_tests/workloads/serve_cluster_fault_tolerance.py
@@ -0,0 +1,119 @@
"""
Test that a serve deployment can recover from cluster failures by resuming
from checkpoints of external source, such as s3.
For product testing, we skip the part of actually starting new cluster as
it's Job Manager's responsibility, and only re-deploy to the same cluster
with remote checkpoint.
"""

import click
import time
import requests
import uuid
import os

from serve_test_cluster_utils import setup_local_single_node_cluster

from serve_test_utils import (save_test_results)

import ray
from ray import serve
from ray.serve.utils import logger

# Deployment configs
DEFAULT_NUM_REPLICAS = 4
DEFAULT_MAX_BATCH_SIZE = 16


def request_with_retries(endpoint, timeout=3):
start = time.time()
while True:
try:
return requests.get(
"http://127.0.0.1:8000" + endpoint, timeout=timeout)
except requests.RequestException:
if time.time() - start > timeout:
raise TimeoutError
time.sleep(0.1)


@click.command()
def main():
# Setup local cluster, note this cluster setup is the same for both
# local and product ray cluster env.
# Each test uses different ray namespace, thus kv storage key for each
# checkpoint is different to avoid collision.
namespace = uuid.uuid4().hex

# IS_SMOKE_TEST is set by args of releaser's e2e.py
smoke_test = os.environ.get("IS_SMOKE_TEST", "1")
if smoke_test == "1":
checkpoint_path = "file://checkpoint.db"
else:
checkpoint_path = "s3://serve-nightly-tests/fault-tolerant-test-checkpoint" # noqa: E501

_, cluster = setup_local_single_node_cluster(
1, checkpoint_path=checkpoint_path, namespace=namespace)

# Deploy for the first time
@serve.deployment(name="echo", num_replicas=DEFAULT_NUM_REPLICAS)
class Echo:
def __init__(self):
return True

def __call__(self, request):
return "hii"

Echo.deploy()

# Ensure endpoint is working
for _ in range(5):
response = request_with_retries("/echo/", timeout=3)
assert response.text == "hii"

logger.info("Initial deployment successful with working endpoint.")

# Kill current cluster, recover from remote checkpoint and ensure endpoint
# is still available with expected results

ray.kill(serve.api._global_client._controller, no_restart=True)
ray.shutdown()
cluster.shutdown()
serve.api._set_global_client(None)

# Start another ray cluster with same namespace to resume from previous
# checkpoints with no new deploy() call.
setup_local_single_node_cluster(
1, checkpoint_path=checkpoint_path, namespace=namespace)

for _ in range(5):
response = request_with_retries("/echo/", timeout=3)
assert response.text == "hii"

logger.info("Deployment recovery from s3 checkpoint is successful "
"with working endpoint.")

# Delete dangling checkpoints. If script failed before this step, it's up
# to the TTL policy on s3 to clean up, but won't lead to collision with
# subsequent tests since each test run in different uuid namespace.
serve.shutdown()
ray.shutdown()
cluster.shutdown()

# Checkpoints in S3 bucket are moved after 7 days with explicit lifecycle
# rules. Each checkpoint is ~260 Bytes in size from this test.

# Save results
save_test_results(
{
"result": "success"
},
default_output_file="/tmp/serve_cluster_fault_tolerance.json")


if __name__ == "__main__":
main()
import pytest
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))
25 changes: 17 additions & 8 deletions release/serve_tests/workloads/serve_test_cluster_utils.py
Expand Up @@ -6,13 +6,16 @@
from ray.cluster_utils import Cluster
from ray.serve.utils import logger
from ray.serve.config import DeploymentMode

from ray.serve.constants import DEFAULT_CHECKPOINT_PATH
# Cluster setup configs
NUM_CPU_PER_NODE = 10
NUM_CONNECTIONS = 10


def setup_local_single_node_cluster(num_nodes):
def setup_local_single_node_cluster(
num_nodes: int,
checkpoint_path: str = DEFAULT_CHECKPOINT_PATH,
namespace="serve"):
"""Setup ray cluster locally via ray.init() and Cluster()
Each actor is simulated in local process on single node,
Expand All @@ -21,19 +24,23 @@ def setup_local_single_node_cluster(num_nodes):
cluster = Cluster()
for i in range(num_nodes):
cluster.add_node(
redis_port=6379 if i == 0 else None,
redis_port=6380 if i == 0 else None,
num_cpus=NUM_CPU_PER_NODE,
num_gpus=0,
resources={str(i): 2},
)
ray.init(address=cluster.address, dashboard_host="0.0.0.0")
ray.init(
address=cluster.address, dashboard_host="0.0.0.0", namespace=namespace)
serve_client = serve.start(
http_options={"location": DeploymentMode.EveryNode})
detached=True,
http_options={"location": DeploymentMode.EveryNode},
_checkpoint_path=checkpoint_path,
)

return serve_client
return serve_client, cluster


def setup_anyscale_cluster():
def setup_anyscale_cluster(checkpoint_path: str = DEFAULT_CHECKPOINT_PATH):
"""Setup ray cluster at anyscale via ray.client()
Note this is by default large scale and should be kicked off
Expand All @@ -44,7 +51,9 @@ def setup_anyscale_cluster():
# ray.client().env({}).connect()
ray.init(address="auto")
serve_client = serve.start(
http_options={"location": DeploymentMode.EveryNode})
http_options={"location": DeploymentMode.EveryNode},
_checkpoint_path=checkpoint_path,
)

return serve_client

Expand Down

0 comments on commit 85b8a6d

Please sign in to comment.