diff --git a/jina/serve/runtimes/asyncio.py b/jina/serve/runtimes/asyncio.py index 78d6be8fb1fe6..1c0eddb88e290 100644 --- a/jina/serve/runtimes/asyncio.py +++ b/jina/serve/runtimes/asyncio.py @@ -27,12 +27,12 @@ class AsyncNewLoopRuntime(BaseRuntime, MonitoringMixin, ABC): """ def __init__( - self, - args: 'argparse.Namespace', - cancel_event: Optional[ - Union['asyncio.Event', 'multiprocessing.Event', 'threading.Event'] - ] = None, - **kwargs, + self, + args: 'argparse.Namespace', + cancel_event: Optional[ + Union['asyncio.Event', 'multiprocessing.Event', 'threading.Event'] + ] = None, + **kwargs, ): super().__init__(args, **kwargs) self._loop = asyncio.new_event_loop() @@ -53,9 +53,9 @@ def __init__( ) else: with ImportExtensions( - required=True, - logger=self.logger, - help_text='''If you see a 'DLL load failed' error, please reinstall `pywin32`. + required=True, + logger=self.logger, + help_text='''If you see a 'DLL load failed' error, please reinstall `pywin32`. If you're using conda, please use the command `conda install -c anaconda pywin32`''', ): import win32api @@ -84,7 +84,12 @@ def teardown(self): self._loop.close() super().teardown() self._stop_time = time.time() - send_telemetry_event(event='stop', obj=self, duration=self._stop_time - self._start_time, entity_id=self._entity_id) + send_telemetry_event( + event='stop', + obj=self, + duration=self._stop_time - self._start_time, + entity_id=self._entity_id, + ) async def _wait_for_cancel(self): """Do NOT override this method when inheriting from :class:`GatewayPod`""" @@ -163,12 +168,13 @@ def is_ready(ctrl_address: str, **kwargs) -> bool: except RpcError: return False - @staticmethod + @classmethod def wait_for_ready_or_shutdown( - timeout: Optional[float], - ready_or_shutdown_event: Union['multiprocessing.Event', 'threading.Event'], - ctrl_address: str, - **kwargs, + cls, + timeout: Optional[float], + ready_or_shutdown_event: Union['multiprocessing.Event', 'threading.Event'], + ctrl_address: str, + **kwargs, ): """ Check if the runtime has successfully started @@ -182,9 +188,7 @@ def wait_for_ready_or_shutdown( timeout_ns = 1000000000 * timeout if timeout else None now = time.time_ns() while timeout_ns is None or time.time_ns() - now < timeout_ns: - if ready_or_shutdown_event.is_set() or AsyncNewLoopRuntime.is_ready( - ctrl_address - ): + if ready_or_shutdown_event.is_set() or cls.is_ready(ctrl_address, **kwargs): return True time.sleep(0.1) return False @@ -200,6 +204,7 @@ def _log_data_request(self, request: DataRequest): @property def _entity_id(self): import uuid + if hasattr(self, '_entity_id_'): return self._entity_id_ self._entity_id_ = uuid.uuid1().hex diff --git a/jina/serve/runtimes/gateway/__init__.py b/jina/serve/runtimes/gateway/__init__.py index 591f7628089e2..af8a761ee714c 100644 --- a/jina/serve/runtimes/gateway/__init__.py +++ b/jina/serve/runtimes/gateway/__init__.py @@ -1,5 +1,7 @@ import argparse +import urllib from abc import ABC +from http import HTTPStatus from typing import TYPE_CHECKING, Optional, Union from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime @@ -28,3 +30,52 @@ def __init__( if self.timeout_send: self.timeout_send /= 1e3 # convert ms to seconds super().__init__(args, cancel_event, **kwargs) + + @staticmethod + def is_ready(ctrl_address: str, protocol: Optional[str] = 'grpc', **kwargs) -> bool: + """ + Check if status is ready. + + :param ctrl_address: the address where the control request needs to be sent + :param protocol: protocol of the gateway runtime + :param kwargs: extra keyword arguments + + :return: True if status is ready else False. + """ + + if protocol is None or protocol == 'grpc': + res = super().is_ready(ctrl_address) + else: + try: + conn = urllib.request.urlopen(url=f'http://{ctrl_address}') + res = conn.code == HTTPStatus.OK + except: + res = False + return res + + @classmethod + def wait_for_ready_or_shutdown( + cls, + timeout: Optional[float], + ready_or_shutdown_event: Union['multiprocessing.Event', 'threading.Event'], + ctrl_address: str, + protocol: Optional[str] = 'grpc', + **kwargs, + ): + """ + Check if the runtime has successfully started + + :param timeout: The time to wait before readiness or failure is determined + :param ctrl_address: the address where the control message needs to be sent + :param ready_or_shutdown_event: the multiprocessing event to detect if the process failed or is ready + :param protocol: protocol of the gateway runtime + :param kwargs: extra keyword arguments + + :return: True if is ready or it needs to be shutdown + """ + return super().wait_for_ready_or_shutdown( + timeout=timeout, + ready_or_shutdown_event=ready_or_shutdown_event, + ctrl_address=ctrl_address, + protocol=protocol, + )