Skip to content

Commit

Permalink
[Serve] Check that telemetry isn't set before the test starts (ray-pr…
Browse files Browse the repository at this point in the history
…oject#38930)

This change adds a few assertions to ensure that telemetry isn't set before the unit tests start.

Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: Jim Thompson <jimthompson5802@gmail.com>
  • Loading branch information
2 people authored and jimthompson5802 committed Sep 12, 2023
1 parent 92b8909 commit 450b5c7
Showing 1 changed file with 93 additions and 10 deletions.
103 changes: 93 additions & 10 deletions python/ray/serve/tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ray._private.usage.usage_lib import get_extra_usage_tags_to_report
from ray.serve._private.usage import ServeUsageTag
from ray.serve.tests.utils import (
receiver_app,
check_ray_started,
start_telemetry_app,
TelemetryStorage,
Expand All @@ -39,6 +40,20 @@ def test_fastapi_detected(manage_ray_with_telemetry):
subprocess.check_output(["ray", "start", "--head"])
wait_for_condition(check_ray_started, timeout=5)

storage_handle = start_telemetry_app()

wait_for_condition(
lambda: ray.get(storage_handle.get_reports_received.remote()) > 0, timeout=5
)

# Check that telemetry related to FastAPI app is not set
report = ray.get(storage_handle.get_report.remote())
assert ServeUsageTag.FASTAPI_USED.get_value_from_report(report) is None
assert ServeUsageTag.API_VERSION.get_value_from_report(report) == "v2"
assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 1
assert int(ServeUsageTag.NUM_DEPLOYMENTS.get_value_from_report(report)) == 1
assert int(ServeUsageTag.NUM_GPU_DEPLOYMENTS.get_value_from_report(report)) == 0

app = FastAPI()

@serve.deployment
Expand All @@ -55,16 +70,18 @@ async def app2(self):
fastapi_app = FastAPIDeployment.bind()
serve.run(fastapi_app, name="fastapi_app", route_prefix="/fastapi")

storage_handle = start_telemetry_app()

wait_for_condition(
lambda: serve.status().applications["fastapi_app"].status
== ApplicationStatus.RUNNING,
timeout=15,
)

current_num_reports = ray.get(storage_handle.get_reports_received.remote())

wait_for_condition(
lambda: ray.get(storage_handle.get_reports_received.remote()) > 0, timeout=5
lambda: ray.get(storage_handle.get_reports_received.remote())
> current_num_reports,
timeout=5,
)
report = ray.get(storage_handle.get_report.remote())

Expand All @@ -90,6 +107,16 @@ def test_grpc_detected(manage_ray_with_telemetry):
subprocess.check_output(["ray", "start", "--head"])
wait_for_condition(check_ray_started, timeout=5)

storage_handle = start_telemetry_app()

wait_for_condition(
lambda: ray.get(storage_handle.get_reports_received.remote()) > 0, timeout=5
)

# Check that telemetry related to gRPC ingress app is not set
report = ray.get(storage_handle.get_report.remote())
assert ServeUsageTag.GRPC_INGRESS_USED.get_value_from_report(report) is None

@serve.deployment(ray_actor_options={"num_cpus": 0})
def greeter(inputs: Dict[str, bytes]):
return "Hello!"
Expand All @@ -100,16 +127,18 @@ def greeter(inputs: Dict[str, bytes]):

serve.run(grpc_app, name="grpc_app", route_prefix="/grpc")

storage_handle = start_telemetry_app()

wait_for_condition(
lambda: serve.status().applications["grpc_app"].status
== ApplicationStatus.RUNNING,
timeout=15,
)

current_num_reports = ray.get(storage_handle.get_reports_received.remote())

wait_for_condition(
lambda: ray.get(storage_handle.get_reports_received.remote()) > 0, timeout=5
lambda: ray.get(storage_handle.get_reports_received.remote())
> current_num_reports,
timeout=5,
)
report = ray.get(storage_handle.get_report.remote())

Expand All @@ -136,6 +165,17 @@ def test_graph_detected(manage_ray_with_telemetry, use_adapter):
subprocess.check_output(["ray", "start", "--head"])
wait_for_condition(check_ray_started, timeout=5)

storage_handle = start_telemetry_app()

wait_for_condition(
lambda: ray.get(storage_handle.get_reports_received.remote()) > 0, timeout=5
)

# Check that telemetry related to DAGDriver app is not set
report = ray.get(storage_handle.get_report.remote())
assert ServeUsageTag.DAG_DRIVER_USED.get_value_from_report(report) is None
assert ServeUsageTag.HTTP_ADAPTER_USED.get_value_from_report(report) is None

@serve.deployment(ray_actor_options={"num_cpus": 0})
def greeter(input):
return "Hello!"
Expand All @@ -156,10 +196,12 @@ def greeter(input):
timeout=15,
)

