Skip to content

Commit

Permalink
trying to add code to qdel child tasks as well with mqdel
Browse files Browse the repository at this point in the history
  • Loading branch information
mbreese committed Nov 28, 2013
1 parent fcf1bdb commit 5f7ca8b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
2 changes: 1 addition & 1 deletion bin/mqdel
Expand Up @@ -26,7 +26,7 @@ def qdel(job_start, job_stop=None):
for i in range(job_start, job_stop + 1):
jobs.append(i)

qtask.pipeline.runner.qdel(*jobs)
qtask.pipeline.runner.qdel(jobs, deps=True)


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions qtask/__init__.py
Expand Up @@ -162,10 +162,10 @@ def __init__(self, multiplier=1.0):
def done(self):
pass

def qdel(self, *jobid):
def qdel(self, jobid, deps=False):
raise NotImplementedError

def qrls(self, *jobid):
def qrls(self, jobid):
raise NotImplementedError

def qsub(self, task, monitor, dryrun=False):
Expand Down
4 changes: 2 additions & 2 deletions qtask/pbs.py
Expand Up @@ -7,8 +7,8 @@ def __init__(self, *args, **kwargs):

qtask.JobRunner.__init__(self, *args, **kwargs)

def qdel(self, *jobid):
def qdel(self, jobid, dep=False):
subprocess.call(["qdel", ' '.join([str(x) for x in jobid])])

def qrls(self, *jobid):
def qrls(self, jobid):
subprocess.call(["qrls", ' '.join([str(x) for x in jobid])])
38 changes: 28 additions & 10 deletions qtask/sge.py
Expand Up @@ -71,21 +71,21 @@ def qsub(self, task, monitor, dryrun=False):

src += '#$ -notify\n'
src += 'FAILED=""\n'
src += 'notify_stop() {\nchild_notify "SIGSTOP"\n}\n'
src += 'notify_kill() {\nchild_notify "SIGKILL"\n}\n'
src += 'child_notify() {\n'
src += 'notify_stop() {\ndepjob_notify "SIGSTOP"\n}\n'
src += 'notify_kill() {\ndepjob_notify "SIGKILL"\n}\n'
src += 'depjob_notify() {\n'
src += ' FAILED="1"\n'
src += ' child_kill $JOB_ID\n'
src += ' depjob_kill $JOB_ID\n'

if monitor:
src += ' "%s" "%s" signal $JOB_ID "$1"\n' % (qtask.QTASK_MON, monitor)
src += ' "%s" "%s" killdeps $JOB_ID\n' % (qtask.QTASK_MON, monitor)

src += '}\n'
src += 'child_kill() {\n'
src += 'depjob_kill() {\n'
src += ' local jid=""\n'
src += ' for jid in $(qstat -f -j $1 | grep jid_successor_list | awk \'{print $2}\' | sed -e \'s/,/ /g\'); do\n'
src += ' child_kill $jid\n'
src += ' depjob_kill $jid\n'
src += ' qdel $jid\n'
src += ' done\n'
src += '}\n'
Expand Down Expand Up @@ -116,7 +116,7 @@ def qsub(self, task, monitor, dryrun=False):
src += 'if [ "$FAILED" == "" ]; then\n'

src += ' if [ $RETVAL -ne 0 ]; then\n'
src += ' child_kill $JOB_ID\n'
src += ' depjob_kill $JOB_ID\n'
if monitor:
src += ' "%s" "%s" killdeps $JOB_ID\n' % (qtask.QTASK_MON, monitor)
src += ' fi\n'
Expand Down Expand Up @@ -145,8 +145,26 @@ def qsub(self, task, monitor, dryrun=False):
self.dry_run_cur_jobid += 1
return 'dryrun.%s' % jobid, src

def qdel(self, *jobid):
subprocess.call(["qdel", ','.join([str(x) for x in jobid])])
def _find_job_deps(self, jobid):
output = subprocess.check_output(["qstat", "-f", "-j", jobid])
for line in output.split('\n'):
if line.startswith('jid_successor_list:'):
line = line[len('jid_successor_list:'):].strip()
for depid in line.split(','):
yield depid


def qdel(self, jobid, deps=False):
del_list = set()
if type(jobid) == list:
for j in jobid:
del_list.add(j)
for depid in self._find_job_deps(j):
del_list.add(depid)
else:
del_list.add(jobid)

subprocess.call(["qdel", ','.join([str(x) for x in del_list])])

def qrls(self, *jobid):
def qrls(self, jobid):
subprocess.call(["qrls", ','.join([str(x) for x in jobid])])

0 comments on commit 5f7ca8b

Please sign in to comment.