Skip to content

Commit

Permalink
refactor: simplify event loop usage (#4811)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed May 20, 2022
1 parent e553d91 commit 93f06fc
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 8 deletions.
2 changes: 1 addition & 1 deletion jina/clients/base/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _request_handler(request: 'Request') -> 'asyncio.Future':
end_of_iter_handler=_handle_end_of_iter,
)

receive_task = get_or_reuse_loop().create_task(_receive())
receive_task = asyncio.create_task(_receive())

if receive_task.done():
raise RuntimeError(
Expand Down
10 changes: 3 additions & 7 deletions jina/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,23 +1287,19 @@ def iscoroutinefunction(func: Callable):
def run_async(func, *args, **kwargs):
"""Generalized asyncio.run for jupyter notebook.
When running inside jupyter, an eventloop is already exist, can't be stopped, can't be killed.
When running inside jupyter, an eventloop already exists, can't be stopped, can't be killed.
Directly calling asyncio.run will fail, as This function cannot be called when another asyncio event loop
is running in the same thread.
.. see_also:
https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop
call `run_async(my_function, any_event_loop=True, *args, **kwargs)` to enable run with any eventloop
:param func: function to run
:param args: parameters
:param kwargs: key-value parameters
:return: asyncio.run(func)
"""

any_event_loop = kwargs.pop('any_event_loop', False)

class _RunThread(threading.Thread):
"""Create a running thread when in Jupyter notebook."""

Expand All @@ -1319,7 +1315,7 @@ def run(self):
if loop and loop.is_running():
# eventloop already exist
# running inside Jupyter
if any_event_loop or is_jupyter():
if is_jupyter():
thread = _RunThread()
thread.start()
thread.join()
Expand All @@ -1340,7 +1336,7 @@ def run(self):
'please report this issue here: https://github.com/jina-ai/jina'
)
else:
return get_or_reuse_loop().run_until_complete(func(*args, **kwargs))
return asyncio.run(func(*args, **kwargs))


def slugify(value):
Expand Down

0 comments on commit 93f06fc

Please sign in to comment.