Skip to content

Commit

Permalink
Refactor the scheduler command merging
Browse files Browse the repository at this point in the history
Merge at dequeue instead of at queue time, which allows to retain individual
task info in the delayed hash, and thus check if a command is already queued
or running.

This also permits merging tasks with compatible --rid specifiers, like
"sync all --rid sync#<n>" or "run --rid task#<n>".
  • Loading branch information
cvaroqui committed May 1, 2018
1 parent c738ecd commit 3323b1e
Showing 1 changed file with 74 additions and 51 deletions.
125 changes: 74 additions & 51 deletions lib/osvcd_scheduler.py
Expand Up @@ -32,10 +32,10 @@ def status(self, **kwargs):
data = shared.OsvcThread.status(self, **kwargs)
data["running"] = len(self.running)
data["delayed"] = [{
"cmd": " ".join(entry["cmd"]),
"cmd": " ".join(self.format_cmd(action, svcname, rids)),
"queued": entry["queued"].strftime(shared.JSON_DATEFMT),
"expire": entry["expire"].strftime(shared.JSON_DATEFMT),
} for entry in self.delayed.values()]
} for (action, delay, svcname, rids), entry in self.delayed.items()]
return data

def run(self):
Expand Down Expand Up @@ -73,27 +73,41 @@ def do(self):
self.run_scheduler()
done = self.janitor_procs()

def drop_running(self, sig):
self.running -= set([sig])
def drop_running(self, sigs):
self.running -= set(sigs)

def exec_action(self, sig, cmd):
def exec_action(self, sigs, cmd):
kwargs = dict(stdout=self.devnull, stderr=self.devnull,
stdin=self.devnull, close_fds=os.name!="nt")
try:
proc = Popen(cmd, **kwargs)
except KeyboardInterrupt:
return
self.running.add(sig)
self.running |= set(sigs)
self.push_proc(proc=proc,
on_success="drop_running",
on_success_args=[sig],
on_success_args=[sigs],
on_error="drop_running",
on_error_args=[sig])
on_error_args=[sigs])

def queue_action(self, cmd, delay, sig):
def format_cmd(self, action, svcname=None, rids=None):
if svcname is None:
cmd = [rcEnv.paths.nodemgr, action]
elif isinstance(svcname, list):
cmd = [rcEnv.paths.svcmgr, "-s", ",".join(svcname), action, "--waitlock=5", "--parallel"]
else:
cmd = [rcEnv.paths.svcmgr, "-s", svcname, action, "--waitlock=5"]
if rids:
cmd += ["--rids", rids]
cmd.append("--cron")
return cmd

def queue_action(self, action, delay=0, svcname=None, rids=None):
if rids:
rids = ",".join(rids)
sig = (action, delay, svcname, rids)
if sig in self.delayed:
self.log.debug("drop already queued action '%s'", " ".join(cmd))
self.log.debug("drop already queued sig '%s'", sig)
return []
if sig in self.running:
self.log.debug("drop already running action '%s'", " ".join(cmd))
Expand All @@ -103,26 +117,58 @@ def queue_action(self, cmd, delay, sig):
self.delayed[sig] = {
"queued": now,
"expire": exp,
"cmd": cmd,
}
if delay == 0:
self.log.debug("queued action '%s' for run asap" % ' '.join(cmd))
if delay > 0:
self.log.debug("queued action %s for run asap" % str(sig))
else:
self.log.debug("queued action '%s' delayed until %s" % (' '.join(cmd), exp))
self.log.debug("queued action %s delayed until %s" % (str(sig), exp))
return []

def dequeue_actions(self):
def get_todo(self):
"""
Merge queued tasks, sort by queued date, and return the first
<n> tasks, where <n> is the number of open slots in the running
queue.
"""
todo = {}
open_slots = max(self.max_tasks() - len(self.running), 0)
now = datetime.datetime.utcnow()
for sig, task in self.delayed.items():
if task["expire"] > now:
continue
action, delay, svcname, rids = sig
merge_key = (svcname is None, action, rids)
if merge_key not in todo:
if svcname:
_svcname = [svcname]
else:
_svcname = None
todo[merge_key] = {
"action": action,
"rids": rids,
"svcname": _svcname,
"sigs": [(action, delay, svcname, rids)],
"queued": task["queued"],
}
else:
if svcname:
todo[merge_key]["svcname"].append(svcname)
todo[merge_key]["sigs"].append((action, delay, svcname, rids))
if task["queued"] < todo[merge_key]["queued"]:
todo[merge_key]["queued"] = task["queued"]
return sorted(todo.values(), key=lambda task: task["queued"])[:open_slots]

def dequeue_actions(self):
"""
Get merged tasks to run from get_todo(), execute them and purge the
delayed hash.
"""
dequeued = []
to_run = [sig for sig, task in self.delayed.items() if task["expire"] < now]
to_run = sorted(to_run, key=lambda sig: self.delayed[sig]["queued"])
open_slots = max(self.max_tasks() - len(self.running), 0)
for sig in to_run[:open_slots]:
self.log.info("run '%s' queued at %s",
" ".join(self.delayed[sig]["cmd"]),
self.delayed[sig]["queued"])
self.exec_action(sig, self.delayed[sig]["cmd"])
dequeued.append(sig)
for task in self.get_todo():
cmd = self.format_cmd(task["action"], task["svcname"], task["rids"])
self.log.info("run '%s' queued at %s", " ".join(cmd), task["queued"])
self.exec_action(task["sigs"], cmd)
dequeued += task["sigs"]
for sig in dequeued:
del self.delayed[sig]

Expand All @@ -138,10 +184,7 @@ def run_scheduler(self):
delay = shared.NODE.sched.validate_action(action)
except ex.excAbortAction:
continue
cmd = [rcEnv.paths.nodemgr, action, "--cron"]
sig = ":".join(["node", action])
run += self.queue_action(cmd, delay, sig)
todo = {}
run += self.queue_action(action, delay)
for svcname in list(shared.SERVICES.keys()):
try:
svc = shared.SERVICES[svcname]
Expand All @@ -161,39 +204,19 @@ def run_scheduler(self):
data = svc.sched.validate_action(action)
except ex.excAbortAction:
continue
if action not in todo:
todo[action] = []
try:
rids, delay = data
todo[action].append((svc.svcname, rids, delay))
except TypeError:
delay = data
todo[action].append((svc.svcname, None, delay))
for action, adata in todo.items():
merged = []
for svcname, rids, delay in adata:
if rids is not None or delay > 0:
cmd = [rcEnv.paths.svcmgr, "-s", svc.svcname, action, "--cron", "--waitlock=5"]
rids = ','.join(rids)
sig = ":".join(["svc", svc.svcname, action, rids])
cmd += ["--rid", rids]
run += self.queue_action(cmd, delay, sig)
else:
merged.append(svcname)
if merged:
svcnames = ",".join(merged)
cmd = [rcEnv.paths.svcmgr, "-s", svcnames, action, "--cron", "--waitlock=5"]
if len(merged) > 1:
cmd.append("--parallel")
sig = ":".join(["svc", svcnames, action])
run += self.queue_action(cmd, 0, sig)
rids = None
run += self.queue_action(action, delay, svc.svcname, rids)

# log a scheduler loop digest
msg = []
if len(nonprov) > 0:
msg.append("non provisioned service skipped: %s." % ", ".join(nonprov))
if len(run) > 0:
msg.append("ran: %s." % ", ".join(run))
msg.append("queued: %s." % ", ".join(run))
if len(msg) > 0:
self.log.info(" ".join(msg))

0 comments on commit 3323b1e

Please sign in to comment.