-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: use body for streaming instead of params #6098
Conversation
This reverts commit f1f9a88.
endpoint_path, | ||
input_doc_model=None, | ||
endpoint_path, | ||
input_doc_model=None, | ||
): | ||
from fastapi import Request | ||
|
||
@app.api_route( | ||
path=f'/{endpoint_path.strip("/")}', | ||
methods=['GET'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following code works, but I did not allow get since it doesn't work with gateway deployments.
methods=['GET', 'POST'],
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #6098 +/- ##
==========================================
+ Coverage 75.98% 76.89% +0.91%
==========================================
Files 143 145 +2
Lines 13974 14014 +40
==========================================
+ Hits 10618 10776 +158
+ Misses 3356 3238 -118
Flags with carried forward coverage won't be shown. Click here to find out more.
☔ View full report in Codecov by Sentry. |
10a0e5a
to
ee77d13
Compare
I am not sure if I would prefer to keep the 2 endpoints separate and fix the underlying issue. @alaeddine-13 is the integration with SSE as in documentation possible still? |
@JoanFM I'm just taking the existing endpoint and allowing it to accept either parameters or a request body. I'm happy to add a post endpoint as well, but I need your help with the gateway. |
Here is a branch on my fork that will reproduce the gateway error |
e87a684
to
5cfb423
Compare
stream = client.stream_doc( | ||
start_time = time.time() | ||
async for doc in client.stream_doc( | ||
on='/hello', | ||
inputs=MyDocument(text='hello world', number=i), | ||
return_type=MyDocument, | ||
) | ||
start_time = None | ||
async for doc in stream: | ||
start_time = start_time or time.time() | ||
): | ||
assert doc.text == f'hello world {i}' | ||
i += 1 | ||
delay = time.time() - start_time | ||
|
||
# 0.5 seconds between each request + 0.5 seconds tolerance interval | ||
assert delay < (0.5 * i), f'Expected delay to be less than {0.5 * i}, got {delay} on iteration {i}' | ||
assert time.time() - start_time < (0.5 * i) + 0.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least, we should merge this and fix the gateway.
@NarekA why do you want to change the GET endpoint method ? |
Actually now I realize you want to allow passing either of body or query parameters. |
@alaeddine-13 The current implementation with the POST method is fine, but it seems to break when using a deployment with a gateway. The stream all comes out at once. I modified this test (and am changing it back) not realizing that I was fixing the very thing it was testing for. The parameterized test passes for all the scenarios except for http+gateway. Using get instead of post seems to fix the gateway and we have only one endpoint so it's less confusing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there or can we have a test where the API is called with the query params?
If we do allow post, I like the idea of just using the same route with 2 methods: |
async def streaming_get(request: Request): | ||
query_params = dict(request.query_params) | ||
async def streaming_get(request: Request, body: input_doc_model = None): | ||
if not body: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can and request.method == 'GET'
to the condition here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may not need POST
at all then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the current state of this PR, and it fixes everything. Can't tell if tests need to be re-run or if they're actually failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can see a test failing here https://github.com/jina-ai/jina/actions/runs/6647328275/job/18084538375?pr=6098
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed that issue, I think the current failures just need re-runs, can you take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, found one that's not flakyness
async def test_issue_6090_get_params(streaming_deployment): | ||
"""Tests if streaming works with pydantic models with complex fields which are not | ||
str, int, or float. | ||
""" | ||
|
||
docs = [] | ||
url = f"htto://localhost:{streaming_deployment.port}/stream-simple?text=my_input_text" | ||
async with aiohttp.ClientSession() as session: | ||
|
||
async with session.get(url) as resp: | ||
async for doc, _ in resp.content.iter_chunks(): | ||
if b"event: end" in doc: | ||
break | ||
parsed = doc.decode().split("data:", 1)[-1].strip() | ||
parsed = SimpleInput.parse_raw(parsed) | ||
docs.append(parsed) | ||
|
||
assert [d.text for d in docs] == [ | ||
'hello world my_input_text 0', | ||
'hello world my_input_text 1', | ||
'hello world my_input_text 2', | ||
'hello world my_input_text 3', | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this test for passing documents as query params
async with session.get(target_url) as response: | ||
request_kwargs = {} | ||
if request.body_exists(): | ||
payload = await request.json() | ||
if payload: | ||
request_kwargs['json'] = payload | ||
|
||
async with session.get( | ||
url=target_url, **request_kwargs | ||
) as response: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found out where the gateway forwarding is happening.
Only one test job failing, and I can't find a clear stack trace in it so it might be flakiness. |
Making a new PR that follows conventions to replace #6091 and #6093
Http streaming breaks when the input doc schema has fields which are not str, int, or float. This includes dicts, bools, and nested objects (See Issue 6090). In addition it caps the input size to 2000 characters (Much less when you factor in URL encoding)
This PR changes the get endpoints to have the data passed in the request body rather than the params.
Goals:
Also related: #6097