diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index a776ce5f4ccd39..bcc17c9288c8d8 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -628,10 +628,8 @@ def send_forever(): logger.warning( f"MetricsPusher thread failed to run metric task: {e}" ) - duration_s = time.time() - start - remaining_time = least_interval_s - duration_s - if remaining_time > 0: - time.sleep(remaining_time) + + time.sleep(least_interval_s) self.pusher_thread = threading.Thread(target=send_forever) # Making this a daemon thread so it doesn't leak upon shutdown, and it diff --git a/python/ray/serve/tests/test_util.py b/python/ray/serve/tests/test_util.py index d2af287639b09c..1b9024afa0e6e7 100644 --- a/python/ray/serve/tests/test_util.py +++ b/python/ray/serve/tests/test_util.py @@ -4,6 +4,7 @@ import sys import tempfile from copy import deepcopy +import time from unittest.mock import patch import numpy as np @@ -24,6 +25,7 @@ dict_keys_snake_to_camel_case, get_all_live_placement_group_names, get_head_node_id, + MetricsPusher, ) from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME @@ -665,6 +667,21 @@ def test_get_all_live_placement_group_names(ray_instance): assert set(get_all_live_placement_group_names()) == {"pg3", "pg4", "pg5", "pg6"} +def test_metrics_pusher(): + counter = {"val": 0} + + def task(c): + time.sleep(0.001) + c["val"] += 1 + + metrics_pusher = MetricsPusher() + metrics_pusher.register_task(lambda: task(counter), 0.5) + metrics_pusher.start() + + time.sleep(10.4) + assert counter["val"] == 20 + + if __name__ == "__main__": import sys