Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: native async based grpc & fastapi based rest for gateway #1348

Merged
merged 28 commits into from
Dec 7, 2020

Conversation

deepankarm
Copy link
Contributor

@deepankarm deepankarm commented Nov 25, 2020

Following changes are done in this PR

Side Effects:

gRPC

  • Moved from custom AsyncExecutor to native asyncio for gRPC
  • Previously server was async, whereas PyClient was sync. With this PR, PyClient becomes async (call, call_unary, dry_run, index, search, train are now coroutines.

REST

  • FastAPI & uvicorn support native async capabilities, rather than solving it in a hacky way with gevent.WSGIServer.
  • FastAPI also claims much higher performance benefits (performance comparison to be done)
  • With gRPC, we have native streaming + prefetch ability, which was not there with REST. This can now be done wih the help of websockets on FastAPI server.

PyClient

  • PyClient (and hence py_client) is now an awaitable.
  • We enable PyClientRuntime, which executes PyClient to a separate process to avoid an event loop in the main process.
  • PyClient (optionally) sends the gRPC response to a zmq socket, from which main process can read & execute callbacks. (This is the default way of when we do flow.index, flow.search etc)

JinaD

  • Since gateway now lives on a separate process for both gRPC & REST, flow invocation logic now can move from def to async def for jinad (Difference)
  • Deleting a Flow wouldn't kill jinad now

CtrlZmqlet

  • Zmqlet with only PAIR socket (capable of BIND or CONNECT) to be tied with all runtimes that don't have any Peas related logic (e.g. - gateway-grpc, gateway-rest, remote, client)

Jupyter Notebook

image

Pending

  • pytest.asyncio tests for PyClient would still cause segmentation faults, as that brings grpc.aio channel (hence the event loop) on main process. This needs to be handled.
  • Do performance tests for gRPC + REST based changes
  • Make changes to jinad (def to async def)
  • Discuss about async propagation in core (whereto await vs run_until_complete a coroutine)

@jina-bot jina-bot added size/S area/core This issue/PR affects the core codebase area/network This issue/PR affects network functionality component/peapod labels Nov 25, 2020
@codecov
Copy link

codecov bot commented Nov 25, 2020

Codecov Report

Merging #1348 (1a866d7) into master (9de90e6) will decrease coverage by 0.83%.
The diff coverage is 88.96%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1348      +/-   ##
==========================================
- Coverage   83.33%   82.49%   -0.84%     
==========================================
  Files         104      106       +2     
  Lines        6869     6980     +111     
==========================================
+ Hits         5724     5758      +34     
- Misses       1145     1222      +77     
Impacted Files Coverage Δ
jina/clients/python/grpc.py 83.09% <85.29%> (+15.30%) ⬆️
jina/peapods/gateway/grpc.py 85.91% <85.91%> (ø)
jina/peapods/gateway/servicer.py 88.23% <88.23%> (ø)
jina/peapods/gateway/rest.py 88.88% <88.88%> (ø)
jina/peapods/zmq.py 84.42% <93.02%> (+3.31%) ⬆️
jina/clients/python/helper.py 87.00% <100.00%> (+1.44%) ⬆️
jina/peapods/pea.py 92.85% <100.00%> (+1.13%) ⬆️
jina/peapods/pod.py 83.33% <100.00%> (+0.05%) ⬆️
jina/peapods/grpc_asyncio.py 0.00% <0.00%> (-75.52%) ⬇️
jina/logging/sse.py 91.93% <0.00%> (-3.23%) ⬇️
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 83edba0...1a866d7. Read the comment docs.

class AsyncGatewayPea:
def __init__(self, args):
if not args.proxy and os.name != 'nt':
os.unsetenv('http_proxy')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like users may get very mad at us?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Unsetting http_proxy and https_proxy sounds really bad.

Copy link
Member

@hanxiao hanxiao Nov 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is needed since GNES @deepankarm @nan-wang @JoanFM please

  1. read the old code
  2. when you running grpc with http and https proxy you will find grpc won't work anymore.
  3. always keep in mind os.environ is set on process base not globally, there is nothing people get mad at us, because nothing change in global
  4. anytime, when you doubt if it adds any trouble to users, use warning.warn

Copy link
Member

@hanxiao hanxiao Nov 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please always understand grpc is on HTTP2 layer and that's why http proxy setting affects it. if you don't know about this, read grpc doc.

if you really want to solve this elegantly, i suggest looking for issue help/posting issue on grpc github. back in early 2019 i spent a day on this and this was the best solution. things may change, so don't be shy to leverage OSS community.

also please fix the misconception that os.environ changes anything globally. it is a variable shared by that process you spawned and all subprocesses. Fix this misconception is important otherwise all os.environ will look weird to you.

until then, keep this lines. Many companies have internal proxy, without this line, Jina won't work for them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this init lives in another process?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be moved into start?

@@ -556,6 +556,25 @@ def start(self) -> 'GatewayPod':
return self


class AsyncGatewayPod(BasePod, AsyncExitStack):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this play nice with inheriting both ỀxitStack and AsyncExitStack (I think is a good point I may try to have a mixin in the refactor for ExitStack so that it is more separated from BasePod.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


class DualContext(ExitStack, AsyncExitStack):
    def __init__(self) -> None:
        super(ExitStack, self).__init__()
        super(AsyncExitStack, self).__init__()

    def start(self):
        raise NotImplementedError

    def close(self):
        raise NotImplementedError

    async def astart(self):
        raise NotImplementedError

    async def aclose(self):
        raise NotImplementedError

    def __enter__(self):
        return self.start()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
        super(ExitStack, self).__exit__(exc_type, exc_val, exc_tb)

    async def __aenter__(self):
        return await self.astart()

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.aclose()
        await super(AsyncExitStack, self).__aexit__(exc_type, exc_val, exc_tb)


class SyncContext(DualContext):
    def start(self):
        print('Sync')

    def close(self):
        print('Sync close')


class AsyncContext(DualContext):
    async def astart(self):
        print('Async')

    async def aclose(self):
        print('Async close')


with SyncContext():
    pass

async def blah():
    async with AsyncContext():
        pass

asyncio.run(blah())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, maybe we can use something similar

@jina-bot jina-bot added size/M and removed size/S labels Nov 26, 2020
@JoanFM
Copy link
Member

JoanFM commented Nov 29, 2020

Zmqlet with only ctrl socket (AsyncCtrlZmqlet) -

  • main process fetches the ctrl address
  • loop_body in the event loop can await AsyncCtrlZmqlet (which awaits the terminate signal on the ctrl socket)
  • upon termination signal, loop_termination kicks in (killing grpc/rest servers, sending delete signal to jinad etc)
  • every jina-process which is not a local pea with zmqstreamlet can inherit this (remote, gateway-grpc, gateway-rest)
  • ctrl address can be used for termination
  • doesn't need input/output sockets
  • i'm assuming, this will always be local. we should be good with ipc only (to avoid clash)?

I think assuming is always local w.r.t the Flow is a valid assumption right now

@hanxiao
Copy link
Member

hanxiao commented Nov 29, 2020

the test is broken

* refactor(gateway): clean code, move file structure

* refactor(proto): jina primitive types
@jina-bot jina-bot added size/L area/testing This issue/PR affects testing component/client and removed size/M labels Nov 29, 2020
@jina-bot jina-bot added area/setup This issue/PR affects setting up Jina component/resource labels Dec 1, 2020
@deepankarm deepankarm changed the title feat: native async based grpc for gateway feat: native async based grpc & fastapi based rest for gateway Dec 1, 2020
@deepankarm deepankarm marked this pull request as ready for review December 1, 2020 15:10
@deepankarm deepankarm requested a review from a team as a code owner December 1, 2020 15:10
@jina-bot jina-bot added size/XL and removed size/L labels Dec 4, 2020
while True:
try:
msg = zmqlet.sock.recv()
if msg == b'TERMINATE':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is the client sending TERMINATE I haven't seen this happen, it is done between Peas but not from client right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyClient would send a terminate signal to PyClientRuntime

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be the other way around right? PyClient does not know it lives inside a PyClientRuntime? Why don't u use multiprocessing Events?

Copy link
Contributor Author

@deepankarm deepankarm Dec 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyClient receives a stream of responses from gRPC servicer. To make sure the callback functions are executed back in the main process, PyClient sends each such response to PyClientRuntime. Once the stream is completed, PyClient sends a TERMINATE signal to PyClientRuntime. But you're right, this can be handled using an Event.

jina/clients/__init__.py Show resolved Hide resolved
jina/clients/python/__init__.py Show resolved Hide resolved
Copy link
Member

@JoanFM JoanFM left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codecov reports missing test for some cases specially when using ctxt management

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This aexit is not tested?

self.close()
# avoid closing a client after a single `index`, `search` or `train` operation
if 'close' in kwargs:
await self.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test for this part exists?

await self.is_gateway_ready.wait()
return self

async def __aenter__(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test using it as ctxt manager

@deepankarm
Copy link
Contributor Author

Codecov reports missing test for some cases specially when using ctxt management

Testing the context manager for AsyncGrpcClient (and PyClient) is not possible until we fix issues with pytest.asyncio. This is because, grpc.aio channel comes to the main process, and creates issues with segmentation faults intermittently. I'll raise an issue for this to keep track.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/core This issue/PR affects the core codebase area/network This issue/PR affects network functionality area/setup This issue/PR affects setting up Jina area/testing This issue/PR affects testing component/client component/flow component/peapod component/resource size/XL
Projects
None yet
6 participants