Skip to content

Commit

Permalink
limit shared memory to 20% when starting local cluster (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
qinxuye authored and wjsi committed Dec 18, 2018
1 parent 45f9166 commit 1028eeb
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
2 changes: 1 addition & 1 deletion mars/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def validate(x):
default_options.register_option('worker.io_process_count', None, validator=(is_null, is_integer))
default_options.register_option('worker.physical_memory_limit_soft', '75%', validator=(is_null, is_string, is_integer))
default_options.register_option('worker.physical_memory_limit_hard', '90%', validator=(is_null, is_string, is_integer))
default_options.register_option('worker.cache_memory_limit', '48%', validator=(is_null, is_string, is_integer))
default_options.register_option('worker.cache_memory_limit', None, validator=(is_null, is_string, is_integer))
default_options.register_option('worker.disk_limit', None, validator=(is_null, is_string))
default_options.register_option('worker.spill_directory', None, validator=(is_null, is_string, is_list))
default_options.register_option('worker.min_spill_size', '5%', validator=(is_string, is_integer))
Expand Down
15 changes: 11 additions & 4 deletions mars/deploy/local/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ...session import new_session
from ...compat import six
from ...lib import gipc
from ...config import options
from .distributor import gen_distributor


Expand All @@ -36,12 +37,16 @@ class LocalDistributedCluster(object):
MIN_WORKER_N_PROCESS = 2

def __init__(self, endpoint, n_process=None,
scheduler_n_process=None, worker_n_process=None):
scheduler_n_process=None, worker_n_process=None,
shared_memory=None):
self._endpoint = endpoint

self._started = False
self._stopped = False

if shared_memory is not None:
options.worker.cache_memory_limit = shared_memory

self._pool = None
self._scheduler_service = SchedulerService()
self._worker_service = WorkerService()
Expand Down Expand Up @@ -148,8 +153,9 @@ def gen_endpoint(address):
return '{0}:{1}'.format(address, port)


def _start_cluster(endpoint, event, n_process=None, **kw):
cluster = LocalDistributedCluster(endpoint, n_process=n_process, **kw)
def _start_cluster(endpoint, event, n_process=None, shared_memory=None, **kw):
cluster = LocalDistributedCluster(endpoint, n_process=n_process,
shared_memory=shared_memory, **kw)
cluster.start_service()
event.set()
try:
Expand Down Expand Up @@ -210,7 +216,7 @@ def stop(self):
self._web_process.terminate()


def new_cluster(address='0.0.0.0', web=False, n_process=None, **kw):
def new_cluster(address='0.0.0.0', web=False, n_process=None, shared_memory=None, **kw):
endpoint = gen_endpoint(address)
web_endpoint = None
if web is True:
Expand All @@ -223,6 +229,7 @@ def new_cluster(address='0.0.0.0', web=False, n_process=None, **kw):

event = multiprocessing.Event()
kw['n_process'] = n_process
kw['shared_memory'] = shared_memory or options.worker.cache_memory_limit or '20%'
process = gipc.start_process(_start_cluster, args=(endpoint, event), kwargs=kw)

while True:
Expand Down
2 changes: 1 addition & 1 deletion mars/worker/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def create_pool(self, *args, **kwargs):
self.args.phy_mem or options.worker.physical_memory_limit_hard, mem_stats.total
)
options.worker.physical_memory_limit_soft = self._calc_size_limit(
self.args.phy_mem or options.worker.physical_memory_limit_soft, mem_stats.total
self.args.phy_mem or options.worker.physical_memory_limit_soft or '48%', mem_stats.total
)
options.worker.cache_memory_limit = self._calc_size_limit(
self.args.cache_mem or options.worker.cache_memory_limit, mem_stats.total
Expand Down

0 comments on commit 1028eeb

Please sign in to comment.