Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch '0.10.1-sge' of http://github.com/satra/ipython into sat…

…ra-0.10.1-sge

This branch adds Sun Grid Engine support to the ipcluster startup
system.

Credits: Justin Riley and Satra Ghosh @ MIT, Matthew Brucher.

Closes gh-152 (pull request)
  • Loading branch information...
commit f780f2e9651d97dfb089c23da714b2c32b956238 2 parents 58a3d11 + 4beba07
Fernando Perez fperez authored
310 IPython/kernel/scripts/ipcluster.py
View
@@ -18,6 +18,7 @@
import re
import sys
import signal
+import stat
import tempfile
pjoin = os.path.join
@@ -234,6 +235,7 @@ def __init__(self, extra_args=None):
def start(self, n):
dlist = []
for i in range(n):
+ print "starting engine:", i
el = EngineLauncher(extra_args=self.extra_args)
d = el.start()
self.launchers.append(el)
@@ -270,22 +272,25 @@ def interrupt_then_kill(self, delay=1.0):
dfinal.addCallback(self._handle_stop)
return dfinal
-
class BatchEngineSet(object):
-
- # Subclasses must fill these in. See PBSEngineSet
+
+ # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
+ name = ''
submit_command = ''
delete_command = ''
job_id_regexp = ''
-
- def __init__(self, template_file, **kwargs):
+ job_array_regexp = ''
+ job_array_template = ''
+ queue_regexp = ''
+ queue_template = ''
+ default_template = ''
+
+ def __init__(self, template_file, queue, **kwargs):
self.template_file = template_file
- self.context = {}
- self.context.update(kwargs)
- self.batch_file = self.template_file+'-run'
-
+ self.queue = queue
+
def parse_job_id(self, output):
- m = re.match(self.job_id_regexp, output)
+ m = re.search(self.job_id_regexp, output)
if m is not None:
job_id = m.group()
else:
@@ -293,46 +298,120 @@ def parse_job_id(self, output):
self.job_id = job_id
log.msg('Job started with job id: %r' % job_id)
return job_id
-
- def write_batch_script(self, n):
- self.context['n'] = n
- template = open(self.template_file, 'r').read()
- log.msg('Using template for batch script: %s' % self.template_file)
- script_as_string = Itpl.itplns(template, self.context)
- log.msg('Writing instantiated batch script: %s' % self.batch_file)
- f = open(self.batch_file,'w')
- f.write(script_as_string)
- f.close()
-
+
def handle_error(self, f):
f.printTraceback()
f.raiseException()
-
+
def start(self, n):
- self.write_batch_script(n)
+ log.msg("starting %d engines" % n)
+ self._temp_file = tempfile.NamedTemporaryFile()
+ os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+ if self.template_file:
+ log.msg("Using %s script %s" % (self.name, self.template_file))
+ contents = open(self.template_file, 'r').read()
+ new_script = contents
+ regex = re.compile(self.job_array_regexp)
+ if not regex.search(contents):
+ log.msg("adding job array settings to %s script" % self.name)
+ new_script = self.job_array_template % n +'\n' + new_script
+ print self.queue_regexp
+ regex = re.compile(self.queue_regexp)
+ print regex.search(contents)
+ if self.queue and not regex.search(contents):
+ log.msg("adding queue settings to %s script" % self.name)
+ new_script = self.queue_template % self.queue + '\n' + new_script
+ if new_script != contents:
+ self._temp_file.write(new_script)
+ self.template_file = self._temp_file.name
+ else:
+ default_script = self.default_template % n
+ if self.queue:
+ default_script = self.queue_template % self.queue + \
+ '\n' + default_script
+ log.msg("using default ipengine %s script: \n%s" %
+ (self.name, default_script))
+ self._temp_file.file.write(default_script)
+ self.template_file = self._temp_file.name
+ self._temp_file.file.close()
d = getProcessOutput(self.submit_command,
- [self.batch_file],env=os.environ)
+ [self.template_file],
+ env=os.environ)
d.addCallback(self.parse_job_id)
d.addErrback(self.handle_error)
return d
-
+
def kill(self):
d = getProcessOutput(self.delete_command,
[self.job_id],env=os.environ)
return d
class PBSEngineSet(BatchEngineSet):
-
+
+ name = 'PBS'
submit_command = 'qsub'
delete_command = 'qdel'
job_id_regexp = '\d+'
-
- def __init__(self, template_file, **kwargs):
- BatchEngineSet.__init__(self, template_file, **kwargs)
+ job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
+ job_array_template = '#PBS -t 1-%d'
+ queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
+ queue_template = '#PBS -q %s'
+ default_template="""#!/bin/sh
+#PBS -V
+#PBS -t 1-%d
+#PBS -N ipengine
+eid=$(($PBS_ARRAYID - 1))
+ipengine --logfile=ipengine${eid}.log
+"""
+
+class SGEEngineSet(PBSEngineSet):
+
+ name = 'SGE'
+ job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
+ job_array_template = '#$ -t 1-%d'
+ queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
+ queue_template = '#$ -q %s'
+ default_template="""#$ -V
+#$ -S /bin/sh
+#$ -t 1-%d
+#$ -N ipengine
+eid=$(($SGE_TASK_ID - 1))
+ipengine --logfile=ipengine${eid}.log
+"""
+
+class LSFEngineSet(PBSEngineSet):
+
+ name = 'LSF'
+ submit_command = 'bsub'
+ delete_command = 'bkill'
+ job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
+ job_array_template = '#BSUB -J ipengine[1-%d]'
+ queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
+ queue_template = '#BSUB -q %s'
+ default_template="""#!/bin/sh
+#BSUB -J ipengine[1-%d]
+eid=$(($LSB_JOBINDEX - 1))
+ipengine --logfile=ipengine${eid}.log
+"""
+ bsub_wrapper="""#!/bin/sh
+bsub < $1
+"""
+
+ def __init__(self, template_file, queue, **kwargs):
+ self._bsub_wrapper = self._make_bsub_wrapper()
+ self.submit_command = self._bsub_wrapper.name
+ PBSEngineSet.__init__(self,template_file, queue, **kwargs)
+ def _make_bsub_wrapper(self):
+ bsub_wrapper = tempfile.NamedTemporaryFile()
+ bsub_wrapper.write(self.bsub_wrapper)
+ bsub_wrapper.file.close()
+ os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+ return bsub_wrapper
-sshx_template="""#!/bin/sh
-"$@" &> /dev/null &
+sshx_template_prefix="""#!/bin/sh
+"""
+sshx_template_suffix=""""$@" &> /dev/null &
echo $!
"""
@@ -340,11 +419,19 @@ def __init__(self, template_file, **kwargs):
ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
"""
+def escape_strings(val):
+ val = val.replace('(','\(')
+ val = val.replace(')','\)')
+ if ' ' in val:
+ val = '"%s"'%val
+ return val
+
class SSHEngineSet(object):
- sshx_template=sshx_template
+ sshx_template_prefix=sshx_template_prefix
+ sshx_template_suffix=sshx_template_suffix
engine_killer_template=engine_killer_template
- def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
+ def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"):
"""Start a controller on localhost and engines using ssh.
The engine_hosts argument is a dict with hostnames as keys and
@@ -363,7 +450,12 @@ def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
'%s-main-sshx.sh' % os.environ['USER']
)
f = open(self.sshx, 'w')
- f.writelines(self.sshx_template)
+ f.writelines(self.sshx_template_prefix)
+ if copyenvs:
+ for key, val in sorted(os.environ.items()):
+ newval = escape_strings(val)
+ f.writelines('export %s=%s\n'%(key,newval))
+ f.writelines(self.sshx_template_suffix)
f.close()
self.engine_command = ipengine
self.engine_hosts = engine_hosts
@@ -609,13 +701,17 @@ def main_pbs(args):
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
+
+ if args.pbsscript and not os.path.isfile(args.pbsscript):
+ log.err('PBS script does not exist: %s' % args.pbsscript)
+ return
cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(r):
- pbs_set = PBSEngineSet(args.pbsscript)
+ pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
def shutdown(signum, frame):
- log.msg('Stopping pbs cluster')
+ log.msg('Stopping PBS cluster')
d = pbs_set.kill()
d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
@@ -627,6 +723,72 @@ def shutdown(signum, frame):
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
+def main_sge(args):
+ cont_args = []
+ cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
+
+ # Check security settings before proceeding
+ if not check_security(args, cont_args):
+ return
+
+ # See if we are reusing FURL files
+ if not check_reuse(args, cont_args):
+ return
+
+ if args.sgescript and not os.path.isfile(args.sgescript):
+ log.err('SGE script does not exist: %s' % args.sgescript)
+ return
+
+ cl = ControllerLauncher(extra_args=cont_args)
+ dstart = cl.start()
+ def start_engines(r):
+ sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
+ def shutdown(signum, frame):
+ log.msg('Stopping sge cluster')
+ d = sge_set.kill()
+ d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
+ d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
+ signal.signal(signal.SIGINT,shutdown)
+ d = sge_set.start(args.n)
+ return d
+ config = kernel_config_manager.get_config_obj()
+ furl_file = config['controller']['engine_furl_file']
+ dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
+ dstart.addErrback(_err_and_stop)
+
+def main_lsf(args):
+ cont_args = []
+ cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
+
+ # Check security settings before proceeding
+ if not check_security(args, cont_args):
+ return
+
+ # See if we are reusing FURL files
+ if not check_reuse(args, cont_args):
+ return
+
+ if args.lsfscript and not os.path.isfile(args.lsfscript):
+ log.err('LSF script does not exist: %s' % args.lsfscript)
+ return
+
+ cl = ControllerLauncher(extra_args=cont_args)
+ dstart = cl.start()
+ def start_engines(r):
+ lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
+ def shutdown(signum, frame):
+ log.msg('Stopping LSF cluster')
+ d = lsf_set.kill()
+ d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
+ d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
+ signal.signal(signal.SIGINT,shutdown)
+ d = lsf_set.start(args.n)
+ return d
+ config = kernel_config_manager.get_config_obj()
+ furl_file = config['controller']['engine_furl_file']
+ dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
+ dstart.addErrback(_err_and_stop)
+
def main_ssh(args):
"""Start a controller on localhost and engines using ssh.
@@ -658,7 +820,8 @@ def main_ssh(args):
cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
- ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
+ ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx,
+ copyenvs=args.copyenvs)
def shutdown(signum, frame):
d = ssh_set.kill()
cl.interrupt_then_kill(1.0)
@@ -765,20 +928,77 @@ def get_args():
help="how to call MPI_Init (default=mpi4py)"
)
parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
-
+
parser_pbs = subparsers.add_parser(
- 'pbs',
+ 'pbs',
help='run a pbs cluster',
parents=[base_parser]
)
parser_pbs.add_argument(
+ '-s',
'--pbs-script',
- type=str,
+ type=str,
dest='pbsscript',
help='PBS script template',
- default='pbs.template'
+ default=''
+ )
+ parser_pbs.add_argument(
+ '-q',
+ '--queue',
+ type=str,
+ dest='pbsqueue',
+ help='PBS queue to use when starting the engines',
+ default=None,
)
parser_pbs.set_defaults(func=main_pbs)
+
+ parser_sge = subparsers.add_parser(
+ 'sge',
+ help='run an sge cluster',
+ parents=[base_parser]
+ )
+ parser_sge.add_argument(
+ '-s',
+ '--sge-script',
+ type=str,
+ dest='sgescript',
+ help='SGE script template',
+ default='' # SGEEngineSet will create one if not specified
+ )
+ parser_sge.add_argument(
+ '-q',
+ '--queue',
+ type=str,
+ dest='sgequeue',
+ help='SGE queue to use when starting the engines',
+ default=None,
+ )
+ parser_sge.set_defaults(func=main_sge)
+
+ parser_lsf = subparsers.add_parser(
+ 'lsf',
+ help='run an lsf cluster',
+ parents=[base_parser]
+ )
+
+ parser_lsf.add_argument(
+ '-s',
+ '--lsf-script',
+ type=str,
+ dest='lsfscript',
+ help='LSF script template',
+ default='' # LSFEngineSet will create one if not specified
+ )
+
+ parser_lsf.add_argument(
+ '-q',
+ '--queue',
+ type=str,
+ dest='lsfqueue',
+ help='LSF queue to use when starting the engines',
+ default=None,
+ )
+ parser_lsf.set_defaults(func=main_lsf)
parser_ssh = subparsers.add_parser(
'ssh',
@@ -786,6 +1006,14 @@ def get_args():
parents=[base_parser]
)
parser_ssh.add_argument(
+ '-e',
+ '--copyenvs',
+ action='store_true',
+ dest='copyenvs',
+ help='Copy current shell environment to remote location',
+ default=False,
+ )
+ parser_ssh.add_argument(
'--clusterfile',
type=str,
dest='clusterfile',
128 docs/source/parallel/parallel_process.txt
View
@@ -53,7 +53,9 @@ The :command:`ipcluster` command provides a simple way of starting a controller
2. When engines are started using the :command:`mpirun` command that comes
with most MPI [MPI]_ implementations
3. When engines are started using the PBS [PBS]_ batch system.
-4. When the controller is started on localhost and the engines are started on
+4. When engines are started using the SGE [SGE]_ batch system.
+5. When engines are started using the LSF [LSF]_ batch system.
+6. When the controller is started on localhost and the engines are started on
remote nodes using :command:`ssh`.
.. note::
@@ -126,49 +128,115 @@ More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
Using :command:`ipcluster` in PBS mode
--------------------------------------
-The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
+The PBS mode uses the Portable Batch System [PBS]_ to start the engines.
-.. sourcecode:: bash
-
- #PBS -N ipython
- #PBS -j oe
- #PBS -l walltime=00:10:00
- #PBS -l nodes=${n/4}:ppn=4
- #PBS -q parallel
+To start an ipcluster using the Portable Batch System::
- cd $$PBS_O_WORKDIR
- export PATH=$$HOME/usr/local/bin
- export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
- /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
+ $ ipcluster pbs -n 12
-There are a few important points about this template:
+The above command will launch a PBS job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option:
-1. This template will be rendered at runtime using IPython's :mod:`Itpl`
- template engine.
+ $ ipcluster pbs -n 12 -q hpcqueue
-2. Instead of putting in the actual number of engines, use the notation
- ``${n}`` to indicate the number of engines to be started. You can also uses
- expressions like ``${n/4}`` in the template to indicate the number of
- nodes.
+By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option:
-3. Because ``$`` is a special character used by the template engine, you must
- escape any ``$`` by using ``$$``. This is important when referring to
- environment variables in the template.
+ $ ipcluster pbs -n 12 -q hpcqueue -s mypbscript.sh
-4. Any options to :command:`ipengine` should be given in the batch script
- template.
+For example the default autogenerated script looks like::
-5. Depending on the configuration of you system, you may have to set
- environment variables in the script template.
+ #PBS -q hpcqueue
+ #!/bin/sh
+ #PBS -V
+ #PBS -t 1-12
+ #PBS -N ipengine
+ eid=$(($PBS_ARRAYID - 1))
+ ipengine --logfile=ipengine${eid}.log
-Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
+.. note::
- $ ipcluster pbs -n 128 --pbs-script=pbs.template
+ ipcluster relies on using PBS job arrays to start the
+ engines. If you specify your own job script without specifying the
+ job array settings ipcluster will automatically add the job array
+ settings (#PBS -t 1-N) to your script.
Additional command line options for this mode can be found by doing::
$ ipcluster pbs -h
+Using :command:`ipcluster` in SGE mode
+--------------------------------------
+
+The SGE mode uses the Sun Grid Engine [SGE]_ to start the engines.
+
+To start an ipcluster using Sun Grid Engine::
+
+ $ ipcluster sge -n 12
+
+The above command will launch an SGE job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option:
+
+ $ ipcluster sge -n 12 -q hpcqueue
+
+By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option:
+
+ $ ipcluster sge -n 12 -q hpcqueue -s mysgescript.sh
+
+For example the default autogenerated script looks like::
+
+ #$ -q hpcqueue
+ #$ -V
+ #$ -S /bin/sh
+ #$ -t 1-12
+ #$ -N ipengine
+ eid=$(($SGE_TASK_ID - 1))
+ ipengine --logfile=ipengine${eid}.log #$ -V
+
+.. note::
+
+ ipcluster relies on using SGE job arrays to start the engines. If
+ you specify your own job script without specifying the job array
+ settings ipcluster will automatically add the job array settings (#$ -t
+ 1-N) to your script.
+
+Additional command line options for this mode can be found by doing::
+
+ $ ipcluster sge -h
+
+Using :command:`ipcluster` in LSF mode
+--------------------------------------
+
+The LSF mode uses the Load Sharing Facility [LSF]_ to start the engines.
+
+To start an ipcluster using the Load Sharing Facility::
+
+ $ ipcluster lsf -n 12
+
+The above command will launch an LSF job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option:
+
+ $ ipcluster lsf -n 12 -q hpcqueue
+
+By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option:
+
+ $ ipcluster lsf -n 12 -q hpcqueue -s mylsfscript.sh
+
+For example the default autogenerated script looks like::
+
+ #BSUB -q hpcqueue
+ #!/bin/sh
+ #BSUB -J ipengine[1-12]
+ eid=$(($LSB_JOBINDEX - 1))
+ ipengine --logfile=ipengine${eid}.log
+
+.. note::
+
+ ipcluster relies on using LSF job arrays to start the engines. If you
+ specify your own job script without specifying the job array settings
+ ipcluster will automatically add the job array settings (#BSUB -J
+ ipengine[1-N]) to your script.
+
+Additional command line options for this mode can be found by doing::
+
+ $ ipcluster lsf -h
+
Using :command:`ipcluster` in SSH mode
--------------------------------------
@@ -348,4 +416,6 @@ the log files to us will often help us to debug any problems.
.. [PBS] Portable Batch System. http://www.openpbs.org/
+.. [SGE] Sun Grid Engine. http://www.sun.com/software/sge/
+.. [LSF] Load Sharing Facility. http://www.platform.com/
.. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
Please sign in to comment.
Something went wrong with that request. Please try again.