Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serve ft nightly #19125

Merged
merged 8 commits into from Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/serve/storage/checkpoint_path.py
Expand Up @@ -32,7 +32,7 @@ def make_kv_store(checkpoint_path, namespace):

if parsed_url.scheme == "s3":
bucket = parsed_url.netloc
prefix = parsed_url.path
prefix = parsed_url.path.lstrip("/")
edoakes marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
"Using Ray S3 KVStore for controller checkpoint and recovery: "
f"bucket={bucket} checkpoint_path={checkpoint_path}")
Expand Down
26 changes: 11 additions & 15 deletions python/ray/serve/storage/kv_store.py
Expand Up @@ -174,32 +174,28 @@ class RayS3KVStore(KVStoreBase):
caller must handle serialization.
"""

def __init__(
self,
namepsace: str,
bucket="",
prefix="",
region_name="us-west-2",
aws_access_key_id=None,
aws_secret_access_key=None,
aws_session_token=None,
):
def __init__(self,
namepsace: str,
bucket="",
prefix="",
region_name="us-west-2"):
self._namespace = namepsace
self._bucket = bucket
self._prefix = prefix + "/" if prefix else ""
self._prefix = prefix
if not boto3:
raise ImportError(
"You tried to use S3KVstore client without boto3 installed."
"Please run `pip install boto3`")
self._s3 = boto3.client(
"s3",
region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token)
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think previously we're not really getting these tokens in make_kv_store, so adding it at client level with actual s3 integration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boto should fall back to environ if we pass in None, so this is not necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump on this

aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY",
None),
aws_session_token=os.environ.get("AWS_SESSION_TOKEN", None))

def get_storage_key(self, key: str) -> str:
return f"{self._prefix}{self._namespace}-{key}"
return f"{self._prefix}/{self._namespace}-{key}"

def put(self, key: str, val: bytes) -> bool:
"""Put the key-value pair into the store.
Expand Down
15 changes: 15 additions & 0 deletions release/serve_tests/serve_tests.yaml
Expand Up @@ -27,12 +27,27 @@
- name: serve_micro_benchmark
cluster:
app_config: app_config.yaml
# 16 CPUS
compute_template: compute_tpl_single_node.yaml

run:
timeout: 7200
long_running: False
script: python workloads/serve_micro_benchmark.py

smoke_test:
timeout: 600

- name: serve_cluster_fault_tolerance
cluster:
app_config: app_config.yaml
# 16 CPUS
compute_template: compute_tpl_single_node.yaml

run:
timeout: 7200
long_running: False
script: python workloads/serve_cluster_fault_tolerance.py

smoke_test:
timeout: 600
99 changes: 99 additions & 0 deletions release/serve_tests/workloads/serve_cluster_fault_tolerance.py
@@ -0,0 +1,99 @@
"""
Test that a serve deployment can recover from cluster failures by resuming
from checkpoints of external source, such as s3.

For product testing, we skip the part of actually starting new cluster as
it's Job Manager's responsibility, and only re-deploy to the same cluster
with remote checkpoint.
Comment on lines +5 to +7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think we should still fully test that path where the entire cluster goes down. If we just test the controller being killed there may be some hidden assumptions about state in the GCS or something.

Can you just use cluster_utils and blow away the whole cluster and create a new one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to completely start new ray cluster if possible as well, but think cluster_utils is for local tests, is there an equivalent on product to blow away current cluster and start new one ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm not sure. cluster_utils would be a good first step, it at least clears all process-level state

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ask internally on slack if there's a best way to do this? Worst case you just use the SDK to kill the cluster and start a new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster_utils will run much faster though and it'll be nice to be able to run it locally for testing. if possible maybe make one test with both "backends" and start with just cluster_utils?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i asked a few product folks, our SDK has start / terminate cluster APIs but the issues is current releaser/e2e.py is assuming it runs a single script as if on laptop, in single ray cluster. Doing this in our test script might not work well and most feasible way is probably extend existing test script's yaml field to extend a multi-cluster setup.

For now im trying to work with multiple local clusters on product, which should terminate quick but also give us more coverage compare to current revision.

"""

import click
import os
import time
import requests

