Permalink
Browse files

add --queue to ipcluster's SGE/PBS/LSF subcmds

  • Loading branch information...
1 parent fe180bb commit 7952c77c48f5daa8cdd009440aeee984030b4726 @jtriley jtriley committed with Jul 20, 2010
Showing with 73 additions and 23 deletions.
  1. +73 −23 IPython/kernel/scripts/ipcluster.py
@@ -277,13 +277,16 @@ class BatchEngineSet(object):
name = ''
submit_command = ''
delete_command = ''
- script_param_prefix = ''
job_id_regexp = ''
job_array_regexp = ''
+ job_array_template = ''
+ queue_regexp = ''
+ queue_template = ''
default_template = ''
- def __init__(self, template_file, **kwargs):
+ def __init__(self, template_file, queue, **kwargs):
self.template_file = template_file
+ self.queue = queue
def parse_job_id(self, output):
m = re.search(self.job_id_regexp, output)
@@ -302,19 +305,31 @@ def handle_error(self, f):
def start(self, n):
log.msg("starting %d engines" % n)
self._temp_file = tempfile.NamedTemporaryFile()
- regex = re.compile(self.job_array_regexp)
if self.template_file:
log.msg("Using %s script %s" % (self.name, self.template_file))
contents = open(self.template_file, 'r').read()
+ new_script = contents
+ regex = re.compile(self.job_array_regexp)
if not regex.search(contents):
log.msg("adding job array settings to %s script" % self.name)
- contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
- self._temp_file.write(contents)
+ new_script = self.job_array_template % n +'\n' + new_script
+ print self.queue_regexp
+ regex = re.compile(self.queue_regexp)
+ print regex.search(contents)
+ if self.queue and not regex.search(contents):
+ log.msg("adding queue settings to %s script" % self.name)
+ new_script = self.queue_template % self.queue + '\n' + new_script
+ if new_script != contents:
+ self._temp_file.write(new_script)
self.template_file = self._temp_file.name
else:
+ default_script = self.default_template % n
+ if self.queue:
+ default_script = self.queue_template % self.queue + \
+ '\n' + default_script
log.msg("using default ipengine %s script: \n%s" %
- (self.name, (self.default_template % n)))
- self._temp_file.file.write(self.default_template % n)
+ (self.name, default_script))
+ self._temp_file.file.write(default_script)
self.template_file = self._temp_file.name
self._temp_file.file.flush()
d = getProcessOutput(self.submit_command,
@@ -334,9 +349,11 @@ class PBSEngineSet(BatchEngineSet):
name = 'PBS'
submit_command = 'qsub'
delete_command = 'qdel'
- script_param_prefix = "#PBS"
job_id_regexp = '\d+'
job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
+ job_array_template = '#PBS -t 1-%d'
+ queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
+ queue_template = '#PBS -q %s'
default_template="""#PBS -V
#PBS -t 1-%d
#PBS -N ipengine
@@ -347,8 +364,10 @@ class PBSEngineSet(BatchEngineSet):
class SGEEngineSet(PBSEngineSet):
name = 'SGE'
- script_param_prefix = "#$"
job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
+ job_array_template = '#$ -t 1-%d'
+ queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
+ queue_template = '#$ -q %s'
default_template="""#$ -V
#$ -t 1-%d
#$ -N ipengine
@@ -361,9 +380,11 @@ class LSFEngineSet(PBSEngineSet):
name = 'LSF'
submit_command = 'bsub'
delete_command = 'bkill'
- script_param_prefix = "#BSUB"
- job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]'
- default_template="""#BSUB ipengine[1-%d]
+ job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
+ job_array_template = '#BSUB -J ipengine[1-%d]'
+ queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
+ queue_template = '#BSUB -q %s'
+ default_template="""#BSUB -J ipengine[1-%d]
eid=$(($LSB_JOBINDEX - 1))
ipengine --logfile=ipengine${eid}.log
"""
@@ -654,7 +675,7 @@ def main_pbs(args):
cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(r):
- pbs_set = PBSEngineSet(args.pbsscript)
+ pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
def shutdown(signum, frame):
log.msg('Stopping PBS cluster')
d = pbs_set.kill()
@@ -687,7 +708,7 @@ def main_sge(args):
cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(r):
- sge_set = SGEEngineSet(args.sgescript)
+ sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
def shutdown(signum, frame):
log.msg('Stopping sge cluster')
d = sge_set.kill()
@@ -720,7 +741,7 @@ def main_lsf(args):
cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(r):
- lsf_set = LSFEngineSet(args.lsfscript)
+ lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
def shutdown(signum, frame):
log.msg('Stopping LSF cluster')
d = lsf_set.kill()
@@ -872,47 +893,76 @@ def get_args():
help="how to call MPI_Init (default=mpi4py)"
)
parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
-
+
parser_pbs = subparsers.add_parser(
- 'pbs',
+ 'pbs',
help='run a pbs cluster',
parents=[base_parser]
)
parser_pbs.add_argument(
+ '-s',
'--pbs-script',
- type=str,
+ type=str,
dest='pbsscript',
help='PBS script template',
default=''
)
+ parser_pbs.add_argument(
+ '-q',
+ '--queue',
+ type=str,
+ dest='pbsqueue',
+ help='PBS queue to use when starting the engines',
+ default=None,
+ )
parser_pbs.set_defaults(func=main_pbs)
-
+
parser_sge = subparsers.add_parser(
- 'sge',
+ 'sge',
help='run an sge cluster',
parents=[base_parser]
)
parser_sge.add_argument(
+ '-s',
'--sge-script',
- type=str,
+ type=str,
dest='sgescript',
help='SGE script template',
default='' # SGEEngineSet will create one if not specified
)
+ parser_sge.add_argument(
+ '-q',
+ '--queue',
+ type=str,
+ dest='sgequeue',
+ help='SGE queue to use when starting the engines',
+ default=None,
+ )
parser_sge.set_defaults(func=main_sge)
parser_lsf = subparsers.add_parser(
- 'lsf',
+ 'lsf',
help='run an lsf cluster',
parents=[base_parser]
)
+
parser_lsf.add_argument(
+ '-s',
'--lsf-script',
- type=str,
+ type=str,
dest='lsfscript',
help='LSF script template',
default='' # LSFEngineSet will create one if not specified
)
+
+ parser_lsf.add_argument(
+ '-q',
+ '--queue',
+ type=str,
+ dest='lsfqueue',
+ help='LSF queue to use when starting the engines',
+ default=None,
+ )
parser_lsf.set_defaults(func=main_lsf)
parser_ssh = subparsers.add_parser(

0 comments on commit 7952c77

Please sign in to comment.