Skip to content

Commit

Permalink
[serve] Cherry-pick queue length deadline configuration changes (#42176)
Browse files Browse the repository at this point in the history
Cherry-picks two PRs to address issues under high network delays:

[serve] Enable setting queue length response deadline via environment variable #42001
[serve] Add exponential backoff for queue_len_response_deadline_s #42041

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
edoakes committed Jan 5, 2024
1 parent e7a883d commit 3bb96d2
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 12 deletions.
10 changes: 10 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,13 @@
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS = (
os.environ.get("RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS", "0") == "1"
)

# Initial deadline for queue length responses in the router.
RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S = float(
os.environ.get("RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S", 0.1)
)

# Maximum deadline for queue length responses in the router (in backoff).
RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S = float(
os.environ.get("RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S", 1.0)
)
53 changes: 42 additions & 11 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
from ray.serve._private.common import DeploymentID, RequestProtocol, RunningReplicaInfo
from ray.serve._private.constants import (
HANDLE_METRIC_PUSH_INTERVAL_S,
RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S,
RAY_SERVE_MULTIPLEXED_MODEL_ID_MATCHING_TIMEOUT_S,
RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING,
RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S,
SERVE_LOGGER_NAME,
)
from ray.serve._private.deployment_info import DeploymentInfo
Expand Down Expand Up @@ -158,8 +160,11 @@ def multiplexed_model_ids(self) -> Set[str]:
"""Set of model IDs on this replica."""
pass

async def get_queue_state(self) -> Tuple[int, bool]:
"""Returns tuple of (queue_len, accepted)."""
async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]:
"""Returns tuple of (queue_len, accepted).
`deadline_s` is passed to verify backoff for testing.
"""
pass

def send_query(
Expand Down Expand Up @@ -195,7 +200,7 @@ def availability_zone(self) -> Optional[str]:
def multiplexed_model_ids(self) -> Set[str]:
return self._multiplexed_model_ids

async def get_queue_state(self) -> Tuple[int, bool]:
async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]:
# NOTE(edoakes): the `get_num_ongoing_requests` method name is shared by
# the Python and Java replica implementations. If you change it, you need to
# change both (or introduce a branch here).
Expand Down Expand Up @@ -316,7 +321,10 @@ class PowerOfTwoChoicesReplicaScheduler(ReplicaScheduler):

# Deadline for replicas to respond with their queue length. If the response isn't
# received within this deadline, the replica will not be considered.
queue_len_response_deadline_s = 0.1
# If this deadline is repeatedly missed, it will be exponentially increased up to
# the maximum configured here.
queue_len_response_deadline_s = RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S
max_queue_len_response_deadline_s = RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S

# Hard limit on the maximum number of scheduling tasks to run. Having too many of
# these tasks can cause stability issue due to too much load on the local process
Expand Down Expand Up @@ -690,35 +698,53 @@ async def choose_two_replicas_with_backoff(
)

async def select_from_candidate_replicas(
self, candidates: List[ReplicaWrapper]
self,
candidates: List[ReplicaWrapper],
backoff_index: int,
) -> Optional[ReplicaWrapper]:
"""Chooses the best replica from the list of candidates.
If none of the replicas can be scheduled, returns `None`.
The queue length at each replica is queried directly from it. The time waited
for these queries is capped by `self.queue_len_response_deadline_s`; if a
replica doesn't respond within the deadline it is not considered.
for these queries is capped by a response deadline; if a replica doesn't
doesn't respond within the deadline it is not considered. The deadline will be
increased exponentially in backoff.
Among replicas that respond within the deadline and accept the request (don't
have full queues), the one with the lowest queue length is chosen.
"""
# Ensure the max deadline is always >= the initial deadline.
max_queue_len_response_deadline_s = max(
self.queue_len_response_deadline_s,
self.max_queue_len_response_deadline_s,
)
queue_len_response_deadline_s = min(
self.queue_len_response_deadline_s * (2**backoff_index),
max_queue_len_response_deadline_s,
)

get_queue_state_tasks = []
for c in candidates:
t = self._loop.create_task(c.get_queue_state())
t = self._loop.create_task(
c.get_queue_state(deadline_s=queue_len_response_deadline_s)
)
t.replica_id = c.replica_id
get_queue_state_tasks.append(t)

