[serve] Fixes Router skips cache invalidation on gRPC request failure#63272
[serve] Fixes Router skips cache invalidation on gRPC request failure#63272wanadzhar913 wants to merge 1 commit into
Conversation
Signed-off-by: wanadzhar913 <adzhar.faiq@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request modifies gRPCReplicaResult to map gRPC UNAVAILABLE status codes to ActorUnavailableError within completion callbacks, which allows the router to invalidate its queue length cache. Unit tests were added to verify this error mapping and the resulting cache behavior. The reviewer identified a thread-safety concern where the callback might be executed on a background thread, potentially leading to race conditions in the router, and provided a code suggestion to ensure the callback runs on the caller's asyncio loop.
| def add_done_callback(self, callback: Callable): | ||
| self._call.add_done_callback(callback) | ||
| def wrapped_callback(call: grpc.aio.Call): | ||
| self._grpc_call_loop.call_soon_threadsafe( | ||
| lambda: self._grpc_call_loop.create_task( | ||
| self._process_done_callback(call, callback) | ||
| ) | ||
| ) | ||
|
|
||
| self._call.add_done_callback(wrapped_callback) |
There was a problem hiding this comment.
The callback is executed on self._grpc_call_loop. When _on_separate_loop is enabled for gRPC transport, this loop runs in a background thread. However, the AsyncioRouter (which typically provides this callback) and its internal state (like the queue-length cache) are not thread-safe. Executing the callback on a background thread can lead to race conditions when invalidating the cache or updating metrics in the router.
Consider capturing the current asyncio loop when add_done_callback is called and ensuring the callback is executed on that loop using call_soon_threadsafe.
| def add_done_callback(self, callback: Callable): | |
| self._call.add_done_callback(callback) | |
| def wrapped_callback(call: grpc.aio.Call): | |
| self._grpc_call_loop.call_soon_threadsafe( | |
| lambda: self._grpc_call_loop.create_task( | |
| self._process_done_callback(call, callback) | |
| ) | |
| ) | |
| self._call.add_done_callback(wrapped_callback) | |
| def add_done_callback(self, callback: Callable): | |
| caller_loop = asyncio.get_running_loop() | |
| def wrapped_callback(call: grpc.aio.Call): | |
| self._grpc_call_loop.call_soon_threadsafe( | |
| lambda: self._grpc_call_loop.create_task( | |
| self._process_done_callback( | |
| call, lambda arg: caller_loop.call_soon_threadsafe(callback, arg) | |
| ) | |
| ) | |
| ) | |
| self._call.add_done_callback(wrapped_callback) |
what does this mean? |
From what I understand in the issue, when a replica is temporarily unavailable, we should remove that replica’s cached queue length ( On the gRPC path, the completion callback gets a raw ray/python/ray/serve/_private/request_router/request_router.py Lines 784 to 787 in 409bc23 |
|
Hi @jeffreywang-anyscale @abrarsheikh lmk what you guys think. Tysm! |
|
thanks for you contribution, closing this PR in fovor of #63371 |
Description
This PR addresses the issue where (for gRPC transport), after a gRPC failure, the
AsyncioRouter's request-completion callback never invalidates the queue-length cache entry for a failed replicaHence, here's my approach:
gRPCReplicaResult.add_done_callback.python/ray/serve/_private/replica_result.pyand ifawait call.code()in the gRPC loop isgrpc.StatusCode.UNAVAILABLE, it passesActorUnavailableErrorto the router callback.AsyncioRouter._process_finished_requestshould then handle that error by invalidating the replica queue-length cache viaon_replica_actor_unavailable.test_grpc_replica_result.pyandtest_router.pyRelated issues
Fixes #63261
Additional information
Tests:
python3 -m pytest python/ray/serve/tests/unit/test_grpc_replica_result.py python/ray/serve/tests/unit/test_router.py -q