diff --git a/cli/autocomplete.py b/cli/autocomplete.py index 086ad306486c7..c78832d4569f6 100644 --- a/cli/autocomplete.py +++ b/cli/autocomplete.py @@ -58,6 +58,7 @@ '--workspace-id', '--static-routing-table', '--routing-table', + '--dynamic-routing', '--zmq-identity', '--port-ctrl', '--ctrl-with-ipc', @@ -104,9 +105,13 @@ '--timeout-ready', '--env', '--expose-public', + '--shard-id', '--pea-id', '--pea-role', '--noblock-on-start', + '--shards', + '--parallel', + '--replicas', '--install-requirements', '--force', ], @@ -121,6 +126,7 @@ '--workspace-id', '--static-routing-table', '--routing-table', + '--dynamic-routing', '--uses', '--env', '--inspect', @@ -139,6 +145,7 @@ '--workspace-id', '--static-routing-table', '--routing-table', + '--dynamic-routing', '--zmq-identity', '--port-ctrl', '--ctrl-with-ipc', @@ -192,14 +199,17 @@ '--timeout-ready', '--env', '--expose-public', + '--shard-id', '--pea-id', '--pea-role', '--noblock-on-start', + '--shards', + '--parallel', + '--replicas', '--k8s-uses-init', '--k8s-mount-path', '--k8s-init-container-command', '--k8s-custom-resource-dir', - '--dynamic-routing', ], 'hub new': [ '--help', @@ -236,6 +246,7 @@ '--workspace-id', '--static-routing-table', '--routing-table', + '--dynamic-routing', '--zmq-identity', '--port-ctrl', '--ctrl-with-ipc', @@ -282,9 +293,13 @@ '--timeout-ready', '--env', '--expose-public', + '--shard-id', '--pea-id', '--pea-role', '--noblock-on-start', + '--shards', + '--parallel', + '--replicas', '--install-requirements', '--force', ], @@ -300,6 +315,7 @@ '--workspace-id', '--static-routing-table', '--routing-table', + '--dynamic-routing', '--zmq-identity', '--port-ctrl', '--ctrl-with-ipc', @@ -346,16 +362,17 @@ '--timeout-ready', '--env', '--expose-public', + '--shard-id', '--pea-id', '--pea-role', '--noblock-on-start', + '--shards', + '--parallel', + '--replicas', '--install-requirements', '--force', '--uses-before', '--uses-after', - '--shards', - '--parallel', - '--replicas', '--polling', '--scheduling', '--external', diff --git a/jina/executors/__init__.py b/jina/executors/__init__.py index 2888e4c85cd03..71544361c0624 100644 --- a/jina/executors/__init__.py +++ b/jina/executors/__init__.py @@ -213,7 +213,11 @@ def workspace(self) -> str: if workspace: complete_workspace = os.path.join(workspace, self.metas.name) replica_id = getattr(self.runtime_args, 'replica_id', None) - pea_id = getattr(self.runtime_args, 'pea_id', None) + pea_id = getattr( + self.runtime_args, + 'pea_id', + getattr(self.runtime_args, 'shard_id', None), + ) if replica_id is not None and replica_id != -1: complete_workspace = os.path.join(complete_workspace, str(replica_id)) if pea_id is not None and pea_id != -1: diff --git a/jina/flow/base.py b/jina/flow/base.py index f4511b6378432..cb0b05268edf8 100644 --- a/jina/flow/base.py +++ b/jina/flow/base.py @@ -182,9 +182,11 @@ def __init__( py_modules: Optional[List[str]] = None, quiet: Optional[bool] = False, quiet_error: Optional[bool] = False, + replicas: Optional[int] = 1, runs_in_docker: Optional[bool] = False, runtime_backend: Optional[str] = 'PROCESS', runtime_cls: Optional[str] = 'GRPCRuntime', + shards: Optional[int] = 1, socket_in: Optional[str] = 'PULL_CONNECT', socket_out: Optional[str] = 'PUSH_CONNECT', ssh_keyfile: Optional[str] = None, @@ -263,9 +265,11 @@ def __init__( `Executor cookbook `__ :param quiet: If set, then no log will be emitted from this object. :param quiet_error: If set, then exception stack information will not be added to the log + :param replicas: The number of replicas in the pod, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary :param runs_in_docker: Informs a Pea that runs in a container. Important to properly set networking information :param runtime_backend: The parallel backend of the runtime inside the Pea :param runtime_cls: The runtime class to run inside the Pea + :param shards: The number of shards in the pod running at the same time, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary :param socket_in: The socket type for input port :param socket_out: The socket type for output port :param ssh_keyfile: This specifies a key to be used in ssh login, default None. regular default ssh keys will be used without specifying this argument. diff --git a/jina/parsers/__init__.py b/jina/parsers/__init__.py index 2bedfe0adb356..444fd85913947 100644 --- a/jina/parsers/__init__.py +++ b/jina/parsers/__init__.py @@ -102,15 +102,6 @@ def set_gateway_parser(parser=None): pod_role=PodRoleType.GATEWAY, ) - parser.add_argument( - '--dynamic-routing', - action='store_true', - default=True, - help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.' - if _SHOW_ALL_ARGS - else argparse.SUPPRESS, - ) - return parser diff --git a/jina/parsers/peapods/base.py b/jina/parsers/peapods/base.py index 701e05d37b2ac..b7dd48e2dafdc 100644 --- a/jina/parsers/peapods/base.py +++ b/jina/parsers/peapods/base.py @@ -102,3 +102,12 @@ def mixin_base_ppr_parser(parser, with_identity: bool = True): type=str, help='Routing graph for the gateway' if _SHOW_ALL_ARGS else argparse.SUPPRESS, ) + + parser.add_argument( + '--dynamic-routing', + action='store_true', + default=True, + help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.' + if _SHOW_ALL_ARGS + else argparse.SUPPRESS, + ) diff --git a/jina/parsers/peapods/pea.py b/jina/parsers/peapods/pea.py index 33669479bcce2..e7479a269acf0 100644 --- a/jina/parsers/peapods/pea.py +++ b/jina/parsers/peapods/pea.py @@ -64,6 +64,7 @@ def mixin_pea_parser(parser): # hidden CLI used for internal only gp.add_argument( + '--shard-id', '--pea-id', type=int, default=0, @@ -89,3 +90,22 @@ def mixin_pea_parser(parser): if _SHOW_ALL_ARGS else argparse.SUPPRESS, ) + + gp.add_argument( + '--shards', + '--parallel', + type=int, + default=1, + help='The number of shards in the pod running at the same time, ' + '`port_in` and `port_out` will be set to random, ' + 'and routers will be added automatically when necessary', + ) + + gp.add_argument( + '--replicas', + type=int, + default=1, + help='The number of replicas in the pod, ' + '`port_in` and `port_out` will be set to random, ' + 'and routers will be added automatically when necessary', + ) diff --git a/jina/parsers/peapods/pod.py b/jina/parsers/peapods/pod.py index fc729c59157db..55bc874cb3d55 100644 --- a/jina/parsers/peapods/pod.py +++ b/jina/parsers/peapods/pod.py @@ -24,23 +24,6 @@ def mixin_base_pod_parser(parser): help='The executor attached after the Peas described by --uses, typically used for receiving from ' 'all shards, accepted type follows `--uses`', ) - gp.add_argument( - '--shards', - '--parallel', - type=int, - default=1, - help='The number of shards in the pod running at the same time, ' - '`port_in` and `port_out` will be set to random, ' - 'and routers will be added automatically when necessary', - ) - gp.add_argument( - '--replicas', - type=int, - default=1, - help='The number of replicas in the pod, ' - '`port_in` and `port_out` will be set to random, ' - 'and routers will be added automatically when necessary', - ) gp.add_argument( '--polling', type=PollingType.from_string, diff --git a/jina/peapods/peas/__init__.py b/jina/peapods/peas/__init__.py index ab8e6f864f3ef..cf4d5a789cd06 100644 --- a/jina/peapods/peas/__init__.py +++ b/jina/peapods/peas/__init__.py @@ -109,6 +109,9 @@ class BasePea: def __init__(self, args: 'argparse.Namespace'): super().__init__() #: required here to call process/thread __init__ self.args = args + # BACKWARDS COMPATIBILITY + self.args.pea_id = self.args.shard_id + self.args.parallel = self.args.shards self.name = self.args.name or self.__class__.__name__ self.logger = JinaLogger(self.name, **vars(self.args)) diff --git a/jina/peapods/pods/compound.py b/jina/peapods/pods/compound.py index 225239868bca6..635906c42839e 100644 --- a/jina/peapods/pods/compound.py +++ b/jina/peapods/pods/compound.py @@ -42,6 +42,8 @@ def __init__( self.tail_args = BasePod._copy_to_tail_args(self.args, self.args.polling) # uses before with shards apply to shards and not to replicas self.shards = [] # type: List['Pod'] + # BACKWARDS COMPATIBILITY: + self.args.parallel = self.args.shards self.assign_shards() def assign_shards(self): @@ -216,6 +218,8 @@ def _set_shard_args( ] _args.peas_hosts = pod_host_list _args.shard_id = idx + # BACKWARDS COMPATIBILITY: + _args.pea_id = _args.shard_id _args.identity = random_identity() if _args.name: _args.name += f'/shard-{idx}' diff --git a/jina/peapods/runtimes/container/__init__.py b/jina/peapods/runtimes/container/__init__.py index 756d21cc30659..de5eea4da9aae 100644 --- a/jina/peapods/runtimes/container/__init__.py +++ b/jina/peapods/runtimes/container/__init__.py @@ -196,6 +196,10 @@ def _docker_run(self, replay: bool = False): _args = ArgNamespace.kwargs2list(non_defaults) ports = {f'{v}/tcp': v for v in _expose_port} if not self._net_mode else None + # WORKAROUND: we cant automatically find these true/false flags, this needs to be fixed + if 'dynamic_routing' in non_defaults and not non_defaults['dynamic_routing']: + _args.append('--no-dynamic-routing') + docker_kwargs = self.args.docker_kwargs or {} self._container = client.containers.run( uses_img, diff --git a/jina/peapods/runtimes/request_handlers/data_request_handler.py b/jina/peapods/runtimes/request_handlers/data_request_handler.py index a4523ff645025..9aa8752af553f 100644 --- a/jina/peapods/runtimes/request_handlers/data_request_handler.py +++ b/jina/peapods/runtimes/request_handlers/data_request_handler.py @@ -61,6 +61,8 @@ def __init__(self, args: 'argparse.Namespace', logger: 'JinaLogger', **kwargs): """ super().__init__() self.args = args + self.args.pea_id = self.args.shard_id + self.args.parallel = self.args.shards self.logger = logger self._load_executor() diff --git a/tests/k8s/test-executor/debug_executor.py b/tests/k8s/test-executor/debug_executor.py index cb3a110a21a83..45b72f02081a1 100644 --- a/tests/k8s/test-executor/debug_executor.py +++ b/tests/k8s/test-executor/debug_executor.py @@ -13,7 +13,7 @@ def __init__(self, *args, **kwargs): self._name = self.runtime_args.name @requests(on='/index') - def debug(self, docs: DocumentArray, parameters: Dict, **kwargs): + def debug(self, docs: DocumentArray, **kwargs): self.logger.debug( f'Received doc array in test-executor {self._name} with length {len(docs)}.' ) @@ -27,7 +27,7 @@ def debug(self, docs: DocumentArray, parameters: Dict, **kwargs): doc.tags[key] = traversed @requests(on='/env') - def env(self, docs: DocumentArray, parameters: Dict, **kwargs): + def env(self, docs: DocumentArray, **kwargs): self.logger.debug( f'Received doc array in test-executor {self._name} with length {len(docs)}.' ) @@ -39,7 +39,7 @@ def env(self, docs: DocumentArray, parameters: Dict, **kwargs): doc.tags['env'] = {'k1': os.environ.get('k1'), 'k2': os.environ.get('k2')} @requests(on='/search') - def read_file(self, docs: DocumentArray, parameters: Dict, **kwargs): + def read_file(self, docs: DocumentArray, **kwargs): self.logger.debug( f'Received doc array in test-executor {self._name} with length {len(docs)}.' ) diff --git a/tests/unit/peapods/peas/test_pea.py b/tests/unit/peapods/peas/test_pea.py index b43a2bc905adc..967c20d3da573 100644 --- a/tests/unit/peapods/peas/test_pea.py +++ b/tests/unit/peapods/peas/test_pea.py @@ -250,3 +250,14 @@ def test_idle_does_not_create_response(command, response_expected): socket.connect(f'tcp://localhost:{p.args.port_ctrl}') socket.send_multipart(msg.dump()) assert socket.poll(timeout=1000) == response_expected + + +def test_pea_set_shard_pea_id(): + args = set_pea_parser().parse_args(['--shard-id', '1', '--shards', '3']) + + pea = Pea(args) + assert pea.args.shard_id == 1 + assert pea.args.pea_id == 1 + + assert pea.args.shards == 3 + assert pea.args.parallel == 3 diff --git a/tests/unit/peapods/runtimes/container/mwu-encoder/Dockerfile b/tests/unit/peapods/runtimes/container/mwu-encoder/Dockerfile index eb3ed572810a1..8ff822a5f5d16 100644 --- a/tests/unit/peapods/runtimes/container/mwu-encoder/Dockerfile +++ b/tests/unit/peapods/runtimes/container/mwu-encoder/Dockerfile @@ -2,4 +2,4 @@ FROM jinaai/jina:test-pip ADD *.py mwu_encoder.yml ./ -ENTRYPOINT ["jina", "pod", "--uses", "mwu_encoder.yml"] \ No newline at end of file +ENTRYPOINT ["jina", "executor", "--uses", "mwu_encoder.yml"] \ No newline at end of file diff --git a/tests/unit/peapods/runtimes/container/test_container_runtime.py b/tests/unit/peapods/runtimes/container/test_container_runtime.py index 086ce0544b194..42143d96d76a7 100644 --- a/tests/unit/peapods/runtimes/container/test_container_runtime.py +++ b/tests/unit/peapods/runtimes/container/test_container_runtime.py @@ -68,6 +68,18 @@ def test_flow_with_one_container_pod(docker_image_built): f.post(on='/index', inputs=random_docs(10)) +def test_flow_with_one_container_pod_shards(docker_image_built): + f = Flow().add(name='dummyEncoder1', shards=2, uses=f'docker://{img_name}') + + with f: + pod = f._pod_nodes['dummyEncoder1'] + assert pod.args.shards == pod.args.parallel == 2 + for idx, shard in enumerate(pod.shards): + assert shard.args.pea_id == shard.args.shard_id == idx + assert shard.args.shards == shard.args.parallel == 2 + f.post(on='/index', inputs=random_docs(10)) + + def test_flow_with_replica_container_ext_yaml(docker_image_built): f = Flow().add( name='dummyEncoder3',