Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove head for non sharded deployments #4517

Merged
merged 20 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
'--graph-description',
'--graph-conditions',
'--deployments-addresses',
'--deployments-disable-reduce',
'--compression',
'--runtime-backend',
'--runtime',
Expand Down
2 changes: 1 addition & 1 deletion docs/fundamentals/architecture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ The following concepts may appear in the docs, but you don't need to master them

- **Pod**: A Pod is a simple abstraction over a runtime that runs any Jina service, be it a process, a Docker container, or a Kubernetes Pod.

- **Head**: The Head is a service added to a Deployment by Jina, and it ensures that load is balanced between all replicas of a given Executor. It communicates with the Executors via `gRPC`.
- **Head**: The Head is a service added to a sharded Deployment by Jina. It manages the communication to the different shards based on the configured polling strategy. It communicates with the Executors via `gRPC`.
4 changes: 0 additions & 4 deletions jina/excepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ class RuntimeFailToStart(SystemError, BaseJinaException):
"""When pod/deployment is failed to started."""


class ScalingFails(SystemError, BaseJinaException):
"""When scaling is unsuccessful for an Executor."""


class RuntimeTerminated(KeyboardInterrupt, BaseJinaException):
"""The event loop of BasePod ends."""

Expand Down
263 changes: 40 additions & 223 deletions jina/orchestrate/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@

from jina import __default_executor__, __default_host__, __docker_host__, helper
from jina.enums import DeploymentRoleType, PodRoleType, PollingType
from jina.excepts import RuntimeFailToStart, RuntimeRunForeverEarlyError, ScalingFails
from jina.helper import CatchAllCleanupContextManager
from jina.hubble.hubio import HubIO
from jina.jaml.helper import complete_path
from jina.orchestrate.pods import Pod
from jina.orchestrate.pods.container import ContainerPod
from jina.orchestrate.pods.factory import PodFactory
from jina.serve.networking import GrpcConnectionPool, host_is_local
from jina.serve.networking import GrpcConnectionPool, host_is_local, in_docker


class BaseDeployment(ExitStack):
Expand All @@ -33,26 +31,6 @@ def start(self) -> 'BaseDeployment':
"""
...

@abstractmethod
async def rolling_update(self, *args, **kwargs):
"""
Roll update the Executors managed by the Deployment

.. # noqa: DAR201
.. # noqa: DAR101
"""
...

@abstractmethod
async def scale(self, *args, **kwargs):
"""
Scale the amount of replicas of a given Executor.

.. # noqa: DAR201
.. # noqa: DAR101
"""
...

@staticmethod
def _set_upload_files(args):
# sets args.upload_files at the deployment level so that pods inherit from it.
Expand Down Expand Up @@ -107,14 +85,14 @@ def head_host(self) -> str:
"""Get the host of the HeadPod of this deployment
.. # noqa: DAR201
"""
return self.head_args.host
return self.head_args.host if self.head_args else None

@property
def head_port(self):
jacobowitz marked this conversation as resolved.
Show resolved Hide resolved
"""Get the port of the HeadPod of this deployment
.. # noqa: DAR201
"""
return self.head_args.port
return self.head_args.port if self.head_args else None

def __enter__(self) -> 'BaseDeployment':
with CatchAllCleanupContextManager(self):
Expand Down Expand Up @@ -225,129 +203,6 @@ def wait_start_success(self):
for pod in self._pods:
pod.wait_start_success()

async def rolling_update(self, uses_with: Optional[Dict] = None):
# TODO make rolling_update robust, in what state this ReplicaSet ends when this fails?
for i in range(len(self._pods)):
_args = self.args[i]
old_pod = self._pods[i]
await GrpcConnectionPool.deactivate_worker(
worker_host=Deployment.get_worker_host(
_args, old_pod, self.head_pod
),
worker_port=_args.port,
target_head=f'{self.head_pod.args.host}:{self.head_pod.args.port}',
shard_id=self.shard_id,
)
old_pod.close()
_args.noblock_on_start = True
_args.uses_with = uses_with
new_pod = PodFactory.build_pod(_args)
new_pod.__enter__()
await new_pod.async_wait_start_success()
await GrpcConnectionPool.activate_worker(
worker_host=Deployment.get_worker_host(
_args, new_pod, self.head_pod
),
worker_port=_args.port,
target_head=f'{self.head_pod.args.host}:{self.head_pod.args.port}',
shard_id=self.shard_id,
)
self.args[i] = _args
self._pods[i] = new_pod

async def _scale_up(self, replicas: int):
new_pods = []
new_args_list = []
for i in range(len(self._pods), replicas):
new_args = copy.copy(self.args[0])
new_args.noblock_on_start = True
new_args.name = new_args.name[:-1] + f'{i}'
new_args.port = helper.random_port()
# no exception should happen at create and enter time
new_pods.append(PodFactory.build_pod(new_args).start())
new_args_list.append(new_args)
exception = None
for new_pod, new_args in zip(new_pods, new_args_list):
try:
await new_pod.async_wait_start_success()
await GrpcConnectionPool.activate_worker(
worker_host=Deployment.get_worker_host(
new_args, new_pod, self.head_pod
),
worker_port=new_args.port,
target_head=f'{self.head_pod.args.host}:{self.head_pod.args.port}',
shard_id=self.shard_id,
)
except (
RuntimeFailToStart,
TimeoutError,
RuntimeRunForeverEarlyError,
) as ex:
exception = ex
break

if exception is not None:
if self.deployment_args.shards > 1:
msg = f' Scaling fails for shard {self.deployment_args.shard_id}'
else:
msg = ' Scaling fails'

msg += f'due to executor failing to start with exception: {exception!r}'
raise ScalingFails(msg)
else:
for new_pod, new_args in zip(new_pods, new_args_list):
self.args.append(new_args)
self._pods.append(new_pod)

async def _scale_down(self, replicas: int):
for i in reversed(range(replicas, len(self._pods))):
# Close returns exception, but in theory `termination` should handle close properly
try:
await GrpcConnectionPool.deactivate_worker(
worker_host=Deployment.get_worker_host(
self.args[i], self._pods[i], self.head_pod
),
worker_port=self.args[i].port,
target_head=f'{self.head_pod.args.host}:{self.head_pod.args.port}',
shard_id=self.shard_id,
)
self._pods[i].close()
finally:
# If there is an exception at close time. Most likely the pod's terminated abruptly and therefore these
# pods are useless
del self._pods[i]
del self.args[i]

async def scale(self, replicas: int):
"""
Scale the amount of replicas of a given Executor.

:param replicas: The number of replicas to scale to

.. note: Scale is either successful or not. If one replica fails to start, the ReplicaSet remains in the same state
"""
# TODO make scale robust, in what state this ReplicaSet ends when this fails?
assert replicas > 0
if replicas > len(self._pods):
await self._scale_up(replicas)
elif replicas < len(self._pods):
await self._scale_down(
replicas
) # scale down has some challenges with the exit fifo
self.deployment_args.replicas = replicas

@property
def has_forked_processes(self):
"""
Checks if any pod in this replica set is a forked process

:returns: True if any Pod is a forked Process, False otherwise (Containers)
"""
for pod in self._pods:
if type(pod) == Pod and pod.is_forked:
return True
return False

def __enter__(self):
for _args in self.args:
if getattr(self.deployment_args, 'noblock_on_start', False):
Expand Down Expand Up @@ -404,8 +259,9 @@ def update_pod_args(self):
host, port = HubIO.deploy_public_sandbox(self.args)
self.first_pod_args.host = host
self.first_pod_args.port = port
self.pod_args['head'].host = host
self.pod_args['head'].port = port
if self.head_args:
self.pod_args['head'].host = host
self.pod_args['head'].port = port

def update_worker_pod_args(self):
"""Update args of all its worker pods based on Deployment args. Does not touch head and tail"""
Expand Down Expand Up @@ -476,6 +332,32 @@ def port(self):
"""
return self.first_pod_args.port

@property
def ports(self) -> List[int]:
"""Returns a list of ports exposed by this Deployment.
Exposed means these are the ports a Client/Gateway is supposed to communicate with.
For sharded deployments this will be the head_port.
For non sharded deployments it will be all replica ports
.. # noqa: DAR201
"""
if self.head_port:
return [self.head_port]
else:
ports = []
for replica in self.pod_args['pods'][0]:
ports.append(replica.port)
return ports

