Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add cluster-id for multiple cluster instances per profile #795

Closed
wants to merge 5 commits into from

2 participants

@minrk
Owner

Default behavior is unchanged, but specifying a '--cluster-id' arg will allow starting additional clusters within a single profile.

I also cleaned up the code in parallel.apps.launchers a fair amount, by adding a few Mixin classes to consolidate some of the extremely repetitive code.

This will need some testing in various environments, to catch possible typos from the changes, but it's a major improvement overall.

The consolidation also fixes a few inconsistencies, such as the MPI launchers using program/program_args, while the local launchers used controller_cmd/controller_args. Now the names are more sensible and consistent across launcher types.

When checking the generated config files, I also fixed a small aesthetic issue when printing the class list from which a Configurable will inherit configuration. It now excludes any base class that doesn't have anything to inherit.

Should close #794

IPython/parallel/apps/baseapp.py
@@ -116,6 +117,18 @@ class BaseParallelApplication(BaseIPythonApplication):
log_url = Unicode('', config=True,
help="The ZMQ URL of the iplogger to aggregate logging.")
+ cluster_id = Unicode('', config=True,
+ help="""String id to add to runtime files, to prevent name collisions when
+ using multiple clusters with a single profile.
@fperez Owner
fperez added a note

Perhaps add a hint here to users on what they can/should use as id's? I'm sure that strings with spaces in them are likely to cause grief somewhere down the road, but any alphanumeric id is probably OK. So just a quick hint would help new users know what kind of value is sensible here.

@minrk Owner
minrk added a note

Pretty much any text should be fine. I tried it with --cluster-id "hello world", and there's no problem. Obviously, it has all the usual inconveniences of filenames with spaces, but it does work, at least in the quick few cases I've tried.

I'll add a bit more detail to the message, though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@fperez fperez commented on the diff
IPython/parallel/apps/baseapp.py
@@ -116,6 +117,18 @@ class BaseParallelApplication(BaseIPythonApplication):
log_url = Unicode('', config=True,
help="The ZMQ URL of the iplogger to aggregate logging.")
+ cluster_id = Unicode('', config=True,
+ help="""String id to add to runtime files, to prevent name collisions when
+ using multiple clusters with a single profile.
+
+ When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
+ """
+ )
+ def _cluster_id_changed(self, name, old, new):
+ self.name = self.__class__.name
+ if new:
+ self.name += '-%s'%new
@fperez Owner
fperez added a note

Type this as

'-%s' % new

(note spaces) for readability. I'm not super picky with things like 2 + 3 * 4, which I actually find hinder readability, but string interpolation is one case where I find that sticking to the 'whitespace around operators' guideline does significantly help, since otherwise the format specification blends visually a lot with the rhs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
IPython/parallel/apps/ipengineapp.py
@@ -158,7 +158,15 @@ class IPEngineApp(BaseParallelApplication):
controller and engine are started at the same time and it
may take a moment for the controller to write the connector files.""")
- url_file_name = Unicode(u'ipcontroller-engine.json')
+ url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
+
+ def _cluster_id_changed(self, name, old, new):
+ if new:
+ base = 'ipcontroller-%s'%new
@fperez Owner
fperez added a note

