-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support communicating with individual Executors in CustomGateway #5558
Conversation
Gateway code: from docarray import DocumentArray, Document
from jina.serve.runtimes.gateway.http.fastapi import FastAPIBaseGateway
from jina import Flow, Executor, requests
class MyGateway(FastAPIBaseGateway):
@property
def app(self):
from fastapi import FastAPI
app = FastAPI(title='Custom FastAPI Gateway')
@app.get("/endpoint")
async def get(text: str):
docs = await self.executor['executor1'](exec_endpoint='/', docs=DocumentArray([Document(text=text)]), parameters={'k': 'v'})
return {'result': docs[0].text}
return app
class FirstExec(Executor):
@requests
def func(self, docs, **kwargs):
for doc in docs:
doc.text += " First"
class SecondExec(Executor):
@requests
def func(self, docs, **kwargs):
print(kwargs)
for doc in docs:
doc.text += " Second"
with Flow(port=12345).config_gateway(uses=MyGateway, protocol='http', port=50975).add(
uses=FirstExec, name='executor0'
).add(
uses=SecondExec, name='executor1'
) as flow:
flow.block() Client code curl 'localhost:50975/endpoint?text=meow' |
Codecov Report
@@ Coverage Diff @@
## master #5558 +/- ##
===========================================
- Coverage 86.17% 70.59% -15.59%
===========================================
Files 124 122 -2
Lines 10048 10022 -26
===========================================
- Hits 8659 7075 -1584
- Misses 1389 2947 +1558
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
Co-authored by: Joan Fontanals <joan.martinez@jina.ai>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest 2 options:
- keep introducing ExecutorStreamer, I think in this case it makes sense to rename GatewayStreamer to FlowStreamer. Since we are making a breaking change, it makes sense to unify the interface as well. This means having:
- self.flow_streamer.post(...) # or self.flow.post(...)
- self.executor_streamer[exec].post(...) # or self.executors[exec].post(...)
- unify flow streamer and executor streamer under the same self.streamer object. I think this makes sense because otherwise, since the same pool is used, destroying one object corrupts the other
Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>
Signed-off-by: Jackmin801 <56836461+Jackmin801@users.noreply.github.com>
zz bot |
@@ -221,3 +221,36 @@ def get_streamer(): | |||
@staticmethod | |||
def _set_env_streamer_args(**kwargs): | |||
os.environ['JINA_STREAMER_ARGS'] = json.dumps(kwargs) | |||
|
|||
|
|||
class _ExecutorStreamer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I do not think so
37c68c9
to
504e9cb
Compare
this makes the tests more resilient and more likely to hit all the replicas
504e9cb
to
d1348e4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically approve, wait for @alexcg1 for the docs
Co-authored-by: Alex Cureton-Griffiths <alexcg1@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
📝 Docs are deployed on https://feat-5538-comm-with-individual-execs--jina-docs.netlify.app 🎉 |
Goals: