Skip to content

Commit

Permalink
refactor: remove backwards compatibility from old behaviors (#4138)
Browse files Browse the repository at this point in the history
* refactor: remove backwards compatibility from old behaviors

* refactor: remove runs_in_docker

* style: fix overload and cli autocomplete

* test: fix tests related to pea_id

* test: fix tests related to auto polling

* test: fix distributed test dump path

Co-authored-by: Jina Dev Bot <dev-bot@jina.ai>
  • Loading branch information
JoanFM and jina-bot committed Jan 7, 2022
1 parent eae0510 commit bba10dd
Show file tree
Hide file tree
Showing 24 changed files with 22 additions and 218 deletions.
16 changes: 0 additions & 16 deletions cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,8 @@
'--py-modules',
'--port-in',
'--host-in',
'--memory-hwm',
'--native',
'--num-part',
'--runs-in-docker',
'--dump-path',
'--entrypoint',
'--docker-kwargs',
'--pull-latest',
Expand All @@ -90,7 +87,6 @@
'--env',
'--expose-public',
'--shard-id',
'--pea-id',
'--replica-id',
'--pea-role',
'--noblock-on-start',
Expand Down Expand Up @@ -144,11 +140,8 @@
'--py-modules',
'--port-in',
'--host-in',
'--memory-hwm',
'--native',
'--num-part',
'--runs-in-docker',
'--dump-path',
'--prefetch',
'--title',
'--description',
Expand All @@ -175,7 +168,6 @@
'--env',
'--expose-public',
'--shard-id',
'--pea-id',
'--replica-id',
'--pea-role',
'--noblock-on-start',
Expand Down Expand Up @@ -241,11 +233,8 @@
'--py-modules',
'--port-in',
'--host-in',
'--memory-hwm',
'--native',
'--num-part',
'--runs-in-docker',
'--dump-path',
'--entrypoint',
'--docker-kwargs',
'--pull-latest',
Expand All @@ -264,7 +253,6 @@
'--env',
'--expose-public',
'--shard-id',
'--pea-id',
'--replica-id',
'--pea-role',
'--noblock-on-start',
Expand Down Expand Up @@ -299,11 +287,8 @@
'--py-modules',
'--port-in',
'--host-in',
'--memory-hwm',
'--native',
'--num-part',
'--runs-in-docker',
'--dump-path',
'--entrypoint',
'--docker-kwargs',
'--pull-latest',
Expand All @@ -322,7 +307,6 @@
'--env',
'--expose-public',
'--shard-id',
'--pea-id',
'--replica-id',
'--pea-role',
'--noblock-on-start',
Expand Down
3 changes: 0 additions & 3 deletions daemon/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def load_and_dump(self) -> None:
- get the list of ports to be published (port_expose, port_in, port_out, port_ctrl)
- ports need to be published for gateway & executors that are not `ContainerRuntime` or `JinadRuntime` based
- Pod level args for ports are enough, as we don't need to publish Pea ports
- all the above Pods also run in docker, hence we set `runs_in_docker`
3. `save` the Flow config.
- saves port configs of all `executors` into the new yaml.
- set `JINA_FULL_CLI` envvar, so that `gateway` args are also added.
Expand Down Expand Up @@ -155,7 +154,6 @@ def load_and_dump(self) -> None:
if runtime_cls in ['WorkerRuntime'] + list(
GATEWAY_RUNTIME_DICT.values()
):
pod.args.runs_in_docker = True
current_ports = Ports()
for port_name in Ports.__fields__:
setattr(
Expand Down Expand Up @@ -255,7 +253,6 @@ def update_args(self):
# Each pea is inside a container
self.params.identity = self.id
self.params.workspace_id = self.workspace_id
self.params.runs_in_docker = True
self.device_requests = (
get_gpu_device_requests(self.params.gpus) if self.params.gpus else None
)
Expand Down
7 changes: 0 additions & 7 deletions daemon/api/endpoints/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,9 @@ async def _create(flow: FlowDepends = Depends(FlowDepends)):
async def _rolling_update(
id: DaemonID,
pod_name: str,
dump_path: Optional[str] = None,
uses_with: Optional[Dict[str, Any]] = None,
):
try:
if dump_path is not None:
if uses_with is not None:
uses_with['dump_path'] = dump_path
else:
uses_with = {'dump_path': dump_path}

return await store.rolling_update(id=id, pod_name=pod_name, uses_with=uses_with)
except Exception as ex:
raise Runtime400Exception from ex
Expand Down
7 changes: 0 additions & 7 deletions daemon/api/endpoints/pods.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,9 @@ async def _create(pod: PodDepends = Depends(PodDepends)):
)
async def _rolling_update(
id: DaemonID,
dump_path: Optional[str] = None,
uses_with: Optional[Dict[str, Any]] = None,
):
try:
if dump_path is not None:
if uses_with is not None:
uses_with['dump_path'] = dump_path
else:
uses_with = {'dump_path': dump_path}

return await store.rolling_update(id=id, uses_with=uses_with)
except Exception as ex:
raise Runtime400Exception from ex
Expand Down
8 changes: 0 additions & 8 deletions daemon/clients/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,15 @@ async def rolling_update(
self,
id: Union[str, 'DaemonID'],
pod_name: str,
dump_path: Optional[str] = None,
*,
uses_with: Optional[Dict] = None,
) -> str:
"""Perform `rolling_update` on a remote Flow
:param id: flow id
:param pod_name: pod name for rolling update
:param dump_path: path of dump from other flow
:param uses_with: the uses with to override the Executors params
:return: flow id
"""
if dump_path is not None:
if uses_with is not None:
uses_with['dump_path'] = dump_path
else:
uses_with = {'dump_path': dump_path}
async with aiohttp.request(
method='PUT',
url=f'{self.store_api}/rolling_update/{daemonize(id, self._kind)}',
Expand Down
8 changes: 0 additions & 8 deletions daemon/clients/pods.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,14 @@ class AsyncPodClient(AsyncPeaClient):
async def rolling_update(
self,
id: Union[str, 'DaemonID'],
dump_path: Optional[str] = None,
*,
uses_with: Optional[Dict] = None,
) -> str:
"""Update a Flow on remote JinaD (only rolling_update supported)
:param id: Pod ID
:param dump_path: path of dump from other flow
:param uses_with: the uses_with to update the Executor
:return: Pod ID
"""
if dump_path is not None:
if uses_with is not None:
uses_with['dump_path'] = dump_path
else:
uses_with = {'dump_path': dump_path}
async with aiohttp.request(
method='PUT',
url=f'{self.store_api}/rolling_update/{daemonize(id, self._kind)}',
Expand Down
9 changes: 0 additions & 9 deletions daemon/stores/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from jina.logging.logger import JinaLogger

from .. import jinad_args, __partial_workspace__
from ..models import GATEWAY_RUNTIME_DICT
from ..models.ports import Ports, PortMappings
from ..models.partial import PartialFlowItem, PartialStoreItem

Expand Down Expand Up @@ -170,14 +169,6 @@ def add(
self._set_pea_ports(pea_args, port_mapping, port_name)
pod.update_worker_pea_args()

# avoid setting runs_in_docker for Pods with parallel > 1 and using `WorkerRuntime`
# else, replica-peas would try connecting to head/tail-pea via __docker_host__
if runtime_cls == 'WorkerRuntime' and (
hasattr(pod.args, 'replicas') and pod.args.replicas > 1
):
pod.args.runs_in_docker = False
pod.update_worker_pea_args()

self.object = self.object.__enter__()
except Exception as e:
if hasattr(self, 'object'):
Expand Down
2 changes: 1 addition & 1 deletion docs/advanced/experimental/indexers.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
For performing the **rolling update**, we can see the usage in the same test:

```python
flow_query.rolling_update(pod_name='indexer_query', dump_path=dump_path)
flow_query.rolling_update(pod_name='indexer_query', uses_with={'dump_path': dump_path})
```

where
Expand Down
4 changes: 2 additions & 2 deletions jina/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Dict, Optional, Type, List, Any
from typing import TYPE_CHECKING, Any
import inspect
import os
from types import SimpleNamespace
Expand Down Expand Up @@ -240,7 +240,7 @@ def workspace(self) -> Optional[str]:
shard_id = getattr(
self.runtime_args,
'shard_id',
getattr(self.runtime_args, 'pea_id', None),
None,
)
if replica_id is not None and replica_id != -1:
complete_workspace = os.path.join(complete_workspace, str(replica_id))
Expand Down
19 changes: 0 additions & 19 deletions jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ def __init__(
host: Optional[str] = '0.0.0.0',
host_in: Optional[str] = '0.0.0.0',
log_config: Optional[str] = None,
memory_hwm: Optional[int] = -1,
name: Optional[str] = 'gateway',
native: Optional[bool] = False,
no_crud_endpoints: Optional[bool] = False,
Expand All @@ -190,7 +189,6 @@ def __init__(
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
replicas: Optional[int] = 1,
runs_in_docker: Optional[bool] = False,
runtime_backend: Optional[str] = 'PROCESS',
runtime_cls: Optional[str] = 'GRPCGatewayRuntime',
shards: Optional[int] = 1,
Expand Down Expand Up @@ -227,7 +225,6 @@ def __init__(
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param host_in: The host address for binding to, by default it is 0.0.0.0
:param log_config: The YAML config of the logger used in this object.
:param memory_hwm: The memory high watermark of this pod in Gigabytes, pod will restart when this is reached. -1 means no restriction
:param name: The name of this object.
This will be used in the following places:
Expand Down Expand Up @@ -267,7 +264,6 @@ def __init__(
:param quiet: If set, then no log will be emitted from this object.
:param quiet_error: If set, then exception stack information will not be added to the log
:param replicas: The number of replicas in the pod, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary
:param runs_in_docker: Informs a Pea that runs in a container. Important to properly set networking information
:param runtime_backend: The parallel backend of the runtime inside the Pea
:param runtime_cls: The runtime class to run inside the Pea
:param shards: The number of shards in the pod running at the same time, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary. For more details check https://docs.jina.ai/fundamentals/flow/topology/
Expand Down Expand Up @@ -612,7 +608,6 @@ def add(
host_in: Optional[str] = '0.0.0.0',
install_requirements: Optional[bool] = False,
log_config: Optional[str] = None,
memory_hwm: Optional[int] = -1,
name: Optional[str] = None,
native: Optional[bool] = False,
peas_hosts: Optional[List[str]] = None,
Expand All @@ -625,7 +620,6 @@ def add(
quiet_error: Optional[bool] = False,
quiet_remote_logs: Optional[bool] = False,
replicas: Optional[int] = 1,
runs_in_docker: Optional[bool] = False,
runtime_backend: Optional[str] = 'PROCESS',
runtime_cls: Optional[str] = 'WorkerRuntime',
scheduling: Optional[str] = 'LOAD_BALANCE',
Expand Down Expand Up @@ -670,7 +664,6 @@ def add(
:param host_in: The host address for binding to, by default it is 0.0.0.0
:param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local
:param log_config: The YAML config of the logger used in this object.
:param memory_hwm: The memory high watermark of this pod in Gigabytes, pod will restart when this is reached. -1 means no restriction
:param name: The name of this object.
This will be used in the following places:
Expand Down Expand Up @@ -705,7 +698,6 @@ def add(
:param quiet_error: If set, then exception stack information will not be added to the log
:param quiet_remote_logs: Do not display the streaming of remote logs on local console
:param replicas: The number of replicas in the pod, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary
:param runs_in_docker: Informs a Pea that runs in a container. Important to properly set networking information
:param runtime_backend: The parallel backend of the runtime inside the Pea
:param runtime_cls: The runtime class to run inside the Pea
:param scheduling: The strategy of scheduling workload among Peas
Expand Down Expand Up @@ -834,13 +826,6 @@ def add(
args.noblock_on_start = True
args.extra_search_paths = self.args.extra_search_paths

# BACKWARDS COMPATIBILITY:
# We assume that this is used in a search Flow if replicas and shards are used
# Thus the polling type should be all
# But dont override any user provided polling
if args.replicas > 1 and args.shards > 1 and 'polling' not in kwargs:
args.polling = PollingType.ALL

port_in = kwargs.get('port_in', None)
if not port_in:
port_in = helper.random_port()
Expand Down Expand Up @@ -1752,22 +1737,18 @@ def expose_endpoint(self, exec_endpoint: str, **kwargs):
def rolling_update(
self,
pod_name: str,
dump_path: Optional[str] = None,
*,
uses_with: Optional[Dict] = None,
):
"""
Reload all replicas of a pod sequentially
:param pod_name: pod to update
:param dump_path: **backwards compatibility** This function was only accepting dump_path as the only potential arg to override
:param uses_with: a Dictionary of arguments to restart the executor with
"""
from ..helper import run_async

run_async(
self._pod_nodes[pod_name].rolling_update,
dump_path=dump_path,
uses_with=uses_with,
any_event_loop=True,
)
Expand Down
3 changes: 1 addition & 2 deletions jina/parsers/peapods/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ def mixin_pea_parser(parser):

gp.add_argument(
'--shard-id',
'--pea-id',
type=int,
default=0,
help='defines the suffix for the workspace path of the pea`'
help='defines the shard identifier for the executor. It is used as suffix for the workspace path of the executor`'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)
Expand Down
24 changes: 0 additions & 24 deletions jina/parsers/peapods/runtimes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ def mixin_worker_runtime_parser(parser):
help=f'The host address for binding to, by default it is {__default_host__}',
)

gp.add_argument(
'--memory-hwm',
type=int,
default=-1,
help='The memory high watermark of this pod in Gigabytes, pod will restart when this is reached. '
'-1 means no restriction',
)

gp.add_argument(
'--native',
action='store_true',
Expand All @@ -109,19 +101,3 @@ def mixin_worker_runtime_parser(parser):
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

gp.add_argument(
'--runs-in-docker',
action='store_true',
default=False,
help='Informs a Pea that runs in a container. Important to properly set networking information',
)

gp.add_argument(
'--dump-path',
type=str,
default='',
help='Dump path to be passed to the executor'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)
3 changes: 0 additions & 3 deletions jina/peapods/peas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from abc import ABC, abstractmethod
from typing import Type, Union, Dict, Optional

from ..networking import GrpcConnectionPool
from ..runtimes.asyncio import AsyncNewLoopRuntime
from ...jaml import JAML
from .helper import _get_event, _get_worker, ConditionalEvent
Expand Down Expand Up @@ -117,8 +116,6 @@ def __init__(self, args: 'argparse.Namespace'):

if hasattr(self.args, 'port_expose'):
self.args.port_in = self.args.port_expose
# BACKWARDS COMPATIBILITY
self.args.pea_id = self.args.shard_id
self.args.parallel = self.args.shards
self.name = self.args.name or self.__class__.__name__
self.is_forked = False
Expand Down

0 comments on commit bba10dd

Please sign in to comment.