Skip to content
Browse files

Update PBS/SGE launchers with 0.10.1 options and defaults

  • Loading branch information...
1 parent 8fb951e commit 428600105caddeb1872365100b825154d9320f94 @minrk minrk committed Mar 23, 2011
Showing with 163 additions and 60 deletions.
  1. +27 −16 IPython/config/default/ipclusterz_config.py
  2. +136 −44 IPython/zmq/parallel/launcher.py
View
43 IPython/config/default/ipclusterz_config.py
@@ -11,7 +11,7 @@
# - Start as a regular process on localhost.
# - Start using mpiexec.
# - Start using the Windows HPC Server 2008 scheduler
-# - Start using PBS
+# - Start using PBS/SGE
# - Start using SSH
@@ -21,13 +21,16 @@
# - LocalControllerLauncher
# - MPIExecControllerLauncher
# - PBSControllerLauncher
+# - SGEControllerLauncher
# - WindowsHPCControllerLauncher
# c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
+c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.PBSControllerLauncher'
# Options are:
# - LocalEngineSetLauncher
# - MPIExecEngineSetLauncher
# - PBSEngineSetLauncher
+# - SGEEngineSetLauncher
# - WindowsHPCEngineSetLauncher
# c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
@@ -136,38 +139,41 @@
# Unix batch (PBS) schedulers launchers
#-----------------------------------------------------------------------------
+# SGE and PBS are very similar. All configurables in this section called 'PBS*'
+# also exist as 'SGE*'.
+
# The command line program to use to submit a PBS job.
-# c.PBSControllerLauncher.submit_command = ['qsub']
+# c.PBSLauncher.submit_command = ['qsub']
# The command line program to use to delete a PBS job.
-# c.PBSControllerLauncher.delete_command = ['qdel']
+# c.PBSLauncher.delete_command = ['qdel']
+
+# The PBS queue in which the job should run
+# c.PBSLauncher.queue = 'myqueue'
# A regular expression that takes the output of qsub and find the job id.
-# c.PBSControllerLauncher.job_id_regexp = r'\d+'
+# c.PBSLauncher.job_id_regexp = r'\d+'
+
+# If for some reason the Controller and Engines have different options above, they
+# can be set as c.PBSControllerLauncher.<option> etc.
# The batch submission script used to start the controller. This is where
# environment variables would be setup, etc. This string is interpreted using
# the Itpl module in IPython.external. Basically, you can use ${n} for the
# number of engine and ${cluster_dir} for the cluster_dir.
# c.PBSControllerLauncher.batch_template = """
# #PBS -N ipcontroller
+# #PBS -q $queue
#
# ipcontrollerz --cluster-dir $cluster_dir
# """
+# You can also load this template from a file
+# c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
+
# The name of the instantiated batch script that will actually be used to
# submit the job. This will be written to the cluster directory.
-# c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
-
-
-# The command line program to use to submit a PBS job.
-# c.PBSEngineSetLauncher.submit_command = 'qsub'
-
-# The command line program to use to delete a PBS job.
-# c.PBSEngineSetLauncher.delete_command = 'qdel'
-
-# A regular expression that takes the output of qsub and find the job id.
-# c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
+# c.PBSControllerLauncher.batch_file_name = u'pbs_controller'
# The batch submission script used to start the engines. This is where
# environment variables would be setup, etc. This string is interpreted using
@@ -180,9 +186,14 @@
# ipenginez --cluster-dir $cluster_dir$s
# """
+# You can also load this template from a file
+# c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
+
# The name of the instantiated batch script that will actually be used to
# submit the job. This will be written to the cluster directory.
-# c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
+# c.PBSEngineSetLauncher.batch_file_name = u'pbs_engines'
+
+
#-----------------------------------------------------------------------------
# Windows HPC Server 2008 launcher configuration
View
180 IPython/zmq/parallel/launcher.py
@@ -19,6 +19,7 @@
import logging
import os
import re
+import stat
from signal import SIGINT, SIGTERM
try:
@@ -41,7 +42,7 @@ def check_output(*args, **kwargs):
from IPython.external import Itpl
# from IPython.config.configurable import Configurable
-from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance
+from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
from IPython.utils.path import get_ipython_module_path
from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
@@ -62,15 +63,15 @@ def check_output(*args, **kwargs):
#-----------------------------------------------------------------------------
-ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
+ipclusterz_cmd_argv = pycmd2argv(get_ipython_module_path(
'IPython.zmq.parallel.ipclusterapp'
))
-ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
+ipenginez_cmd_argv = pycmd2argv(get_ipython_module_path(
'IPython.zmq.parallel.ipengineapp'
))
-ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
+ipcontrollerz_cmd_argv = pycmd2argv(get_ipython_module_path(
'IPython.zmq.parallel.ipcontrollerapp'
))
@@ -303,7 +304,7 @@ def poll(self):
class LocalControllerLauncher(LocalProcessLauncher):
"""Launch a controller as a regular external process."""
- controller_cmd = List(ipcontroller_cmd_argv, config=True)
+ controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
# Command line arguments to ipcontroller.
controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
@@ -321,7 +322,7 @@ def start(self, cluster_dir):
class LocalEngineLauncher(LocalProcessLauncher):
"""Launch a single engine as a regular externall process."""
- engine_cmd = List(ipengine_cmd_argv, config=True)
+ engine_cmd = List(ipenginez_cmd_argv, config=True)
# Command line arguments for ipengine.
engine_args = List(
['--log-to-file','--log-level', str(logging.INFO)], config=True
@@ -442,7 +443,7 @@ def start(self, n):
class MPIExecControllerLauncher(MPIExecLauncher):
"""Launch a controller using mpiexec."""
- controller_cmd = List(ipcontroller_cmd_argv, config=True)
+ controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
# Command line arguments to ipcontroller.
controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
n = Int(1, config=False)
@@ -461,7 +462,7 @@ def find_args(self):
class MPIExecEngineSetLauncher(MPIExecLauncher):
- program = List(ipengine_cmd_argv, config=True)
+ program = List(ipenginez_cmd_argv, config=True)
# Command line arguments for ipengine.
program_args = List(
['--log-to-file','--log-level', str(logging.INFO)], config=True
@@ -494,18 +495,18 @@ class SSHLauncher(LocalProcessLauncher):
ssh_args = List(['-tt'], config=True)
program = List(['date'], config=True)
program_args = List([], config=True)
- hostname = Str('', config=True)
- user = Str('', config=True)
- location = Str('')
+ hostname = CUnicode('', config=True)
+ user = CUnicode('', config=True)
+ location = CUnicode('')
def _hostname_changed(self, name, old, new):
if self.user:
- self.location = '%s@%s' % (self.user, new)
+ self.location = u'%s@%s' % (self.user, new)
else:
self.location = new
def _user_changed(self, name, old, new):
- self.location = '%s@%s' % (new, self.hostname)
+ self.location = u'%s@%s' % (new, self.hostname)
def find_args(self):
return self.ssh_cmd + self.ssh_args + [self.location] + \
@@ -530,13 +531,13 @@ def signal(self, sig):
class SSHControllerLauncher(SSHLauncher):
- program = List(ipcontroller_cmd_argv, config=True)
+ program = List(ipcontrollerz_cmd_argv, config=True)
# Command line arguments to ipcontroller.
program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
class SSHEngineLauncher(SSHLauncher):
- program = List(ipengine_cmd_argv, config=True)
+ program = List(ipenginez_cmd_argv, config=True)
# Command line arguments for ipengine.
program_args = List(
['--log-to-file','--log-level', str(logging.INFO)], config=True
@@ -602,13 +603,13 @@ class WindowsHPCLauncher(BaseLauncher):
# submit_command.
job_id_regexp = Str(r'\d+', config=True)
# The filename of the instantiated job script.
- job_file_name = Unicode(u'ipython_job.xml', config=True)
+ job_file_name = CUnicode(u'ipython_job.xml', config=True)
# The full path to the instantiated job script. This gets made dynamically
# by combining the work_dir with the job_file_name.
- job_file = Unicode(u'')
+ job_file = CUnicode(u'')
# The hostname of the scheduler to submit the job to
- scheduler = Str('', config=True)
- job_cmd = Str(find_job_cmd(), config=True)
+ scheduler = CUnicode('', config=True)
+ job_cmd = CUnicode(find_job_cmd(), config=True)
def __init__(self, work_dir=u'.', config=None, **kwargs):
super(WindowsHPCLauncher, self).__init__(
@@ -623,7 +624,7 @@ def write_job_file(self, n):
raise NotImplementedError("Implement write_job_file in a subclass.")
def find_args(self):
- return ['job.exe']
+ return [u'job.exe']
def parse_job_id(self, output):
"""Take the output of the submit command and return the job id."""
@@ -676,7 +677,7 @@ def stop(self):
class WindowsHPCControllerLauncher(WindowsHPCLauncher):
- job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
+ job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
extra_args = List([], config=False)
def write_job_file(self, n):
@@ -707,7 +708,7 @@ def start(self, cluster_dir):
class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
- job_file_name = Unicode(u'ipengineset_job.xml', config=True)
+ job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
extra_args = List([], config=False)
def write_job_file(self, n):
@@ -757,24 +758,38 @@ class BatchSystemLauncher(BaseLauncher):
# Subclasses must fill these in. See PBSEngineSet
# The name of the command line program used to submit jobs.
- submit_command = Str('', config=True)
+ submit_command = List([''], config=True)
# The name of the command line program used to delete jobs.
- delete_command = Str('', config=True)
+ delete_command = List([''], config=True)
# A regular expression used to get the job id from the output of the
# submit_command.
- job_id_regexp = Str('', config=True)
+ job_id_regexp = CUnicode('', config=True)
# The string that is the batch script template itself.
- batch_template = Str('', config=True)
+ batch_template = CUnicode('', config=True)
+ # The file that contains the batch template
+ batch_template_file = CUnicode(u'', config=True)
# The filename of the instantiated batch script.
- batch_file_name = Unicode(u'batch_script', config=True)
+ batch_file_name = CUnicode(u'batch_script', config=True)
+ # The PBS Queue
+ queue = CUnicode(u'', config=True)
+
+ # not configurable, override in subclasses
+ # PBS Job Array regex
+ job_array_regexp = CUnicode('')
+ job_array_template = CUnicode('')
+ # PBS Queue regex
+ queue_regexp = CUnicode('')
+ queue_template = CUnicode('')
+ # The default batch template, override in subclasses
+ default_template = CUnicode('')
# The full path to the instantiated batch script.
- batch_file = Unicode(u'')
+ batch_file = CUnicode(u'')
# the format dict used with batch_template:
context = Dict()
def find_args(self):
- return [self.submit_command, self.batch_file]
+ return self.submit_command + [self.batch_file]
def __init__(self, work_dir=u'.', config=None, **kwargs):
super(BatchSystemLauncher, self).__init__(
@@ -796,11 +811,37 @@ def parse_job_id(self, output):
def write_batch_script(self, n):
"""Instantiate and write the batch script to the work_dir."""
self.context['n'] = n
+ self.context['queue'] = self.queue
+ print self.context
+ # first priority is batch_template if set
+ if self.batch_template_file and not self.batch_template:
+ # second priority is batch_template_file
+ with open(self.batch_template_file) as f:
+ self.batch_template = f.read()
+ if not self.batch_template:
+ # third (last) priority is default_template
+ self.batch_template = self.default_template
+
+ regex = re.compile(self.job_array_regexp)
+ # print regex.search(self.batch_template)
+ if not regex.search(self.batch_template):
+ self.log.info("adding job array settings to batch script")
+ firstline, rest = self.batch_template.split('\n',1)
+ self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
+
+ regex = re.compile(self.queue_regexp)
+ # print regex.search(self.batch_template)
+ if self.queue and not regex.search(self.batch_template):
+ self.log.info("adding PBS queue settings to batch script")
+ firstline, rest = self.batch_template.split('\n',1)
+ self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
+
script_as_string = Itpl.itplns(self.batch_template, self.context)
self.log.info('Writing instantiated batch script: %s' % self.batch_file)
- f = open(self.batch_file, 'w')
- f.write(script_as_string)
- f.close()
+
+ with open(self.batch_file, 'w') as f:
+ f.write(script_as_string)
+ os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
def start(self, n, cluster_dir):
"""Start n copies of the process using a batch system."""
@@ -817,26 +858,34 @@ def start(self, n, cluster_dir):
return job_id
def stop(self):
- output = check_output([self.delete_command, self.job_id], env=os.environ)
+ output = check_output(self.delete_command+[self.job_id], env=os.environ)
self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
return output
class PBSLauncher(BatchSystemLauncher):
"""A BatchSystemLauncher subclass for PBS."""
- submit_command = Str('qsub', config=True)
- delete_command = Str('qdel', config=True)
- job_id_regexp = Str(r'\d+', config=True)
- batch_template = Str('', config=True)
- batch_file_name = Unicode(u'pbs_batch_script', config=True)
- batch_file = Unicode(u'')
+ submit_command = List(['qsub'], config=True)
+ delete_command = List(['qdel'], config=True)
+ job_id_regexp = CUnicode(r'\d+', config=True)
+
+ batch_file = CUnicode(u'')
+ job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
+ job_array_template = CUnicode('#PBS -t 1-$n')
+ queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
+ queue_template = CUnicode('#PBS -q $queue')
class PBSControllerLauncher(PBSLauncher):
"""Launch a controller using PBS."""
- batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
+ batch_file_name = CUnicode(u'pbs_controller', config=True)
+ default_template= CUnicode("""#!/bin/sh
+#PBS -V
+#PBS -N ipcontrollerz
+%s --log-to-file --cluster-dir $cluster_dir
+"""%(' '.join(ipcontrollerz_cmd_argv)))
def start(self, cluster_dir):
"""Start the controller by profile or cluster_dir."""
@@ -845,14 +894,57 @@ def start(self, cluster_dir):
class PBSEngineSetLauncher(PBSLauncher):
-
- batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
+ """Launch Engines using PBS"""
+ batch_file_name = CUnicode(u'pbs_engines', config=True)
+ default_template= CUnicode(u"""#!/bin/sh
+#PBS -V
+#PBS -N ipenginez
+%s --cluster-dir $cluster_dir
+"""%(' '.join(ipenginez_cmd_argv)))
def start(self, n, cluster_dir):
"""Start n engines by profile or cluster_dir."""
- self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
+ self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args))
return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
+#SGE is very similar to PBS
+
+class SGELauncher(PBSLauncher):
+ """Sun GridEngine is a PBS clone with slightly different syntax"""
+ job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
+ job_array_template = CUnicode('#$$ -t 1-$n')
+ queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
+ queue_template = CUnicode('#$$ -q $queue')
+
+class SGEControllerLauncher(SGELauncher):
+ """Launch a controller using SGE."""
+
+ batch_file_name = CUnicode(u'sge_controller', config=True)
+ default_template= CUnicode(u"""#$$ -V
+#$$ -S /bin/sh
+#$$ -N ipcontrollerz
+%s --log-to-file --cluster-dir $cluster_dir
+"""%(' '.join(ipcontrollerz_cmd_argv)))
+
+ def start(self, cluster_dir):
+ """Start the controller by profile or cluster_dir."""
+ self.log.info("Starting PBSControllerLauncher: %r" % self.args)
+ return super(PBSControllerLauncher, self).start(1, cluster_dir)
+
+class SGEEngineSetLauncher(SGELauncher):
+ """Launch Engines with SGE"""
+ batch_file_name = CUnicode(u'sge_engines', config=True)
+ default_template = CUnicode("""#$$ -V
+#$$ -S /bin/sh
+#$$ -N ipenginez
+%s --cluster-dir $cluster_dir
+"""%(' '.join(ipenginez_cmd_argv)))
+
+ def start(self, n, cluster_dir):
+ """Start n engines by profile or cluster_dir."""
+ self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args))
+ return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
+
#-----------------------------------------------------------------------------
# A launcher for ipcluster itself!
@@ -862,7 +954,7 @@ def start(self, n, cluster_dir):
class IPClusterLauncher(LocalProcessLauncher):
"""Launch the ipcluster program in an external process."""
- ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
+ ipcluster_cmd = List(ipclusterz_cmd_argv, config=True)
# Command line arguments to pass to ipcluster.
ipcluster_args = List(
['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)

0 comments on commit 4286001

Please sign in to comment.
Something went wrong with that request. Please try again.