Skip to content

Commit

Permalink
[scheduler] Detect action that have been lost
Browse files Browse the repository at this point in the history
When scheduler exec action, action process may fail, with no trace.
This patch will log when such failure is detected.

@sched_action decorated functions will update dedicated flag file
during call.

Flag file is OSVC_SCHED_FLAG env var value (path_var/scheduler/launched.<sigs>.session_id)
  • Loading branch information
cgalibern committed Dec 6, 2021
1 parent 0a53625 commit 0c3f711
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 37 deletions.
76 changes: 44 additions & 32 deletions opensvc/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,39 @@ def __str__(self):
def to_time(dt):
return time.mktime(dt.timetuple()) + dt.microsecond / 1E6


def trace_sched_action():
"""
Create a trace flag from OSVC_SCHED_FLAG env var
This trace flag can be used from daemon scheduler to ensure crm task has been launched
"""
flag_launched = os.environ.get("OSVC_SCHED_FLAG")
if not flag_launched:
return
try:
last = datetime.datetime.fromtimestamp(float(os.environ["OSVC_SCHED_TIME"]))
except Exception:
last = str(datetime.datetime.now())
timestamp(flag_launched, last=last)


def timestamp(timestamp_f, last=None):
"""
Update the timestamp file <timestamp_f>.
Create missing parent directories if needed.
"""
if last is None:
last = time.time()
elif type(last) == datetime.datetime:
last = to_time(last)
timestamp_d = os.path.dirname(timestamp_f)
if not os.path.isdir(timestamp_d):
os.makedirs(timestamp_d, 0o755)
buff = repr(last)
with open(timestamp_f, 'w') as ofile:
ofile.write(buff+os.linesep)


def sched_action(func):
"""
A decorator in charge of updating the scheduler tasks and subtasks
Expand All @@ -131,6 +164,7 @@ def _func(self, action, options=None):
self.sched.configure(action=action)
if action in self.sched.actions:
self.sched.action_timestamps(action, options.rid)
trace_sched_action()
try:
ret = func(self, action, options)
except ex.AbortAction:
Expand Down Expand Up @@ -913,27 +947,6 @@ def configure(self, *args, **kwargs):
getattr(self.obj, self.configure_method)(*args, **kwargs)
self.configured = True

def _timestamp(self, timestamp_f, last=None):
"""
Update the timestamp file <timestamp_f>.
If <timestamp_f> if is not a fullpath, consider it parented to
<pathvar>.
Create missing parent directories if needed.
"""
if last is None:
last = time.time()
elif type(last) == datetime.datetime:
last = to_time(last)
if not timestamp_f.startswith(os.sep):
timestamp_f = self.get_timestamp_f(timestamp_f)
timestamp_d = os.path.dirname(timestamp_f)
if not os.path.isdir(timestamp_d):
os.makedirs(timestamp_d, 0o755)
buff = repr(last)
with open(timestamp_f, 'w') as ofile:
ofile.write(buff+os.linesep)
return True

def get_last(self, fname, success=False):
"""
Return the last task run timestamp, fetched from the on-disk cache.
Expand Down Expand Up @@ -1037,7 +1050,7 @@ def action_timestamps(self, action, rids=None, success=False):
last = datetime.datetime.now()

for tsfile in tsfiles:
self._timestamp(tsfile, last=last)
timestamp(tsfile, last=last)

def print_schedule(self):
"""
Expand Down Expand Up @@ -1162,17 +1175,16 @@ def __print_schedule_data(self, action, sopt, now, with_next=True):

if __name__ == '__main__':
now = datetime.datetime.now()
last = datetime.datetime.now()
# s = Schedule("00:10-01:00@1,03:10-04:00@10,@1h sun-mon:last,fri:first,wed 10-40 2,jun-aug")
# Schedule("18:00-19:00@10").validate(now.replace(hour=18, minute=51), last=now.replace(hour=18, minute=51))
# Schedule("18:00-19:00@10").validate(now.replace(hour=15, minute=01))
# s = Schedule(["14:00-21:00@10", "!12:00-18:55", "!19:10-20:00"])
last_value = datetime.datetime.now()
# s = Schedule("00:10-01:00@1,03:10-04:00@10,@1h sun-mon:last,fri:first,wed 10-40 2,jun-aug")
# Schedule("18:00-19:00@10").validate(now.replace(hour=18, minute=51), last=now.replace(hour=18, minute=51))
# Schedule("18:00-19:00@10").validate(now.replace(hour=15, minute=01))
# s = Schedule(["14:00-21:00@10", "!12:00-18:55", "!19:10-20:00"])
s = Schedule("@11s mon:last")
# s = Schedule("")
# s = Schedule("")
print(json.dumps(s.data, indent=4))
for _ in range(10):
last, interval = s.get_next(now=last, last=last)
if not last:
last_value, interval = s.get_next(now=last_value, last=last_value)
if not last_value:
break
print(last, last.strftime("%c"), interval)

print(last_value, last_value.strftime("%c"), interval)
23 changes: 18 additions & 5 deletions opensvc/daemon/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ def janitor_run_done(self):
#self.privlog.debug("dropped_via_notify: %s", self.dropped_via_notify)
return

def post_exec_action(self, sigs, flag_launched, cmd_s):
"""
Verify lost tasks (tasks where flag_launched has not been created)
then call drop_running
"""
if not os.path.exists(flag_launched):
self.log.warning("failed run '%s'", cmd_s)
else:
os.unlink(flag_launched)
self.drop_running(sigs)

def drop_running(self, sigs):
"""
Drop for running tasks signatures those not yet dropped via
Expand All @@ -226,9 +237,12 @@ def exec_action(self, sigs, path, action, rids, queued, now, session_id):
cmd = Env.om + cmd_args
self.privlog.info("run '%s'", " ".join(cmd_log))

flag_name = "launched.%s.%s" % (",".join(["-".join([str(s) for s in sig]) for sig in sigs]), session_id)
flag_launched = str(os.path.join(Env.paths.pathvar, "scheduler", flag_name))
env = os.environ.copy()
env["OSVC_ACTION_ORIGIN"] = "daemon"
env["OSVC_SCHED_TIME"] = str(now)
env["OSVC_SCHED_FLAG"] = flag_launched
env["OSVC_PARENT_SESSION_UUID"] = session_id

kwargs = dict(stdout=self.devnull, stderr=self.devnull,
Expand All @@ -249,13 +263,12 @@ def exec_action(self, sigs, path, action, rids, queued, now, session_id):
self.lasts[sig] = now
self.push_proc(proc=proc,
cmd=cmd,
on_success="drop_running",
on_success_args=[sigs],
on_error="drop_running",
on_error_args=[sigs])
on_success="post_exec_action",
on_success_args=[sigs, flag_launched, " ".join(cmd_log)],
on_error="post_exec_action",
on_error_args=[sigs, flag_launched, " ".join(cmd_log)])

@staticmethod

def get_cmd_args(action, path=None, rids=None):
if path is None:
cmd = ["node", action]
Expand Down

0 comments on commit 0c3f711

Please sign in to comment.