Skip to content

Commit

Permalink
refactor: separate gateway and asyncio runtime readiness checks (#5224)
Browse files Browse the repository at this point in the history
  • Loading branch information
alaeddine-13 committed Sep 28, 2022
1 parent 147317d commit 9a6079e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 18 deletions.
41 changes: 23 additions & 18 deletions jina/serve/runtimes/asyncio.py
Expand Up @@ -27,12 +27,12 @@ class AsyncNewLoopRuntime(BaseRuntime, MonitoringMixin, ABC):
"""

def __init__(
self,
args: 'argparse.Namespace',
cancel_event: Optional[
Union['asyncio.Event', 'multiprocessing.Event', 'threading.Event']
] = None,
**kwargs,
self,
args: 'argparse.Namespace',
cancel_event: Optional[
Union['asyncio.Event', 'multiprocessing.Event', 'threading.Event']
] = None,
**kwargs,
):
super().__init__(args, **kwargs)
self._loop = asyncio.new_event_loop()
Expand All @@ -53,9 +53,9 @@ def __init__(
)
else:
with ImportExtensions(
required=True,
logger=self.logger,
help_text='''If you see a 'DLL load failed' error, please reinstall `pywin32`.
required=True,
logger=self.logger,
help_text='''If you see a 'DLL load failed' error, please reinstall `pywin32`.
If you're using conda, please use the command `conda install -c anaconda pywin32`''',
):
import win32api
Expand Down Expand Up @@ -84,7 +84,12 @@ def teardown(self):
self._loop.close()
super().teardown()
self._stop_time = time.time()
send_telemetry_event(event='stop', obj=self, duration=self._stop_time - self._start_time, entity_id=self._entity_id)
send_telemetry_event(
event='stop',
obj=self,
duration=self._stop_time - self._start_time,
entity_id=self._entity_id,
)

async def _wait_for_cancel(self):
"""Do NOT override this method when inheriting from :class:`GatewayPod`"""
Expand Down Expand Up @@ -163,12 +168,13 @@ def is_ready(ctrl_address: str, **kwargs) -> bool:
except RpcError:
return False

@staticmethod
@classmethod
def wait_for_ready_or_shutdown(
timeout: Optional[float],
ready_or_shutdown_event: Union['multiprocessing.Event', 'threading.Event'],
ctrl_address: str,
**kwargs,
cls,
timeout: Optional[float],
ready_or_shutdown_event: Union['multiprocessing.Event', 'threading.Event'],
ctrl_address: str,
**kwargs,
):
"""
Check if the runtime has successfully started
Expand All @@ -182,9 +188,7 @@ def wait_for_ready_or_shutdown(
timeout_ns = 1000000000 * timeout if timeout else None
now = time.time_ns()
while timeout_ns is None or time.time_ns() - now < timeout_ns:
if ready_or_shutdown_event.is_set() or AsyncNewLoopRuntime.is_ready(
ctrl_address
):
if ready_or_shutdown_event.is_set() or cls.is_ready(ctrl_address, **kwargs):
return True
time.sleep(0.1)
return False
Expand All @@ -200,6 +204,7 @@ def _log_data_request(self, request: DataRequest):
@property
def _entity_id(self):
import uuid

if hasattr(self, '_entity_id_'):
return self._entity_id_
self._entity_id_ = uuid.uuid1().hex
Expand Down
51 changes: 51 additions & 0 deletions jina/serve/runtimes/gateway/__init__.py
@@ -1,5 +1,7 @@
import argparse
import urllib
from abc import ABC
from http import HTTPStatus
from typing import TYPE_CHECKING, Optional, Union

from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime
Expand Down Expand Up @@ -28,3 +30,52 @@ def __init__(
if self.timeout_send:
self.timeout_send /= 1e3 # convert ms to seconds
super().__init__(args, cancel_event, **kwargs)

@staticmethod
def is_ready(ctrl_address: str, protocol: Optional[str] = 'grpc', **kwargs) -> bool:
"""
Check if status is ready.
:param ctrl_address: the address where the control request needs to be sent
:param protocol: protocol of the gateway runtime
:param kwargs: extra keyword arguments
:return: True if status is ready else False.
"""

if protocol is None or protocol == 'grpc':
res = super().is_ready(ctrl_address)
else:
try:
conn = urllib.request.urlopen(url=f'http://{ctrl_address}')
res = conn.code == HTTPStatus.OK
except:
res = False
return res

@classmethod
def wait_for_ready_or_shutdown(
cls,
timeout: Optional[float],
ready_or_shutdown_event: Union['multiprocessing.Event', 'threading.Event'],
ctrl_address: str,
protocol: Optional[str] = 'grpc',
**kwargs,
):
"""
Check if the runtime has successfully started
:param timeout: The time to wait before readiness or failure is determined
:param ctrl_address: the address where the control message needs to be sent
:param ready_or_shutdown_event: the multiprocessing event to detect if the process failed or is ready
:param protocol: protocol of the gateway runtime
:param kwargs: extra keyword arguments
:return: True if is ready or it needs to be shutdown
"""
return super().wait_for_ready_or_shutdown(
timeout=timeout,
ready_or_shutdown_event=ready_or_shutdown_event,
ctrl_address=ctrl_address,
protocol=protocol,
)

0 comments on commit 9a6079e

Please sign in to comment.