Skip to content

Commit

Permalink
fix: fix deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Jul 11, 2022
1 parent 2e6ea86 commit 2b154b6
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
6 changes: 3 additions & 3 deletions jina/clients/base/http.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from contextlib import AsyncExitStack
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Tuple

from starlette import status

Expand Down Expand Up @@ -104,14 +104,14 @@ async def _get_results(
HTTPClientlet(url=url, logger=self.logger, **kwargs)
)

def _request_handler(request: 'Request') -> 'asyncio.Future':
def _request_handler(request: 'Request') -> 'Tuple[asyncio.Future, None]':
"""
For HTTP Client, for each request in the iterator, we `send_message` using
http POST request and add it to the list of tasks which is awaited and yielded.
:param request: current request in the iterator
:return: asyncio Task for sending message
"""
return asyncio.ensure_future(iolet.send_message(request=request))
return asyncio.ensure_future(iolet.send_message(request=request)), None

def _result_handler(result):
return result
Expand Down
6 changes: 3 additions & 3 deletions jina/clients/base/websocket.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A module for the websockets-based Client for Jina."""
import asyncio
from contextlib import AsyncExitStack
from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Dict, Optional, Tuple

from starlette import status

Expand Down Expand Up @@ -139,7 +139,7 @@ def _handle_end_of_iter():
"""Send End of iteration signal to the Gateway"""
asyncio.create_task(iolet.send_eoi())

def _request_handler(request: 'Request') -> 'asyncio.Future':
def _request_handler(request: 'Request') -> 'Tuple[asyncio.Future, None]':
"""
For each request in the iterator, we send the `Message` using `iolet.send_message()`.
For websocket requests from client, for each request in the iterator, we send the request in `bytes`
Expand All @@ -152,7 +152,7 @@ def _request_handler(request: 'Request') -> 'asyncio.Future':
future = get_or_reuse_loop().create_future()
request_buffer[request.header.request_id] = future
asyncio.create_task(iolet.send_message(request))
return future
return future, None

streamer = RequestStreamer(
args=self.args,
Expand Down
6 changes: 4 additions & 2 deletions jina/serve/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ async def iterate_requests() -> None:
prefetch=self._prefetch,
):
requests_to_handle.count += 1
future, future_hanging = self._request_handler(request=request)
future.add_done_callback(callback)
future_responses, future_hanging = self._request_handler(
request=request
)
future_responses.add_done_callback(callback)
if future_hanging is not None:
hanging_tasks_to_handle.count += 1
future_hanging.add_done_callback(hanging_callback)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/serve/stream/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def task():
return request

future = asyncio.ensure_future(task())
return future
return future, None

def result_handle_fn(result):
results_handled.append(result)
Expand Down

0 comments on commit 2b154b6

Please sign in to comment.