diff --git a/lib/osvcd_scheduler.py b/lib/osvcd_scheduler.py index f47243b4cd..f3320c7613 100644 --- a/lib/osvcd_scheduler.py +++ b/lib/osvcd_scheduler.py @@ -32,10 +32,10 @@ def status(self, **kwargs): data = shared.OsvcThread.status(self, **kwargs) data["running"] = len(self.running) data["delayed"] = [{ - "cmd": " ".join(entry["cmd"]), + "cmd": " ".join(self.format_cmd(action, svcname, rids)), "queued": entry["queued"].strftime(shared.JSON_DATEFMT), "expire": entry["expire"].strftime(shared.JSON_DATEFMT), - } for entry in self.delayed.values()] + } for (action, delay, svcname, rids), entry in self.delayed.items()] return data def run(self): @@ -73,27 +73,41 @@ def do(self): self.run_scheduler() done = self.janitor_procs() - def drop_running(self, sig): - self.running -= set([sig]) + def drop_running(self, sigs): + self.running -= set(sigs) - def exec_action(self, sig, cmd): + def exec_action(self, sigs, cmd): kwargs = dict(stdout=self.devnull, stderr=self.devnull, stdin=self.devnull, close_fds=os.name!="nt") try: proc = Popen(cmd, **kwargs) except KeyboardInterrupt: return - self.running.add(sig) + self.running |= set(sigs) self.push_proc(proc=proc, on_success="drop_running", - on_success_args=[sig], + on_success_args=[sigs], on_error="drop_running", - on_error_args=[sig]) + on_error_args=[sigs]) - def queue_action(self, cmd, delay, sig): + def format_cmd(self, action, svcname=None, rids=None): + if svcname is None: + cmd = [rcEnv.paths.nodemgr, action] + elif isinstance(svcname, list): + cmd = [rcEnv.paths.svcmgr, "-s", ",".join(svcname), action, "--waitlock=5", "--parallel"] + else: + cmd = [rcEnv.paths.svcmgr, "-s", svcname, action, "--waitlock=5"] + if rids: + cmd += ["--rids", rids] + cmd.append("--cron") + return cmd + + def queue_action(self, action, delay=0, svcname=None, rids=None): + if rids: + rids = ",".join(rids) + sig = (action, delay, svcname, rids) if sig in self.delayed: self.log.debug("drop already queued action '%s'", " ".join(cmd)) - self.log.debug("drop already queued sig '%s'", sig) return [] if sig in self.running: self.log.debug("drop already running action '%s'", " ".join(cmd)) @@ -103,26 +117,58 @@ def queue_action(self, cmd, delay, sig): self.delayed[sig] = { "queued": now, "expire": exp, - "cmd": cmd, } - if delay == 0: - self.log.debug("queued action '%s' for run asap" % ' '.join(cmd)) + if delay > 0: + self.log.debug("queued action %s for run asap" % str(sig)) else: - self.log.debug("queued action '%s' delayed until %s" % (' '.join(cmd), exp)) + self.log.debug("queued action %s delayed until %s" % (str(sig), exp)) return [] - def dequeue_actions(self): + def get_todo(self): + """ + Merge queued tasks, sort by queued date, and return the first + tasks, where is the number of open slots in the running + queue. + """ + todo = {} + open_slots = max(self.max_tasks() - len(self.running), 0) now = datetime.datetime.utcnow() + for sig, task in self.delayed.items(): + if task["expire"] > now: + continue + action, delay, svcname, rids = sig + merge_key = (svcname is None, action, rids) + if merge_key not in todo: + if svcname: + _svcname = [svcname] + else: + _svcname = None + todo[merge_key] = { + "action": action, + "rids": rids, + "svcname": _svcname, + "sigs": [(action, delay, svcname, rids)], + "queued": task["queued"], + } + else: + if svcname: + todo[merge_key]["svcname"].append(svcname) + todo[merge_key]["sigs"].append((action, delay, svcname, rids)) + if task["queued"] < todo[merge_key]["queued"]: + todo[merge_key]["queued"] = task["queued"] + return sorted(todo.values(), key=lambda task: task["queued"])[:open_slots] + + def dequeue_actions(self): + """ + Get merged tasks to run from get_todo(), execute them and purge the + delayed hash. + """ dequeued = [] - to_run = [sig for sig, task in self.delayed.items() if task["expire"] < now] - to_run = sorted(to_run, key=lambda sig: self.delayed[sig]["queued"]) - open_slots = max(self.max_tasks() - len(self.running), 0) - for sig in to_run[:open_slots]: - self.log.info("run '%s' queued at %s", - " ".join(self.delayed[sig]["cmd"]), - self.delayed[sig]["queued"]) - self.exec_action(sig, self.delayed[sig]["cmd"]) - dequeued.append(sig) + for task in self.get_todo(): + cmd = self.format_cmd(task["action"], task["svcname"], task["rids"]) + self.log.info("run '%s' queued at %s", " ".join(cmd), task["queued"]) + self.exec_action(task["sigs"], cmd) + dequeued += task["sigs"] for sig in dequeued: del self.delayed[sig] @@ -138,10 +184,7 @@ def run_scheduler(self): delay = shared.NODE.sched.validate_action(action) except ex.excAbortAction: continue - cmd = [rcEnv.paths.nodemgr, action, "--cron"] - sig = ":".join(["node", action]) - run += self.queue_action(cmd, delay, sig) - todo = {} + run += self.queue_action(action, delay) for svcname in list(shared.SERVICES.keys()): try: svc = shared.SERVICES[svcname] @@ -161,39 +204,19 @@ def run_scheduler(self): data = svc.sched.validate_action(action) except ex.excAbortAction: continue - if action not in todo: - todo[action] = [] try: rids, delay = data - todo[action].append((svc.svcname, rids, delay)) except TypeError: delay = data - todo[action].append((svc.svcname, None, delay)) - for action, adata in todo.items(): - merged = [] - for svcname, rids, delay in adata: - if rids is not None or delay > 0: - cmd = [rcEnv.paths.svcmgr, "-s", svc.svcname, action, "--cron", "--waitlock=5"] - rids = ','.join(rids) - sig = ":".join(["svc", svc.svcname, action, rids]) - cmd += ["--rid", rids] - run += self.queue_action(cmd, delay, sig) - else: - merged.append(svcname) - if merged: - svcnames = ",".join(merged) - cmd = [rcEnv.paths.svcmgr, "-s", svcnames, action, "--cron", "--waitlock=5"] - if len(merged) > 1: - cmd.append("--parallel") - sig = ":".join(["svc", svcnames, action]) - run += self.queue_action(cmd, 0, sig) + rids = None + run += self.queue_action(action, delay, svc.svcname, rids) # log a scheduler loop digest msg = [] if len(nonprov) > 0: msg.append("non provisioned service skipped: %s." % ", ".join(nonprov)) if len(run) > 0: - msg.append("ran: %s." % ", ".join(run)) + msg.append("queued: %s." % ", ".join(run)) if len(msg) > 0: self.log.info(" ".join(msg))