Skip to content

Commit

Permalink
fix: uses=Executor should work with spawn start method (#3380)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tadej Svetina committed Sep 10, 2021
1 parent b0e46a3 commit 2cf15df
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 0 deletions.
7 changes: 7 additions & 0 deletions jina/flow/base.py
Expand Up @@ -712,6 +712,13 @@ def add(
kwargs, parser, True, fallback_parsers=FALLBACK_PARSERS
)

# Temporary workaround to re-import executor module when using mp spawn start
# method, so that the executor is re-registered with pyyaml. The re-import
# occurs because the class will be in the arguments passed to mp.Process.start
# method. A better solution probably involves deeper refactoring
if isinstance(kwargs.get('uses'), type(JAMLCompatible)):
args._exec_cls = kwargs['uses']

if args.grpc_data_requests and args.runtime_cls == 'ZEDRuntime':
args.runtime_cls = 'GRPCDataRuntime'

Expand Down
1 change: 1 addition & 0 deletions jina/jaml/__init__.py
Expand Up @@ -507,6 +507,7 @@ def load_config(
:param kwargs: kwargs for parse_config_source
:return: :class:`JAMLCompatible` object
"""

stream, s_path = parse_config_source(source, **kwargs)
with stream as fp:
# first load yml with no tag
Expand Down
4 changes: 4 additions & 0 deletions jina/peapods/peas/__init__.py
Expand Up @@ -52,6 +52,10 @@ def run(
"""
logger = JinaLogger(name, **vars(args))

# Remove workaround used to re-import executor in spawn
if hasattr(args, '_exec_cls'):
del args._exec_cls

def _unset_envs():
if envs and args.runtime_backend != RuntimeBackendType.THREAD:
for k in envs.keys():
Expand Down
Empty file.
5 changes: 5 additions & 0 deletions tests/integration/multiprocessing_spawn/modules/exec.py
@@ -0,0 +1,5 @@
from jina import Executor


class Exec(Executor):
pass
15 changes: 15 additions & 0 deletions tests/integration/multiprocessing_spawn/modules/main_cls.py
@@ -0,0 +1,15 @@
from multiprocessing import get_start_method

import jina


def run():
from exec import Exec

with jina.Flow().add(uses=Exec) as f:
pass


if __name__ == '__main__':
assert get_start_method() == 'spawn'
run()
15 changes: 15 additions & 0 deletions tests/integration/multiprocessing_spawn/modules/main_empty.py
@@ -0,0 +1,15 @@
from multiprocessing import get_start_method

import jina


def run():
from exec import Exec

with jina.Flow().add() as f:
pass


if __name__ == '__main__':
assert get_start_method() == 'spawn'
run()
15 changes: 15 additions & 0 deletions tests/integration/multiprocessing_spawn/modules/main_name.py
@@ -0,0 +1,15 @@
from multiprocessing import get_start_method

import jina


def run():
from exec import Exec

with jina.Flow().add(uses="Exec") as f:
pass


if __name__ == '__main__':
assert get_start_method() == 'spawn'
run()
41 changes: 41 additions & 0 deletions tests/integration/multiprocessing_spawn/test_spawn.py
@@ -0,0 +1,41 @@
"""Tests that flow can launch when using the spawn multiprocessing method
Currently when using class name this still breaks (see xfail)
"""

import os
import subprocess
import sys
from pathlib import Path


import pytest


def test_launch_spawn_empty():
subprocess.run(
[sys.executable, 'main_empty.py'],
check=True,
env={'JINA_MP_START_METHOD': 'spawn', 'PATH': os.environ['PATH']},
cwd=Path(__file__).parent / 'modules',
)


def test_launch_spawn_cls():
print(sys.executable)
subprocess.run(
[sys.executable, 'main_cls.py'],
check=True,
env={'JINA_MP_START_METHOD': 'spawn', 'PATH': os.environ['PATH']},
cwd=Path(__file__).parent / 'modules',
)


@pytest.mark.xfail(reason="re-importing not possible when given only exec name")
def test_launch_spawn_name():
subprocess.run(
[sys.executable, 'main_name.py'],
check=True,
env={'JINA_MP_START_METHOD': 'spawn', 'PATH': os.environ['PATH']},
cwd=Path(__file__).parent / 'modules',
)

0 comments on commit 2cf15df

Please sign in to comment.