Skip to content

Commit

Permalink
fix: make accessible shard and replica info at container rt (#3822)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Nov 2, 2021
1 parent 06f1af4 commit cdadb07
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 45 deletions.
4 changes: 4 additions & 0 deletions cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
'--expose-public',
'--shard-id',
'--pea-id',
'--replica-id',
'--pea-role',
'--noblock-on-start',
'--shards',
Expand Down Expand Up @@ -203,6 +204,7 @@
'--expose-public',
'--shard-id',
'--pea-id',
'--replica-id',
'--pea-role',
'--noblock-on-start',
'--shards',
Expand Down Expand Up @@ -298,6 +300,7 @@
'--expose-public',
'--shard-id',
'--pea-id',
'--replica-id',
'--pea-role',
'--noblock-on-start',
'--shards',
Expand Down Expand Up @@ -368,6 +371,7 @@
'--expose-public',
'--shard-id',
'--pea-id',
'--replica-id',
'--pea-role',
'--noblock-on-start',
'--shards',
Expand Down
12 changes: 6 additions & 6 deletions jina/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,22 @@ def workspace(self) -> str:
if workspace:
complete_workspace = os.path.join(workspace, self.metas.name)
replica_id = getattr(self.runtime_args, 'replica_id', None)
pea_id = getattr(
shard_id = getattr(
self.runtime_args,
'pea_id',
getattr(self.runtime_args, 'shard_id', None),
'shard_id',
getattr(self.runtime_args, 'pea_id', None),
)
if replica_id is not None and replica_id != -1:
complete_workspace = os.path.join(complete_workspace, str(replica_id))
if pea_id is not None and pea_id != -1:
complete_workspace = os.path.join(complete_workspace, str(pea_id))
if shard_id is not None and shard_id != -1:
complete_workspace = os.path.join(complete_workspace, str(shard_id))
if not os.path.exists(complete_workspace):
os.makedirs(complete_workspace)
return os.path.abspath(complete_workspace)
else:
raise ValueError(
'Neither `metas.workspace` nor `runtime_args.workspace` is set, '
'are you using this Executor is a Flow?'
'are you using this Executor in a Flow?'
)

def __enter__(self):
Expand Down
9 changes: 9 additions & 0 deletions jina/parsers/peapods/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ def mixin_pea_parser(parser):
else argparse.SUPPRESS,
)

gp.add_argument(
'--replica-id',
type=int,
default=0, # not sure how to mantain backwards compatibility with the workspace of Executor
help='the id of the replica of an executor'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

gp.add_argument(
'--pea-role',
type=PeaRoleType.from_string,
Expand Down
4 changes: 4 additions & 0 deletions jina/peapods/pods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ def __enter__(self):
for _args in self.args:
if getattr(self.pod_args, 'noblock_on_start', False):
_args.noblock_on_start = True
if (
self.pod_args.replicas == 1
): # keep backwards compatibility with `workspace` in `Executor`
_args.replica_id = -1
self._peas.append(BasePea(_args).start())
return self

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@

@pytest.fixture
def executor_images():
import docker

client = docker.from_env()

dbms_dir = os.path.join(cur_dir, 'pods', 'dbms')
dbms_docker_file = os.path.join(dbms_dir, 'Dockerfile')
os.system(f"docker build -f {dbms_docker_file} -t dbms-executor {dbms_dir}")
query_dir = os.path.join(cur_dir, 'pods', 'query')
query_docker_file = os.path.join(query_dir, 'Dockerfile')
os.system(f"docker build -f {query_docker_file} -t query-executor {query_dir}")
time.sleep(3)
client.images.build(path=dbms_dir, tag='dbms-executor')
client.images.build(path=query_dir, tag='query-executor')
client.close()
yield
os.system(f"docker rmi $(docker images | grep 'dbms-executor')")
os.system(f"docker rmi $(docker images | grep 'query-executor')")
time.sleep(2)
client = docker.from_env()
client.containers.prune()
client.close()


def _create_flows():
Expand Down
1 change: 0 additions & 1 deletion tests/distributed/test_workspaces/test_custom_container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os

import docker
import pytest

from jina import __default_host__
from daemon.clients import JinaDClient
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM jinaai/jina:test-pip

# setup the workspace
COPY . /workspace
WORKDIR /workspace

ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]
16 changes: 16 additions & 0 deletions tests/integration/container_runtime_args/replica-exec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from jina import Executor, requests


class ReplicatedExec(Executor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.replica_id = self.runtime_args.replica_id
self.shard_id = self.runtime_args.shard_id
self.shards = self.runtime_args.shards

@requests
def foo(self, docs, **kwargs):
for doc in docs:
doc.tags['replica_id'] = self.replica_id
doc.tags['shard_id'] = self.shard_id
doc.tags['shards'] = self.shards
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
jtype: ReplicatedExec
metas:
py_modules:
- __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pytest
import time
import os

from jina import Flow, Executor, Document, DocumentArray, requests

cur_dir = os.path.dirname(os.path.abspath(__file__))

img_name = 'jina/replica-exec'


@pytest.fixture(scope='module')
def docker_image_built():
import docker

client = docker.from_env()
client.images.build(path=os.path.join(cur_dir, 'replica-exec'), tag=img_name)
client.close()
yield
time.sleep(2)
client = docker.from_env()
client.containers.prune()


@pytest.mark.parametrize('shards', [1, 2])
@pytest.mark.parametrize('replicas', [1, 3, 4])
def test_containerruntime_args(docker_image_built, shards, replicas):
f = Flow().add(
name='executor',
uses=f'docker://{img_name}',
replicas=replicas,
shards=shards,
polling='ANY',
)
with f:
ret1 = f.index(
inputs=DocumentArray([Document() for _ in range(200)]),
return_results=True,
request_size=10,
)

assert len(ret1) == 20
replica_ids = set()
shard_ids = set()
for r in ret1:
assert len(r.docs) == 10
for replica_id in r.docs.get_attributes('tags__replica_id'):
replica_ids.add(replica_id)
for shard_id in r.docs.get_attributes('tags__shard_id'):
shard_ids.add(shard_id)
for doc in r.docs:
assert doc.tags['shards'] == shards

if replicas > 1:
assert replica_ids == set(range(replicas))
else:
assert replica_ids == {-1.0}
assert shard_ids == set(range(shards))
12 changes: 8 additions & 4 deletions tests/integration/issues/github_3124/test_cli_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@

@pytest.fixture()
def docker_image():
docker_file = os.path.join(cur_dir, 'Dockerfile')
os.system(f"docker build -f {docker_file} -t clitest {cur_dir}")
time.sleep(3)
import docker

client = docker.from_env()
client.images.build(path=os.path.join(cur_dir), tag='clitest')
client.close()
yield
os.system(f"docker rmi -f $(docker images | grep 'clitest')")
time.sleep(2)
client = docker.from_env()
client.containers.prune()


def test_executor_cli_docker(docker_image):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@

@pytest.fixture()
def docker_image():
docker_file = os.path.join(cur_dir, 'Dockerfile')
os.system(f"docker build -f {docker_file} -t override-config-test {cur_dir}")
time.sleep(3)
import docker

client = docker.from_env()
client.images.build(path=os.path.join(cur_dir), tag='override-config-test')
client.close()
yield
os.system(f"docker rmi $(docker images | grep 'override-config-test')")
time.sleep(2)
client = docker.from_env()
client.containers.prune()
client.close()


@pytest.fixture()
Expand Down
13 changes: 9 additions & 4 deletions tests/integration/rolling_update/test_rolling_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ def test_simple_run(docs):

@pytest.fixture()
def docker_image():
docker_file = os.path.join(cur_dir, 'Dockerfile')
os.system(f"docker build -f {docker_file} -t test_rolling_update_docker {cur_dir}")
time.sleep(3)
import docker

client = docker.from_env()
client.images.build(path=os.path.join(cur_dir), tag='test_rolling_update_docker')
client.close()
yield
os.system(f"docker rmi $(docker images | grep 'test_rolling_update_docker')")
time.sleep(2)
client = docker.from_env()
client.containers.prune()
client.close()


@pytest.mark.repeat(5)
Expand Down
32 changes: 16 additions & 16 deletions tests/unit/executors/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from jina import Executor, requests, Flow
from jina import Executor, requests
from jina.executors.metas import get_default_metas


Expand Down Expand Up @@ -39,7 +39,7 @@ def replica_id(request):


@pytest.fixture
def pea_id(request):
def shard_id(request):
return request.param


Expand All @@ -58,12 +58,12 @@ def test_bad_metas_workspace(tmpdir):


@pytest.fixture
def test_metas_workspace_replica_peas(tmpdir, replica_id, pea_id):
def test_metas_workspace_replica_peas(tmpdir, replica_id, shard_id):
metas = get_default_metas()
metas['workspace'] = str(tmpdir)
metas['name'] = 'test'
metas['replica_id'] = replica_id
metas['pea_id'] = pea_id
metas['shard_id'] = shard_id
return metas


Expand Down Expand Up @@ -96,24 +96,24 @@ def test_executor_workspace_simple_workspace(tmpdir):

executor = Executor(
metas={'name': name, 'workspace': workspace},
runtime_args={'pea_id': 1, 'replica_id': 2},
runtime_args={'shard_id': 1, 'replica_id': 2},
)
assert executor.workspace == os.path.abspath(
os.path.join(workspace, name, '2', '1')
)

executor = Executor(
metas={'name': name},
runtime_args={'workspace': workspace, 'pea_id': 1, 'replica_id': 2},
runtime_args={'workspace': workspace, 'shard_id': 1, 'replica_id': 2},
)
assert executor.workspace == os.path.abspath(
os.path.join(workspace, name, '2', '1')
)


@pytest.mark.parametrize('replica_id', [0, 1, 2], indirect=True)
@pytest.mark.parametrize('pea_id', [0, 1, 2], indirect=True)
def test_executor_workspace(test_metas_workspace_replica_peas, replica_id, pea_id):
@pytest.mark.parametrize('shard_id', [0, 1, 2], indirect=True)
def test_executor_workspace(test_metas_workspace_replica_peas, replica_id, shard_id):
executor = Executor(
metas={'name': test_metas_workspace_replica_peas['name']},
runtime_args=test_metas_workspace_replica_peas,
Expand All @@ -123,15 +123,15 @@ def test_executor_workspace(test_metas_workspace_replica_peas, replica_id, pea_i
test_metas_workspace_replica_peas['workspace'],
test_metas_workspace_replica_peas['name'],
str(replica_id),
str(pea_id),
str(shard_id),
)
)


@pytest.mark.parametrize('replica_id', [0, 1, 2], indirect=True)
@pytest.mark.parametrize('pea_id', [None, -1], indirect=True)
@pytest.mark.parametrize('shard_id', [None, -1], indirect=True)
def test_executor_workspace_parent_replica_nopea(
test_metas_workspace_replica_peas, replica_id, pea_id
test_metas_workspace_replica_peas, replica_id, shard_id
):
executor = Executor(
metas={'name': test_metas_workspace_replica_peas['name']},
Expand All @@ -147,9 +147,9 @@ def test_executor_workspace_parent_replica_nopea(


@pytest.mark.parametrize('replica_id', [None, -1], indirect=True)
@pytest.mark.parametrize('pea_id', [0, 1, 2], indirect=True)
@pytest.mark.parametrize('shard_id', [0, 1, 2], indirect=True)
def test_executor_workspace_parent_noreplica_pea(
test_metas_workspace_replica_peas, replica_id, pea_id
test_metas_workspace_replica_peas, replica_id, shard_id
):
executor = Executor(
metas={'name': test_metas_workspace_replica_peas['name']},
Expand All @@ -159,15 +159,15 @@ def test_executor_workspace_parent_noreplica_pea(
os.path.join(
test_metas_workspace_replica_peas['workspace'],
test_metas_workspace_replica_peas['name'],
str(pea_id),
str(shard_id),
)
)


@pytest.mark.parametrize('replica_id', [None, -1], indirect=True)
@pytest.mark.parametrize('pea_id', [None, -1], indirect=True)
@pytest.mark.parametrize('shard_id', [None, -1], indirect=True)
def test_executor_workspace_parent_noreplica_nopea(
test_metas_workspace_replica_peas, replica_id, pea_id
test_metas_workspace_replica_peas, replica_id, shard_id
):
executor = Executor(
metas={'name': test_metas_workspace_replica_peas['name']},
Expand Down

0 comments on commit cdadb07

Please sign in to comment.