New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add option to return in order in client and streamer #5404
Conversation
905fcee
to
fed6f31
Compare
Codecov Report
@@ Coverage Diff @@
## master #5404 +/- ##
==========================================
- Coverage 86.99% 85.69% -1.30%
==========================================
Files 101 101
Lines 6611 6607 -4
==========================================
- Hits 5751 5662 -89
- Misses 860 945 +85
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
Signed-off-by: Joan Fontanals Martinez <joan.martinez@jina.ai>
fed6f31
to
fe91c49
Compare
Signed-off-by: Joan Fontanals Martinez <joan.martinez@jina.ai>
f2628b6
to
af4eb8c
Compare
Signed-off-by: Joan Fontanals Martinez <joan.martinez@jina.ai>
d8eebaa
to
88b502f
Compare
Co-authored-by: Alex Cureton-Griffiths <alexcg1@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
jina/serve/stream/__init__.py
Outdated
responses_ids[response_request_id] = len(responses_list) | ||
responses_list.append(response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a need for responses_list here. I think we can directly put the response in the responses_ids map (actually let'ts map from ids to response objects and rename the variable)
jina/serve/stream/__init__.py
Outdated
response_index = responses_ids[next_request_id] | ||
del responses_ids[next_request_id] | ||
request_ids.pop(0) | ||
yield responses_list[response_index] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assuming we no more need responses_list, we can change the code like so:
response_index = responses_ids[next_request_id] | |
del responses_ids[next_request_id] | |
request_ids.pop(0) | |
yield responses_list[response_index] | |
request_ids.pop(0) | |
yield responses_ids.pop(next_request_id) |
jina/serve/stream/__init__.py
Outdated
stop_yielding = False | ||
while not stop_yielding and len(request_ids) > 0: | ||
next_request_id = request_ids[0] | ||
if next_request_id in responses_ids: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the next_request id is not received ? we don't yield anymore ?
|
||
@pytest.mark.parametrize('protocol', ['grpc']) | ||
def test_return_order_in_client(protocol): | ||
class ExecutorRandomSleepExecutor(Executor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class ExecutorRandomSleepExecutor(Executor): | |
class RandomSleepExecutor(Executor): |
rand_sleep = random.uniform(0.1, 1.3) | ||
time.sleep(rand_sleep) | ||
|
||
f = Flow(protocol=protocol).add(uses=ExecutorRandomSleepExecutor, replicas=2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f = Flow(protocol=protocol).add(uses=ExecutorRandomSleepExecutor, replicas=2) | |
f = Flow(protocol=protocol).add(uses=RandomSleepExecutor, replicas=2) |
jina/serve/streamer.py
Outdated
def _create_topology_graph( | ||
graph_description, | ||
graph_conditions, | ||
deployments_metadata, | ||
deployments_no_reduce, | ||
timeout_send, | ||
retries, | ||
): | ||
# check if it should be in K8s, maybe ConnectionPoolFactory to be created | ||
return TopologyGraph( | ||
graph_representation=graph_description, | ||
graph_conditions=graph_conditions, | ||
deployments_metadata=deployments_metadata, | ||
deployments_no_reduce=deployments_no_reduce, | ||
timeout_send=timeout_send, | ||
retries=retries, | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the purpose of this, it seems like it is the same as just calling the constructor directly?
jina/serve/stream/__init__.py
Outdated
:yield: responses | ||
""" | ||
result_queue = asyncio.Queue() | ||
request_ids = [] | ||
responses_list = [] | ||
responses_ids = {} # map from id to index in responses_list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename to response_id_to_index
or something like that
@@ -760,6 +760,34 @@ None | |||
|
|||
```` | |||
|
|||
A Client connects to a Flow that processes Documents in an asynchronous and very distributed way. This means that the order of the Flow processing the requests may not be the same order as the Client sending the requests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to pass this parameter from a 3rd party client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is gonna be ready in the streamer
for the CustomGateway. We can add it later I would say
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly external clients do not have this streaming
capability so easily implemented, so I think is not so so important
Co-authored-by: AlaeddineAbdessalem <alaeddine-13@live.fr>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
📝 Docs are deployed on https://feat_return_order--jina-docs.netlify.app 🎉 |
Goals:
Try returning in the order of requests from Streamer