Permalink
Browse files

update PBSEngineSet to use job arrays

refactored base class BatchEngineSet to handle job arrays. PBS and SGE
engine sets are much simpler now.
  • Loading branch information...
jtriley authored and satra committed Jul 18, 2010
1 parent 225cd96 commit 1edb98711d24eeef8ca15382aa80543e1bc7cf70
Showing with 44 additions and 67 deletions.
  1. +44 −67 IPython/kernel/scripts/ipcluster.py
@@ -271,98 +271,50 @@ def interrupt_then_kill(self, delay=1.0):
dfinal.addCallback(self._handle_stop)
return dfinal
-
class BatchEngineSet(object):
-
- # Subclasses must fill these in. See PBSEngineSet
+
+ # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
+ name = ''
submit_command = ''
delete_command = ''
+ script_param_prefix = ''
job_id_regexp = ''
-
+ job_array_regexp = ''
+ default_template = ''
+
def __init__(self, template_file, **kwargs):
self.template_file = template_file
- self.context = {}
- self.context.update(kwargs)
- self.batch_file = self.template_file+'-run'
-
+
def parse_job_id(self, output):
- m = re.match(self.job_id_regexp, output)
+ m = re.search(self.job_id_regexp, output)
if m is not None:
job_id = m.group()
else:
raise Exception("job id couldn't be determined: %s" % output)
self.job_id = job_id
log.msg('Job started with job id: %r' % job_id)
return job_id
-
- def write_batch_script(self, n):
- self.context['n'] = n
- template = open(self.template_file, 'r').read()
- log.msg('Using template for batch script: %s' % self.template_file)
- script_as_string = Itpl.itplns(template, self.context)
- log.msg('Writing instantiated batch script: %s' % self.batch_file)
- f = open(self.batch_file,'w')
- f.write(script_as_string)
- f.close()
-
+
def handle_error(self, f):
f.printTraceback()
f.raiseException()
-
- def start(self, n):
- self.write_batch_script(n)
- d = getProcessOutput(self.submit_command,
- [self.batch_file],env=os.environ)
- d.addCallback(self.parse_job_id)
- d.addErrback(self.handle_error)
- return d
-
- def kill(self):
- d = getProcessOutput(self.delete_command,
- [self.job_id],env=os.environ)
- return d
-
-class PBSEngineSet(BatchEngineSet):
-
- submit_command = 'qsub'
- delete_command = 'qdel'
- job_id_regexp = '\d+'
-
- def __init__(self, template_file, **kwargs):
- BatchEngineSet.__init__(self, template_file, **kwargs)
-
-class SGEEngineSet(PBSEngineSet):
-
- def __init__(self, template_file, **kwargs):
- BatchEngineSet.__init__(self, template_file, **kwargs)
- self._temp_file = None
-
- def parse_job_id(self, output):
- m = re.search(self.job_id_regexp, output)
- if m is not None:
- job_id = m.group()
- else:
- raise Exception("job id couldn't be determined: %s" % output)
- self.job_id = job_id
- log.msg('job started with job id: %r' % job_id)
- return job_id
def start(self, n):
log.msg("starting %d engines" % n)
self._temp_file = tempfile.NamedTemporaryFile()
- regex = re.compile('#\$[ \t]+-t[ \t]+\d+')
+ regex = re.compile(self.job_array_regexp)
if self.template_file:
- log.msg("Using sge script %s" % self.template_file)
+ log.msg("Using %s script %s" % (self.name, self.template_file))
contents = open(self.template_file, 'r').read()
if not regex.search(contents):
- log.msg("adding job array settings to sge script")
- contents = ("#$ -t 1-%d\n" % n) + contents
+ log.msg("adding job array settings to %s script" % self.name)
+ contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
self._temp_file.write(contents)
self.template_file = self._temp_file.name
else:
- log.msg("using default ipengine sge script: \n%s" %
- (sge_template % n))
- self._temp_file.file.write(sge_template % n)
+ log.msg("using default ipengine %s script: \n%s" %
+ (self.name, (self.default_template % n)))
+ self._temp_file.file.write(self.default_template % n)
self.template_file = self._temp_file.name
self._temp_file.file.flush()
d = getProcessOutput(self.submit_command,
@@ -372,7 +324,32 @@ def start(self, n):
d.addErrback(self.handle_error)
return d
-sge_template="""#$ -V
+ def kill(self):
+ d = getProcessOutput(self.delete_command,
+ [self.job_id],env=os.environ)
+ return d
+
+class PBSEngineSet(BatchEngineSet):
+
+ name = 'PBS'
+ submit_command = 'qsub'
+ delete_command = 'qdel'
+ script_param_prefix = "#PBS"
+ job_id_regexp = '\d+'
+ job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
+ default_template="""#PBS -V
+#PBS -t 1-%d
+#PBS -N ipengine
+eid=$(($PBS_ARRAYID - 1))
+ipengine --logfile=ipengine${eid}.log
+"""
+
+class SGEEngineSet(PBSEngineSet):
+
+ name = 'SGE'
+ script_param_prefix = "#$"
+ job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
+ default_template="""#$ -V
#$ -t 1-%d
#$ -N ipengine
eid=$(($SGE_TASK_ID - 1))
@@ -857,7 +834,7 @@ def get_args():
type=str,
dest='pbsscript',
help='PBS script template',
- default='pbs.template'
+ default=''
)
parser_pbs.set_defaults(func=main_pbs)

0 comments on commit 1edb987

Please sign in to comment.