from serve_test_cluster_utils import (setup_local_single_node_cluster,
setup_anyscale_cluster)
from serve_test_utils import (
save_test_results, )

import ray
from ray import serve
from ray.serve.utils import logger

# Deployment configs
DEFAULT_NUM_REPLICAS = 4
DEFAULT_MAX_BATCH_SIZE = 16

# Checkpoint configs
DEFAULT_CHECKPOINT_PATH = "s3://serve-nightly-tests/fault-tolerant-test-checkpoint" # noqa: E501


def request_with_retries(endpoint, timeout=30):
start = time.time()
while True:
try:
return requests.get(
"http://127.0.0.1:8000" + endpoint, timeout=timeout)
except requests.RequestException:
if time.time() - start > timeout:
raise TimeoutError
time.sleep(0.1)


@click.command()
def main():
# (1) Setup cluster
# IS_SMOKE_TEST is set by args of releaser's e2e.py
smoke_test = os.environ.get("IS_SMOKE_TEST", "1")
if smoke_test == "1":
setup_local_single_node_cluster(
1, checkpoint_path=DEFAULT_CHECKPOINT_PATH)
else:
setup_anyscale_cluster(checkpoint_path=DEFAULT_CHECKPOINT_PATH)

# Deploy for the first time
@serve.deployment(name="echo", num_replicas=DEFAULT_NUM_REPLICAS)
class Echo:
def __init__(self):
return True

def __call__(self, request):
return "hii"

Echo.deploy()

# Ensure endpoint is working
for _ in range(10):
response = request_with_retries("/echo/", timeout=30)
assert response.text == "hii"

logger.info("Initial deployment successful with working endpoint.")

# Kill controller and wait for endpoint to be available again
# Recover from remote checkpoint
# Ensure endpoint is still available with expected results
ray.kill(serve.api._global_client._controller, no_restart=False)
for _ in range(10):
response = request_with_retries("/echo/", timeout=30)
assert response.text == "hii"

logger.info("Deployment recovery from s3 checkpoint is successful "
"with working endpoint.")

# Checkpoints in S3 bucket are moved after 7 days with explicit lifecycle
# rules. Each checkpoint is ~260 Bytes in size from this test.

# Save results
save_test_results(
{
"result": "success"
},
default_output_file="/tmp/serve_cluster_fault_tolerance.json")


if __name__ == "__main__":
main()
import pytest
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))
15 changes: 10 additions & 5 deletions release/serve_tests/workloads/serve_test_cluster_utils.py
Expand Up @@ -6,13 +6,14 @@
from ray.cluster_utils import Cluster
from ray.serve.utils import logger
from ray.serve.config import DeploymentMode

from ray.serve.constants import DEFAULT_CHECKPOINT_PATH
# Cluster setup configs
NUM_CPU_PER_NODE = 10
NUM_CONNECTIONS = 10


def setup_local_single_node_cluster(num_nodes):
def setup_local_single_node_cluster(
num_nodes: int, checkpoint_path: str = DEFAULT_CHECKPOINT_PATH):
"""Setup ray cluster locally via ray.init() and Cluster()

Each actor is simulated in local process on single node,
Expand All @@ -28,12 +29,14 @@ def setup_local_single_node_cluster(num_nodes):
)
ray.init(address=cluster.address, dashboard_host="0.0.0.0")
serve_client = serve.start(
http_options={"location": DeploymentMode.EveryNode})
http_options={"location": DeploymentMode.EveryNode},
_checkpoint_path=checkpoint_path,
)

return serve_client


def setup_anyscale_cluster():
def setup_anyscale_cluster(checkpoint_path: str = DEFAULT_CHECKPOINT_PATH):
"""Setup ray cluster at anyscale via ray.client()

Note this is by default large scale and should be kicked off
Expand All @@ -44,7 +47,9 @@ def setup_anyscale_cluster():
# ray.client().env({}).connect()
ray.init(address="auto")
serve_client = serve.start(
http_options={"location": DeploymentMode.EveryNode})
http_options={"location": DeploymentMode.EveryNode},
_checkpoint_path=checkpoint_path,
)

return serve_client

Expand Down