@property
def dockerized_uses(self) -> bool:
"""Checks if this Deployment uses a dockerized Executor

.. # noqa: DAR201
"""
return self.args.uses.startswith('docker://') or self.args.uses.startswith(
'jinahub+docker://'
)

def _parse_args(
self, args: Namespace
) -> Dict[str, Optional[Union[List[Namespace], Namespace]]]:
Expand Down Expand Up @@ -611,18 +493,10 @@ def get_worker_host(pod_args, pod, head_pod):

@staticmethod
def _is_container_to_container(pod, head_pod):
def _in_docker():
path = '/proc/self/cgroup'
return (
os.path.exists('/.dockerenv')
or os.path.isfile(path)
and any('docker' in line for line in open(path))
)

# Check if both shard_id/pod_idx and the head are containerized
# if the head is not containerized, it still could mean that the deployment itself is containerized
return type(pod) == ContainerPod and (
type(head_pod) == ContainerPod or _in_docker()
type(head_pod) == ContainerPod or in_docker()
)

def start(self) -> 'Deployment':
Expand Down Expand Up @@ -730,68 +604,6 @@ def is_ready(self) -> bool:
is_ready = self.uses_after_pod.is_ready.is_set()
return is_ready

@property
def _has_forked_processes(self):
return any(
[self.shards[shard_id].has_forked_processes for shard_id in self.shards]
)

async def rolling_update(self, uses_with: Optional[Dict] = None):
"""Reload all Pods of this Deployment.

:param uses_with: a Dictionary of arguments to restart the executor with
"""
tasks = []
try:
import asyncio

for shard_id in self.shards:
task = asyncio.create_task(
self.shards[shard_id].rolling_update(uses_with=uses_with)
)
# it is dangerous to fork new processes (pods) while grpc operations are ongoing
# while we use fork, we need to guarantee that forking/grpc status checking is done sequentially
# this is true at least when the flow process and the forked processes are running in the same OS
# thus this does not apply to K8s
# to ContainerPod it still applies due to the managing process being forked
# source: https://grpc.github.io/grpc/cpp/impl_2codegen_2fork_8h.html#a450c01a1187f69112a22058bf690e2a0
await task
tasks.append(task)

await asyncio.gather(*tasks)
except:
for task in tasks:
if not task.done():
task.cancel()

async def scale(self, replicas: int):
"""
Scale the amount of replicas of a given Executor.

:param replicas: The number of replicas to scale to
"""
self.args.replicas = replicas

tasks = []
try:
import asyncio

for shard_id in self.shards:
task = asyncio.create_task(
self.shards[shard_id].scale(replicas=replicas)
)
# see rolling_update for explanation of sequential excution
await task
tasks.append(task)

await asyncio.gather(*tasks)
except:
# TODO: Handle the failure of one of the shards. Unscale back all of them to the original state? Cancelling would potentially be dangerous.
for task in tasks:
if not task.done():
task.cancel()
raise

@staticmethod
def _set_pod_args(args: Namespace) -> Dict[int, List[Namespace]]:
result = {}
Expand All @@ -814,8 +626,12 @@ def _set_pod_args(args: Namespace) -> Dict[int, List[Namespace]]:
_args.name = f'{replica_id}'

# the gateway needs to respect the assigned port
if args.deployment_role == DeploymentRoleType.GATEWAY:
if args.deployment_role == DeploymentRoleType.GATEWAY or args.external:
_args.port = args.port
elif args.shards == 1 and args.replicas == 1:
# if there are no shards/replicas, we dont need to distribute ports randomly
# we should rather use the pre assigned one
args.port = args.port
else:
_args.port = helper.random_port()

Expand Down Expand Up @@ -864,7 +680,8 @@ def _parse_base_deployment_args(self, args):
}

# a gateway has no heads and uses
if self.role != DeploymentRoleType.GATEWAY:
# also there a no heads created, if there are no shards
if self.role != DeploymentRoleType.GATEWAY and getattr(args, 'shards', 1) > 1:
if (
getattr(args, 'uses_before', None)
and args.uses_before != __default_executor__
Expand Down