Skip to content

Commit

Permalink
BibSched: new continuable error status for tasks
Browse files Browse the repository at this point in the history
* New --stop-on-error/--continue-on-error CLI parameter for bibtasks.
* Handles CERROR and ERROR.  CERROR is a continuable error that
  does not stop the queue.  ERROR is a fatal error that stops the queue.
* BibUpload will decide whether to issue an ERROR or a CERROR.
  (closes #856)
* Always send emergency notification to CFG_SITE_ADMIN_EMAIL.

Co-authored-by: Samuele Kaplun <samuele.kaplun@cern.ch>
  • Loading branch information
2 people authored and tiborsimko committed Apr 17, 2012
1 parent e1504d9 commit d74fa91
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 17 deletions.
40 changes: 32 additions & 8 deletions modules/bibsched/lib/bibsched.py
Expand Up @@ -54,7 +54,13 @@
from invenio.errorlib import register_exception, register_emergency
from invenio.shellutils import run_shell_command

CFG_VALID_STATUS = ('WAITING', 'SCHEDULED', 'RUNNING', 'CONTINUING', '% DELETED', 'ABOUT TO STOP', 'ABOUT TO SLEEP', 'STOPPED', 'SLEEPING', 'KILLED', 'NOW STOP')
CFG_VALID_STATUS = ('WAITING', 'SCHEDULED', 'RUNNING', 'CONTINUING',
'% DELETED', 'ABOUT TO STOP', 'ABOUT TO SLEEP', 'STOPPED',
'SLEEPING', 'KILLED', 'NOW STOP', 'ERRORS REPORTED')


class RecoverableError(StandardError):
pass


def get_pager():
Expand Down Expand Up @@ -318,7 +324,7 @@ def handle_keys(self, chr):
elif chr in (ord("s"), ord("S")):
self.sleep()
elif chr in (ord("k"), ord("K")):
if status in ('ERROR', 'DONE WITH ERRORS'):
if status in ('ERROR', 'DONE WITH ERRORS', 'ERRORS REPORTED'):
self.acknowledge()
elif status is not None:
self.kill()
Expand Down Expand Up @@ -602,7 +608,7 @@ def run(self):
def acknowledge(self):
task_id = self.currentrow[0]
status = self.currentrow[5]
if status in ('ERROR', 'DONE WITH ERRORS'):
if status in ('ERROR', 'DONE WITH ERRORS', 'ERRORS REPORTED'):
bibsched_set_status(task_id, 'ACK ' + status, status)
self.display_in_footer("Acknowledged error")

Expand Down Expand Up @@ -834,7 +840,7 @@ def repaint(self):
else:
self.display_in_footer(self.footer_select_mode, print_time_p=1)
footer2 = ""
if self.item_status.find("DONE") > -1 or self.item_status in ("ERROR", "STOPPED", "KILLED"):
if self.item_status.find("DONE") > -1 or self.item_status in ("ERROR", "STOPPED", "KILLED", "ERRORS REPORTED"):
footer2 += self.footer_stopped_item
elif self.item_status in ("RUNNING", "CONTINUING", "ABOUT TO STOP", "ABOUT TO SLEEP"):
footer2 += self.footer_running_item
Expand Down Expand Up @@ -1122,12 +1128,30 @@ def handle_task(self, task_id, proc, runtime, status, priority, host, sequenceid
return True

def watch_loop(self):
def check_errors():
sql = "SELECT count(id) FROM schTASK WHERE status='ERROR'" \
" OR status='DONE WITH ERRORS' OR STATUS='CERROR'"
if run_sql(sql)[0][0] > 0:
errors = run_sql("SELECT id,proc,status FROM schTASK" \
" WHERE status='ERROR' " \
"OR status='DONE WITH ERRORS'" \
"OR status='CERROR'")
msg_errors = [" #%s %s -> %s" % row for row in errors]
msg = 'BibTask with ERRORS:\n%s' % "\n".join(msg_errors)
err_types = set(e[2] for e in errors if e[2])
if 'ERROR' in err_types or 'DONE WITH ERRORS' in err_types:
raise StandardError(msg)
else:
raise RecoverableError(msg)

def calculate_rows():
"""Return all the node_relevant_active_tasks to work on."""
if run_sql("SELECT count(id) FROM schTASK WHERE status='ERROR' OR status='DONE WITH ERRORS'")[0][0] > 0:
errors = run_sql("SELECT id,proc,status FROM schTASK WHERE status='ERROR' OR status='DONE WITH ERRORS'")
errors = [" #%s %s -> %s" % row for row in errors]
raise StandardError('BibTask with ERRORS:\n%s' % "\n".join(errors))
try:
check_errors()
except RecoverableError, msg:
register_emergency('Light emergency from %s: BibTask failed: %s' % (CFG_SITE_URL, msg))
run_sql("UPDATE schTASK set status='ERRORS REPORTED' where status='CERROR'")

max_bibupload_priority = run_sql("SELECT max(priority) FROM schTASK WHERE status='WAITING' AND proc='bibupload' AND runtime<=NOW()")
if max_bibupload_priority:
run_sql("UPDATE schTASK SET priority=%s WHERE status='WAITING' AND proc='bibupload' AND runtime<=NOW()", ( max_bibupload_priority[0][0], ))
Expand Down
22 changes: 18 additions & 4 deletions modules/bibsched/lib/bibtask.py
Expand Up @@ -88,7 +88,9 @@
'priority': 0,
'runtime_limit': None,
'profile': [],
'post-process': [],
'sequence-id':None,
'stop_queue_on_error': False,
}

# Global _OPTIONS dictionary.
Expand Down Expand Up @@ -356,6 +358,7 @@ def task_init(
"profile" : [],
"post-process": [],
"sequence-id": None,
"stop_queue_on_error": False,
}
to_be_submitted = True
if len(sys.argv) == 2 and sys.argv[1].isdigit():
Expand Down Expand Up @@ -495,8 +498,10 @@ def _task_build_params(
"name=",
"limit=",
"profile=",
"post-process="
"sequence-id="
"post-process=",
"sequence-id=",
"stop-on-error",
"continue-on-error",
] + long_params)
except getopt.GetoptError, err:
_usage(1, err, help_specific_usage=help_specific_usage, description=description)
Expand Down Expand Up @@ -529,6 +534,10 @@ def _task_build_params(
_TASK_PARAMS["post-process"] += [opt[1]];
elif opt[0] in ("-I","--sequence-id"):
_TASK_PARAMS["sequence-id"] = opt[1]
elif opt[0] in ("--stop-on-error", ):
_TASK_PARAMS["stop_queue_on_error"] = True
elif opt[0] in ("--continue-on-error", ):
_TASK_PARAMS["stop_queue_on_error"] = False
elif not callable(task_submit_elaborate_specific_parameter_fnc) or \
not task_submit_elaborate_specific_parameter_fnc(opt[0],
opt[1], opts, args):
Expand Down Expand Up @@ -867,7 +876,10 @@ def _task_run(task_run_fnc):
pass
except:
register_exception(alert_admin=True)
task_update_status("ERROR")
if task_get_task_param('stop_queue_on_error'):
task_update_status("ERROR")
else:
task_update_status("CERROR")
finally:
task_status = task_read_status()
if sleeptime:
Expand Down Expand Up @@ -940,7 +952,9 @@ def _usage(exitcode=1, msg="", help_specific_usage="", description=""):
sys.stderr.write(" -V, --version\t\tPrint version information.\n")
sys.stderr.write(" -v, --verbose=LEVEL\tVerbose level (0=min,"
" 1=default, 9=max).\n")
sys.stderr.write(" --profile=STATS\tPrint profile information. STATS is a comma-separated\n\t\t\tlist of desired output stats (calls, cumulative,\n\t\t\tfile, line, module, name, nfl, pcalls, stdname, time).\n")
sys.stderr.write(" --profile=STATS\tPrint profile information. STATS is a comma-separated\n\t\t\tlist of desired output stats (calls, cumulative,\n\t\t\tfile, line, module, name, nfl, pcalls, stdname, time).\n")
sys.stderr.write(" --stop-on-error\tIn case of unrecoverable error stop the bibsched queue.\n")
sys.stderr.write(" --continue-on-error\tIn case of unrecoverable error don't stop the bibsched queue.\n")
sys.stderr.write(" --post-process=BIB_TASKLET_NAME[parameters]\tPostprocesses the specified\n\t\t\tbibtasklet with the given parameters between square\n\t\t\tbrackets.\n")
sys.stderr.write("\t\t\tExample:--post-process \"bst_send_email[fromaddr=\n\t\t\t'foo@xxx.com', toaddr='bar@xxx.com', subject='hello',\n\t\t\tcontent='help']\"\n")
if description:
Expand Down
1 change: 1 addition & 0 deletions modules/bibsched/lib/bibtask_config.py
Expand Up @@ -70,6 +70,7 @@
'stage_to_start_from': 1,
'pretend': False,
'force': False,
'stop_queue_on_error': True,
},
'bibindex': {
'cmd': 'add',
Expand Down
14 changes: 11 additions & 3 deletions modules/bibsched/lib/bibtaskex.py
Expand Up @@ -52,6 +52,9 @@ def task_submit_elaborate_specific_parameter(key, value, opts, args):
if key in ('-n', '--number'):
task_set_option('number', value)
return True
elif key in ('-e', '--error'):
task_set_option('error', True)
return True
return False

def task_run_core():
Expand All @@ -67,6 +70,8 @@ def task_run_core():
write_message("Error: water in the CPU. Ignoring and continuing.", sys.stderr, verbose=3)
elif i > 0 and i % 5 == 0:
write_message("Error: floppy drive dropped on the floor. Ignoring and continuing.", sys.stderr)
if task_get_option('error'):
1 / 0
write_message("fib(%d)=%d" % (i, fib(i)))
task_update_progress("Done %d out of %d." % (i, n))
task_sleep_now_if_required(can_stop_too=True)
Expand All @@ -78,10 +83,13 @@ def main():
"""Main that construct all the bibtask."""
task_init(authorization_action='runbibtaskex',
authorization_msg="BibTaskEx Task Submission",
help_specific_usage=""" -n, --number Print Fibonacci numbers for up to NUM. [default=30]\n""",
help_specific_usage="""\
-n, --number Print Fibonacci numbers for up to NUM. [default=30]
-e, --error Raise an error from time to time
""",
version=__revision__,
specific_params=("n:",
["number="]),
specific_params=("n:e",
["number=", "error"]),
task_submit_elaborate_specific_parameter_fnc=task_submit_elaborate_specific_parameter,
task_run_fnc=task_run_core)

Expand Down
9 changes: 7 additions & 2 deletions modules/bibupload/lib/bibupload.py
Expand Up @@ -523,7 +523,12 @@ def open_marc_file(path):
except IOError, erro:
write_message("Error: %s" % erro, verbose=1, stream=sys.stderr)
write_message("Exiting.", sys.stderr)
task_update_status("ERROR")
if erro.errno == 2:
# No such file or directory
# Not scary
task_update_status("CERROR")
else:
task_update_status("ERROR")
sys.exit(1)
return marc

Expand All @@ -540,7 +545,7 @@ def xml_marc_to_records(xml_marc):
write_message("Error: MARCXML file has wrong format: %s" % recs,
verbose=1, stream=sys.stderr)
write_message("Exiting.", sys.stderr)
task_update_status("ERROR")
task_update_status("CERROR")
sys.exit(1)
else:
recs = map((lambda x:x[0]), recs)
Expand Down
2 changes: 2 additions & 0 deletions modules/miscutil/lib/errorlib.py
Expand Up @@ -121,6 +121,8 @@ def register_emergency(msg, recipients=None):
from invenio.mailutils import send_email
if not recipients:
recipients = get_emergency_recipients()
recipients = set(recipients)
recipients.add(CFG_SITE_ADMIN_EMAIL)
for address_str in recipients:
send_email(CFG_SITE_SUPPORT_EMAIL, address_str, "Emergency notification", msg)

Expand Down

0 comments on commit d74fa91

Please sign in to comment.