Skip to content

Commit

Permalink
refactor: remove runtime backend cli arg (#4644)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Apr 21, 2022
1 parent 2c72e77 commit b76d477
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 186 deletions.
8 changes: 0 additions & 8 deletions cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@
'--host',
'--quiet-remote-logs',
'--upload-files',
'--runtime-backend',
'--runtime',
'--runtime-cls',
'--timeout-ready',
'--env',
Expand Down Expand Up @@ -161,8 +159,6 @@
'--deployments-addresses',
'--deployments-disable-reduce',
'--compression',
'--runtime-backend',
'--runtime',
'--runtime-cls',
'--timeout-ready',
'--env',
Expand Down Expand Up @@ -241,8 +237,6 @@
'--host',
'--quiet-remote-logs',
'--upload-files',
'--runtime-backend',
'--runtime',
'--runtime-cls',
'--timeout-ready',
'--env',
Expand Down Expand Up @@ -293,8 +287,6 @@
'--host',
'--quiet-remote-logs',
'--upload-files',
'--runtime-backend',
'--runtime',
'--runtime-cls',
'--timeout-ready',
'--env',
Expand Down
7 changes: 0 additions & 7 deletions jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,6 @@ class DataInputType(BetterEnum):
DICT = 3 # the input is a dictionary representing a Document, needed while pydantic model not available


class RuntimeBackendType(BetterEnum):
"""Type of backend in runtime."""

THREAD = 0
PROCESS = 1


class WebsocketSubProtocols(str, Enum):
"""Subprotocol supported with Websocket Gateway"""

Expand Down
4 changes: 0 additions & 4 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ def __init__(
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
replicas: Optional[int] = 1,
runtime_backend: Optional[str] = 'PROCESS',
runtime_cls: Optional[str] = 'GRPCGatewayRuntime',
shards: Optional[int] = 1,
ssl_certfile: Optional[str] = None,
Expand Down Expand Up @@ -246,7 +245,6 @@ def __init__(
:param quiet: If set, then no log will be emitted from this object.
:param quiet_error: If set, then exception stack information will not be added to the log
:param replicas: The number of replicas in the deployment
:param runtime_backend: The parallel backend of the runtime inside the Pod
:param runtime_cls: The runtime class to run inside the Pod
:param shards: The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies
:param ssl_certfile: the path to the certificate file
Expand Down Expand Up @@ -673,7 +671,6 @@ def add(
quiet_error: Optional[bool] = False,
quiet_remote_logs: Optional[bool] = False,
replicas: Optional[int] = 1,
runtime_backend: Optional[str] = 'PROCESS',
runtime_cls: Optional[str] = 'WorkerRuntime',
shards: Optional[int] = 1,
timeout_ctrl: Optional[int] = 60,
Expand Down Expand Up @@ -755,7 +752,6 @@ def add(
:param quiet_error: If set, then exception stack information will not be added to the log
:param quiet_remote_logs: Do not display the streaming of remote logs on local console
:param replicas: The number of replicas in the deployment
:param runtime_backend: The parallel backend of the runtime inside the Pod
:param runtime_cls: The runtime class to run inside the Pod
:param shards: The number of shards in the deployment running at the same time. For more details check https://docs.jina.ai/fundamentals/flow/create-flow/#complex-flow-topologies
:param timeout_ctrl: The timeout in milliseconds of the control request, -1 for waiting forever
Expand Down
52 changes: 11 additions & 41 deletions jina/orchestrate/pods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from typing import Dict, Optional, Type, Union

from jina import __ready_msg__, __stop_msg__, __windows__
from jina.enums import PodRoleType, RuntimeBackendType
from jina.enums import PodRoleType
from jina.excepts import RuntimeFailToStart, RuntimeRunForeverEarlyError
from jina.helper import typename
from jina.jaml import JAML
from jina.logging.logger import JinaLogger
from jina.orchestrate.pods.helper import ConditionalEvent, _get_event, _get_worker
from jina.orchestrate.pods.helper import ConditionalEvent, _get_event
from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime

__all__ = ['BasePod', 'Pod']
Expand All @@ -26,7 +26,6 @@ def run(
is_started: Union['multiprocessing.Event', 'threading.Event'],
is_shutdown: Union['multiprocessing.Event', 'threading.Event'],
is_ready: Union['multiprocessing.Event', 'threading.Event'],
cancel_event: Union['multiprocessing.Event', 'threading.Event'],
jaml_classes: Optional[Dict] = None,
):
"""Method representing the :class:`BaseRuntime` activity.
Expand Down Expand Up @@ -57,30 +56,23 @@ def run(
:param is_started: concurrency event to communicate runtime is properly started. Used for better logging
:param is_shutdown: concurrency event to communicate runtime is terminated
:param is_ready: concurrency event to communicate runtime is ready to receive messages
:param cancel_event: concurrency event to receive cancelling signal from the Pod. Needed by some runtimes
:param jaml_classes: all the `JAMLCompatible` classes imported in main process
"""
logger = JinaLogger(name, **vars(args))

def _unset_envs():
if envs and args.runtime_backend != RuntimeBackendType.THREAD:
if envs:
for k in envs.keys():
os.environ.pop(k, None)

def _set_envs():
if args.env:
if args.runtime_backend == RuntimeBackendType.THREAD:
logger.warning(
'environment variables should not be set when runtime="thread".'
)
else:
os.environ.update({k: str(v) for k, v in envs.items()})
os.environ.update({k: str(v) for k, v in envs.items()})

try:
_set_envs()
runtime = runtime_cls(
args=args,
cancel_event=cancel_event,
)
except Exception as ex:
logger.error(
Expand Down Expand Up @@ -122,13 +114,6 @@ def __init__(self, args: 'argparse.Namespace'):
self.is_forked = False
self.logger = JinaLogger(self.name, **vars(self.args))

if self.args.runtime_backend == RuntimeBackendType.THREAD:
self.logger.warning(
f' Using Thread as runtime backend is not recommended for production purposes. It is '
f'just supposed to be used for easier debugging. Besides the performance considerations, it is'
f'specially dangerous to mix `Executors` running in different types of `RuntimeBackends`.'
)

self._envs = {'JINA_DEPLOYMENT_NAME': self.name}
if self.args.quiet:
self._envs['JINA_LOG_CONFIG'] = 'QUIET'
Expand All @@ -137,16 +122,12 @@ def __init__(self, args: 'argparse.Namespace'):

# arguments needed to create `runtime` and communicate with it in the `run` in the stack of the new process
# or thread.f
test_worker = {
RuntimeBackendType.THREAD: threading.Thread,
RuntimeBackendType.PROCESS: multiprocessing.Process,
}.get(getattr(args, 'runtime_backend', RuntimeBackendType.THREAD))()
test_worker = multiprocessing.Process()
self.is_ready = _get_event(test_worker)
self.is_shutdown = _get_event(test_worker)
self.cancel_event = _get_event(test_worker)
self.is_started = _get_event(test_worker)
self.ready_or_shutdown = ConditionalEvent(
getattr(args, 'runtime_backend', RuntimeBackendType.THREAD),
events_list=[self.is_ready, self.is_shutdown],
)
self.runtime_ctrl_address = self._get_control_address()
Expand All @@ -158,7 +139,7 @@ def _get_control_address(self):
def close(self) -> None:
"""Close the Pod
This method makes sure that the `Process/thread` is properly finished and its resources properly released
This method makes sure that the `Process` is properly finished and its resources properly released
"""
self.logger.debug('waiting for ready or shutdown signal from runtime')
if not self.is_shutdown.is_set() and self.is_started.is_set():
Expand Down Expand Up @@ -322,8 +303,7 @@ class Pod(BasePod):
def __init__(self, args: 'argparse.Namespace'):
super().__init__(args)
self.runtime_cls = self._get_runtime_cls()
self.worker = _get_worker(
args=args,
self.worker = multiprocessing.Process(
target=run,
kwargs={
'args': args,
Expand All @@ -332,19 +312,14 @@ def __init__(self, args: 'argparse.Namespace'):
'is_started': self.is_started,
'is_shutdown': self.is_shutdown,
'is_ready': self.is_ready,
# the cancel event is only necessary for threads, otherwise runtimes should create and use the asyncio event
'cancel_event': self.cancel_event
if getattr(args, 'runtime_backend', RuntimeBackendType.THREAD)
== RuntimeBackendType.THREAD
else None,
'runtime_cls': self.runtime_cls,
'jaml_classes': JAML.registered_classes(),
},
name=self.name,
daemon=True,
)

def start(self):

"""Start the Pod.
This method calls :meth:`start` in :class:`threading.Thread` or :class:`multiprocesssing.Process`.
.. #noqa: DAR201
Expand All @@ -371,14 +346,9 @@ def _terminate(self):
"""Terminate the Pod.
This method calls :meth:`terminate` in :class:`threading.Thread` or :class:`multiprocesssing.Process`.
"""
if hasattr(self.worker, 'terminate'):
self.logger.debug(f'terminating the runtime process')
self.worker.terminate()
self.logger.debug(f' runtime process properly terminated')
else:
self.logger.debug(f'canceling the runtime thread')
self.cancel_event.set()
self.logger.debug(f'runtime thread properly canceled')
self.logger.debug(f'terminating the runtime process')
self.worker.terminate()
self.logger.debug(f' runtime process properly terminated')

def _get_runtime_cls(self) -> AsyncNewLoopRuntime:
from jina.orchestrate.pods.helper import update_runtime_cls
Expand Down
17 changes: 6 additions & 11 deletions jina/orchestrate/pods/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from jina.importer import ImportExtensions
from jina.logging.logger import JinaLogger
from jina.orchestrate.helper import generate_default_volume_and_workspace
from jina.orchestrate.pods import BasePod, _get_worker
from jina.orchestrate.pods import BasePod
from jina.orchestrate.pods.container_helper import (
get_docker_network,
get_gpu_device_requests,
Expand Down Expand Up @@ -376,8 +376,7 @@ def start(self):
This method calls :meth:`start` in :class:`threading.Thread` or :class:`multiprocesssing.Process`.
.. #noqa: DAR201
"""
self.worker = _get_worker(
args=self.args,
self.worker = multiprocessing.Process(
target=run,
kwargs={
'args': self.args,
Expand All @@ -390,6 +389,7 @@ def start(self):
'is_shutdown': self.is_shutdown,
'is_ready': self.is_ready,
},
daemon=True,
)
self.worker.start()
if not self.args.noblock_on_start:
Expand All @@ -405,14 +405,9 @@ def _terminate(self):
self._container.kill(signal='SIGTERM')
finally:
self.is_shutdown.wait(self.args.timeout_ctrl)
if hasattr(self.worker, 'terminate'):
self.logger.debug(f'terminating the runtime process')
self.worker.terminate()
self.logger.debug(f' runtime process properly terminated')
else:
self.logger.debug(f'canceling the runtime thread')
self.cancel_event.set()
self.logger.debug(f'runtime thread properly canceled')
self.logger.debug(f'terminating the runtime process')
self.worker.terminate()
self.logger.debug(f' runtime process properly terminated')

def join(self, *args, **kwargs):
"""Joins the Pod.
Expand Down
35 changes: 8 additions & 27 deletions jina/orchestrate/pods/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from grpc import RpcError

from jina.enums import GatewayProtocolType, PodRoleType, RuntimeBackendType
from jina.enums import GatewayProtocolType, PodRoleType
from jina.hubble.helper import is_valid_huburi
from jina.hubble.hubio import HubIO
from jina.serve.networking import GrpcConnectionPool
Expand All @@ -16,50 +16,31 @@
from argparse import Namespace


def _get_worker(
args, target: Callable, kwargs: Dict, name: Optional[str] = None
) -> Union['threading.Thread', 'multiprocessing.Process']:
return {
RuntimeBackendType.THREAD: threading.Thread,
RuntimeBackendType.PROCESS: multiprocessing.Process,
}.get(getattr(args, 'runtime_backend', RuntimeBackendType.THREAD))(
target=target, name=name, kwargs=kwargs, daemon=True
)


def _get_event(obj) -> Union[multiprocessing.Event, threading.Event]:
if isinstance(obj, threading.Thread):
return threading.Event()
elif isinstance(obj, multiprocessing.Process) or isinstance(
def _get_event(obj) -> multiprocessing.Event:
if isinstance(obj, multiprocessing.Process) or isinstance(
obj, multiprocessing.context.ForkProcess
):
return multiprocessing.Event()
elif isinstance(obj, multiprocessing.context.SpawnProcess):
return multiprocessing.get_context('spawn').Event()
else:
raise TypeError(
f'{obj} is not an instance of "threading.Thread" nor "multiprocessing.Process"'
)
raise TypeError(f'{obj} is not an instance of "multiprocessing.Process"')


class ConditionalEvent:
"""
:class:`ConditionalEvent` provides a common interface to an event (multiprocessing or threading event)
that gets triggered when any of the events provided in input is triggered (OR logic)
:param backend_runtime: The runtime type to decide which type of Event to instantiate
:param events_list: The list of events that compose this composable event
"""

def __init__(self, backend_runtime: RuntimeBackendType, events_list):
def __init__(self, events_list):
super().__init__()
self.event = None
if backend_runtime == RuntimeBackendType.THREAD:
self.event = threading.Event()
else:
self.event = multiprocessing.synchronize.Event(
ctx=multiprocessing.get_context()
)
self.event = multiprocessing.synchronize.Event(
ctx=multiprocessing.get_context()
)
self.event_list = events_list
for e in events_list:
self._setup(e, self._state_changed)
Expand Down
11 changes: 1 addition & 10 deletions jina/parsers/orchestrate/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import argparse

from jina import __default_port_monitoring__, helper
from jina.enums import PodRoleType, RuntimeBackendType
from jina.enums import PodRoleType
from jina.parsers.helper import _SHOW_ALL_ARGS, KVAppendAction, add_arg_group


Expand All @@ -13,15 +13,6 @@ def mixin_pod_parser(parser):

gp = add_arg_group(parser, title='Pod')

gp.add_argument(
'--runtime-backend',
'--runtime',
type=RuntimeBackendType.from_string,
choices=list(RuntimeBackendType),
default=RuntimeBackendType.PROCESS,
help='The parallel backend of the runtime inside the Pod',
)

gp.add_argument(
'--runtime-cls',
type=str,
Expand Down
6 changes: 1 addition & 5 deletions jina/serve/runtimes/head/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,13 @@ class HeadRuntime(AsyncNewLoopRuntime, ABC):
def __init__(
self,
args: argparse.Namespace,
cancel_event: Optional[
Union['asyncio.Event', 'multiprocessing.Event', 'threading.Event']
] = None,
**kwargs,
):
"""Initialize grpc server for the head runtime.
:param args: args from CLI
:param cancel_event: the cancel event used to wait for canceling
:param kwargs: keyword args
"""
super().__init__(args, cancel_event, **kwargs)
super().__init__(args, **kwargs)

if args.name is None:
args.name = ''
Expand Down

0 comments on commit b76d477

Please sign in to comment.