-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve] Add initial support for FastAPI #14754
Conversation
@@ -71,8 +72,8 @@ class BackendConfig(BaseModel): | |||
max_concurrent_queries: Optional[int] = None | |||
user_config: Any = None | |||
|
|||
experimental_graceful_shutdown_wait_loop_s: PositiveFloat = 2.0 | |||
experimental_graceful_shutdown_timeout_s: confloat(ge=0) = 20.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confloat is a private api
@edoakes @architkulkarni This PR should lay the ground work for subsequent PRs. I hope to get this most basic case merged and unblock other FastAPI support (middlewares, classes, dependency injection test) in parallel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, this is an exciting new feature
python/ray/serve/backend_worker.py
Outdated
headers=dict(self.header)) | ||
|
||
sender = MockSender() | ||
path_params = req.scope.pop("path_params") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we copy the scope, or does it not matter in this case? Just parroting https://asgi.readthedocs.io/en/latest/specs/main.html#middleware which says the scope should be copied before passing to a child.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't copy the scope, we should leave a comment here explaining what the assumptions are that allow us to do that. This could lead to hard-to-debug errors in the future.
@@ -365,3 +365,42 @@ def get_current_node_resource_key() -> str: | |||
return key | |||
else: | |||
raise ValueError("Cannot found the node dictionary for current node.") | |||
|
|||
|
|||
def register_custom_serializers(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's great that you were able to find ways to serialize these!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, looks pretty straightforward once the pydantic serialization was sorted out. My main concern is around the custom path handling logic -- I don't think we should be doing any kind of manual stripping or we're sure to run into trouble. I think we should be able to use the following:
https://fastapi.tiangolo.com/advanced/behind-a-proxy/#proxy-with-a-stripped-path-prefix
py_test( | ||
name = "test_pydantic_serialization", | ||
size = "small", | ||
srcs = serve_tests_srcs, | ||
tags = ["exclusive"], | ||
deps = [":serve_lib"], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future it'd be better to make a separate PR for this. Makes it easier to review and merged with fewer moving pieces.
python/ray/serve/api.py
Outdated
>>> @serve.deployment(app) | ||
@app.get("/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API still smells a little funny to me but we can discuss that offline. I will focus on the implementation for the review for now.
@@ -256,7 +257,39 @@ async def invoke_single(self, request_item: Query) -> Any: | |||
|
|||
start = time.time() | |||
try: | |||
result = await method_to_call(arg) | |||
if self.config.internal_metadata.is_asgi_app: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please pull this logic out and add unit tests for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a bit hard to pull out, I think we should do it after remove the batching part so there's only one "invoke", and then we can make it invoke & invoke_asgi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, sounds good. can you leave a note so we remember to clean it up when that happens?
python/ray/serve/backend_worker.py
Outdated
if (message["type"] == "http.response.body"): | ||
self.buffer.append(message["body"]) | ||
if (message["type"] == "http.response.start"): | ||
self.status_code = message["status"] | ||
for key, value in message["headers"]: | ||
self.header[key.decode()] = value.decode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there other unhandled message types here? Please add an else
clause that either does assert False
if not, or raises a useful exception if there are unhandled types
python/ray/serve/backend_worker.py
Outdated
headers=dict(self.header)) | ||
|
||
sender = MockSender() | ||
path_params = req.scope.pop("path_params") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't copy the scope, we should leave a comment here explaining what the assumptions are that allow us to do that. This could lead to hard-to-debug errors in the future.
python/ray/serve/backend_worker.py
Outdated
matched_path = path_params["wildcard"] | ||
req.scope["path"] = f"/{matched_path}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly is this doing? This seems like something we shouldn't need to do and should be handled by FastAPI
python/ray/serve/controller.py
Outdated
methods = [ | ||
"GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", | ||
"OPTIONS", "TRACE", "PATCH" | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future we can just transparently proxy all methods at the HTTP proxy and let the downstream application handle them. Can you leave a comment here mentioning that?
python/ray/serve/controller.py
Outdated
route = f"/{name}" | ||
methods = ["GET", "POST"] | ||
if replica_config.is_asgi_app: | ||
route += "/{wildcard:path}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'm still not understanding this logic. Should we just need to configure the base path in FastAPI to /{name}
?
from ray import serve | ||
|
||
|
||
def test_fastapi_function(serve_instance): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So cool!!!!
@@ -719,6 +726,8 @@ def connect() -> Client: | |||
if not ray.is_initialized(): | |||
ray.init() | |||
|
|||
register_custom_serializers() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably upstream these into ray, right?
python/ray/serve/api.py
Outdated
|
||
|
||
def ingress( | ||
app: Union[FastAPI, APIRouter, None] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this optional? What is the behavior when it isn't specified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks mostly good, a few more comments. Biggest question: didn't we conclude that we only need to handle the class case and not the function case? If the user is passing in a full FastAPI app, we wrap it in a "dummy" class. This PR seems to only be handling the function case.
python/ray/serve/api.py
Outdated
@wraps(f) | ||
def inner(*args, **kwargs): | ||
return f(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we only need to handle classes here? This looks like it only handles functions?
@@ -256,7 +257,39 @@ async def invoke_single(self, request_item: Query) -> Any: | |||
|
|||
start = time.time() | |||
try: | |||
result = await method_to_call(arg) | |||
if self.config.internal_metadata.is_asgi_app: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, sounds good. can you leave a note so we remember to clean it up when that happens?
python/ray/serve/controller.py
Outdated
if replica_config.path_prefix is None: | ||
replica_config.path_prefix = f"/{name}" | ||
# Updated here because it is used by backend worker | ||
backend_config.internal_metadata.path_prefix = f"/{name}" | ||
route = replica_config.path_prefix | ||
methods = ["GET", "POST"] | ||
if replica_config.is_asgi_app: | ||
# When the backend is asgi application, we want to proxy it | ||
# with a prefixed path as well as proxy all HTTP methods. | ||
# {wildcard:path} is used so HTTPProxy's Starlette router can match | ||
# arbitrary path. | ||
route = "/{route}/{wildcard:path}" | ||
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods | ||
methods = [ | ||
"GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", | ||
"TRACE", "PATCH" | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused at this logic overall. Shouldn't there only be a single branch here?
if replica_config.is_ingress:
route = replica_config.path_prefix or f"{name}"
methods = [...] # all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need to set the default path prefix using the backend name, updated the code to make it cleaner.
# The incoming scope["path"] contains prefixed path and it | ||
# won't be stripped by FastAPI. | ||
request.scope["path"] = scope["path"].replace(root_path, "", 1) | ||
# root_path is used such that the reverse look up and | ||
# redirection works. | ||
request.scope["root_path"] = root_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm why won't this be handled by fastapi? It still seems like we shouldn't need to be mucking around with the fields directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
root_path only works for the redirects, it actually is not used by routing. FastAPI expects the incoming route is cleaned and root_path is stripped by proxy already. We are essentially performing the path stripping here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, cool, makes sense
python/ray/serve/backend_worker.py
Outdated
app = self.callable._serve_asgi_app | ||
sender = ASGIHTTPSender() | ||
await app( | ||
request.scope, | ||
request._receive, | ||
sender, | ||
) | ||
result = sender.build_starlette_response() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
app = self.callable._serve_asgi_app | |
sender = ASGIHTTPSender() | |
await app( | |
request.scope, | |
request._receive, | |
sender, | |
) | |
result = sender.build_starlette_response() | |
sender = ASGIHTTPSender() | |
await self.callable._serve_asgi_app( | |
request.scope, | |
request._receive, | |
sender, | |
) | |
result = sender.build_starlette_response() |
# Pydantic's Cython validators are not serializable. | ||
# https://github.com/cloudpipe/cloudpickle/issues/408 | ||
ray.worker.global_worker.run_function_on_all_workers( | ||
lambda _: ray.util.register_serializer( | ||
pydantic.fields.ModelField, | ||
serializer=lambda o: { | ||
"name": o.name, | ||
"type_": o.type_, | ||
"class_validators": o.class_validators, | ||
"model_config": o.model_config, | ||
"default": o.default, | ||
"default_factory": o.default_factory, | ||
"required": o.required, | ||
"alias": o.alias, | ||
"field_info": o.field_info, | ||
}, | ||
deserializer=lambda kwargs: pydantic.fields.ModelField(**kwargs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@simon-mo can we upstream this into Ray so we don't need to dynamically register this callback?
Add the most basic FastAPI support, passing the following test.
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.