diff --git a/scripts/qiita-recover-jobs b/scripts/qiita-recover-jobs new file mode 100644 index 000000000..a8b9e2dbd --- /dev/null +++ b/scripts/qiita-recover-jobs @@ -0,0 +1,137 @@ +#!/usr/bin/env python + +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- +from subprocess import check_output +from qiita_db.sql_connection import TRN +from qiita_db.processing_job import ProcessingJob +from time import sleep + + +SLEEP_TIME = 2 + + +def _submit_jobs(jids_to_recover, recover_type): + len_jids_to_recover = len(jids_to_recover) - 1 + for i, j in enumerate(jids_to_recover): + print 'recovering %s: %d/%d' % (recover_type, len_jids_to_recover, i) + job = ProcessingJob(j) + job._set_status('in_construction') + job.submit() + sleep(SLEEP_TIME) + + +def qiita_recover_jobs(): + # Step 1: get current jobs running in the queue + qiita_jobs = [line.split()[0] for line in check_output("qstat").split("\n") + # just retriving 'qiita' and ignoring [] (ipython workers) + if 'qiita' in line and '[]' not in line and + # and private jobs + 'private' not in line] + qiita_jids = [] + for qj in qiita_jobs: + # to retrieve info about the jobs we need to use the fullname, so + # appending .ucsd.edu + args = ["qstat", "-f", "%s.ucsd.edu" % qj] + # the name is the last string of the line and has .txt prepended + qji = [line.split()[-1].split(".")[0] + for line in check_output(args).split("\n") + if 'Job_Name' in line] + qiita_jids.extend(qji) + qiita_jids = set(qiita_jids) + + sql = """SELECT processing_job_id + FROM qiita.processing_job + JOIN qiita.processing_job_status + USING (processing_job_status_id) + WHERE processing_job_status = %s""" + sql_validators = """SELECT processing_job_id, array_agg(validator_id) + FROM qiita.processing_job_validator + WHERE processing_job_id in %s + GROUP BY processing_job_id""" + + # Step 2: recover jobs that are in queue status + with TRN: + recover_type = 'queued' + TRN.add(sql, [recover_type]) + jids = set(TRN.execute_fetchflatten()) + jids_to_recover = jids - qiita_jids + + _submit_jobs(jids_to_recover, recover_type) + + # Step 3: recover jobs that are running, note that there are several steps + # to recover this group: 3.1. check if they have validators, + # 3.2. if so, recover validators, 3. recover failed jobs + with TRN: + # getting all "running" jobs, discard those in qiita_jids + recover_type = 'running' + TRN.add(sql, [recover_type]) + jids = set(TRN.execute_fetchflatten()) + jids_to_recover = jids - qiita_jids + + # 3.1, and 3.2: checking which jobs have validators, and recover them + + TRN.add(sql_validators, [tuple(jids_to_recover)]) + jids_validator = TRN.execute_fetchindex() + jobs_with_validators = [] + for j, validators in jids_validator: + validators = validators[1:-1].split(',') + # we append the main job as it shouldn't be touch + # and all their validators + jobs_with_validators.append(j) + jobs_with_validators.extend(validators) + job = ProcessingJob(j) + status = set([ProcessingJob(v).status + for v in validators if v not in qiita_jids]) + # if there are no status, that means that the validators weren't + # created and we should rerun from scratch (Step 4) + if not bool(status): + continue + # it multiple status in the validators, it's a complex behaivor + # and needs a case by case solution + if len(status) != 1: + print ("Job '%s' has too many validators status (%d), check " + "them by hand" % (j, len(status))) + continue + status = list(status)[0] + + if status == 'waiting': + print "releasing job validators: %s" % j + job.release_validators() + sleep(SLEEP_TIME) + elif status == 'running': + _submit_jobs(validators, recover_type + ' validator, running') + elif status == 'error': + # in this case is the same process than before but we need + # to split the set in_construction and submit in 2 steps, + # however, we can still submit via _submit_jobs + for v in validators: + vjob = ProcessingJob(v) + vjob._set_status('in_construction') + _submit_jobs(validators + ' validator, error') + else: + print ("Check the status of this job %s : %s and validators" + "%s." % (j, status, validators)) + + jids_to_recover = jids_to_recover - set(jobs_with_validators) + + # Step 4: Finally, we recover all the leftover jobs + for i, j in enumerate(jids_to_recover): + job = ProcessingJob(j) + status = job.status + + if status == 'waiting': + print "releasing job validators: %s" % j + job.release_validators() + sleep(SLEEP_TIME) + elif 'running' == status: + _submit_jobs([j], 'main_job, running') + + +if __name__ == '__main__': + qiita_recover_jobs()