storage_handle = start_telemetry_app()
current_num_reports = ray.get(storage_handle.get_reports_received.remote())

wait_for_condition(
lambda: ray.get(storage_handle.get_reports_received.remote()) > 0, timeout=5
lambda: ray.get(storage_handle.get_reports_received.remote())
> current_num_reports,
timeout=5,
)
report = ray.get(storage_handle.get_report.remote())

Expand Down Expand Up @@ -200,6 +242,23 @@ def test_rest_api(manage_ray_with_telemetry, tmp_dir, version):

storage = TelemetryStorage.remote()

serve.run(
receiver_app,
host="0.0.0.0",
name="telemetry",
route_prefix=TELEMETRY_ROUTE_PREFIX,
)

wait_for_condition(
lambda: ray.get(storage.get_reports_received.remote()) > 1, timeout=15
)

# Check that REST API telemetry is not set
report = ray.get(storage.get_report.remote())
assert ServeUsageTag.REST_API_VERSION.get_value_from_report(report) is None

serve.delete(name="telemetry", _blocking=True)

if version == "v1":
config = {"import_path": "ray.serve.tests.utils.receiver_app"}
elif version == "v2":
Expand Down Expand Up @@ -243,8 +302,11 @@ def test_rest_api(manage_ray_with_telemetry, tmp_dir, version):
timeout=15,
)

current_num_reports = ray.get(storage.get_reports_received.remote())

wait_for_condition(
lambda: ray.get(storage.get_reports_received.remote()) > 10, timeout=15
lambda: ray.get(storage.get_reports_received.remote()) > current_num_reports,
timeout=5,
)
report = ray.get(storage.get_report.remote())

Expand Down Expand Up @@ -335,6 +397,23 @@ def test_lightweight_config_options(
wait_for_condition(check_ray_started, timeout=5)
storage = TelemetryStorage.remote()

serve.run(
receiver_app,
host="0.0.0.0",
name="telemetry",
route_prefix=TELEMETRY_ROUTE_PREFIX,
)

wait_for_condition(
lambda: ray.get(storage.get_reports_received.remote()) > 1, timeout=15
)

# Check that REST API telemetry is not set
report = ray.get(storage.get_report.remote())
assert ServeUsageTag.REST_API_VERSION.get_value_from_report(report) is None

serve.delete(name="telemetry", _blocking=True)

config = {
"applications": [
{
Expand Down Expand Up @@ -364,8 +443,12 @@ def test_lightweight_config_options(
== ApplicationStatus.RUNNING,
timeout=15,
)

current_num_reports = ray.get(storage.get_reports_received.remote())

wait_for_condition(
lambda: ray.get(storage.get_reports_received.remote()) > 0, timeout=5
lambda: ray.get(storage.get_reports_received.remote()) > current_num_reports,
timeout=5,
)
report = ray.get(storage.get_report.remote())

Expand Down

0 comments on commit 450b5c7

Please sign in to comment.