Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions scripts/qiita-recover-jobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/env python

# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from subprocess import check_output
from qiita_db.sql_connection import TRN
from qiita_db.processing_job import ProcessingJob
from time import sleep


SLEEP_TIME = 2


def _submit_jobs(jids_to_recover, recover_type):
len_jids_to_recover = len(jids_to_recover) - 1
for i, j in enumerate(jids_to_recover):
print 'recovering %s: %d/%d' % (recover_type, len_jids_to_recover, i)
job = ProcessingJob(j)
job._set_status('in_construction')
job.submit()
sleep(SLEEP_TIME)


def qiita_recover_jobs():
# Step 1: get current jobs running in the queue
qiita_jobs = [line.split()[0] for line in check_output("qstat").split("\n")
# just retriving 'qiita' and ignoring [] (ipython workers)
if 'qiita' in line and '[]' not in line and
# and private jobs
'private' not in line]
qiita_jids = []
for qj in qiita_jobs:
# to retrieve info about the jobs we need to use the fullname, so
# appending .ucsd.edu
args = ["qstat", "-f", "%s.ucsd.edu" % qj]
# the name is the last string of the line and has .txt prepended
qji = [line.split()[-1].split(".")[0]
for line in check_output(args).split("\n")
if 'Job_Name' in line]
qiita_jids.extend(qji)
qiita_jids = set(qiita_jids)

sql = """SELECT processing_job_id
FROM qiita.processing_job
JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE processing_job_status = %s"""
sql_validators = """SELECT processing_job_id, array_agg(validator_id)
FROM qiita.processing_job_validator
WHERE processing_job_id in %s
GROUP BY processing_job_id"""

# Step 2: recover jobs that are in queue status
with TRN:
recover_type = 'queued'
TRN.add(sql, [recover_type])
jids = set(TRN.execute_fetchflatten())
jids_to_recover = jids - qiita_jids

_submit_jobs(jids_to_recover, recover_type)

# Step 3: recover jobs that are running, note that there are several steps
# to recover this group: 3.1. check if they have validators,
# 3.2. if so, recover validators, 3. recover failed jobs
with TRN:
# getting all "running" jobs, discard those in qiita_jids
recover_type = 'running'
TRN.add(sql, [recover_type])
jids = set(TRN.execute_fetchflatten())
jids_to_recover = jids - qiita_jids

# 3.1, and 3.2: checking which jobs have validators, and recover them

TRN.add(sql_validators, [tuple(jids_to_recover)])
jids_validator = TRN.execute_fetchindex()
jobs_with_validators = []
for j, validators in jids_validator:
validators = validators[1:-1].split(',')
# we append the main job as it shouldn't be touch
# and all their validators
jobs_with_validators.append(j)
jobs_with_validators.extend(validators)
job = ProcessingJob(j)
status = set([ProcessingJob(v).status
for v in validators if v not in qiita_jids])
# if there are no status, that means that the validators weren't
# created and we should rerun from scratch (Step 4)
if not bool(status):
continue
# it multiple status in the validators, it's a complex behaivor
# and needs a case by case solution
if len(status) != 1:
print ("Job '%s' has too many validators status (%d), check "
"them by hand" % (j, len(status)))
continue
status = list(status)[0]

if status == 'waiting':
print "releasing job validators: %s" % j
job.release_validators()
sleep(SLEEP_TIME)
elif status == 'running':
_submit_jobs(validators, recover_type + ' validator, running')
elif status == 'error':
# in this case is the same process than before but we need
# to split the set in_construction and submit in 2 steps,
# however, we can still submit via _submit_jobs
for v in validators:
vjob = ProcessingJob(v)
vjob._set_status('in_construction')
_submit_jobs(validators + ' validator, error')
else:
print ("Check the status of this job %s : %s and validators"
"%s." % (j, status, validators))

jids_to_recover = jids_to_recover - set(jobs_with_validators)

# Step 4: Finally, we recover all the leftover jobs
for i, j in enumerate(jids_to_recover):
job = ProcessingJob(j)
status = job.status

if status == 'waiting':
print "releasing job validators: %s" % j
job.release_validators()
sleep(SLEEP_TIME)
elif 'running' == status:
_submit_jobs([j], 'main_job, running')


if __name__ == '__main__':
qiita_recover_jobs()