Skip to content

Commit

Permalink
Restore the probabilistic schedule feature in collaboration with osvcd
Browse files Browse the repository at this point in the history
All task except the sync are eligible to this behaviour.

When the scheduler thread identifies a task is allowed, a random delay is
computed and the task execution is postponed to this further date.

The random delay is chosen to keep the execution in the allowed timerange.

This patch also removes more scheduler code from the Node and Svc objet.
Notably the 'scheduler' and 'schedulers' action.
  • Loading branch information
cvaroqui committed Dec 13, 2017
1 parent 71978c0 commit f86d3c3
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 305 deletions.
32 changes: 0 additions & 32 deletions lib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,11 +817,7 @@ def action(self, action, options=None):
Looks up which method to handle the action (some are not implemented
in the Node class), and call the handling method.
"""
if action == "scheduler":
return self.scheduler()
try:
if self.options.cron:
self.sched.validate_action(action)
self.async_action(action)
except ex.excAbortAction:
return 0
Expand Down Expand Up @@ -880,34 +876,6 @@ def print_schedule(self):
"""
return self.sched.print_schedule()

def scheduler(self):
"""
The node scheduler entrypoint.
Evaluates execution constraints for all scheduled tasks and executes
the tasks if required.
"""
self.options.cron = True
for action in self.sched.scheduler_actions:
try:
self.action(action)
except:
self.log.exception("")

def schedulers(self):
"""
schedulers node action entrypoint.
Run the node scheduler and each configured service scheduler.
"""
purge_cache_expired()
self.scheduler()

self.build_services()
for svc in self.svcs:
try:
svc.scheduler()
except ex.excError as exc:
svc.log.error(exc)

def get_push_objects(self, section):
"""
Returns the object names to do inventory on.
Expand Down
7 changes: 0 additions & 7 deletions lib/nodemgr_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,6 @@
"reboot": {
"msg": "Reboot the node.",
},
"scheduler": {
"msg": "Run the node task scheduler.",
},
"schedulers": {
"msg": "Execute a run of the node and services schedulers. This "
"action is installed in the system scheduler.",
},
"schedule_reboot_status": {
"msg": "Tell if the node is scheduled for reboot.",
},
Expand Down
94 changes: 68 additions & 26 deletions lib/osvcd_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import logging
import time
import datetime
from subprocess import Popen, PIPE

import osvcd_shared as shared
Expand All @@ -13,6 +14,16 @@

class Scheduler(shared.OsvcThread):
interval = 60
delayed = {}

def status(self, **kwargs):
data = shared.OsvcThread.status(self, **kwargs)
data["delayed"] = [{
"cmd": " ".join(entry["cmd"]),
"queued": entry["queued"].strftime(shared.JSON_DATEFMT),
"expire": entry["expire"].strftime(shared.JSON_DATEFMT),
} for entry in self.delayed.values()]
return data

def run(self):
self.log = logging.getLogger(rcEnv.nodename+".osvcd.scheduler")
Expand Down Expand Up @@ -42,30 +53,65 @@ def do(self):
if self.stopped():
break

def run_scheduler(self):
#self.log.info("run schedulers")
def exec_action(self, cmd):
kwargs = dict(stdout=self.devnull, stderr=self.devnull,
stdin=self.devnull, close_fds=True)
run = []
try:
proc = Popen(cmd, **kwargs)
except KeyboardInterrupt:
return
self.push_proc(proc=proc)

def queue_action(self, cmd, delay, sig):
if sig in self.delayed:
return []
if delay == 0:
self.log.debug("immediate exec of action '%s'" % ' '.join(cmd))
self.exec_action(cmd)
return [sig]
now = datetime.datetime.utcnow()
exp = now + datetime.timedelta(seconds=delay)
self.delayed[sig] = {
"queued": now,
"expire": exp,
"cmd": cmd,
}
self.log.debug("queued action '%s' delayed until %s" % (' '.join(cmd), exp))
return []

def dequeue_action(self, data, now):
if now < data["expire"]:
return False
self.log.info("dequeue action '%s' queued at %s" % (data["cmd"], data["queued"]))
self.exec_action(data["cmd"])
return True

def dequeue_actions(self):
now = datetime.datetime.utcnow()
dequeued = []
for sig, data in self.delayed.items():
ret = self.dequeue_action(data, now)
if ret:
dequeued.append(sig)
for sig in dequeued:
del self.delayed[sig]

def run_scheduler(self):
#self.log.info("run schedulers")
nonprov = []
run = []

self.dequeue_actions()
if shared.NODE:
shared.NODE.options.cron = True
_run = []
for action in shared.NODE.sched.scheduler_actions:
try:
shared.NODE.sched.validate_action(action)
delay = shared.NODE.sched.validate_action(action)
except ex.excAbortAction:
continue
_run.append(action)
cmd = [rcEnv.paths.nodemgr, action, "--cron"]
try:
proc = Popen(cmd, **kwargs)
except KeyboardInterrupt:
return
self.push_proc(proc=proc)
if len(_run) > 0:
run.append("node:%s" % ",".join(_run))
sig = ":".join(["node", action])
run += self.queue_action(cmd, delay, sig)
for svc in shared.SERVICES.values():
svc.options.cron = True
try:
Expand All @@ -75,25 +121,21 @@ def run_scheduler(self):
if provisioned is not True:
nonprov.append(svc.svcname)
continue
_run = []
for action in svc.sched.scheduler_actions:
try:
rids = svc.sched.validate_action(action)
data = svc.sched.validate_action(action)
except ex.excAbortAction:
continue
cmd = [rcEnv.paths.svcmgr, "-s", svc.svcname, action, "--cron", "--waitlock=5"]
if rids:
cmd += ["--rid", ",".join(rids)]
_run.append("%s(%s)" % (action, ','.join(rids)))
else:
_run.append(action)
try:
proc = Popen(cmd, **kwargs)
except KeyboardInterrupt:
return
self.push_proc(proc=proc)
if len(_run) > 0:
run.append("%s:%s" % (svc.svcname, ",".join(_run)))
rids, delay = data
rids = ','.join(rids)
sig = ":".join(["svc", action, rids])
cmd += ["--rid", rids]
except TypeError:
delay = data
sig = ":".join(["svc", action])
run += self.queue_action(cmd, delay, sig)

# log a scheduler loop digest
msg = []
Expand Down
3 changes: 2 additions & 1 deletion lib/osvcd_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
COMPAT_VERSION = 4

DATEFMT = "%Y-%m-%dT%H:%M:%S.%fZ"
JSON_DATEFMT = "%Y-%m-%dT%H:%M:%SZ"
MAX_MSG_SIZE = 1024 * 1024

# The threads store
Expand Down Expand Up @@ -167,7 +168,7 @@ def status(self, **kwargs):
data = Storage({
"state": state,
"created": datetime.datetime.utcfromtimestamp(self.created)\
.strftime('%Y-%m-%dT%H:%M:%SZ'),
.strftime(JSON_DATEFMT),
})
return data

Expand Down
Loading

0 comments on commit f86d3c3

Please sign in to comment.