Skip to content

Commit

Permalink
fix(docker): args processing in container runtime (#3684)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobowitz committed Oct 15, 2021
1 parent bd6e562 commit dca6d01
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 35 deletions.
25 changes: 21 additions & 4 deletions cli/autocomplete.py
Expand Up @@ -58,6 +58,7 @@
'--workspace-id',
'--static-routing-table',
'--routing-table',
'--dynamic-routing',
'--zmq-identity',
'--port-ctrl',
'--ctrl-with-ipc',
Expand Down Expand Up @@ -104,9 +105,13 @@
'--timeout-ready',
'--env',
'--expose-public',
'--shard-id',
'--pea-id',
'--pea-role',
'--noblock-on-start',
'--shards',
'--parallel',
'--replicas',
'--install-requirements',
'--force',
],
Expand All @@ -121,6 +126,7 @@
'--workspace-id',
'--static-routing-table',
'--routing-table',
'--dynamic-routing',
'--uses',
'--env',
'--inspect',
Expand All @@ -139,6 +145,7 @@
'--workspace-id',
'--static-routing-table',
'--routing-table',
'--dynamic-routing',
'--zmq-identity',
'--port-ctrl',
'--ctrl-with-ipc',
Expand Down Expand Up @@ -192,14 +199,17 @@
'--timeout-ready',
'--env',
'--expose-public',
'--shard-id',
'--pea-id',
'--pea-role',
'--noblock-on-start',
'--shards',
'--parallel',
'--replicas',
'--k8s-uses-init',
'--k8s-mount-path',
'--k8s-init-container-command',
'--k8s-custom-resource-dir',
'--dynamic-routing',
],
'hub new': [
'--help',
Expand Down Expand Up @@ -236,6 +246,7 @@
'--workspace-id',
'--static-routing-table',
'--routing-table',
'--dynamic-routing',
'--zmq-identity',
'--port-ctrl',
'--ctrl-with-ipc',
Expand Down Expand Up @@ -282,9 +293,13 @@
'--timeout-ready',
'--env',
'--expose-public',
'--shard-id',
'--pea-id',
'--pea-role',
'--noblock-on-start',
'--shards',
'--parallel',
'--replicas',
'--install-requirements',
'--force',
],
Expand All @@ -300,6 +315,7 @@
'--workspace-id',
'--static-routing-table',
'--routing-table',
'--dynamic-routing',
'--zmq-identity',
'--port-ctrl',
'--ctrl-with-ipc',
Expand Down Expand Up @@ -346,16 +362,17 @@
'--timeout-ready',
'--env',
'--expose-public',
'--shard-id',
'--pea-id',
'--pea-role',
'--noblock-on-start',
'--shards',
'--parallel',
'--replicas',
'--install-requirements',
'--force',
'--uses-before',
'--uses-after',
'--shards',
'--parallel',
'--replicas',
'--polling',
'--scheduling',
'--external',
Expand Down
6 changes: 5 additions & 1 deletion jina/executors/__init__.py
Expand Up @@ -213,7 +213,11 @@ def workspace(self) -> str:
if workspace:
complete_workspace = os.path.join(workspace, self.metas.name)
replica_id = getattr(self.runtime_args, 'replica_id', None)
pea_id = getattr(self.runtime_args, 'pea_id', None)
pea_id = getattr(
self.runtime_args,
'pea_id',
getattr(self.runtime_args, 'shard_id', None),
)
if replica_id is not None and replica_id != -1:
complete_workspace = os.path.join(complete_workspace, str(replica_id))
if pea_id is not None and pea_id != -1:
Expand Down
4 changes: 4 additions & 0 deletions jina/flow/base.py
Expand Up @@ -182,9 +182,11 @@ def __init__(
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
replicas: Optional[int] = 1,
runs_in_docker: Optional[bool] = False,
runtime_backend: Optional[str] = 'PROCESS',
runtime_cls: Optional[str] = 'GRPCRuntime',
shards: Optional[int] = 1,
socket_in: Optional[str] = 'PULL_CONNECT',
socket_out: Optional[str] = 'PUSH_CONNECT',
ssh_keyfile: Optional[str] = None,
Expand Down Expand Up @@ -263,9 +265,11 @@ def __init__(
`Executor cookbook <https://docs.jina.ai/fundamentals/executor/repository-structure/>`__
:param quiet: If set, then no log will be emitted from this object.
:param quiet_error: If set, then exception stack information will not be added to the log
:param replicas: The number of replicas in the pod, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary
:param runs_in_docker: Informs a Pea that runs in a container. Important to properly set networking information
:param runtime_backend: The parallel backend of the runtime inside the Pea
:param runtime_cls: The runtime class to run inside the Pea
:param shards: The number of shards in the pod running at the same time, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary
:param socket_in: The socket type for input port
:param socket_out: The socket type for output port
:param ssh_keyfile: This specifies a key to be used in ssh login, default None. regular default ssh keys will be used without specifying this argument.
Expand Down
9 changes: 0 additions & 9 deletions jina/parsers/__init__.py
Expand Up @@ -102,15 +102,6 @@ def set_gateway_parser(parser=None):
pod_role=PodRoleType.GATEWAY,
)

parser.add_argument(
'--dynamic-routing',
action='store_true',
default=True,
help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

return parser


Expand Down
9 changes: 9 additions & 0 deletions jina/parsers/peapods/base.py
Expand Up @@ -102,3 +102,12 @@ def mixin_base_ppr_parser(parser, with_identity: bool = True):
type=str,
help='Routing graph for the gateway' if _SHOW_ALL_ARGS else argparse.SUPPRESS,
)

parser.add_argument(
'--dynamic-routing',
action='store_true',
default=True,
help='The Pod will setup the socket types of the HeadPea and TailPea depending on this argument.'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)
20 changes: 20 additions & 0 deletions jina/parsers/peapods/pea.py
Expand Up @@ -64,6 +64,7 @@ def mixin_pea_parser(parser):
# hidden CLI used for internal only

gp.add_argument(
'--shard-id',
'--pea-id',
type=int,
default=0,
Expand All @@ -89,3 +90,22 @@ def mixin_pea_parser(parser):
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

gp.add_argument(
'--shards',
'--parallel',
type=int,
default=1,
help='The number of shards in the pod running at the same time, '
'`port_in` and `port_out` will be set to random, '
'and routers will be added automatically when necessary',
)

gp.add_argument(
'--replicas',
type=int,
default=1,
help='The number of replicas in the pod, '
'`port_in` and `port_out` will be set to random, '
'and routers will be added automatically when necessary',
)
17 changes: 0 additions & 17 deletions jina/parsers/peapods/pod.py
Expand Up @@ -24,23 +24,6 @@ def mixin_base_pod_parser(parser):
help='The executor attached after the Peas described by --uses, typically used for receiving from '
'all shards, accepted type follows `--uses`',
)
gp.add_argument(
'--shards',
'--parallel',
type=int,
default=1,
help='The number of shards in the pod running at the same time, '
'`port_in` and `port_out` will be set to random, '
'and routers will be added automatically when necessary',
)
gp.add_argument(
'--replicas',
type=int,
default=1,
help='The number of replicas in the pod, '
'`port_in` and `port_out` will be set to random, '
'and routers will be added automatically when necessary',
)
gp.add_argument(
'--polling',
type=PollingType.from_string,
Expand Down
3 changes: 3 additions & 0 deletions jina/peapods/peas/__init__.py
Expand Up @@ -109,6 +109,9 @@ class BasePea:
def __init__(self, args: 'argparse.Namespace'):
super().__init__() #: required here to call process/thread __init__
self.args = args
# BACKWARDS COMPATIBILITY
self.args.pea_id = self.args.shard_id
self.args.parallel = self.args.shards
self.name = self.args.name or self.__class__.__name__

self.logger = JinaLogger(self.name, **vars(self.args))
Expand Down
4 changes: 4 additions & 0 deletions jina/peapods/pods/compound.py
Expand Up @@ -42,6 +42,8 @@ def __init__(
self.tail_args = BasePod._copy_to_tail_args(self.args, self.args.polling)
# uses before with shards apply to shards and not to replicas
self.shards = [] # type: List['Pod']
# BACKWARDS COMPATIBILITY:
self.args.parallel = self.args.shards
self.assign_shards()

def assign_shards(self):
Expand Down Expand Up @@ -216,6 +218,8 @@ def _set_shard_args(
]
_args.peas_hosts = pod_host_list
_args.shard_id = idx
# BACKWARDS COMPATIBILITY:
_args.pea_id = _args.shard_id
_args.identity = random_identity()
if _args.name:
_args.name += f'/shard-{idx}'
Expand Down
4 changes: 4 additions & 0 deletions jina/peapods/runtimes/container/__init__.py
Expand Up @@ -196,6 +196,10 @@ def _docker_run(self, replay: bool = False):
_args = ArgNamespace.kwargs2list(non_defaults)
ports = {f'{v}/tcp': v for v in _expose_port} if not self._net_mode else None

# WORKAROUND: we cant automatically find these true/false flags, this needs to be fixed
if 'dynamic_routing' in non_defaults and not non_defaults['dynamic_routing']:
_args.append('--no-dynamic-routing')

docker_kwargs = self.args.docker_kwargs or {}
self._container = client.containers.run(
uses_img,
Expand Down
Expand Up @@ -61,6 +61,8 @@ def __init__(self, args: 'argparse.Namespace', logger: 'JinaLogger', **kwargs):
"""
super().__init__()
self.args = args
self.args.pea_id = self.args.shard_id
self.args.parallel = self.args.shards
self.logger = logger
self._load_executor()

Expand Down
6 changes: 3 additions & 3 deletions tests/k8s/test-executor/debug_executor.py
Expand Up @@ -13,7 +13,7 @@ def __init__(self, *args, **kwargs):
self._name = self.runtime_args.name

@requests(on='/index')
def debug(self, docs: DocumentArray, parameters: Dict, **kwargs):
def debug(self, docs: DocumentArray, **kwargs):
self.logger.debug(
f'Received doc array in test-executor {self._name} with length {len(docs)}.'
)
Expand All @@ -27,7 +27,7 @@ def debug(self, docs: DocumentArray, parameters: Dict, **kwargs):
doc.tags[key] = traversed

@requests(on='/env')
def env(self, docs: DocumentArray, parameters: Dict, **kwargs):
def env(self, docs: DocumentArray, **kwargs):
self.logger.debug(
f'Received doc array in test-executor {self._name} with length {len(docs)}.'
)
Expand All @@ -39,7 +39,7 @@ def env(self, docs: DocumentArray, parameters: Dict, **kwargs):
doc.tags['env'] = {'k1': os.environ.get('k1'), 'k2': os.environ.get('k2')}

@requests(on='/search')
def read_file(self, docs: DocumentArray, parameters: Dict, **kwargs):
def read_file(self, docs: DocumentArray, **kwargs):
self.logger.debug(
f'Received doc array in test-executor {self._name} with length {len(docs)}.'
)
Expand Down
11 changes: 11 additions & 0 deletions tests/unit/peapods/peas/test_pea.py
Expand Up @@ -250,3 +250,14 @@ def test_idle_does_not_create_response(command, response_expected):
socket.connect(f'tcp://localhost:{p.args.port_ctrl}')
socket.send_multipart(msg.dump())
assert socket.poll(timeout=1000) == response_expected


def test_pea_set_shard_pea_id():
args = set_pea_parser().parse_args(['--shard-id', '1', '--shards', '3'])

pea = Pea(args)
assert pea.args.shard_id == 1
assert pea.args.pea_id == 1

assert pea.args.shards == 3
assert pea.args.parallel == 3
Expand Up @@ -2,4 +2,4 @@ FROM jinaai/jina:test-pip

ADD *.py mwu_encoder.yml ./

ENTRYPOINT ["jina", "pod", "--uses", "mwu_encoder.yml"]
ENTRYPOINT ["jina", "executor", "--uses", "mwu_encoder.yml"]
12 changes: 12 additions & 0 deletions tests/unit/peapods/runtimes/container/test_container_runtime.py
Expand Up @@ -68,6 +68,18 @@ def test_flow_with_one_container_pod(docker_image_built):
f.post(on='/index', inputs=random_docs(10))


def test_flow_with_one_container_pod_shards(docker_image_built):
f = Flow().add(name='dummyEncoder1', shards=2, uses=f'docker://{img_name}')

with f:
pod = f._pod_nodes['dummyEncoder1']
assert pod.args.shards == pod.args.parallel == 2
for idx, shard in enumerate(pod.shards):
assert shard.args.pea_id == shard.args.shard_id == idx
assert shard.args.shards == shard.args.parallel == 2
f.post(on='/index', inputs=random_docs(10))


def test_flow_with_replica_container_ext_yaml(docker_image_built):
f = Flow().add(
name='dummyEncoder3',
Expand Down

0 comments on commit dca6d01

Please sign in to comment.