done, pending = await asyncio.wait(
get_queue_state_tasks,
timeout=self.queue_len_response_deadline_s,
timeout=queue_len_response_deadline_s,
return_when=asyncio.ALL_COMPLETED,
)
for t in pending:
t.cancel()
logger.warning(
f"Failed to get queue length from replica {t.replica_id} "
f"within {self.queue_len_response_deadline_s}s."
f"within {queue_len_response_deadline_s}s. If this happens repeatedly "
"it's likely caused by high network latency in the cluster. You can "
"configure the deadline using the "
"`RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S` environment variable."
)

chosen_replica_id = None
Expand Down Expand Up @@ -820,15 +846,20 @@ async def fulfill_pending_requests(self):
"""
try:
while len(self._scheduling_tasks) <= self.target_num_scheduling_tasks:
backoff_index = 0
request_metadata = self._get_next_pending_request_metadata_to_schedule()
async for candidates in self.choose_two_replicas_with_backoff(
request_metadata
):
replica = await self.select_from_candidate_replicas(candidates)
replica = await self.select_from_candidate_replicas(
candidates, backoff_index
)
if replica is not None:
self.fulfill_next_pending_request(replica, request_metadata)
break

backoff_index += 1

except Exception:
logger.exception("Unexpected error in fulfill_pending_requests.")
finally:
Expand Down
74 changes: 73 additions & 1 deletion python/ray/serve/tests/unit/test_replica_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
self._sleep_time_s = sleep_time_s

self.get_queue_state_was_cancelled = False
self.queue_len_deadline_history = list()

@property
def replica_id(self) -> str:
Expand Down Expand Up @@ -71,7 +72,8 @@ def set_queue_state_response(
self._exception = exception
self._has_queue_len_response.set()

async def get_queue_state(self) -> Tuple[int, bool]:
async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]:
self.queue_len_deadline_history.append(deadline_s)
try:
while not self._has_queue_len_response.is_set():
await self._has_queue_len_response.wait()
Expand Down Expand Up @@ -1177,6 +1179,76 @@ async def test_get_queue_state_cancelled_on_timeout(pow_2_scheduler, fake_query)
assert (await task) == r1


@pytest.mark.asyncio
async def test_queue_len_response_deadline_backoff(pow_2_scheduler, fake_query):
"""
Verify that the response deadline is exponentially backed off up to the max.
"""
s = pow_2_scheduler
s.queue_len_response_deadline_s = 0.001
s.max_queue_len_response_deadline_s = 0.005
loop = get_or_create_event_loop()

r1 = FakeReplicaWrapper("r1")
s.update_replicas([r1])

# Attempt to schedule; the replica will be attempted and a timeout will occur
# due to the short timeout set above.
task = loop.create_task(s.choose_replica_for_query(fake_query))
done, _ = await asyncio.wait([task], timeout=0.2)
assert len(done) == 0

# Verify that the deadline never exceeds the max and deadline_n+1 is equal to
# the max or 2*deadline_n.
for i, j in zip(
range(0, len(r1.queue_len_deadline_history) - 1),
range(1, len(r1.queue_len_deadline_history)),
):
deadline_i = r1.queue_len_deadline_history[i]
deadline_j = r1.queue_len_deadline_history[j]
print(deadline_i, deadline_j)
assert (
deadline_i <= deadline_j
and deadline_j <= s.max_queue_len_response_deadline_s
)
if deadline_i < s.max_queue_len_response_deadline_s:
assert (
deadline_j == s.max_queue_len_response_deadline_s
or deadline_j == 2 * deadline_i
)

r1.set_queue_state_response(0, accepted=True)
assert (await task) == r1


@pytest.mark.asyncio
async def test_max_queue_len_response_deadline(pow_2_scheduler, fake_query):
"""
Verify that if the max response deadline is > the initial deadline, the initial is
always used.
"""
s = pow_2_scheduler
s.queue_len_response_deadline_s = 0.01
s.max_queue_len_response_deadline_s = 0.001
loop = get_or_create_event_loop()

r1 = FakeReplicaWrapper("r1")
s.update_replicas([r1])

# Attempt to schedule; the replica will be attempted and a timeout will occur
# due to the short timeout set above.
task = loop.create_task(s.choose_replica_for_query(fake_query))
done, _ = await asyncio.wait([task], timeout=0.2)
assert len(done) == 0

assert all(
d == s.queue_len_response_deadline_s for d in r1.queue_len_deadline_history
)

r1.set_queue_state_response(0, accepted=True)
assert (await task) == r1


@pytest.mark.asyncio
async def test_replicas_updated_event_on_correct_loop(pow_2_scheduler):
"""See https://github.com/ray-project/ray/issues/40631.
Expand Down

0 comments on commit 3bb96d2

Please sign in to comment.