Permalink
Browse files

update SGEEngineSet to use SGE job arrays

SGE job arrays allow one job id to be associated with a set of processes on
an SGE cluster.

modified SGEEngineSet to be a subclass PBSEngineSet

ipcluster will now generate a default SGE job script if --sge-script is
not provided. most folks should ignore the --sge-script option unless they
know they need it.

if --sge-script is passed, check that the script exists and that the user has
defined a "#$ -t" setting within the script. if not, add the setting
for them by copying the script to a temp file and launching the job
array using the modified temp file.

ipengines terminate cleanly now when the ipcluster command exits.

i think we still need to handle furl files when engines are assigned to different
hosts via SGE. without using ssh or nfs, the only other way is to put
the contents of the furl in the job script before submission but this is
less secure. need to discuss this.
  • Loading branch information...
1 parent 70725a5 commit 225cd966aafbe718b9fe21d437d5e5e6dd59cff3 @jtriley jtriley committed with Jul 18, 2010
Showing with 42 additions and 48 deletions.
  1. +42 −48 IPython/kernel/scripts/ipcluster.py
View
90 IPython/kernel/scripts/ipcluster.py
@@ -331,64 +331,54 @@ class PBSEngineSet(BatchEngineSet):
def __init__(self, template_file, **kwargs):
BatchEngineSet.__init__(self, template_file, **kwargs)
-class SGEEngineSet(BatchEngineSet):
-
- submit_command = 'qsub'
- delete_command = 'qdel'
- job_id_regexp = '\d+'
-
+class SGEEngineSet(PBSEngineSet):
+
def __init__(self, template_file, **kwargs):
BatchEngineSet.__init__(self, template_file, **kwargs)
- self.num_engines = None
+ self._temp_file = None
def parse_job_id(self, output):
m = re.search(self.job_id_regexp, output)
if m is not None:
job_id = m.group()
else:
raise Exception("job id couldn't be determined: %s" % output)
- self.job_id.append(job_id)
- log.msg('Job started with job id: %r' % job_id)
+ self.job_id = job_id
+ log.msg('job started with job id: %r' % job_id)
return job_id
-
- def kill_job(self, output):
- log.msg(output)
- return output
-
- def write_batch_script(self, i):
- context = {'eid':i}
- template = open(self.template_file, 'r').read()
- log.msg('Using template for batch script: %s' % self.template_file)
- script_as_string = Itpl.itplns(template, context)
- log.msg('Writing instantiated batch script: %s' % self.batch_file+str(i))
- f = open(self.batch_file+str(i),'w')
- f.write(script_as_string)
- f.close()
-
+
def start(self, n):
- dlist = []
- self.num_engines = 0
- self.job_id = []
- for i in range(n):
- log.msg("starting engine: %d"%i)
- self.write_batch_script(i)
- d = getProcessOutput(self.submit_command,
- [self.batch_file+str(i)],env=os.environ)
- d.addCallback(self.parse_job_id)
- d.addErrback(self.handle_error)
- dlist.append(d)
- return gatherBoth(dlist, consumeErrors=True)
-
- def kill(self):
- dlist = []
- for i in range(self.num_engines):
- log.msg("killing job id: %d"%self.job_id[i])
- d = getProcessOutput(self.delete_command,
- [self.job_id[i]],env=os.environ)
- d.addCallback(self.kill_job)
- dlist.append(d)
- return gatherBoth(dlist, consumeErrors=True)
-
+ log.msg("starting %d engines" % n)
+ self._temp_file = tempfile.NamedTemporaryFile()
+ regex = re.compile('#\$[ \t]+-t[ \t]+\d+')
+ if self.template_file:
+ log.msg("Using sge script %s" % self.template_file)
+ contents = open(self.template_file, 'r').read()
+ if not regex.search(contents):
+ log.msg("adding job array settings to sge script")
+ contents = ("#$ -t 1-%d\n" % n) + contents
+ self._temp_file.write(contents)
+ self.template_file = self._temp_file.name
+ else:
+ log.msg("using default ipengine sge script: \n%s" %
+ (sge_template % n))
+ self._temp_file.file.write(sge_template % n)
+ self.template_file = self._temp_file.name
+ self._temp_file.file.flush()
+ d = getProcessOutput(self.submit_command,
+ [self.template_file],
+ env=os.environ)
+ d.addCallback(self.parse_job_id)
+ d.addErrback(self.handle_error)
+ return d
+
+sge_template="""#$ -V
+#$ -t 1-%d
+#$ -N ipengine
+eid=$(($SGE_TASK_ID - 1))
+ipengine --logfile=ipengine${eid}.log
+"""
+
sshx_template="""#!/bin/sh
"$@" &> /dev/null &
echo $!
@@ -696,6 +686,10 @@ def main_sge(args):
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
+
+ if args.sgescript and not os.path.isfile(args.sgescript):
+ log.err('SGE script does not exist: %s' % args.sgescript)
+ return
cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
@@ -877,7 +871,7 @@ def get_args():
type=str,
dest='sgescript',
help='SGE script template',
- default='template.sge'
+ default='' # SGEEngineSet will create one if not specified
)
parser_sge.set_defaults(func=main_sge)

0 comments on commit 225cd96

Please sign in to comment.