-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[serve] Application-level batching initial commit #14610
[serve] Application-level batching initial commit #14610
Conversation
@@ -58,6 +58,198 @@ def __call__(self, requests): | |||
assert ray.get(handle.remote(temp=1)) | |||
|
|||
|
|||
def test_app_level_batching(serve_instance): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this and the next test are duplicates of the old batching tests
@architkulkarni @simon-mo this ended up being pretty gnarly to get right both for the asyncio code and the decorator (to handle both methods and functions). Please leave any comments where the code is confusing so I can clarify w/ comments. |
@simon-mo I added type hints based on the examples here: Could you double-check these? I'm not too familiar with mypy. |
@@ -185,8 +121,8 @@ def __init__(self, _callable: Callable, backend_config: BackendConfig, | |||
self.is_function = is_function | |||
|
|||
self.config = backend_config | |||
self.batch_queue = BatchQueue(self.config.max_batch_size or 1, | |||
self.config.batch_wait_timeout) | |||
self.batch_queue = _BatchQueue(self.config.max_batch_size or 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, the existing codepath is untouched. I don't see a benefit to refactoring it to use this new version given that this will just be deleted.
@simon-mo all of your comments are address, please have another look! |
t1 = asyncio.get_event_loop().create_task(call("hi1")) | ||
await asyncio.sleep(0.5) | ||
t2 = asyncio.get_event_loop().create_task(call("hi2")) | ||
t3 = asyncio.get_event_loop().create_task(call("raise")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused here, why are these executed in a size-two batch if timeout = 0? Doesn't timeout=0 mean that the max batch size is effectively 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second two are executed together because they are both waiting once the first one finishes (only one batch is executed at a time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, thanks
Why are these changes needed?
Adds a
@serve.batch
decorator that allows for application level batching rather than making it part of thebackend_worker.py
implementation. This should allow for more flexibility and enable batching with our future ingress plans that are more "HTTP-native." This does not yet modify other tests, examples, docs, etc.TODOs before making this the default in the docs:
Task was destroyed but it is pending!
errors happening at shutdown.Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.