Skip to content

Commit

Permalink
stabilize metrics pusher
Browse files Browse the repository at this point in the history
Signed-off-by: Cindy Zhang <cindyzyx9@gmail.com>
  • Loading branch information
zcin committed Aug 11, 2023
1 parent 6fef803 commit b7394cf
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
6 changes: 2 additions & 4 deletions python/ray/serve/_private/utils.py
Expand Up @@ -628,10 +628,8 @@ def send_forever():
logger.warning(
f"MetricsPusher thread failed to run metric task: {e}"
)
duration_s = time.time() - start
remaining_time = least_interval_s - duration_s
if remaining_time > 0:
time.sleep(remaining_time)

time.sleep(least_interval_s)

self.pusher_thread = threading.Thread(target=send_forever)
# Making this a daemon thread so it doesn't leak upon shutdown, and it
Expand Down
17 changes: 17 additions & 0 deletions python/ray/serve/tests/test_util.py
Expand Up @@ -4,6 +4,7 @@
import sys
import tempfile
from copy import deepcopy
import time
from unittest.mock import patch

import numpy as np
Expand All @@ -24,6 +25,7 @@
dict_keys_snake_to_camel_case,
get_all_live_placement_group_names,
get_head_node_id,
MetricsPusher,
)
from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME

Expand Down Expand Up @@ -665,6 +667,21 @@ def test_get_all_live_placement_group_names(ray_instance):
assert set(get_all_live_placement_group_names()) == {"pg3", "pg4", "pg5", "pg6"}


def test_metrics_pusher():
counter = {"val": 0}

def task(c):
time.sleep(0.001)
c["val"] += 1

metrics_pusher = MetricsPusher()
metrics_pusher.register_task(lambda: task(counter), 0.5)
metrics_pusher.start()

time.sleep(10.4)
assert counter["val"] == 20


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit b7394cf

Please sign in to comment.