Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Stabilize metrics pusher #38349

Merged
merged 7 commits into from Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 16 additions & 12 deletions python/ray/serve/_private/utils.py
Expand Up @@ -2,6 +2,7 @@
import importlib
import inspect
import logging
import math
import os
import random
import string
Expand Down Expand Up @@ -599,21 +600,14 @@ def start(self):
"""

def send_forever():

while True:
if self.stop_event.is_set():
return

start = time.time()
least_interval_s = None

for task in self.tasks:
try:
if least_interval_s is None:
least_interval_s = task.interval_s
else:
least_interval_s = min(least_interval_s, task.interval_s)
if start - task.last_call_succeeded_time > task.interval_s:
if start - task.last_call_succeeded_time >= task.interval_s:
if task.last_ref:
ready_refs, _ = ray.wait([task.last_ref], timeout=0)
if len(ready_refs) == 0:
Expand All @@ -628,10 +622,20 @@ def send_forever():
logger.warning(
f"MetricsPusher thread failed to run metric task: {e}"
)
duration_s = time.time() - start
remaining_time = least_interval_s - duration_s
if remaining_time > 0:
time.sleep(remaining_time)

# For all tasks, check when the task should be executed
# next. Sleep until the next closest time.
least_interval_s = math.inf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] Could we raise an error if the MetricsPusher is started without any tasks registered? As written, it silently sleeps forever since least_interval_s gets set to math.inf.

Copy link
Contributor Author

@zcin zcin Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! I've added this to MetricsPusher.start()

for task in self.tasks:
time_until_next_push = task.interval_s - (
time.time() - task.last_call_succeeded_time
)
least_interval_s = min(least_interval_s, time_until_next_push)

time.sleep(max(least_interval_s, 0))

if len(self.tasks) == 0:
raise ValueError("MetricsPusher has zero tasks registered.")

self.pusher_thread = threading.Thread(target=send_forever)
# Making this a daemon thread so it doesn't leak upon shutdown, and it
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_application_state.py
Expand Up @@ -18,10 +18,10 @@
ReplicaConfig,
DeploymentInfo,
)
from ray.serve.tests.utils import MockKVStore
from ray.serve._private.utils import get_random_letters
from ray.serve.exceptions import RayServeException
from ray.serve.schema import ServeApplicationSchema, DeploymentSchema
from ray.serve.tests.test_deployment_state import MockKVStore


class MockEndpointState:
Expand Down
41 changes: 1 addition & 40 deletions python/ray/serve/tests/test_deployment_state.py
@@ -1,5 +1,4 @@
import sys
import time
from typing import Any, Dict, List, Optional, Tuple
from unittest.mock import patch, Mock
from collections import defaultdict
Expand All @@ -19,6 +18,7 @@
from ray.serve._private.deployment_scheduler import (
ReplicaSchedulingRequest,
)
from ray.serve.tests.utils import MockTimer, MockKVStore
from ray.serve._private.deployment_state import (
ActorReplicaWrapper,
DeploymentState,
Expand Down Expand Up @@ -290,32 +290,6 @@ def schedule(self, upscales, downscales):
return deployment_to_replicas_to_stop


class MockKVStore:
def __init__(self):
self.store = dict()

def put(self, key: str, val: Any) -> bool:
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))
self.store[key] = val
return True

def get(self, key: str) -> Any:
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))
return self.store.get(key, None)

def delete(self, key: str) -> bool:
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))

if key in self.store:
del self.store[key]
return True

return False


def deployment_info(
version: Optional[str] = None,
num_replicas: Optional[int] = 1,
Expand Down Expand Up @@ -350,19 +324,6 @@ def deployment_version(code_version) -> DeploymentVersion:
return DeploymentVersion(code_version, DeploymentConfig(), {})


class MockTimer:
def __init__(self, start_time=None):
if start_time is None:
start_time = time.time()
self._curr = start_time

def time(self):
return self._curr

def advance(self, by):
self._curr += by


class MockClusterNodeInfoCache:
def __init__(self):
self.alive_node_ids = set()
Expand Down
84 changes: 84 additions & 0 deletions python/ray/serve/tests/test_util.py
Expand Up @@ -4,6 +4,7 @@
import sys
import tempfile
from copy import deepcopy
import time
from unittest.mock import patch

import numpy as np
Expand All @@ -24,7 +25,9 @@
dict_keys_snake_to_camel_case,
get_all_live_placement_group_names,
get_head_node_id,
MetricsPusher,
)
from ray.serve.tests.utils import MockTimer
from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME


Expand Down Expand Up @@ -665,6 +668,87 @@ def test_get_all_live_placement_group_names(ray_instance):
assert set(get_all_live_placement_group_names()) == {"pg3", "pg4", "pg5", "pg6"}


def test_metrics_pusher_no_tasks():
"""Test that a metrics pusher can't be started with zero tasks."""
metrics_pusher = MetricsPusher()
with pytest.raises(ValueError):
metrics_pusher.start()


