Skip to content

Commit

Permalink
Merge default & distributed requirements (mars-project#2263)
Browse files Browse the repository at this point in the history
* Make new_session defeault=True & refine tests

* Try relieve test_cmdline

Co-authored-by: wenjun.swj <wenjun.swj@alibaba-inc.com>
  • Loading branch information
Xuye (Chris) Qin and wjsi committed Jul 29, 2021
1 parent ac04c8c commit 50ef6c3
Show file tree
Hide file tree
Showing 36 changed files with 216 additions and 430 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
fi
pip install numpy scipy cython
pip install -e ".[dev,distributed]"
pip install -e ".[dev,extra]"
conda list -n test
- name: Deploy packages
Expand Down
9 changes: 1 addition & 8 deletions .github/workflows/core-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
pip install git+https://github.com/mars-project/pytest-asyncio.git
pip install numpy scipy cython
pip install -e ".[dev,distributed]"
pip install -e ".[dev,extra]"
pip install virtualenv flaky flake8
if [ -z "$NO_COMMON_TESTS" ]; then
Expand All @@ -75,13 +75,6 @@ jobs:
pip install torch torchvision
pip install statsmodels tsfresh
fi
virtualenv testenv && source testenv/bin/activate
pip install -e . && pip install pytest pytest-timeout
if [ -z "$DEFAULT_VENV" ]; then
deactivate
else
source $DEFAULT_VENV/bin/activate
fi
fi
retry ./.github/workflows/download-etcd.sh
conda list -n test
Expand Down
18 changes: 2 additions & 16 deletions .github/workflows/os-compat-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,8 @@ jobs:
pip install git+https://github.com/mars-project/pytest-asyncio.git
pip install numpy scipy cython
pip install -e ".[dev,distributed]"
if [[ $UNAME == "windows" ]]; then
pip install virtualenv flaky flake8
else
pip install virtualenv flaky flake8
if [ -z "$NO_COMMON_TESTS" ]; then
virtualenv testenv && source testenv/bin/activate
pip install -e . && pip install pytest pytest-timeout
if [ -z "$DEFAULT_VENV" ]; then
deactivate
else
source $DEFAULT_VENV/bin/activate
fi
fi
fi
pip install -e ".[dev,extra]"
pip install virtualenv flaky flake8
conda list -n test
- name: Test with pytest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
pip install pandas\<1.3
fi
pip install -e ".[dev,distributed]"
pip install -e ".[dev,extra]"
if [[ $UNAME == "windows" ]]; then
pip install virtualenv flaky flake8
Expand Down
9 changes: 0 additions & 9 deletions .github/workflows/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,4 @@ if [ -z "$NO_COMMON_TESTS" ]; then
--ignore mars/learn --ignore mars/remote mars
mv .coverage build/.coverage.main.file
coverage combine build/ && coverage report

export DEFAULT_VENV=$VIRTUAL_ENV
source testenv/bin/activate
pytest --timeout=1500 mars/tests/test_session.py mars/lib/filesystem/tests/test_filesystem.py
if [ -z "$DEFAULT_VENV" ]; then
deactivate
else
source $DEFAULT_VENV/bin/activate
fi
fi
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
else
pip install numpy scipy cython
fi
pip install -e ".[dev,distributed]"
pip install -e ".[dev,extra]"
pip install virtualenv flaky flake8
if [ -z "$NO_COMMON_TESTS" ]; then
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/kubedl/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def session(self):

def start(self):
self._endpoint = self._cluster.start()
self._session = new_session(self._endpoint, verify_ssl=self._cluster.verify_ssl).as_default()
self._session = new_session(self._endpoint, verify_ssl=self._cluster.verify_ssl)

def stop(self, wait=False, timeout=0):
self._cluster.stop(wait=wait, timeout=timeout)
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/kubernetes/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def session(self):
def start(self):
try:
self._endpoint = self._cluster.start()
self._session = new_session(self._endpoint, default=True)
self._session = new_session(self._endpoint)
except: # noqa: E722 # nosec # pylint: disable=bare-except
self.stop()
raise
Expand Down
15 changes: 10 additions & 5 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,17 @@ def __await__(self):
return self._future_local.aio_future.__await__()


warning_msg = """No session found, local session \
will be created in the background, \
warning_msg = """
No session found, local session \
will be created in background, \
it may take a while before execution. \
If you want to new a local session by yourself, \
run code below:
```
import mars
mars.new_session(default=True)
mars.new_session()
```
"""

Expand Down Expand Up @@ -1354,6 +1355,8 @@ def execute(tileable: TileableType,
new_session_kwargs: dict = None,
show_progress: Union[bool, str] = None,
progress_update_interval=1, **kwargs):
if isinstance(tileable, (tuple, list)) and len(tileables) == 0:
tileable, tileables = tileable[0], tileable[1:]
if session is None:
session = get_default_or_create(
**(new_session_kwargs or dict()))
Expand Down Expand Up @@ -1393,6 +1396,8 @@ def fetch(tileable: TileableType,
def fetch_log(*tileables: TileableType,
session: SyncSession = None,
**kwargs):
if len(tileables) == 1 and isinstance(tileables[0], (list, tuple)):
tileables = tileables[0]
if session is None:
session = get_default_session()
if session is None: # pragma: no cover
Expand Down Expand Up @@ -1442,7 +1447,7 @@ async def _new_session(address: str,
def new_session(address: str = None,
session_id: str = None,
backend: str = 'oscar',
default: bool = False,
default: bool = True,
**kwargs) -> AbstractSession:
ensure_isolation_created(kwargs)

Expand Down Expand Up @@ -1481,7 +1486,7 @@ def get_default_or_create(**kwargs):
# no session attached, try to create one
warnings.warn(warning_msg)
session = new_session(
'127.0.0.1', default=True, init_local=True, **kwargs)
'127.0.0.1', init_local=True, **kwargs)
session.as_default()
if isinstance(session, _IsolatedSession):
session = SyncSession.from_isolated_session(session)
Expand Down
19 changes: 14 additions & 5 deletions mars/deploy/oscar/tests/test_cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import subprocess
import sys
import time
from concurrent import futures
from typing import List

import numpy as np
Expand All @@ -37,6 +38,8 @@
class _ProcessExitedException(Exception):
pass

_TimeoutErrors = (asyncio.TimeoutError, futures.TimeoutError, TimeoutError)


def _wait_supervisor_ready(supervisor_proc: subprocess.Popen, timeout=120):
start_time = time.time()
Expand Down Expand Up @@ -79,7 +82,7 @@ async def wait_for_workers():
await asyncio.sleep(0.1)

isolation = get_isolation()
asyncio.run_coroutine_threadsafe(wait_for_workers(), isolation.loop).result()
asyncio.run_coroutine_threadsafe(wait_for_workers(), isolation.loop).result(120)


_test_port_cache = dict()
Expand Down Expand Up @@ -124,15 +127,21 @@ def _reload_args(args):
return [arg if not callable(arg) else arg() for arg in args]


_rerun_errors = (_ProcessExitedException,) + _TimeoutErrors


@pytest.mark.parametrize('supervisor_args,worker_args,use_web_addr',
list(start_params.values()), ids=list(start_params.keys()))
@flaky(rerun_filter=lambda *args: issubclass(args[0][0], _ProcessExitedException))
@flaky(rerun_filter=lambda *args: issubclass(args[0][0], _rerun_errors))
def test_cmdline_run(supervisor_args, worker_args, use_web_addr):
new_isolation()
sv_proc = w_procs = None
try:
env = os.environ.copy()
env['MARS_CPU_TOTAL'] = '2'

sv_args = _reload_args(supervisor_args)
sv_proc = subprocess.Popen(sv_args, env=os.environ.copy())
sv_proc = subprocess.Popen(sv_args, env=env)

oscar_port = _get_labelled_port('supervisor', create=False)
if not oscar_port:
Expand All @@ -147,10 +156,10 @@ def test_cmdline_run(supervisor_args, worker_args, use_web_addr):
api_ep = oscar_ep

w_procs = [subprocess.Popen(
_reload_args(worker_args), env=os.environ.copy()) for _ in range(2)]
_reload_args(worker_args), env=env) for _ in range(2)]
_wait_worker_ready(oscar_ep, w_procs)

new_session(api_ep, default=True)
new_session(api_ep)
data = np.random.rand(10, 10)
res = mt.tensor(data, chunk_size=5).sum().execute().fetch()
np.testing.assert_almost_equal(res, data.sum())
Expand Down
21 changes: 8 additions & 13 deletions mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ async def test_web_session(create_cluster):


def test_sync_execute():
session = new_session(n_cpu=2, default=True,
web=False, use_uvloop=False)
session = new_session(n_cpu=2, web=False, use_uvloop=False)

# web not started
assert session._session.client.web_address is None
Expand Down Expand Up @@ -293,7 +292,7 @@ def test_no_default_session():

@pytest.fixture
def setup_session():
session = new_session(n_cpu=2, default=True, use_uvloop=False)
session = new_session(n_cpu=2, use_uvloop=False)
assert session.get_web_endpoint() is not None

with session:
Expand Down Expand Up @@ -420,22 +419,18 @@ def test_load_third_party_modules(cleanup_third_party_modules_output): # noqa:

config['third_party_modules'] = set()
with pytest.raises(TypeError, match='set'):
new_session(n_cpu=2, default=True,
web=False, config=config)
new_session(n_cpu=2, web=False, config=config)

config['third_party_modules'] = {'supervisor': ['not_exists_for_supervisor']}
with pytest.raises(ModuleNotFoundError, match='not_exists_for_supervisor'):
new_session(n_cpu=2, default=True,
web=False, config=config)
new_session(n_cpu=2, web=False, config=config)

config['third_party_modules'] = {'worker': ['not_exists_for_worker']}
with pytest.raises(ModuleNotFoundError, match='not_exists_for_worker'):
new_session(n_cpu=2, default=True,
web=False, config=config)
new_session(n_cpu=2, web=False, config=config)

config['third_party_modules'] = ['mars.deploy.oscar.tests.modules.replace_op']
session = new_session(n_cpu=2, default=True,
web=False, config=config)
session = new_session(n_cpu=2, web=False, config=config)
# web not started
assert session._session.client.web_address is None

Expand All @@ -451,8 +446,8 @@ def test_load_third_party_modules(cleanup_third_party_modules_output): # noqa:
session.stop_server()
assert get_default_session() is None

session = new_session(n_cpu=2, default=True,
web=False, config=CONFIG_THIRD_PARTY_MODULES_TEST_FILE)
session = new_session(n_cpu=2, web=False,
config=CONFIG_THIRD_PARTY_MODULES_TEST_FILE)
# web not started
assert session._session.client.web_address is None

Expand Down
6 changes: 3 additions & 3 deletions mars/deploy/oscar/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def test_execute_describe(ray_large_cluster, create_cluster):
@pytest.mark.asyncio
def test_sync_execute(ray_large_cluster, create_cluster):
assert create_cluster.session
session = new_session(address=create_cluster.address, backend='oscar', default=True)
session = new_session(address=create_cluster.address, backend='oscar')
with session:
raw = np.random.RandomState(0).rand(10, 5)
a = mt.tensor(raw, chunk_size=5).sum(axis=1)
Expand All @@ -99,7 +99,7 @@ def _run_web_session(web_address):

def _sync_web_session_test(web_address):
register_ray_serializers()
new_session(web_address, backend='oscar', default=True)
new_session(web_address, backend='oscar')
raw = np.random.RandomState(0).rand(10, 5)
a = mt.tensor(raw, chunk_size=5).sum(axis=1)
b = a.execute(show_progress=False)
Expand Down Expand Up @@ -182,7 +182,7 @@ async def test_load_third_party_modules(ray_large_cluster):
@pytest.mark.asyncio
def test_load_third_party_modules2(ray_large_cluster, create_cluster):
assert create_cluster.session
session = new_session(address=create_cluster.address, backend='oscar', default=True)
session = new_session(address=create_cluster.address, backend='oscar')
with session:
raw = np.random.RandomState(0).rand(10, 10)
a = mt.tensor(raw, chunk_size=5)
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/yarn/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, skein_client, application_id, endpoint, is_client_managed=Fal
self._is_client_managed = is_client_managed
self._application_id = application_id
self._endpoint = endpoint
self._session = new_session(endpoint).as_default()
self._session = new_session(endpoint)

@property
def session(self):
Expand Down
2 changes: 1 addition & 1 deletion mars/learn/contrib/joblib/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, service=None, session=None, backend=None, n_parallel=None):

if session is None:
if service is not None:
self.session = new_session(service, backend=backend)
self.session = new_session(service, backend=backend, default=False)
else:
self.session = get_default_session()
else:
Expand Down
14 changes: 4 additions & 10 deletions mars/learn/contrib/joblib/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,16 @@
# limitations under the License.

import numpy as np
import pytest
try:
import joblib
import sklearn
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
except ImportError:
joblib = sklearn = None
import joblib
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

from mars.learn.contrib.joblib import register_mars_backend

register_mars_backend()


@pytest.mark.skipif(sklearn is None, reason='scikit-learn not installed')
def test_sk_learn_svc_train(setup):
digits = load_digits()
param_space = {
Expand Down
2 changes: 1 addition & 1 deletion mars/learn/contrib/tsfresh/tests/test_tsfresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_distributed_ts_fresh(setup):
robot_execution_failures.download_robot_execution_failures()
df, y = robot_execution_failures.load_robot_execution_failures()
default_session = get_default_session()
sync_session = new_session(default_session.address)
sync_session = new_session(default_session.address, default=False)
dist = MarsDistributor(session=sync_session)

df = df.iloc[:200].copy()
Expand Down
Loading

0 comments on commit 50ef6c3

Please sign in to comment.