Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge PR #795 (cluster-id and launcher cleanup)

closes gh-795
  • Loading branch information...
commit 3994edb75897eb8a5e19fcb6191cd0d704eb7376 2 parents 391deab + 7b3d060
@minrk minrk authored
View
5 IPython/config/configurable.py
@@ -207,8 +207,9 @@ def c(s):
for parent in cls.mro():
# only include parents that are not base classes
# and are not the class itself
- if issubclass(parent, Configurable) and \
- not parent in (Configurable, SingletonConfigurable, cls):
+ # and have some configurable traits to inherit
+ if parent is not cls and issubclass(parent, Configurable) and \
+ parent.class_traits(config=True):
parents.append(parent)
if parents:
View
17 IPython/parallel/apps/baseapp.py
@@ -75,6 +75,7 @@ def __init__(self, app):
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
+ 'cluster-id' : 'BaseParallelApplication.cluster_id',
})
base_flags = {
@@ -116,6 +117,22 @@ def _work_dir_changed(self, name, old, new):
log_url = Unicode('', config=True,
help="The ZMQ URL of the iplogger to aggregate logging.")
+ cluster_id = Unicode('', config=True,
+ help="""String id to add to runtime files, to prevent name collisions when
+ using multiple clusters with a single profile simultaneously.
+
+ When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
+
+ Since this is text inserted into filenames, typical recommendations apply:
+ Simple character strings are ideal, and spaces are not recommended (but should
+ generally work).
+ """
+ )
+ def _cluster_id_changed(self, name, old, new):
+ self.name = self.__class__.name
+ if new:
+ self.name += '-%s'%new
+
def _config_files_default(self):
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
View
25 IPython/parallel/apps/ipclusterapp.py
@@ -275,19 +275,22 @@ def initialize(self, argv=None):
self.init_launchers()
def init_launchers(self):
- self.engine_launcher = self.build_launcher(self.engine_launcher_class)
+ self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
self.engine_launcher.on_stop(lambda r: self.loop.stop())
def init_signal(self):
# Setup signals
signal.signal(signal.SIGINT, self.sigint_handler)
- def build_launcher(self, clsname):
+ def build_launcher(self, clsname, kind=None):
"""import and instantiate a Launcher based on importstring"""
if '.' not in clsname:
# not a module, presume it's the raw name in apps.launcher
+ if kind and kind not in clsname:
+ # doesn't match necessary full class name, assume it's
+ # just 'PBS' or 'MPIExec' prefix:
+ clsname = clsname + kind + 'Launcher'
clsname = 'IPython.parallel.apps.launcher.'+clsname
- # print repr(clsname)
try:
klass = import_item(clsname)
except (ImportError, KeyError):
@@ -295,16 +298,14 @@ def build_launcher(self, clsname):
self.exit(1)
launcher = klass(
- work_dir=u'.', config=self.config, log=self.log
+ work_dir=u'.', config=self.config, log=self.log,
+ profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
)
return launcher
def start_engines(self):
self.log.info("Starting %i engines"%self.n)
- self.engine_launcher.start(
- self.n,
- self.profile_dir.location
- )
+ self.engine_launcher.start(self.n)
def stop_engines(self):
self.log.info("Stopping Engines...")
@@ -424,14 +425,12 @@ def _classes_default(self,):
aliases = Dict(start_aliases)
def init_launchers(self):
- self.controller_launcher = self.build_launcher(self.controller_launcher_class)
- self.engine_launcher = self.build_launcher(self.engine_launcher_class)
+ self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
+ self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
self.controller_launcher.on_stop(self.stop_launchers)
def start_controller(self):
- self.controller_launcher.start(
- self.profile_dir.location
- )
+ self.controller_launcher.start()
def stop_controller(self):
# self.log.info("In stop_controller")
View
21 IPython/parallel/apps/ipcontrollerapp.py
@@ -174,7 +174,18 @@ class IPControllerApp(BaseParallelApplication):
use_threads = Bool(False, config=True,
help='Use threads instead of processes for the schedulers',
- )
+ )
+
+ engine_json_file = Unicode('ipcontroller-engine.json', config=True,
+ help="JSON filename where engine connection info will be stored.")
+ client_json_file = Unicode('ipcontroller-client.json', config=True,
+ help="JSON filename where client connection info will be stored.")
+
+ def _cluster_id_changed(self, name, old, new):
+ super(IPControllerApp, self)._cluster_id_changed(name, old, new)
+ self.engine_json_file = "%s-engine.json" % self.name
+ self.client_json_file = "%s-client.json" % self.name
+
# internal
children = List()
@@ -215,7 +226,7 @@ def load_config_from_json(self):
"""load config from existing json connector files."""
c = self.config
# load from engine config
- with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
+ with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
cfg = json.loads(f.read())
key = c.Session.key = asbytes(cfg['exec_key'])
xport,addr = cfg['url'].split('://')
@@ -227,7 +238,7 @@ def load_config_from_json(self):
if not self.engine_ssh_server:
self.engine_ssh_server = cfg['ssh']
# load client config
- with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
+ with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
cfg = json.loads(f.read())
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
xport,addr = cfg['url'].split('://')
@@ -277,11 +288,11 @@ def init_hub(self):
'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
'location' : self.location
}
- self.save_connection_dict('ipcontroller-client.json', cdict)
+ self.save_connection_dict(self.client_json_file, cdict)
edict = cdict
edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
edict['ssh'] = self.engine_ssh_server
- self.save_connection_dict('ipcontroller-engine.json', edict)
+ self.save_connection_dict(self.engine_json_file, edict)
#
def init_schedulers(self):
View
16 IPython/parallel/apps/ipengineapp.py
@@ -93,7 +93,7 @@ class MPI(Configurable):
help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
)
- def _on_use_changed(self, old, new):
+ def _use_changed(self, name, old, new):
# load default init script if it's not set
if not self.init_script:
self.init_script = self.default_inits.get(new, '')
@@ -135,8 +135,8 @@ def _on_use_changed(self, old, new):
class IPEngineApp(BaseParallelApplication):
- name = Unicode(u'ipengine')
- description = Unicode(_description)
+ name = 'ipengine'
+ description = _description
examples = _examples
config_file_name = Unicode(default_config_file_name)
classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
@@ -158,7 +158,15 @@ class IPEngineApp(BaseParallelApplication):
controller and engine are started at the same time and it
may take a moment for the controller to write the connector files.""")
- url_file_name = Unicode(u'ipcontroller-engine.json')
+ url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
+
+ def _cluster_id_changed(self, name, old, new):
+ if new:
+ base = 'ipcontroller-%s' % new
+ else:
+ base = 'ipcontroller'
+ self.url_file_name = "%s-engine.json" % base
+
log_url = Unicode('', config=True,
help="""The URL for the iploggerapp instance, for forwarding
logging to a central location.""")
View
300 IPython/parallel/apps/launcher.py
@@ -57,7 +57,9 @@ def check_output(*args, **kwargs):
from IPython.config.application import Application
from IPython.config.configurable import LoggingConfigurable
from IPython.utils.text import EvalFormatter
-from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
+from IPython.utils.traitlets import (
+ Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
+)
from IPython.utils.path import get_ipython_module_path
from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
@@ -213,6 +215,33 @@ def signal(self, sig):
"""
raise NotImplementedError('signal must be implemented in a subclass')
+class ClusterAppMixin(HasTraits):
+ """MixIn for cluster args as traits"""
+ cluster_args = List([])
+ profile_dir=Unicode('')
+ cluster_id=Unicode('')
+ def _profile_dir_changed(self, name, old, new):
+ self.cluster_args = []
+ if self.profile_dir:
+ self.cluster_args.extend(['--profile-dir', self.profile_dir])
+ if self.cluster_id:
+ self.cluster_args.extend(['--cluster-id', self.cluster_id])
+ _cluster_id_changed = _profile_dir_changed
+
+class ControllerMixin(ClusterAppMixin):
+ controller_cmd = List(ipcontroller_cmd_argv, config=True,
+ help="""Popen command to launch ipcontroller.""")
+ # Command line arguments to ipcontroller.
+ controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
+ help="""command-line args to pass to ipcontroller""")
+
+class EngineMixin(ClusterAppMixin):
+ engine_cmd = List(ipengine_cmd_argv, config=True,
+ help="""command to launch the Engine.""")
+ # Command line arguments for ipengine.
+ engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
+ help="command-line arguments to pass to ipengine"
+ )
#-----------------------------------------------------------------------------
# Local process launchers
@@ -317,54 +346,28 @@ def poll(self):
self.notify_stop(dict(exit_code=status, pid=self.process.pid))
return status
-class LocalControllerLauncher(LocalProcessLauncher):
+class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
"""Launch a controller as a regular external process."""
- controller_cmd = List(ipcontroller_cmd_argv, config=True,
- help="""Popen command to launch ipcontroller.""")
- # Command line arguments to ipcontroller.
- controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="""command-line args to pass to ipcontroller""")
-
def find_args(self):
- return self.controller_cmd + self.controller_args
+ return self.controller_cmd + self.cluster_args + self.controller_args
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile_dir."""
- self.controller_args.extend(['--profile-dir=%s'%profile_dir])
- self.profile_dir = unicode(profile_dir)
self.log.info("Starting LocalControllerLauncher: %r" % self.args)
return super(LocalControllerLauncher, self).start()
-class LocalEngineLauncher(LocalProcessLauncher):
+class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
"""Launch a single engine as a regular externall process."""
- engine_cmd = List(ipengine_cmd_argv, config=True,
- help="""command to launch the Engine.""")
- # Command line arguments for ipengine.
- engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="command-line arguments to pass to ipengine"
- )
-
def find_args(self):
- return self.engine_cmd + self.engine_args
+ return self.engine_cmd + self.cluster_args + self.engine_args
- def start(self, profile_dir):
- """Start the engine by profile_dir."""
- self.engine_args.extend(['--profile-dir=%s'%profile_dir])
- self.profile_dir = unicode(profile_dir)
- return super(LocalEngineLauncher, self).start()
-
-class LocalEngineSetLauncher(BaseLauncher):
+class LocalEngineSetLauncher(LocalEngineLauncher):
"""Launch a set of engines as regular external processes."""
- # Command line arguments for ipengine.
- engine_args = List(
- ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="command-line arguments to pass to ipengine"
- )
delay = CFloat(0.1, config=True,
help="""delay (in seconds) between starting each engine after the first.
This can help force the engines to get their ids in order, or limit
@@ -383,26 +386,26 @@ def __init__(self, work_dir=u'.', config=None, **kwargs):
)
self.stop_data = {}
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
- self.profile_dir = unicode(profile_dir)
dlist = []
for i in range(n):
if i > 0:
time.sleep(self.delay)
- el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
+ el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
+ profile_dir=self.profile_dir, cluster_id=self.cluster_id,
+ )
+
# Copy the engine args over to each engine launcher.
+ el.engine_cmd = copy.deepcopy(self.engine_cmd)
el.engine_args = copy.deepcopy(self.engine_args)
el.on_stop(self._notice_engine_stopped)
- d = el.start(profile_dir)
+ d = el.start()
if i==0:
self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
self.launchers[i] = el
dlist.append(d)
self.notify_start(dlist)
- # The consumeErrors here could be dangerous
- # dfinal = gatherBoth(dlist, consumeErrors=True)
- # dfinal.addCallback(self.notify_start)
return dlist
def find_args(self):
@@ -413,7 +416,6 @@ def signal(self, sig):
for el in self.launchers.itervalues():
d = el.signal(sig)
dlist.append(d)
- # dfinal = gatherBoth(dlist, consumeErrors=True)
return dlist
def interrupt_then_kill(self, delay=1.0):
@@ -421,7 +423,6 @@ def interrupt_then_kill(self, delay=1.0):
for el in self.launchers.itervalues():
d = el.interrupt_then_kill(delay)
dlist.append(d)
- # dfinal = gatherBoth(dlist, consumeErrors=True)
return dlist
def stop(self):
@@ -452,9 +453,9 @@ class MPIExecLauncher(LocalProcessLauncher):
mpi_args = List([], config=True,
help="The command line arguments to pass to mpiexec."
)
- program = List(['date'], config=True,
+ program = List(['date'],
help="The program to start via mpiexec.")
- program_args = List([], config=True,
+ program_args = List([],
help="The command line argument to the program."
)
n = Int(1)
@@ -470,44 +471,42 @@ def start(self, n):
return super(MPIExecLauncher, self).start()
-class MPIExecControllerLauncher(MPIExecLauncher):
+class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
"""Launch a controller using mpiexec."""
- controller_cmd = List(ipcontroller_cmd_argv, config=True,
- help="Popen command to launch the Contropper"
- )
- controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="Command line arguments to pass to ipcontroller."
- )
- n = Int(1)
+ # alias back to *non-configurable* program[_args] for use in find_args()
+ # this way all Controller/EngineSetLaunchers have the same form, rather
+ # than *some* having `program_args` and others `controller_args`
+ @property
+ def program(self):
+ return self.controller_cmd
+
+ @property
+ def program_args(self):
+ return self.cluster_args + self.controller_args
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile_dir."""
- self.controller_args.extend(['--profile-dir=%s'%profile_dir])
- self.profile_dir = unicode(profile_dir)
self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
return super(MPIExecControllerLauncher, self).start(1)
- def find_args(self):
- return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
- self.controller_cmd + self.controller_args
-
-class MPIExecEngineSetLauncher(MPIExecLauncher):
+class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
+ """Launch engines using mpiexec"""
- program = List(ipengine_cmd_argv, config=True,
- help="Popen command for ipengine"
- )
- program_args = List(
- ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="Command line arguments for ipengine."
- )
- n = Int(1)
+ # alias back to *non-configurable* program[_args] for use in find_args()
+ # this way all Controller/EngineSetLaunchers have the same form, rather
+ # than *some* having `program_args` and others `controller_args`
+ @property
+ def program(self):
+ return self.engine_cmd
+
+ @property
+ def program_args(self):
+ return self.cluster_args + self.engine_args
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
- self.program_args.extend(['--profile-dir=%s'%profile_dir])
- self.profile_dir = unicode(profile_dir)
self.n = n
self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
return super(MPIExecEngineSetLauncher, self).start(n)
@@ -530,9 +529,9 @@ class SSHLauncher(LocalProcessLauncher):
help="command for starting ssh")
ssh_args = List(['-tt'], config=True,
help="args to pass to ssh")
- program = List(['date'], config=True,
+ program = List(['date'],
help="Program to launch via ssh")
- program_args = List([], config=True,
+ program_args = List([],
help="args to pass to remote program")
hostname = Unicode('', config=True,
help="hostname on which to launch the program")
@@ -554,8 +553,7 @@ def find_args(self):
return self.ssh_cmd + self.ssh_args + [self.location] + \
self.program + self.program_args
- def start(self, profile_dir, hostname=None, user=None):
- self.profile_dir = unicode(profile_dir)
+ def start(self, hostname=None, user=None):
if hostname is not None:
self.hostname = hostname
if user is not None:
@@ -571,22 +569,33 @@ def signal(self, sig):
-class SSHControllerLauncher(SSHLauncher):
+class SSHControllerLauncher(SSHLauncher, ControllerMixin):
- program = List(ipcontroller_cmd_argv, config=True,
- help="remote ipcontroller command.")
- program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
- help="Command line arguments to ipcontroller.")
+ # alias back to *non-configurable* program[_args] for use in find_args()
+ # this way all Controller/EngineSetLaunchers have the same form, rather
+ # than *some* having `program_args` and others `controller_args`
+ @property
+ def program(self):
+ return self.controller_cmd
+
+ @property
+ def program_args(self):
+ return self.cluster_args + self.controller_args
-class SSHEngineLauncher(SSHLauncher):
- program = List(ipengine_cmd_argv, config=True,
- help="remote ipengine command.")
- # Command line arguments for ipengine.
- program_args = List(
- ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
- help="Command line arguments to ipengine."
- )
+class SSHEngineLauncher(SSHLauncher, EngineMixin):
+
+ # alias back to *non-configurable* program[_args] for use in find_args()
+ # this way all Controller/EngineSetLaunchers have the same form, rather
+ # than *some* having `program_args` and others `controller_args`
+ @property
+ def program(self):
+ return self.engine_cmd
+
+ @property
+ def program_args(self):
+ return self.cluster_args + self.engine_args
+
class SSHEngineSetLauncher(LocalEngineSetLauncher):
launcher_class = SSHEngineLauncher
@@ -594,12 +603,11 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher):
help="""dict of engines to launch. This is a dict by hostname of ints,
corresponding to the number of engines to start on that host.""")
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start engines by profile or profile_dir.
`n` is ignored, and the `engines` config property is used instead.
"""
- self.profile_dir = unicode(profile_dir)
dlist = []
for host, n in self.engines.iteritems():
if isinstance(n, (tuple, list)):
@@ -614,13 +622,15 @@ def start(self, n, profile_dir):
for i in range(n):
if i > 0:
time.sleep(self.delay)
- el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
+ el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
+ profile_dir=self.profile_dir, cluster_id=self.cluster_id,
+ )
# Copy the engine args over to each engine launcher.
- i
- el.program_args = args
+ el.engine_cmd = self.engine_cmd
+ el.engine_args = args
el.on_stop(self._notice_engine_stopped)
- d = el.start(profile_dir, user=user, hostname=host)
+ d = el.start(user=user, hostname=host)
if i==0:
self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
self.launchers[host+str(i)] = el
@@ -727,11 +737,11 @@ def stop(self):
return output
-class WindowsHPCControllerLauncher(WindowsHPCLauncher):
+class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
help="WinHPC xml job file.")
- extra_args = List([], config=False,
+ controller_args = List([], config=False,
help="extra args to pass to ipcontroller")
def write_job_file(self, n):
@@ -743,7 +753,8 @@ def write_job_file(self, n):
# files that the scheduler redirects to.
t.work_directory = self.profile_dir
# Add the profile_dir and from self.start().
- t.controller_args.extend(self.extra_args)
+ t.controller_args.extend(self.cluster_args)
+ t.controller_args.extend(self.controller_args)
job.add_task(t)
self.log.info("Writing job description file: %s" % self.job_file)
@@ -753,18 +764,16 @@ def write_job_file(self, n):
def job_file(self):
return os.path.join(self.profile_dir, self.job_file_name)
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile_dir."""
- self.extra_args = ['--profile-dir=%s'%profile_dir]
- self.profile_dir = unicode(profile_dir)
return super(WindowsHPCControllerLauncher, self).start(1)
-class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
+class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
job_file_name = Unicode(u'ipengineset_job.xml', config=True,
help="jobfile for ipengines job")
- extra_args = List([], config=False,
+ engine_args = List([], config=False,
help="extra args to pas to ipengine")
def write_job_file(self, n):
@@ -777,7 +786,8 @@ def write_job_file(self, n):
# files that the scheduler redirects to.
t.work_directory = self.profile_dir
# Add the profile_dir and from self.start().
- t.engine_args.extend(self.extra_args)
+ t.controller_args.extend(self.cluster_args)
+ t.controller_args.extend(self.engine_args)
job.add_task(t)
self.log.info("Writing job description file: %s" % self.job_file)
@@ -787,10 +797,8 @@ def write_job_file(self, n):
def job_file(self):
return os.path.join(self.profile_dir, self.job_file_name)
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start the controller by profile_dir."""
- self.extra_args = ['--profile-dir=%s'%profile_dir]
- self.profile_dir = unicode(profile_dir)
return super(WindowsHPCEngineSetLauncher, self).start(n)
@@ -798,6 +806,20 @@ def start(self, n, profile_dir):
# Batch (PBS) system launchers
#-----------------------------------------------------------------------------
+class BatchClusterAppMixin(ClusterAppMixin):
+ """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
+ def _profile_dir_changed(self, name, old, new):
+ self.context[name] = new
+ _cluster_id_changed = _profile_dir_changed
+
+ def _profile_dir_default(self):
+ self.context['profile_dir'] = ''
+ return ''
+ def _cluster_id_default(self):
+ self.context['cluster_id'] = ''
+ return ''
+
+
class BatchSystemLauncher(BaseLauncher):
"""Launch an external process using a batch system.
@@ -829,6 +851,12 @@ class BatchSystemLauncher(BaseLauncher):
queue = Unicode(u'', config=True,
help="The PBS Queue.")
+ def _queue_changed(self, name, old, new):
+ self.context[name] = new
+
+ n = Int(1)
+ _n_changed = _queue_changed
+
# not configurable, override in subclasses
# PBS Job Array regex
job_array_regexp = Unicode('')
@@ -868,8 +896,7 @@ def parse_job_id(self, output):
def write_batch_script(self, n):
"""Instantiate and write the batch script to the work_dir."""
- self.context['n'] = n
- self.context['queue'] = self.queue
+ self.n = n
# first priority is batch_template if set
if self.batch_template_file and not self.batch_template:
# second priority is batch_template_file
@@ -902,12 +929,10 @@ def write_batch_script(self, n):
f.write(script_as_string)
os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n copies of the process using a batch system."""
# Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
- self.context['profile_dir'] = profile_dir
- self.profile_dir = unicode(profile_dir)
self.write_batch_script(n)
output = check_output(self.args, env=os.environ)
@@ -938,7 +963,7 @@ class PBSLauncher(BatchSystemLauncher):
queue_template = Unicode('#PBS -q {queue}')
-class PBSControllerLauncher(PBSLauncher):
+class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
"""Launch a controller using PBS."""
batch_file_name = Unicode(u'pbs_controller', config=True,
@@ -946,29 +971,30 @@ class PBSControllerLauncher(PBSLauncher):
default_template= Unicode("""#!/bin/sh
#PBS -V
#PBS -N ipcontroller
-%s --log-to-file --profile-dir={profile_dir}
+%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipcontroller_cmd_argv)))
- def start(self, profile_dir):
+
+ def start(self):
"""Start the controller by profile or profile_dir."""
self.log.info("Starting PBSControllerLauncher: %r" % self.args)
- return super(PBSControllerLauncher, self).start(1, profile_dir)
+ return super(PBSControllerLauncher, self).start(1)
-class PBSEngineSetLauncher(PBSLauncher):
+class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
"""Launch Engines using PBS"""
batch_file_name = Unicode(u'pbs_engines', config=True,
help="batch file name for the engine(s) job.")
default_template= Unicode(u"""#!/bin/sh
#PBS -V
#PBS -N ipengine
-%s --profile-dir={profile_dir}
+%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipengine_cmd_argv)))
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
- return super(PBSEngineSetLauncher, self).start(n, profile_dir)
+ return super(PBSEngineSetLauncher, self).start(n)
#SGE is very similar to PBS
@@ -979,7 +1005,7 @@ class SGELauncher(PBSLauncher):
queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
queue_template = Unicode('#$ -q {queue}')
-class SGEControllerLauncher(SGELauncher):
+class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
"""Launch a controller using SGE."""
batch_file_name = Unicode(u'sge_controller', config=True,
@@ -987,28 +1013,28 @@ class SGEControllerLauncher(SGELauncher):
default_template= Unicode(u"""#$ -V
#$ -S /bin/sh
#$ -N ipcontroller
-%s --log-to-file --profile-dir={profile_dir}
+%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipcontroller_cmd_argv)))
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile or profile_dir."""
self.log.info("Starting PBSControllerLauncher: %r" % self.args)
- return super(SGEControllerLauncher, self).start(1, profile_dir)
+ return super(SGEControllerLauncher, self).start(1)
-class SGEEngineSetLauncher(SGELauncher):
+class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
"""Launch Engines with SGE"""
batch_file_name = Unicode(u'sge_engines', config=True,
help="batch file name for the engine(s) job.")
default_template = Unicode("""#$ -V
#$ -S /bin/sh
#$ -N ipengine
-%s --profile-dir={profile_dir}
+%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipengine_cmd_argv)))
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
- return super(SGEEngineSetLauncher, self).start(n, profile_dir)
+ return super(SGEEngineSetLauncher, self).start(n)
# LSF launchers
@@ -1029,7 +1055,7 @@ class LSFLauncher(BatchSystemLauncher):
queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
queue_template = Unicode('#BSUB -q {queue}')
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n copies of the process using LSF batch system.
This cant inherit from the base class because bsub expects
to be piped a shell script in order to honor the #BSUB directives :
@@ -1037,8 +1063,6 @@ def start(self, n, profile_dir):
"""
# Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
- self.context['profile_dir'] = profile_dir
- self.profile_dir = unicode(profile_dir)
self.write_batch_script(n)
#output = check_output(self.args, env=os.environ)
piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
@@ -1049,7 +1073,7 @@ def start(self, n, profile_dir):
return job_id
-class LSFControllerLauncher(LSFLauncher):
+class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
"""Launch a controller using LSF."""
batch_file_name = Unicode(u'lsf_controller', config=True,
@@ -1058,29 +1082,29 @@ class LSFControllerLauncher(LSFLauncher):
#BSUB -J ipcontroller
#BSUB -oo ipcontroller.o.%%J
#BSUB -eo ipcontroller.e.%%J
- %s --log-to-file --profile-dir={profile_dir}
+ %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipcontroller_cmd_argv)))
- def start(self, profile_dir):
+ def start(self):
"""Start the controller by profile or profile_dir."""
self.log.info("Starting LSFControllerLauncher: %r" % self.args)
- return super(LSFControllerLauncher, self).start(1, profile_dir)
+ return super(LSFControllerLauncher, self).start(1)
-class LSFEngineSetLauncher(LSFLauncher):
+class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
"""Launch Engines using LSF"""
batch_file_name = Unicode(u'lsf_engines', config=True,
help="batch file name for the engine(s) job.")
default_template= Unicode(u"""#!/bin/sh
#BSUB -oo ipengine.o.%%J
#BSUB -eo ipengine.e.%%J
- %s --profile-dir={profile_dir}
+ %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(ipengine_cmd_argv)))
- def start(self, n, profile_dir):
+ def start(self, n):
"""Start n engines by profile or profile_dir."""
self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
- return super(LSFEngineSetLauncher, self).start(n, profile_dir)
+ return super(LSFEngineSetLauncher, self).start(n)
#-----------------------------------------------------------------------------
Please sign in to comment.
Something went wrong with that request. Please try again.