Skip to content

Commit

Permalink
refactor: do not expose everything in runtime args (#4261)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Feb 9, 2022
1 parent b283672 commit 5820913
Show file tree
Hide file tree
Showing 17 changed files with 25 additions and 71 deletions.
2 changes: 0 additions & 2 deletions jina/orchestrate/deployments/config/docker_compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ def _get_image_name(self, uses: Optional[str]):

def _get_container_args(self, cargs):
uses_metas = cargs.uses_metas or {}
if self.shard_id is not None:
uses_metas['pod_id'] = self.shard_id
uses_with = self.service_args.uses_with
if cargs.uses != __default_executor__:
cargs.uses = 'config.yml'
Expand Down
2 changes: 0 additions & 2 deletions jina/orchestrate/deployments/config/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ def _get_image_name(self, uses: Optional[str]):

def _get_container_args(self, cargs, pod_type):
uses_metas = cargs.uses_metas or {}
if self.shard_id is not None:
uses_metas['pod_id'] = self.shard_id
uses_with = self.deployment_args.uses_with
if cargs.uses != __default_executor__:
cargs.uses = 'config.yml'
Expand Down
11 changes: 8 additions & 3 deletions jina/serve/runtimes/request_handlers/data_request_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import warnings
from typing import Dict, List, TYPE_CHECKING, Optional

from docarray import DocumentArray
Expand All @@ -25,7 +24,6 @@ def __init__(self, args: 'argparse.Namespace', logger: 'JinaLogger', **kwargs):
"""
super().__init__()
self.args = args
self.args.pod_id = self.args.shard_id
self.args.parallel = self.args.shards
self.logger = logger
self._is_closed = False
Expand All @@ -39,7 +37,14 @@ def _load_executor(self):
uses_with=self.args.uses_with,
uses_metas=self.args.uses_metas,
uses_requests=self.args.uses_requests,
runtime_args=vars(self.args),
runtime_args={
'workspace': self.args.workspace,
'shard_id': self.args.shard_id,
'shards': self.args.shards,
'replicas': self.args.replicas,
'name': self.args.name,
'py_modules': self.args.py_modules,
},
extra_search_paths=self.args.extra_search_paths,
)
except BadConfigSource as ex:
Expand Down
2 changes: 0 additions & 2 deletions tests/docker_compose/executor-merger/exec_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ def debug(self, docs_matrix: List[DocumentArray], **kwargs):
for docs in zip(*docs_matrix):
traversed_executors = [doc.tags['traversed-executors'] for doc in docs]
shard_ids = [doc.tags['shard_id'] for doc in docs]
pod_ids = [doc.tags['pod_id'] for doc in docs]
shards = [doc.tags['shards'] for doc in docs]
parallels = [doc.tags['parallel'] for doc in docs]
traversed_executors = list(chain(*traversed_executors))
doc = Document()
doc.tags['traversed-executors'] = traversed_executors
doc.tags['shard_id'] = shard_ids
doc.tags['pod_id'] = pod_ids
doc.tags['shards'] = shards
doc.tags['parallel'] = parallels
doc.tags['merged'] = True
Expand Down
3 changes: 1 addition & 2 deletions tests/docker_compose/test-executor/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ def debug(self, docs: DocumentArray, **kwargs):
traversed = list(doc.tags.get(key))
traversed.append(self._name)
doc.tags[key] = traversed
doc.tags['parallel'] = self.runtime_args.parallel
doc.tags['parallel'] = self.runtime_args.replicas
doc.tags['shards'] = self.runtime_args.shards
doc.tags['shard_id'] = self.runtime_args.shard_id
doc.tags['pod_id'] = self.runtime_args.pod_id

@requests(on='/env')
def env(self, docs: DocumentArray, **kwargs):
Expand Down
3 changes: 0 additions & 3 deletions tests/docker_compose/test_docker_compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ async def test_flow_with_sharding(flow_with_sharding, polling, tmpdir):
for doc in docs:
if polling == 'ALL':
assert len(set(doc.tags['traversed-executors'])) == 2
assert set(doc.tags['pod_id']) == {0, 1}
assert set(doc.tags['shard_id']) == {0, 1}
assert doc.tags['parallel'] == [2, 2]
assert doc.tags['shards'] == [2, 2]
Expand All @@ -170,9 +169,7 @@ async def test_flow_with_sharding(flow_with_sharding, polling, tmpdir):
runtimes_to_visit.remove(executor)
else:
assert len(set(doc.tags['traversed-executors'])) == 1
assert len(set(doc.tags['pod_id'])) == 1
assert len(set(doc.tags['shard_id'])) == 1
assert 0 in set(doc.tags['pod_id']) or 1 in set(doc.tags['pod_id'])
assert 0 in set(doc.tags['shard_id']) or 1 in set(doc.tags['shard_id'])
assert doc.tags['parallel'] == [2]
assert doc.tags['shards'] == [2]
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/deployments/test_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ async def test_deployments_with_replicas_advance_faster(port_generator):


class NameChangeExecutor(Executor):
def __init__(self, runtime_args, *args, **kwargs):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = runtime_args['name']
self.name = self.runtime_args.name

@requests
def foo(self, docs, **kwargs):
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/reduce/test_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@ def search(self, docs: DocumentArray, **kwargs):
for doc in docs:
doc.matches.extend(
[
Document(id=f'm-{self.runtime_args.pod_id}-{i}')
Document(id=f'm-{self.runtime_args.shard_id}-{i}')
for i in range(self.n_docs)
]
)
doc.chunks.extend(
[
Document(id=f'c-{self.runtime_args.pod_id}-{i}')
Document(id=f'c-{self.runtime_args.shard_id}-{i}')
for i in range(self.n_docs)
]
)

doc.text = self.runtime_args.name

if self.runtime_args.pod_id == 0:
if self.runtime_args.shard_id == 0:
doc.scores['cosine'].value = 0
doc.modality = 'text'
elif self.runtime_args.pod_id == 1:
elif self.runtime_args.shard_id == 1:
doc.modality = 'image'
doc.tags = {'c': 'd'}
elif self.runtime_args.pod_id == 2:
elif self.runtime_args.shard_id == 2:
doc.tags = {'a': 'b'}


Expand Down
2 changes: 0 additions & 2 deletions tests/k8s/executor-merger/exec_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ def debug(self, docs_matrix: List[DocumentArray], **kwargs):
for docs in zip(*docs_matrix):
traversed_executors = [doc.tags['traversed-executors'] for doc in docs]
shard_ids = [doc.tags['shard_id'] for doc in docs]
pod_ids = [doc.tags['pod_id'] for doc in docs]
shards = [doc.tags['shards'] for doc in docs]
parallels = [doc.tags['parallel'] for doc in docs]
traversed_executors = list(chain(*traversed_executors))
doc = Document()
doc.tags['traversed-executors'] = traversed_executors
doc.tags['shard_id'] = shard_ids
doc.tags['pod_id'] = pod_ids
doc.tags['shards'] = shards
doc.tags['parallel'] = parallels
doc.tags['merged'] = True
Expand Down
3 changes: 1 addition & 2 deletions tests/k8s/test-executor/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ def debug(self, docs: DocumentArray, **kwargs):
traversed = list(doc.tags.get(key))
traversed.append(self._name)
doc.tags[key] = traversed
doc.tags['parallel'] = self.runtime_args.parallel
doc.tags['parallel'] = self.runtime_args.replicas
doc.tags['shards'] = self.runtime_args.shards
doc.tags['shard_id'] = self.runtime_args.shard_id
doc.tags['pod_id'] = self.runtime_args.pod_id
doc.tags['hostname'] = socket.gethostname()

@requests(on='/env')
Expand Down
3 changes: 0 additions & 3 deletions tests/k8s/test_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ async def test_flow_with_sharding(
'test_executor-0',
'test_executor-1',
}
assert set(doc.tags['pod_id']) == {0, 1}
assert set(doc.tags['shard_id']) == {0, 1}
assert doc.tags['parallel'] == [2, 2]
assert doc.tags['shards'] == [2, 2]
Expand All @@ -348,9 +347,7 @@ async def test_flow_with_sharding(
assert set(doc.tags['traversed-executors']) == {'test_executor-0'} or set(
doc.tags['traversed-executors']
) == {'test_executor-1'}
assert len(set(doc.tags['pod_id'])) == 1
assert len(set(doc.tags['shard_id'])) == 1
assert 0 in set(doc.tags['pod_id']) or 1 in set(doc.tags['pod_id'])
assert 0 in set(doc.tags['shard_id']) or 1 in set(doc.tags['shard_id'])
assert doc.tags['parallel'] == [2]
assert doc.tags['shards'] == [2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,7 @@ def _mock_fetch(name, tag=None, secret=None, force=False):
expected_uses_metas = {}
if uses_metas is not None:
expected_uses_metas = json.loads(uses_metas)
expected_uses_metas['pod_id'] = shard_id
assert '--uses-metas' in replica_args
assert replica_args[replica_args.index('--uses-metas') + 1] == json.dumps(
expected_uses_metas
)
assert '--uses-metas' in replica_args
assert replica_args[
replica_args.index('--uses-metas') + 1
] == json.dumps(expected_uses_metas)
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,6 @@ def _mock_fetch(name, tag=None, secret=None, force=False):
expected_uses_metas = {}
if uses_metas is not None:
expected_uses_metas = json.loads(uses_metas)
expected_uses_metas['pod_id'] = i
assert '--uses-metas' in shard_container_runtime_container_args
assert shard_container_runtime_container_args[
shard_container_runtime_container_args.index('--uses-metas') + 1
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/orchestrate/deploymens/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ def test_pod_context_shards_replicas(shards):


class AppendNameExecutor(Executor):
def __init__(self, runtime_args, *args, **kwargs):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = runtime_args['name']
self.name = self.runtime_args.name

@requests
def foo(self, docs: DocumentArray, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ def test_flow_to_docker_compose_yaml(tmpdir, protocol):
assert executor0_args[executor0_args.index('--name') + 1] == 'executor0'
assert '--uses-with' in executor0_args
assert executor0_args[executor0_args.index('--uses-with') + 1] == '{"param": 0}'
assert '--uses-metas' in executor0_args
assert executor0_args[executor0_args.index('--uses-metas') + 1] == '{"pod_id": 0}'
assert '--native' in executor0_args
assert '--pod-role' not in executor0_args
assert '--runtime-cls' not in executor0_args
Expand Down Expand Up @@ -175,10 +173,6 @@ def test_flow_to_docker_compose_yaml(tmpdir, protocol):
== '{"param": 0}'
)
assert '--uses-metas' in executor1_shard0_args
assert (
executor1_shard0_args[executor1_shard0_args.index('--uses-metas') + 1]
== '{"pod_id": 0}'
)
assert '--native' in executor1_shard0_args
assert '--pod-role' not in executor1_shard0_args
assert '--runtime-cls' not in executor1_shard0_args
Expand All @@ -202,10 +196,6 @@ def test_flow_to_docker_compose_yaml(tmpdir, protocol):
== '{"param": 0}'
)
assert '--uses-metas' in executor1_shard1_args
assert (
executor1_shard1_args[executor1_shard1_args.index('--uses-metas') + 1]
== '{"pod_id": 1}'
)
assert '--native' in executor1_shard1_args
assert '--pod-role' not in executor1_shard1_args
assert '--runtime-cls' not in executor1_shard1_args
Expand Down Expand Up @@ -265,11 +255,6 @@ def test_flow_to_docker_compose_yaml(tmpdir, protocol):
executor2_rep_0_args[executor2_rep_0_args.index('--uses-with') + 1]
== '{"param": 0}'
)
assert '--uses-metas' in executor2_rep_0_args
assert (
executor2_rep_0_args[executor2_rep_0_args.index('--uses-metas') + 1]
== '{"pod_id": 0}'
)
assert '--native' in executor2_rep_0_args
assert '--pod-role' not in executor2_rep_0_args
assert '--runtime-cls' not in executor2_rep_0_args
Expand All @@ -292,11 +277,6 @@ def test_flow_to_docker_compose_yaml(tmpdir, protocol):
executor2_rep_1_args[executor2_rep_1_args.index('--uses-with') + 1]
== '{"param": 0}'
)
assert '--uses-metas' in executor2_rep_1_args
assert (
executor2_rep_1_args[executor2_rep_1_args.index('--uses-metas') + 1]
== '{"pod_id": 0}'
)
assert '--native' in executor2_rep_1_args
assert '--pod-role' not in executor2_rep_1_args
assert '--runtime-cls' not in executor2_rep_1_args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ def test_flow_to_k8s_yaml(tmpdir, protocol, k8s_connection_pool):
assert executor0_args[executor0_args.index('--k8s-namespace') + 1] == namespace
assert '--uses-with' in executor0_args
assert executor0_args[executor0_args.index('--uses-with') + 1] == '{"param": 0}'
assert '--uses-metas' in executor0_args
assert executor0_args[executor0_args.index('--uses-metas') + 1] == '{"pod_id": 0}'
assert '--native' in executor0_args
assert '--k8s-disable-connection-pool' not in executor0_args
assert '--pod-role' not in executor0_args
Expand Down Expand Up @@ -327,10 +325,6 @@ def test_flow_to_k8s_yaml(tmpdir, protocol, k8s_connection_pool):
== '{"param": 0}'
)
assert '--uses-metas' in executor1_shard0_args
assert (
executor1_shard0_args[executor1_shard0_args.index('--uses-metas') + 1]
== '{"pod_id": 0}'
)
assert '--native' in executor1_shard0_args
assert '--k8s-disable-connection-pool' not in executor1_shard0_args
assert '--pod-role' not in executor1_shard0_args
Expand Down Expand Up @@ -381,11 +375,6 @@ def test_flow_to_k8s_yaml(tmpdir, protocol, k8s_connection_pool):
executor1_shard1_args[executor1_shard1_args.index('--uses-with') + 1]
== '{"param": 0}'
)
assert '--uses-metas' in executor1_shard1_args
assert (
executor1_shard1_args[executor1_shard1_args.index('--uses-metas') + 1]
== '{"pod_id": 1}'
)
assert '--native' in executor1_shard1_args
assert '--k8s-disable-connection-pool' not in executor1_shard1_args
assert '--pod-role' not in executor1_shard1_args
Expand Down Expand Up @@ -540,8 +529,6 @@ def test_flow_to_k8s_yaml(tmpdir, protocol, k8s_connection_pool):
assert executor2_args[executor2_args.index('--k8s-namespace') + 1] == namespace
assert '--uses-with' in executor2_args
assert executor2_args[executor2_args.index('--uses-with') + 1] == '{"param": 0}'
assert '--uses-metas' in executor2_args
assert executor2_args[executor2_args.index('--uses-metas') + 1] == '{"pod_id": 0}'
assert '--native' in executor2_args
assert '--k8s-disable-connection-pool' not in executor2_args
assert '--pod-role' not in executor2_args
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_yamlparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ def test_encoder_name_dict_replace():

def test_encoder_inject_config_via_kwargs():
with BaseExecutor.load_config(
'yaml/test-encoder-env.yml', metas={'pod_id': 345}
'yaml/test-encoder-env.yml', metas={'shard_id': 345}
) as be:
assert be.metas.pod_id == 345
assert be.metas.shard_id == 345


def test_load_from_dict():
Expand Down

0 comments on commit 5820913

Please sign in to comment.