Skip to content

Commit

Permalink
osvcd enhancement
Browse files Browse the repository at this point in the history
* add the resource restart handler to the monitor thread
* add a service monitor "expect" property to trace the expected
status of services, if any
* add to service actions a daemon notification to update the
monitor status and expected status
* build the cached service minimal=False mode, so that the
daemon has access to the resource properties, notably "nb_restart"
  • Loading branch information
cvaroqui committed Jun 12, 2017
1 parent 63bf4c2 commit 8b69d9d
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 8 deletions.
147 changes: 139 additions & 8 deletions lib/osvcd.py
Expand Up @@ -263,7 +263,7 @@ def get_services_nodenames(self):
nodenames |= svc.nodes | svc.drpnodes
return nodenames

def set_service_monitor(self, svcname, status=None):
def set_service_monitor(self, svcname, status=None, expect=None):
global MON_DATA
global MON_DATA_LOCK
with MON_DATA_LOCK:
Expand All @@ -277,6 +277,8 @@ def set_service_monitor(self, svcname, status=None):
status
)
MON_DATA[svcname].status = status
if expect:
MON_DATA[svcname].expect = expect
MON_DATA[svcname].updated = datetime.datetime.utcnow()

def get_service_monitor(self, svcname, datestr=False):
Expand Down Expand Up @@ -405,7 +407,7 @@ def get_listener_info(self, nodename):
port = rcEnv.listener_port
return addr, port

def daemon_send(self, data, nodename=None, with_result=True):
def daemon_send(self, data, nodename=None, with_result=True, silent=False):
if nodename is None:
nodename = rcEnv.nodename
addr, port = self.get_listener_info(nodename)
Expand All @@ -432,7 +434,8 @@ def daemon_send(self, data, nodename=None, with_result=True):
nodename, data = self.decrypt(data)
return data
except socket.error as exc:
self.log.error("init error: %s", str(exc))
if not silent:
self.log.error("init error: %s", str(exc))
return
finally:
sock.close()
Expand Down Expand Up @@ -1085,9 +1088,18 @@ def action_get_service_config(self, nodename, **kwargs):

def action_clear(self, nodename, **kwargs):
svcname = kwargs.get("svcname")
if not svcname:
return self._action_set_service_monitor(nodename, svcname, "idle")

def action_set_service_monitor(self, nodename, **kwargs):
svcname = kwargs.get("svcname")
status = kwargs.get("status")
expect = kwargs.get("expect")
return self._action_set_service_monitor(nodename, svcname, status=status, expect=expect)

def _action_set_service_monitor(self, nodename, svcname=None, status=None, expect=None):
if svcname is None:
return {"error": "no svcname specified", "status": 1}
self.set_service_monitor(svcname, "idle")
self.set_service_monitor(svcname, status=status, expect=expect)
return {"status": 0}


Expand Down Expand Up @@ -1229,6 +1241,24 @@ def service_command(self, svcname, cmd):
proc = Popen(cmd, stdout=None, stderr=None, stdin=None, close_fds=True)
return proc

def service_start_resources(self, svcname, rids):
proc = self.service_command(svcname, ["start", "--rid", ",".join(rids)])
self.push_proc(
proc=proc,
on_success="service_start_resources_on_success", on_success_args=[svcname, rids],
on_error="service_start_resources_on_error", on_error_args=[svcname, rids],
)

def service_start_resources_on_error(self, svcname, rids):
self.set_service_monitor(svcname, status="idle", expect="up")
self.update_hb_data()

def service_start_resources_on_success(self, svcname, rids):
self.set_service_monitor(svcname, status="idle", expect="up")
for rid in rids:
self.service_monitor_reset_retries(svcname, rid)
self.update_hb_data()