As above for string interpolation, same below... Won't repeat further to keep noise down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@fperez fperez commented on the diff
IPython/parallel/apps/launcher.py
@@ -213,6 +215,33 @@ class BaseLauncher(LoggingConfigurable):
"""
raise NotImplementedError('signal must be implemented in a subclass')
+class ClusterAppMixin(HasTraits):
@fperez Owner
fperez added a note

Good idea to have these mixins, I'm glad to see less of that code repetition.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
IPython/parallel/apps/launcher.py
((15 lines not shown))
-class PBSEngineSetLauncher(PBSLauncher):
+class PBSEngineSetLauncher(BatchClusterAppMixin, PBSLauncher):
@fperez Owner
fperez added a note

Any particular reason the mixin class is first on this one? Typically (and in the ones above) mixins go last, so I'm curious...

@minrk Owner
minrk added a note

it was to get the changed default value for the context to inherit properly. I don't know if it's a traitlets bug or what, but see this example:

class Base(HasTraits):
    a = Int(5)

class Mixin(HasTraits):
    def _a_default(self):
        return 10

class A(Base, Mixin):
    pass

class B(Mixin, Base):
    pass

a = A()
print a.a # 5
b = B()
print b.a # 10

So declaring _a_default() in Mixin does not result in the same behavior as declaring it in A or Base.

Now, an alternative would be to just use the b._trait_values dict, instead of a special context dict, and give the template access to all of the launcher's traits. That would mean I don't have to do these initialization shenanigans.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
IPython/parallel/apps/launcher.py
@@ -979,7 +998,7 @@ class SGELauncher(PBSLauncher):
queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
queue_template = Unicode('#$ -q {queue}')
-class SGEControllerLauncher(SGELauncher):
+class SGEControllerLauncher(BatchClusterAppMixin, SGELauncher):
@fperez Owner
fperez added a note

Same question about mixin order, and same below; won't repeat everywhere to reduce noise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@fperez
Owner

Code-wise, this looks like a no-brainer to me, modulo minor comments above. Testing-wise though, what testing scenarios do you have in mind before feeling comfortable merging it? Can I help with some of that, or should we ping the list to try to recruit a few testers?

minrk added some commits
@minrk minrk add cluster_id to parallel apps
This allows multiple controller instances per profile.  Default behavior remains unchanged.
4079cf8
@minrk minrk add cluster_id support to ipcluster/launchers
This includes a moderate reorganization of the common launcher args.

start() no longer takes profile_dir, which is now a trait,
as is cluster_id.   This is implemented via small Mixin classes,
consolidating many duplicated controller_cmd/args / engine_cmd/args lines.
8f563c9
@minrk minrk cleanup inheritance line in auto-config files
rather than explicitly excluding particular base classes,
exclude any base classes that have no config traits to inherit.
7a02b90
@minrk
Owner

I've made the changes mentioned. Since there aren't any tests for launchers, and they are not used very often, errors can go unnoticed. So I just want to try a few simple runs with at least SGE/SSH/MPI (the systems I have available to me), to make sure I haven't messed things up.

I would like real tests for the launchers, but since launchers can't run without their various environments (PBS, SGE, etc.) available, unit tests don't have much value.

@minrk minrk parallel.apps cleanup per review
STY: added spaces in string formatting calls
STY: expanded helpstring for cluster-id

ENH: allow BatchClusterAppMixin to be the second inherited class
d24b9a6
@fperez
Owner

OK, as far as I'm concerned, this is good to go in, once you feel comfortable with whatever testing you can do on your own. Because of this very issue, this is the kind of code that should go in as early as possible, so that we get the kind of 'in the wild' testing that is hard to automate. The longer it is in the master branch, the higher the chances of someone running it in an environment we don't have access to and pointing out potential problems.

So go from my side, merge when you're comfortable. Thanks!

@minrk minrk allow launcher specification by prefix alone
e.g. ipcluster start --engines=SGE
9937d3c
@minrk minrk referenced this pull request from a commit
@minrk minrk Merge PR #795 (cluster-id and launcher cleanup)
closes gh-795
3994edb
@minrk minrk closed this in 3994edb
@ellisonbg ellisonbg referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@mattvonrocketstein mattvonrocketstein referenced this pull request from a commit in mattvonrocketstein/ipython
@minrk minrk Merge PR #795 (cluster-id and launcher cleanup)
closes gh-795
e8422d6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 16, 2011
  1. @minrk

    add cluster_id to parallel apps

    minrk authored
    This allows multiple controller instances per profile.  Default behavior remains unchanged.
  2. @minrk

    add cluster_id support to ipcluster/launchers

    minrk authored
    This includes a moderate reorganization of the common launcher args.
    
    start() no longer takes profile_dir, which is now a trait,
    as is cluster_id.   This is implemented via small Mixin classes,
    consolidating many duplicated controller_cmd/args / engine_cmd/args lines.
  3. @minrk

    cleanup inheritance line in auto-config files

    minrk authored
    rather than explicitly excluding particular base classes,
    exclude any base classes that have no config traits to inherit.
  4. @minrk

    parallel.apps cleanup per review

    minrk authored
    STY: added spaces in string formatting calls
    STY: expanded helpstring for cluster-id
    
    ENH: allow BatchClusterAppMixin to be the second inherited class
  5. @minrk

    allow launcher specification by prefix alone

    minrk authored
    e.g. ipcluster start --engines=SGE
This page is out of date. Refresh to see the latest.
View
5 IPython/config/configurable.py
@@ -207,8 +207,9 @@ def c(s):
for parent in cls.mro():
# only include parents that are not base classes
# and are not the class itself
- if issubclass(parent, Configurable) and \
- not parent in (Configurable, SingletonConfigurable, cls):
+ # and have some configurable traits to inherit
+ if parent is not cls and issubclass(parent, Configurable) and \
+ parent.class_traits(config=True):
parents.append(parent)
if parents:
View
17 IPython/parallel/apps/baseapp.py
@@ -75,6 +75,7 @@ def __init__(self, app):
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
+ 'cluster-id' : 'BaseParallelApplication.cluster_id',
})
base_flags = {
@@ -116,6 +117,22 @@ def _work_dir_changed(self, name, old, new):
log_url = Unicode('', config=True,
help="The ZMQ URL of the iplogger to aggregate logging.")
+ cluster_id = Unicode('', config=True,
+ help="""String id to add to runtime files, to prevent name collisions when
+ using multiple clusters with a single profile simultaneously.
+
+ When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
+
+ Since this is text inserted into filenames, typical recommendations apply:
+ Simple character strings are ideal, and spaces are not recommended (but should
+ generally work).
+ """
+ )
+ def _cluster_id_changed(self, name, old, new):
+ self.name = self.__class__.name
+ if new:
+ self.name += '-%s'%new
@fperez Owner
fperez added a note

Type this as

'-%s' % new

(note spaces) for readability. I'm not super picky with things like 2 + 3 * 4, which I actually find hinder readability, but string interpolation is one case where I find that sticking to the 'whitespace around operators' guideline does significantly help, since otherwise the format specification blends visually a lot with the rhs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
def _config_files_default(self):
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
View
25 IPython/parallel/apps/ipclusterapp.py
@@ -275,19 +275,22 @@ def initialize(self, argv=None):
self.init_launchers()
def init_launchers(self):
- self.engine_launcher = self.build_launcher(self.engine_launcher_class)
+ self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
self.engine_launcher.on_stop(lambda r: self.loop.stop())
def init_signal(self):
# Setup signals
signal.signal(signal.SIGINT, self.sigint_handler)
- def build_launcher(self, clsname):
+ def build_launcher(self, clsname, kind=None):
"""import and instantiate a Launcher based on importstring"""
if '.' not in clsname:
# not a module, presume it's the raw name in apps.launcher
+ if kind and kind not in clsname:
+ # doesn't match necessary full class name, assume it's
+ # just 'PBS' or 'MPIExec' prefix:
+ clsname = clsname + kind + 'Launcher'
clsname = 'IPython.parallel.apps.launcher.'+clsname
- # print repr(clsname)
try:
klass = import_item(clsname)
except (ImportError, KeyError):
@@ -295,16 +298,14 @@ def build_launcher(self, clsname):
self.exit(1)
launcher = klass(
- work_dir=u'.', config=self.config, log=self.log
+ work_dir=u'.', config=self.config, log=self.log,
+ profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
)
return launcher
def start_engines(self):
self.log.info("Starting %i engines"%self.n)
- self.engine_launcher.start(
- self.n,
- self.profile_dir.location
- )
+ self.engine_launcher.start(self.n)
def stop_engines(self):
self.log.info("Stopping Engines...")
@@ -424,14 +425,12 @@ def _classes_default(self,):
aliases = Dict(start_aliases)
def init_launchers(self):
- self.controller_launcher = self.build_launcher(self.controller_launcher_class)
- self.engine_launcher = self.build_launcher(self.engine_launcher_class)
+ self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
+ self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
self.controller_launcher.on_stop(self.stop_launchers)
def start_controller(self):
- self.controller_launcher.start(
- self.profile_dir.location
- )
+ self.controller_launcher.start()
def stop_controller(self):
# self.log.info("In stop_controller")
View
21 IPython/parallel/apps/ipcontrollerapp.py
@@ -174,7 +174,18 @@ class IPControllerApp(BaseParallelApplication):
use_threads = Bool(False, config=True,
help='Use threads instead of processes for the schedulers',
- )
+ )
+
+ engine_json_file = Unicode('ipcontroller-engine.json', config=True,
+ help="JSON filename where engine connection info will be stored.")
+ client_json_file = Unicode('ipcontroller-client.json', config=True,
+ help="JSON filename where client connection info will be stored.")
+
+ def _cluster_id_changed(self, name, old, new):
+ super(IPControllerApp, self)._cluster_id_changed(name, old, new)
+ self.engine_json_file = "%s-engine.json" % self.name
+ self.client_json_file = "%s-client.json" % self.name
+
# internal
children = List()
@@ -215,7 +226,7 @@ def load_config_from_json(self):
"""load config from existing json connector files."""
c = self.config
# load from engine config
- with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
+ with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
cfg = json.loads(f.read())
key = c.Session.key = asbytes(cfg['exec_key'])
xport,addr = cfg['url'].split('://')
@@ -227,7 +238,7 @@ def load_config_from_json(self):
if not self.engine_ssh_server:
self.engine_ssh_server = cfg['ssh']
# load client config
- with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
+ with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
cfg = json.loads(f.read())
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
xport,addr = cfg['url'].split('://')
@@ -277,11 +288,11 @@ def init_hub(self):
'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
'location' : self.location
}
- self.save_connection_dict('ipcontroller-client.json', cdict)
+ self.save_connection_dict(self.client_json_file, cdict)
edict = cdict
edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
edict['ssh'] = self.engine_ssh_server
- self.save_connection_dict('ipcontroller-engine.json', edict)
+ self.save_connection_dict(self.engine_json_file, edict)
#
def init_schedulers(self):
View
16 IPython/parallel/apps/ipengineapp.py
@@ -93,7 +93,7 @@ class MPI(Configurable):
help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
)
- def _on_use_changed(self, old, new):
+ def _use_changed(self, name, old, new):
# load default init script if it's not set
if not self.init_script:
self.init_script = self.default_inits.get(new, '')
@@ -135,8 +135,8 @@ def _on_use_changed(self, old, new):
class IPEngineApp(BaseParallelApplication):
- name = Unicode(u'ipengine')
- description = Unicode(_description)
+ name = 'ipengine'
+ description = _description
examples = _examples
config_file_name = Unicode(default_config_file_name)
classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
@@ -158,7 +158,15 @@ class IPEngineApp(BaseParallelApplication):
controller and engine are started at the same time and it
may take a moment for the controller to write the connector files.""")
- url_file_name = Unicode(u'ipcontroller-engine.json')
+ url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
+
+ def _cluster_id_changed(self, name, old, new):
+ if new:
+ base = 'ipcontroller-%s' % new
+ else:
+ base = 'ipcontroller'
+ self.url_file_name = "%s-engine.json" % base
+
log_url = Unicode('', config=True,
help="""The URL for the iploggerapp instance, for forwarding
logging to a central location.""")
View
300 IPython/parallel/apps/launcher.py
@@ -57,7 +57,9 @@ def check_output(*args, **kwargs):
from IPython.config.application import Application
from IPython.config.configurable import LoggingConfigurable
from IPython.utils.text import EvalFormatter
-from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
+from IPython.utils.traitlets import (
+ Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
+)
from IPython.utils.path import get_ipython_module_path
from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
@@ -213,6 +215,33 @@ def signal(self, sig):
"""
raise NotImplementedError('signal must be implemented in a subclass')
+class ClusterAppMixin(HasTraits):
@fperez Owner
fperez added a note

Good idea to have these mixins, I'm glad to see less of that code repetition.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ """MixIn for cluster args as traits"""
+ cluster_args = List([])
+ profile_dir=Unicode('')
+ cluster_id=Unicode('')
+ def _profile_dir_changed(self, name, old, new):
+ self.cluster_args = []
+ if self.profile_dir:
+ self.cluster_args.extend(['--profile-dir', self.profile_dir])
+ if self.cluster_id:
+ self.cluster_args.extend(['--cluster-id', self.cluster_id])
+ _cluster_id_changed = _profile_dir_changed
+
+class ControllerMixin(ClusterAppMixin):
+ controller_cmd = List(ipcontroller_cmd_argv, config=True,
+ help="""Popen command to launch ipcontroller.""")
+ # Command line arguments to ipcontroller.
+ controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
+ help="""command-line args to pass to ipcontroller""")
+
+class EngineMixin(ClusterAppMixin):
+ engine_cmd = List(ipengine_cmd_argv, config=True,
+ help="""command to launch the Engine.""")
+ # Command line arguments for ipengine.
+ engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
+ help="command-line arguments to pass to ipengine"
+ )
#-----------------------------------------------------------------------------
# Local process launchers
@@ -317,54 +346,28 @@ def poll(self):
self.notify_stop(dict(exit_code=status, pid=self.process.pid))
return status
-class LocalControllerLauncher(LocalProcessLauncher):
+class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
"""Launch a controller as a regular external process."""
- controller_cmd = List(ipcontroller_cmd_argv, config=True,
- help="""Popen command to launch ipcontroller.""")
- # Command line arguments to ipcontroller.
- controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="""command-line args to pass to ipcontroller""")
-
def find_args(self):
- return self.controller_cmd + self.controller_args
+ return self.controller_cmd + self.cluster_args + self.controller_args
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile_dir."""
- self.controller_args.extend(['--profile-dir=%s'%profile_dir])
- self.profile_dir = unicode(profile_dir)
self.log.info("Starting LocalControllerLauncher: %r" % self.args)
return super(LocalControllerLauncher, self).start()
-class LocalEngineLauncher(LocalProcessLauncher):
+class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
"""Launch a single engine as a regular externall process."""
- engine_cmd = List(ipengine_cmd_argv, config=True,
- help="""command to launch the Engine.""")
- # Command line arguments for ipengine.
- engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="command-line arguments to pass to ipengine"
- )
-
def find_args(self):
- return self.engine_cmd + self.engine_args
+ return self.engine_cmd + self.cluster_args + self.engine_args
- def start(self, profile_dir):
- """Start the engine by profile_dir."""
- self.engine_args.extend(['--profile-dir=%s'%profile_dir])
- self.profile_dir = unicode(profile_dir)
- return super(LocalEngineLauncher, self).start()
-
-class LocalEngineSetLauncher(BaseLauncher):
+class LocalEngineSetLauncher(LocalEngineLauncher):
"""Launch a set of engines as regular external processes."""
- # Command line arguments for ipengine.
- engine_args = List(
- ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="command-line arguments to pass to ipengine"
- )
delay = CFloat(0.1, config=True,
help="""delay (in seconds) between starting each engine after the first.
This can help force the engines to get their ids in order, or limit
@@ -383,26 +386,26 @@ def __init__(self, work_dir=u'.', config=None, **kwargs):
)
self.stop_data = {}
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
- self.profile_dir = unicode(profile_dir)
dlist = []
for i in range(n):
if i > 0:
time.sleep(self.delay)
- el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
+ el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
+ profile_dir=self.profile_dir, cluster_id=self.cluster_id,
+ )
+
# Copy the engine args over to each engine launcher.
+ el.engine_cmd = copy.deepcopy(self.engine_cmd)
el.engine_args = copy.deepcopy(self.engine_args)
el.on_stop(self._notice_engine_stopped)
- d = el.start(profile_dir)
+ d = el.start()
if i==0:
self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
self.launchers[i] = el
dlist.append(d)
self.notify_start(dlist)
- # The consumeErrors here could be dangerous
- # dfinal = gatherBoth(dlist, consumeErrors=True)
- # dfinal.addCallback(self.notify_start)
return dlist
def find_args(self):
@@ -413,7 +416,6 @@ def signal(self, sig):
for el in self.launchers.itervalues():
d = el.signal(sig)
dlist.append(d)
- # dfinal = gatherBoth(dlist, consumeErrors=True)
return dlist
def interrupt_then_kill(self, delay=1.0):
@@ -421,7 +423,6 @@ def interrupt_then_kill(self, delay=1.0):
for el in self.launchers.itervalues():
d = el.interrupt_then_kill(delay)
dlist.append(d)
- # dfinal = gatherBoth(dlist, consumeErrors=True)
return dlist
def stop(self):
@@ -452,9 +453,9 @@ class MPIExecLauncher(LocalProcessLauncher):
mpi_args = List([], config=True,
help="The command line arguments to pass to mpiexec."
)
- program = List(['date'], config=True,
+ program = List(['date'],
help="The program to start via mpiexec.")
- program_args = List([], config=True,
+ program_args = List([],
help="The command line argument to the program."
)
n = Int(1)
@@ -470,44 +471,42 @@ def start(self, n):
return super(MPIExecLauncher, self).start()
-class MPIExecControllerLauncher(MPIExecLauncher):
+class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
"""Launch a controller using mpiexec."""
- controller_cmd = List(ipcontroller_cmd_argv, config=True,
- help="Popen command to launch the Contropper"
- )
- controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="Command line arguments to pass to ipcontroller."
- )
- n = Int(1)
+ # alias back to *non-configurable* program[_args] for use in find_args()
+ # this way all Controller/EngineSetLaunchers have the same form, rather
+ # than *some* having `program_args` and others `controller_args`
+ @property
+ def program(self):
+ return self.controller_cmd
+
+ @property
+ def program_args(self):
+ return self.cluster_args + self.controller_args
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile_dir."""
- self.controller_args.extend(['--profile-dir=%s'%profile_dir])
- self.profile_dir = unicode(profile_dir)
self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
return super(MPIExecControllerLauncher, self).start(1)
- def find_args(self):
- return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
- self.controller_cmd + self.controller_args
-
-class MPIExecEngineSetLauncher(MPIExecLauncher):
+class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
+ """Launch engines using mpiexec"""
- program = List(ipengine_cmd_argv, config=True,
- help="Popen command for ipengine"
- )
- program_args = List(
- ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="Command line arguments for ipengine."
- )
- n = Int(1)
+ # alias back to *non-configurable* program[_args] for use in find_args()
+ # this way all Controller/EngineSetLaunchers have the same form, rather
+ # than *some* having `program_args` and others `controller_args`
+ @property
+ def program(self):
+ return self.engine_cmd
+
+ @property
+ def program_args(self):
+ return self.cluster_args + self.engine_args
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
- self.program_args.extend(['--profile-dir=%s'%profile_dir])
- self.profile_dir = unicode(profile_dir)
self.n = n
self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
return super(MPIExecEngineSetLauncher, self).start(n)
@@ -530,9 +529,9 @@ class SSHLauncher(LocalProcessLauncher):
help="command for starting ssh")
ssh_args = List(['-tt'], config=True,
help="args to pass to ssh")
- program = List(['date'], config=True,
+ program = List(['date'],
help="Program to launch via ssh")
- program_args = List([], config=True,
+ program_args = List([],
help="args to pass to remote program")
hostname = Unicode('', config=True,
help="hostname on which to launch the program")
@@ -554,8 +553,7 @@ def find_args(self):
return self.ssh_cmd + self.ssh_args + [self.location] + \
self.program + self.program_args
- def start(self, profile_dir, hostname=None, user=None):
- self.profile_dir = unicode(profile_dir)
+ def start(self, hostname=None, user=None):
if hostname is not None:
self.hostname = hostname
if user is not None:
@@ -571,22 +569,33 @@ def signal(self, sig):
-class SSHControllerLauncher(SSHLauncher):
+class SSHControllerLauncher(SSHLauncher, ControllerMixin):
- program = List(ipcontroller_cmd_argv, config=True,
- help="remote ipcontroller command.")
- program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="Command line arguments to ipcontroller.")
+ # alias back to *non-configurable* program[_args] for use in find_args()
+ # this way all Controller/EngineSetLaunchers have the same form, rather
+ # than *some* having `program_args` and others `controller_args`
+ @property
+ def program(self):
+ return self.controller_cmd
+
+ @property
+ def program_args(self):
+ return self.cluster_args + self.controller_args
-class SSHEngineLauncher(SSHLauncher):
- program = List(ipengine_cmd_argv, config=True,
- help="remote ipengine command.")
- # Command line arguments for ipengine.
- program_args = List(
- ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
- help="Command line arguments to ipengine."
- )
+class SSHEngineLauncher(SSHLauncher, EngineMixin):
+
+ # alias back to *non-configurable* program[_args] for use in find_args()
+ # this way all Controller/EngineSetLaunchers have the same form, rather
+ # than *some* having `program_args` and others `controller_args`
+ @property
+ def program(self):
+ return self.engine_cmd
+
+ @property
+ def program_args(self):
+ return self.cluster_args + self.engine_args
+
class SSHEngineSetLauncher(LocalEngineSetLauncher):
launcher_class = SSHEngineLauncher
@@ -594,12 +603,11 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher):
help="""dict of engines to launch. This is a dict by hostname of ints,
corresponding to the number of engines to start on that host.""")
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start engines by profile or profile_dir.
`n` is ignored, and the `engines` config property is used instead.
"""
- self.profile_dir = unicode(profile_dir)
dlist = []
for host, n in self.engines.iteritems():
if isinstance(n, (tuple, list)):
@@ -614,13 +622,15 @@ def start(self, n, profile_dir):
for i in range(n):
if i > 0:
time.sleep(self.delay)
- el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
+ el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
+ profile_dir=self.profile_dir, cluster_id=self.cluster_id,
+ )
# Copy the engine args over to each engine launcher.
- i
- el.program_args = args
+ el.engine_cmd = self.engine_cmd
+ el.engine_args = args
el.on_stop(self._notice_engine_stopped)
- d = el.start(profile_dir, user=user, hostname=host)
+ d = el.start(user=user, hostname=host)
if i==0:
self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
self.launchers[host+str(i)] = el
@@ -727,11 +737,11 @@ def stop(self):
return output
-class WindowsHPCControllerLauncher(WindowsHPCLauncher):
+class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
help="WinHPC xml job file.")
- extra_args = List([], config=False,
+ controller_args = List([], config=False,
help="extra args to pass to ipcontroller")
def write_job_file(self, n):
@@ -743,7 +753,8 @@ def write_job_file(self, n):
# files that the scheduler redirects to.
t.work_directory = self.profile_dir
# Add the profile_dir and from self.start().
- t.controller_args.extend(self.extra_args)
+ t.controller_args.extend(self.cluster_args)
+ t.controller_args.extend(self.controller_args)
job.add_task(t)
self.log.info("Writing job description file: %s" % self.job_file)
@@ -753,18 +764,16 @@ def write_job_file(self, n):
def job_file(self):
return os.path.join(self.profile_dir, self.job_file_name)
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile_dir."""
- self.extra_args = ['--profile-dir=%s'%profile_dir]
- self.profile_dir = unicode(profile_dir)
return super(WindowsHPCControllerLauncher, self).start(1)
-class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
+class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
job_file_name = Unicode(u'ipengineset_job.xml', config=True,
help="jobfile for ipengines job")
- extra_args = List([], config=False,
+ engine_args = List([], config=False,
help="extra args to pas to ipengine")
def write_job_file(self, n):
@@ -777,7 +786,8 @@ def write_job_file(self, n):
# files that the scheduler redirects to.
t.work_directory = self.profile_dir
# Add the profile_dir and from self.start().
- t.engine_args.extend(self.extra_args)
+ t.controller_args.extend(self.cluster_args)
+ t.controller_args.extend(self.engine_args)
job.add_task(t)
self.log.info("Writing job description file: %s" % self.job_file)
@@ -787,10 +797,8 @@ def write_job_file(self, n):
def job_file(self):
return os.path.join(self.profile_dir, self.job_file_name)
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start the controller by profile_dir."""
- self.extra_args = ['--profile-dir=%s'%profile_dir]
- self.profile_dir = unicode(profile_dir)
return super(WindowsHPCEngineSetLauncher, self).start(n)
@@ -798,6 +806,20 @@ def start(self, n, profile_dir):
# Batch (PBS) system launchers
#-----------------------------------------------------------------------------
+class BatchClusterAppMixin(ClusterAppMixin):
+ """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
+ def _profile_dir_changed(self, name, old, new):
+ self.context[name] = new
+ _cluster_id_changed = _profile_dir_changed
+
+ def _profile_dir_default(self):
+ self.context['profile_dir'] = ''
+ return ''
+ def _cluster_id_default(self):
+ self.context['cluster_id'] = ''
+ return ''
+
+
class BatchSystemLauncher(BaseLauncher):
"""Launch an external process using a batch system.
@@ -829,6 +851,12 @@ class BatchSystemLauncher(BaseLauncher):
queue = Unicode(u'', config=True,
help="The PBS Queue.")
+ def _queue_changed(self, name, old, new):
+ self.context[name] = new
+
+ n = Int(1)
+ _n_changed = _queue_changed
+
# not configurable, override in subclasses
# PBS Job Array regex
job_array_regexp = Unicode('')
@@ -868,8 +896,7 @@ def parse_job_id(self, output):
def write_batch_script(self, n):
"""Instantiate and write the batch script to the work_dir."""
- self.context['n'] = n
- self.context['queue'] = self.queue
+ self.n = n
# first priority is batch_template if set
if self.batch_template_file and not self.batch_template:
# second priority is batch_template_file
@@ -902,12 +929,10 @@ def write_batch_script(self, n):
f.write(script_as_string)
os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n copies of the process using a batch system."""
# Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
- self.context['profile_dir'] = profile_dir
- self.profile_dir = unicode(profile_dir)
self.write_batch_script(n)
output = check_output(self.args, env=os.environ)
@@ -938,7 +963,7 @@ class PBSLauncher(BatchSystemLauncher):
queue_template = Unicode('#PBS -q {queue}')
-class PBSControllerLauncher(PBSLauncher):
+class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
"""Launch a controller using PBS."""
batch_file_name = Unicode(u'pbs_controller', config=True,
@@ -946,29 +971,30 @@ class PBSControllerLauncher(PBSLauncher):
default_template= Unicode("""#!/bin/sh
#PBS -V
#PBS -N ipcontroller
-%s --log-to-file --profile-dir={profile_dir}
+%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipcontroller_cmd_argv)))
- def start(self, profile_dir):
+
+ def start(self):
"""Start the controller by profile or profile_dir."""
self.log.info("Starting PBSControllerLauncher: %r" % self.args)
- return super(PBSControllerLauncher, self).start(1, profile_dir)
+ return super(PBSControllerLauncher, self).start(1)
-class PBSEngineSetLauncher(PBSLauncher):
+class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
"""Launch Engines using PBS"""
batch_file_name = Unicode(u'pbs_engines', config=True,
help="batch file name for the engine(s) job.")
default_template= Unicode(u"""#!/bin/sh
#PBS -V
#PBS -N ipengine
-%s --profile-dir={profile_dir}
+%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipengine_cmd_argv)))
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
- return super(PBSEngineSetLauncher, self).start(n, profile_dir)
+ return super(PBSEngineSetLauncher, self).start(n)
#SGE is very similar to PBS
@@ -979,7 +1005,7 @@ class SGELauncher(PBSLauncher):
queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
queue_template = Unicode('#$ -q {queue}')
-class SGEControllerLauncher(SGELauncher):
+class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
"""Launch a controller using SGE."""
batch_file_name = Unicode(u'sge_controller', config=True,
@@ -987,28 +1013,28 @@ class SGEControllerLauncher(SGELauncher):
default_template= Unicode(u"""#$ -V
#$ -S /bin/sh
#$ -N ipcontroller
-%s --log-to-file --profile-dir={profile_dir}
+%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipcontroller_cmd_argv)))
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile or profile_dir."""
self.log.info("Starting PBSControllerLauncher: %r" % self.args)
- return super(SGEControllerLauncher, self).start(1, profile_dir)
+ return super(SGEControllerLauncher, self).start(1)
-class SGEEngineSetLauncher(SGELauncher):
+class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
"""Launch Engines with SGE"""
batch_file_name = Unicode(u'sge_engines', config=True,
help="batch file name for the engine(s) job.")
default_template = Unicode("""#$ -V
#$ -S /bin/sh
#$ -N ipengine
-%s --profile-dir={profile_dir}
+%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipengine_cmd_argv)))
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
- return super(SGEEngineSetLauncher, self).start(n, profile_dir)
+ return super(SGEEngineSetLauncher, self).start(n)
# LSF launchers
@@ -1029,7 +1055,7 @@ class LSFLauncher(BatchSystemLauncher):
queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
queue_template = Unicode('#BSUB -q {queue}')
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n copies of the process using LSF batch system.
This cant inherit from the base class because bsub expects
to be piped a shell script in order to honor the #BSUB directives :
@@ -1037,8 +1063,6 @@ def start(self, n, profile_dir):
"""
# Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
- self.context['profile_dir'] = profile_dir
- self.profile_dir = unicode(profile_dir)
self.write_batch_script(n)
#output = check_output(self.args, env=os.environ)
piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
@@ -1049,7 +1073,7 @@ def start(self, n, profile_dir):
return job_id
-class LSFControllerLauncher(LSFLauncher):
+class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
"""Launch a controller using LSF."""
batch_file_name = Unicode(u'lsf_controller', config=True,
@@ -1058,29 +1082,29 @@ class LSFControllerLauncher(LSFLauncher):
#BSUB -J ipcontroller
#BSUB -oo ipcontroller.o.%%J
#BSUB -eo ipcontroller.e.%%J
- %s --log-to-file --profile-dir={profile_dir}
+ %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipcontroller_cmd_argv)))
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile or profile_dir."""
self.log.info("Starting LSFControllerLauncher: %r" % self.args)
- return super(LSFControllerLauncher, self).start(1, profile_dir)
+ return super(LSFControllerLauncher, self).start(1)
-class LSFEngineSetLauncher(LSFLauncher):
+class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
"""Launch Engines using LSF"""
batch_file_name = Unicode(u'lsf_engines', config=True,
help="batch file name for the engine(s) job.")
default_template= Unicode(u"""#!/bin/sh
#BSUB -oo ipengine.o.%%J
#BSUB -eo ipengine.e.%%J
- %s --profile-dir={profile_dir}
+ %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipengine_cmd_argv)))
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
- return super(LSFEngineSetLauncher, self).start(n, profile_dir)
+ return super(LSFEngineSetLauncher, self).start(n)
#-----------------------------------------------------------------------------
Something went wrong with that request. Please try again.