def test_metrics_pusher_basic():
start = 0
timer = MockTimer(start)

with patch("time.time", new=timer.time), patch(
"time.sleep", new=timer.realistic_sleep
):
counter = {"val": 0}
result = {}
expected_result = 20

def task(c, res):
timer.realistic_sleep(0.001)
c["val"] += 1
# At 10 seconds, this task should have been called 20 times
if timer.time() >= 10 and "val" not in res:
res["val"] = c["val"]

metrics_pusher = MetricsPusher()
metrics_pusher.register_task(lambda: task(counter, result), 0.5)

metrics_pusher.start()
# This busy wait loop should run for at most a few hundred milliseconds
# The test should finish by then, and if the test fails this prevents
# an infinite loop
for _ in range(10000000):
if "val" in result:
assert result["val"] == expected_result
break

assert result["val"] == expected_result


def test_metrics_pusher_multiple_tasks():
start = 0
timer = MockTimer(start)

with patch("time.time", new=timer.time), patch(
"time.sleep", new=timer.realistic_sleep
):
counter = {"A": 0, "B": 0, "C": 0}
result = {}
expected_results = {"A": 35, "B": 14, "C": 10}

def task(key, c, res):
time.sleep(0.001)
c[key] += 1
# Check for how many times this task has been called
# At 7 seconds, tasks A, B, C should have executed 35, 14, and 10
# times respectively.
if timer.time() >= 7 and key not in res:
res[key] = c[key]

metrics_pusher = MetricsPusher()
# Each task interval is different, and they don't divide each other.
metrics_pusher.register_task(lambda: task("A", counter, result), 0.2)
metrics_pusher.register_task(lambda: task("B", counter, result), 0.5)
metrics_pusher.register_task(lambda: task("C", counter, result), 0.7)
metrics_pusher.start()

# This busy wait loop should run for at most a few hundred milliseconds
# The test should finish by then, and if the test fails this prevents
# an infinite loop
for _ in range(10000000):
for key in result.keys():
assert result[key] == expected_results[key]
if len(result) == 3:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the test fail if len(result) is never 3? We should add an assertion after the for loop in that case. Same question for test_metrics_pusher_basic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good call, I've added this in both tests!

break

# Check there are three results set and all are expected.
for key in expected_results.keys():
assert result[key] == expected_results[key]


if __name__ == "__main__":
import sys

Expand Down
44 changes: 44 additions & 0 deletions python/ray/serve/tests/utils.py
@@ -0,0 +1,44 @@
import time
from typing import Any


class MockTimer:
def __init__(self, start_time=None):
if start_time is None:
start_time = time.time()
self._curr = start_time

def time(self):
return self._curr

def advance(self, by):
self._curr += by

def realistic_sleep(self, amt):
self._curr += amt + 0.001


class MockKVStore:
def __init__(self):
self.store = dict()

def put(self, key: str, val: Any) -> bool:
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))
self.store[key] = val
return True

def get(self, key: str) -> Any:
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))
return self.store.get(key, None)

def delete(self, key: str) -> bool:
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))

if key in self.store:
del self.store[key]
return True

return False