def service_start(self, svcname):
proc = self.service_command(svcname, ["start"])
self.push_proc(
Expand All @@ -1241,7 +1271,7 @@ def service_start_on_error(self, svcname):
self.set_service_monitor(svcname, "start failed")

def service_start_on_success(self, svcname):
self.set_service_monitor(svcname, "idle")
self.set_service_monitor(svcname, status="idle", expect="up")

def orchestrator(self):
global SERVICES
Expand All @@ -1250,6 +1280,47 @@ def orchestrator(self):
svcs = SERVICES.values()
for svc in svcs:
self.service_orchestrator(svc)
self.resources_orchestrator(svc)

def resources_orchestrator(self, svc):
global CLUSTER_DATA
global CLUSTER_DATA_LOCK
if svc.frozen():
#self.log.info("service %s orchestrator out (frozen)", svc.svcname)
return
if svc.disabled:
#self.log.info("service %s orchestrator out (disabled)", svc.svcname)
return
try:
with CLUSTER_DATA_LOCK:
resources = CLUSTER_DATA[rcEnv.nodename]["services"]["status"][svc.svcname]["resources"]
except KeyError:
return
rids = []
for rid, resource in resources.items():
if not resource["monitor"]:
continue
if resource["status"] != "down":
continue
smon = self.get_service_monitor(svc.svcname)
if smon.status != "idle":
continue
if smon.expect != "up":
continue
nb_restart = svc.get_resource(rid).nb_restart
retries = self.service_monitor_get_retries(svc.svcname, rid)
if retries > nb_restart:
continue
if retries >= nb_restart:
self.service_monitor_inc_retries(svc.svcname, rid)
self.log.info("max retries (%d) reached for %s.%s", nb_restart, svc.svcname, rid)
continue
self.service_monitor_inc_retries(svc.svcname, rid)
self.log.info("restart resource %s.%s %d/%d", svc.svcname, rid, retries+1, nb_restart)
rids.append(rid)
if len(rids) > 0:
self.set_service_monitor(svc.svcname, "restarting")
self.service_start_resources(svc.svcname, rids)

def service_orchestrator(self, svc):
if svc.frozen():
Expand Down Expand Up @@ -1315,7 +1386,7 @@ def count_up_service_instances(self, svcname):
for instance in self.get_service_instances(svcname):
if instance["avail"] == "up":
n_up += 1
elif instance["monitor"]["status"] in ("starting", "ready"):
elif instance["monitor"]["status"] in ("restarting", "starting", "ready"):
n_up += 1
return n_up

Expand Down Expand Up @@ -1476,7 +1547,7 @@ def get_services_config(self):
cksum = self.fsum(cfg)
try:
with SERVICES_LOCK:
SERVICES[svcname] = build(svcname, minimal=True)
SERVICES[svcname] = build(svcname)
except Exception as exc:
self.log.error("%s build error: %s", svcname, str(exc))
else:
Expand Down Expand Up @@ -1507,6 +1578,13 @@ def service_status_fallback(self, svcname):
return json.loads(bdecode(out))

def get_services_status(self, svcnames):
"""
Return the local services status data, fetching data from status.json
caches if their mtime changed or from CLUSTER_DATA[rcEnv.nodename] if
not.
Also update the monitor 'expect' field for each service.
"""
global CLUSTER_DATA
global CLUSTER_DATA_LOCK
with CLUSTER_DATA_LOCK:
Expand All @@ -1532,6 +1610,7 @@ def get_services_status(self, svcnames):
data[svcname] = self.service_status_fallback(svcname)
except Exception:
data[svcname] = self.service_status_fallback(svcname)
self.set_service_monitor_expect_from_status(data, svcname)
data[svcname]["monitor"] = self.get_service_monitor(svcname, datestr=True)

# purge deleted services
Expand All @@ -1540,6 +1619,58 @@ def get_services_status(self, svcnames):

return data

def service_monitor_reset_retries(self, svcname, rid):
global MON_DATA
global MON_DATA_LOCK
with MON_DATA_LOCK:
if svcname not in MON_DATA:
return
if "restart" not in MON_DATA[svcname]:
return
if rid in MON_DATA[svcname].restart:
del MON_DATA[svcname].restart[rid]
if len(MON_DATA[svcname].restart.keys()) == 0:
del MON_DATA[svcname].restart

def service_monitor_get_retries(self, svcname, rid):
global MON_DATA
global MON_DATA_LOCK
with MON_DATA_LOCK:
if svcname not in MON_DATA:
return 0
if "restart" not in MON_DATA[svcname]:
return 0
if rid not in MON_DATA[svcname].restart:
return 0
else:
return MON_DATA[svcname].restart[rid]

def service_monitor_inc_retries(self, svcname, rid):
global MON_DATA
global MON_DATA_LOCK
with MON_DATA_LOCK:
if svcname not in MON_DATA:
return
if "restart" not in MON_DATA[svcname]:
MON_DATA[svcname].restart = Storage()
if rid not in MON_DATA[svcname].restart:
MON_DATA[svcname].restart[rid] = 1
else:
MON_DATA[svcname].restart[rid] += 1

def set_service_monitor_expect_from_status(self, data, svcname):
global MON_DATA
global MON_DATA_LOCK
if svcname not in data:
return
with MON_DATA_LOCK:
if svcname not in MON_DATA:
return
if data[svcname]["avail"] == "up" and MON_DATA[svcname].expect != "up":
self.log.info("service %s monitor expect change %s => %s", svcname,
MON_DATA[svcname].expect, "up")
MON_DATA[svcname].expect = "up"

def update_hb_data(self):
global CLUSTER_DATA
global CLUSTER_DATA_LOCK
Expand Down
36 changes: 36 additions & 0 deletions lib/svc.py
Expand Up @@ -357,6 +357,7 @@ def __init__(self, svcname=None):
self.dependencies = {}
self.running_action = None
self.need_postsync = set()
self.service_monitor_set = False

# set by the builder
self.conf = os.path.join(rcEnv.paths.pathetc, svcname+".conf")
Expand Down Expand Up @@ -749,6 +750,15 @@ def setup_signal_handlers():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def get_resource(self, rid):
"""
Return a resource object by id.
Return None if the rid is not found.
"""
if rid not in self.resources_by_id:
return
return self.resources_by_id[rid]

def get_resources(self, _type=None, strict=False, discard_disabled=True):
"""
Return the list of resources matching criteria.
Expand Down Expand Up @@ -2221,6 +2231,7 @@ def slave_run(self):
self.encap_cmd(['run'], verbose=True)

def start(self):
self.set_service_monitor(status="starting", expect="up")
self.abort_start()
af_svc = self.get_non_affine_svc()
if len(af_svc) != 0:
Expand Down Expand Up @@ -2255,6 +2266,7 @@ def rollback(self):
self.rollbackip()

def stop(self):
self.set_service_monitor(status="stopping", expect="down")
self.slave_stop()
try:
self.master_stopapp()
Expand Down Expand Up @@ -3287,6 +3299,9 @@ def action(self, action, options=None):
finally:
if action != "scheduler":
self.set_run_flag()
if self.service_monitor_set:
self.set_service_monitor(status="idle")
self.service_monitor_set = False

def options_to_rids(self, options):
"""
Expand Down Expand Up @@ -4767,6 +4782,9 @@ def compliance_auto(self):
return
self.action("compliance_auto")

#
# daemon communications
#
def clear(self):
options = {
"svcname": self.svcname,
Expand All @@ -4777,6 +4795,24 @@ def clear(self):
)
print(json.dumps(data, indent=4, sort_keys=True))

def set_service_monitor(self, status=None, expect=None):
self.service_monitor_set = True
options = {
"svcname": self.svcname,
"status": status,
"expect": expect,
}
try:
data = self.daemon_send(
{"action": "set_service_monitor", "options": options},
nodename=self.options.node,
silent=True,
)
if data and data["status"] != 0:
self.log.warning("set monitor status failed")
except Exception as exc:
self.log.warning("set monitor status failed: %s", str(exc))

#
# config helpers
#
Expand Down

0 comments on commit 8b69d9d

Please sign in to comment.