Skip to content

Commit

Permalink
Added MAX_ENGINE_PER_NODE for IPCluster plugin
Browse files Browse the repository at this point in the history
This config-file level option specifies the maximum number of IPython
engines to start per node. Applies to both master and worker nodes.
Must be an integer >= 1. Note: This does NOT update the
IPClusterRestartEngines and IPClusterStop convenience plugins.
  • Loading branch information
nkrumm committed Nov 1, 2013
1 parent f8e04f7 commit 9670dde
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions starcluster/plugins/ipcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class IPCluster(DefaultClusterSetup):
"""
def __init__(self, enable_notebook=False, notebook_passwd=None,
notebook_directory=None, packer=None, log_level='INFO'):
notebook_directory=None, packer=None, log_level='INFO',
max_engines_per_node=None):
super(IPCluster, self).__init__()
if isinstance(enable_notebook, basestring):
self.enable_notebook = enable_notebook.lower().strip() == 'true'
Expand All @@ -105,6 +106,17 @@ def __init__(self, enable_notebook=False, notebook_passwd=None,
self.packer = None
else:
self.packer = packer
self.max_engines_per_node = max_engines_per_node
if self.max_engines_per_node is not None:
try:
self.max_engines_per_node = int(self.max_engines_per_node)
except ValueError:
raise exception.PluginError(
"MAX_ENGINES_PER_NODE must be an integer!")
if self.max_engines_per_node < 1:
raise exception.PluginError(
"MAX_ENGINES_PER_NODE must be >= 1!")


def _check_ipython_installed(self, node):
has_ipy = node.ssh.has_required(['ipython', 'ipcluster'])
Expand Down Expand Up @@ -164,6 +176,8 @@ def _write_config(self, master, user, profile_dir):

def _start_cluster(self, master, profile_dir):
n_engines = max(1, master.num_processors - 1)
if self.max_engines_per_node:
n_engines = min(self.max_engines_per_node, n_engines)
log.info("Starting the IPython controller and %i engines on master"
% n_engines)
# cleanup existing connection files, to prevent their use
Expand Down Expand Up @@ -286,12 +300,15 @@ def run(self, nodes, master, user, user_shell, volumes):
cfile, n_engines_master = self._start_cluster(master, profile_dir)
# Start engines on each of the non-master nodes
non_master_nodes = [node for node in nodes if not node.is_master()]
n_engines_non_master = 0
for node in non_master_nodes:
n_engines = node.num_processors
if self.max_engines_per_node:
n_engines = min(self.max_engines_per_node, n_engines)
self.pool.simple_job(
_start_engines, (node, user, node.num_processors),
_start_engines, (node, user, n_engines),
jobid=node.alias)
n_engines_non_master = sum(node.num_processors
for node in non_master_nodes)
n_engines_non_master += n_engines
if len(non_master_nodes) > 0:
log.info("Adding %d engines on %d nodes",
n_engines_non_master, len(non_master_nodes))
Expand All @@ -309,8 +326,10 @@ def run(self, nodes, master, user, user_shell, volumes):
def on_add_node(self, node, nodes, master, user, user_shell, volumes):
self._check_ipython_installed(node)
n_engines = node.num_processors
if self.max_engines_per_node:
n_engines = min(self.max_engines_per_node, n_engines)
log.info("Adding %d engines on %s", n_engines, node.alias)
_start_engines(node, user)
_start_engines(node, user, n_engines)

def on_remove_node(self, node, nodes, master, user, user_shell, volumes):
raise NotImplementedError("on_remove_node method not implemented")
Expand Down

1 comment on commit 9670dde

@nkrumm
Copy link
Owner Author

@nkrumm nkrumm commented on 9670dde Nov 1, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Title of commit gets the variable name wrong, should be MAX_ENGINES_PER_NODE.

Please sign in to comment.