Skip to content

Commit

Permalink
Add pseudo-cluster mode support (#35)
Browse files Browse the repository at this point in the history
* remove root distributor, add SchedulerDistributor and WorkerDistributor

* move the start_service logic in scheduler main to a sole file

* add missing file

* add worker service

* unify worker actor names(with prefix w:)

* add localDistributedCluster, fix test_api

* add local cluster session

* fix dashboard

* add web suppport

* ajust the way to calculate number of workers and schedulers, add ut for it

* fix fetch tensor when the result tensor is a scalar, fix tensor.execute that not every session supports n_parallel, add ut for local cluster execute

* fix session when the result is an scalar and not a numpy type

* add mechanism to check if scheduler is ready to prevent failing of prepare graph

* use actor pool's sleep instead of time.sleep

* limit shared memory size for testing local cluster
  • Loading branch information
qinxuye authored and wjsi committed Dec 17, 2018
1 parent 11c5a3f commit 9637373
Show file tree
Hide file tree
Showing 44 changed files with 1,182 additions and 356 deletions.
4 changes: 1 addition & 3 deletions mars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ def create_session(self, session_id, **kw):
self.session_manager.create_session(session_id, **kw)

def delete_session(self, session_id):
session_uid = SessionActor.gen_name(session_id)
session_ref = self.get_actor_ref(session_uid)
session_ref.destroy()
self.session_manager.delete_session(session_id)

def submit_graph(self, session_id, serialized_graph, graph_key, target):
session_uid = SessionActor.gen_name(session_id)
Expand Down
2 changes: 0 additions & 2 deletions mars/base_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from .config import options
from .errors import StartArgumentError
from .utils import get_next_port
from .distributor import BaseDistributor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -173,7 +172,6 @@ def _try_create_pool(self, endpoint=None, host=None, port=None):

def create_pool(self, *args, **kwargs):
kwargs.update(dict(n_process=self.n_process, backend='gevent'))
kwargs['distributor'] = BaseDistributor(self.n_process)
return create_actor_pool(*args, **kwargs)

def main_loop(self):
Expand Down
1 change: 1 addition & 0 deletions mars/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def validate(x):
default_options.register_option('worker.prepare_data_timeout', 600, validator=is_integer)

default_options.register_option('worker.plasma_socket', '/tmp/plasma', validator=is_string)
default_options.register_option('worker.plasma_one_mapped_file', False, validator=is_bool)
default_options.register_option('worker.advertise_addr', '127.0.0.1', validator=is_string)

# optimization
Expand Down
15 changes: 15 additions & 0 deletions mars/deploy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2018 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
13 changes: 3 additions & 10 deletions mars/tests/test_distributor.py → mars/deploy/local/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2018 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,14 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
from .core import new_cluster

from mars.distributor import BaseDistributor


class Test(unittest.TestCase):
def testDistributor(self):
distributor = BaseDistributor(5)
self.assertEqual(distributor.distribute('NormalActor'), 0)
self.assertIn(distributor.distribute('s:NormalActor'), (1, 2, 3, 4))
self.assertEqual(distributor.distribute('w:1:ManualBalance'), 1)
254 changes: 254 additions & 0 deletions mars/deploy/local/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2018 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing
import signal
import os

from ...utils import get_next_port
from ...resource import cpu_count
from ...scheduler.service import SchedulerService
from ...worker.service import WorkerService
from ...actors import create_actor_pool
from ...session import new_session
from ...compat import six
from ...lib import gipc
from .distributor import gen_distributor


class LocalDistributedCluster(object):

# at least 2 process are required by scheduler and worker
MIN_SCHEDULER_N_PROCESS = 2
MIN_WORKER_N_PROCESS = 2

def __init__(self, endpoint, n_process=None,
scheduler_n_process=None, worker_n_process=None):
self._endpoint = endpoint

self._started = False
self._stopped = False

self._pool = None
self._scheduler_service = SchedulerService()
self._worker_service = WorkerService()

self._scheduler_n_process, self._worker_n_process = \
self._calc_scheduler_worker_n_process(n_process,
scheduler_n_process,
worker_n_process)

@property
def pool(self):
return self._pool

@classmethod
def _calc_scheduler_worker_n_process(cls, n_process, scheduler_n_process, worker_n_process,
calc_cpu_count=cpu_count):
n_scheduler, n_worker = scheduler_n_process, worker_n_process

if n_scheduler is None and n_worker is None:
n_scheduler = cls.MIN_SCHEDULER_N_PROCESS
n_process = n_process if n_process is not None else calc_cpu_count() + n_scheduler
n_worker = max(n_process - n_scheduler, cls.MIN_WORKER_N_PROCESS)
elif n_scheduler is None or n_worker is None:
# one of scheduler and worker n_process provided
if n_scheduler is None:
n_process = n_process if n_process is not None else calc_cpu_count()
n_scheduler = max(n_process - n_worker, cls.MIN_SCHEDULER_N_PROCESS)
else:
assert n_worker is None
n_process = n_process if n_process is not None else calc_cpu_count() + n_scheduler
n_worker = max(n_process - n_scheduler, cls.MIN_WORKER_N_PROCESS)

return n_scheduler, n_worker

def _make_sure_scheduler_ready(self):
while True:
workers_meta = self._scheduler_service._resource_ref.get_workers_meta()
if not workers_meta:
# wait for worker to report status
self._pool.sleep(.5)
else:
break

def start_service(self):
if self._started:
return
self._started = True

# start plasma
self._worker_service.start_plasma(self._worker_service.cache_memory_limit())

# start actor pool
n_process = self._scheduler_n_process + self._worker_n_process
distributor = gen_distributor(self._scheduler_n_process, self._worker_n_process)
self._pool = create_actor_pool(self._endpoint, n_process, distributor=distributor)

# start scheduler first
self._scheduler_service.start(self._endpoint, self._pool)

# start worker next
self._worker_service.start_local(self._endpoint, self._pool, self._scheduler_n_process)

# make sure scheduler is ready
self._make_sure_scheduler_ready()

def stop_service(self):
if self._stopped:
return

self._stopped = True
try:
self._scheduler_service.stop(self._pool)
self._worker_service.stop()
finally:
self._pool.stop()

def serve_forever(self):
try:
self._pool.join()
finally:
self.stop_service()

def __enter__(self):
self.start_service()
return self

def __exit__(self, *_):
self.stop_service()


def gen_endpoint(address):
port = None
tries = 5 # retry for 5 times

for i in range(tries):
try:
port = get_next_port()
break
except SystemError:
if i < tries - 1:
continue
raise

return '{0}:{1}'.format(address, port)


def _start_cluster(endpoint, event, n_process=None, **kw):
cluster = LocalDistributedCluster(endpoint, n_process=n_process, **kw)
cluster.start_service()
event.set()
try:
cluster.serve_forever()
finally:
cluster.stop_service()


def _start_web(scheduler_address, ui_port, event):
import gevent.monkey
gevent.monkey.patch_all(thread=False)

from ...web import MarsWeb

web = MarsWeb(ui_port, scheduler_address)
try:
web.start(event=event, block=True)
finally:
web.stop()


class LocalDistributedClusterClient(object):
def __init__(self, endpoint, web_endpoint, cluster_process, web_process):
self._cluster_process = cluster_process
self._web_process = web_process
self._endpoint = endpoint
self._web_endpoint = web_endpoint
self._session = new_session(endpoint).as_default()

@property
def endpoint(self):
return self._endpoint

@property
def web_endpoint(self):
return self._web_endpoint

@property
def session(self):
return self._session

def __enter__(self):
return self

def __exit__(self, *_):
self.stop()

def stop(self):
if self._cluster_process.is_alive():
os.kill(self._cluster_process.pid, signal.SIGINT)
self._cluster_process.join(3)
if self._cluster_process.is_alive():
self._cluster_process.terminate()
if self._web_process is not None and self._web_process.is_alive():
os.kill(self._web_process.pid, signal.SIGINT)
self._web_process.join(3)
if self._web_process.is_alive():
self._web_process.terminate()


def new_cluster(address='0.0.0.0', web=False, n_process=None, **kw):
endpoint = gen_endpoint(address)
web_endpoint = None
if web is True:
web_endpoint = gen_endpoint('0.0.0.0')
elif isinstance(web, six.string_types):
if ':' in web:
web_endpoint = web
else:
web_endpoint = gen_endpoint(web)

event = multiprocessing.Event()
kw['n_process'] = n_process
process = gipc.start_process(_start_cluster, args=(endpoint, event), kwargs=kw)

while True:
event.wait(5)
if not event.is_set():
# service not started yet
continue
if not process.is_alive():
raise SystemError('New local cluster failed')
else:
break

web_process = None
if web_endpoint:
web_event = multiprocessing.Event()
ui_port = int(web_endpoint.rsplit(':', 1)[1])
web_process = gipc.start_process(_start_web, args=(endpoint, ui_port, web_event), daemon=True)

while True:
web_event.wait(5)
if not web_event.is_set():
# web not started yet
continue
if not web_process.is_alive():
raise SystemError('New web interface failed')
else:
break

return LocalDistributedClusterClient(endpoint, web_endpoint, process, web_process)
37 changes: 37 additions & 0 deletions mars/deploy/local/distributor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2018 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ...compat import functools32, six
from ...actors import Distributor
from ...scheduler.distributor import SchedulerDistributor
from ...worker.distributor import WorkerDistributor


def gen_distributor(scheduler_n_process, worker_n_process):
class LocalClusterDistributor(Distributor):
def __init__(self, n_process):
super(LocalClusterDistributor, self).__init__(n_process)
self._scheduler_distributor = SchedulerDistributor(scheduler_n_process)
self._worker_distributor = WorkerDistributor(worker_n_process)

@functools32.lru_cache(100)
def distribute(self, uid):
if isinstance(uid, six.string_types) and uid.startswith('w:'):
return self._worker_distributor.distribute(uid) + scheduler_n_process

return self._scheduler_distributor.distribute(uid)

return LocalClusterDistributor(scheduler_n_process + worker_n_process)

0 comments on commit 9637373

Please sign in to comment.