Skip to content

Commit

Permalink
Merge branch 'master' of git+ssh://www.opensvc.com/home/opensvc/opensvc
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaudveron committed Jun 10, 2017
2 parents d0359ea + 00fac71 commit cde681e
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 101 deletions.
6 changes: 6 additions & 0 deletions bin/pkg/make_doc
@@ -0,0 +1,6 @@
#!/bin/bash

PATH_SCRIPT="$(cd $(/usr/bin/dirname $(whence -- $0 || echo $0));pwd)"

sudo python $PATH_SCRIPT/../../lib/svcDict.py

136 changes: 81 additions & 55 deletions lib/osvcd.py
Expand Up @@ -73,6 +73,8 @@ def _decrypt(ciphertext, key, iv):
HB_MSG_LOCK = threading.RLock()
SERVICES = {}
SERVICES_LOCK = threading.RLock()
MON_DATA = {}
MON_DATA_LOCK = threading.RLock()


def fork(func, args=None, kwargs=None):
Expand Down Expand Up @@ -206,6 +208,29 @@ def get_services_nodenames(self):
nodenames |= svc.nodes | svc.drpnodes
return nodenames

def set_service_monitor(self, svcname, status=None):
with MON_DATA_LOCK:
if svcname not in MON_DATA:
MON_DATA[svcname] = Storage({})
if status:
self.log.info(
"service %s monitor status change: %s => %s",
svcname,
MON_DATA[svcname].status if MON_DATA[svcname].status else "none",
status
)
MON_DATA[svcname].status = status
MON_DATA[svcname].updated = datetime.datetime.utcnow()

def get_service_monitor(self, svcname, datestr=False):
with MON_DATA_LOCK:
if svcname not in MON_DATA:
self.set_service_monitor(svcname, "idle")
data = Storage(MON_DATA[svcname])
if datestr:
data.updated = data.updated.strftime(DATEFMT)
return data


#
class Crypt(object):
Expand Down Expand Up @@ -239,6 +264,10 @@ def gen_iv(urandom=[], locker=threading.RLock()):
locker.release()

def decrypt(self, message):
if hasattr(self, "node"):
config = self.node.config
else:
config = self.config
message = bdecode(message).rstrip("\0\x00")
try:
message = json.loads(message)
Expand All @@ -251,10 +280,10 @@ def decrypt(self, message):
uuid_key = "uuid"
else:
uuid_key = "uuid@"+message["nodename"]
if not self.config.has_option("node", uuid_key):
if not config.has_option("node", uuid_key):
self.log.error("no %s to use as AES key", uuid_key)
return None, None
key = self.config.get("node", uuid_key).encode("utf-8")
key = config.get("node", uuid_key).encode("utf-8")
if len(key) > 32:
key = key[:32]
data = bdecode(self._decrypt(data, key, iv))
Expand All @@ -264,10 +293,14 @@ def decrypt(self, message):
return message["nodename"], data

def encrypt(self, data):
if not self.config.has_option("node", "uuid"):
if hasattr(self, "node"):
config = self.node.config
else:
config = self.config
if not config.has_option("node", "uuid"):
self.log.error("no uuid to use as AES key")
return
key = self.config.get("node", "uuid").encode("utf-8")
key = config.get("node", "uuid").encode("utf-8")
if len(key) > 32:
key = key[:32]
iv = self.gen_iv()
Expand All @@ -279,32 +312,36 @@ def encrypt(self, data):
return (json.dumps(message)+'\0').encode()

def get_listener_info(self, nodename):
if hasattr(self, "node"):
config = self.node.config
else:
config = self.config
if nodename == rcEnv.nodename:
if not self.config.has_section("listener"):
if not config.has_section("listener"):
return "127.0.0.1", rcEnv.listener_port
if self.config.has_option("listener", "addr@"+nodename):
addr = self.config.get("listener", "addr@"+nodename)
elif self.config.has_option("listener", "addr"):
addr = self.config.get("listener", "addr")
if config.has_option("listener", "addr@"+nodename):
addr = config.get("listener", "addr@"+nodename)
elif config.has_option("listener", "addr"):
addr = config.get("listener", "addr")
else:
addr = "127.0.0.1"
if self.config.has_option("listener", "port@"+nodename):
port = self.config.getint("listener", "port@"+nodename)
elif self.config.has_option("listener", "port"):
port = self.config.getint("listener", "port")
if config.has_option("listener", "port@"+nodename):
port = config.getint("listener", "port@"+nodename)
elif config.has_option("listener", "port"):
port = config.getint("listener", "port")
else:
port = rcEnv.listener_port
else:
if not self.config.has_section("listener"):
if not config.has_section("listener"):
return nodename, rcEnv.listener_port
if self.config.has_option("listener", "addr@"+nodename):
addr = self.config.get("listener", "addr@"+nodename)
if config.has_option("listener", "addr@"+nodename):
addr = config.get("listener", "addr@"+nodename)
else:
addr = nodename
if self.config.has_option("listener", "port@"+nodename):
port = self.config.getint("listener", "port@"+nodename)
elif self.config.has_option("listener", "port"):
port = self.config.getint("listener", "port")
if config.has_option("listener", "port@"+nodename):
port = config.getint("listener", "port@"+nodename)
elif config.has_option("listener", "port"):
port = config.getint("listener", "port")
else:
port = rcEnv.listener_port
return addr, port
Expand Down Expand Up @@ -989,6 +1026,13 @@ def action_get_hb_data(self, nodename, **kwargs):
with HB_MSG_LOCK:
return HB_MSG

def action_clear(self, nodename, **kwargs):
svcname = kwargs.get("svcname")
if not svcname:
return {"error": "no svcname specified", "status": 1}
self.set_service_monitor(svcname, "idle")
return {"status": 0}


#
class Scheduler(OsvcThread):
Expand Down Expand Up @@ -1052,8 +1096,6 @@ def run(self):
self.log = logging.getLogger(rcEnv.nodename+".osvcd.monitor")
self.last_run = 0
self.log.info("monitor started")
self.data = {}
self.data_lock = threading.RLock()

while True:
self.do()
Expand Down Expand Up @@ -1175,7 +1217,7 @@ def service_orchestrator(self, svc):

now = datetime.datetime.utcnow()
if svc.clustertype == "failover":
if status in ("down", "stdby down"):
if status in ("down", "stdby down", "stdby up"):
if smon.status == "ready":
if smon.updated < (now - MON_WAIT_READY):
self.log.info("failover service %s status %s/ready for %s", svc.svcname, status, now-smon.updated)
Expand All @@ -1186,16 +1228,23 @@ def service_orchestrator(self, svc):
self.set_service_monitor(svc.svcname, "ready")
elif svc.clustertype == "flex":
n_up = self.count_up_service_instances(svc.svcname)
if n_up < svc.flex_min_nodes:
if smon.status == "ready":
if (n_up - 1) >= svc.flex_min_nodes:
self.log.info("flex service %s instance count reached required minimum while we were ready", svc.svcname, status)
self.set_service_monitor(svc.svcname, "idle")
return
if smon.updated < (now - MON_WAIT_READY):
self.log.info("flex service %s status %s/ready for %s", svc.svcname, status, now-smon.updated)
self.set_service_monitor(svc.svcname, "starting")
self.service_command(svc.svcname, ["start"])
elif smon.status == "idle":
if n_up >= svc.flex_min_nodes:
return
instance = self.get_service_instance(svc.svcname, rcEnv.nodename)
if smon.status == "ready":
if smon.updated < (now - MON_WAIT_READY):
self.log.info("flex service %s status %s/ready for %s", svc.svcname, status, now-smon.updated)
self.set_service_monitor(svc.svcname, "starting")
self.service_command(svc.svcname, ["start"])
elif instance.avail in ("down", "stdby down"):
self.log.info("flex service %s started, starting or ready to start instances: %d/%d. local status %s", svc.svcname, n_up, svc.flex_min_nodes, instance.avail)
self.set_service_monitor(svc.svcname, "ready")
if instance.avail not in ("down", "stdby down", "stdby up"):
return
self.log.info("flex service %s started, starting or ready to start instances: %d/%d. local status %s", svc.svcname, n_up, svc.flex_min_nodes, instance.avail)
self.set_service_monitor(svc.svcname, "ready")

def count_up_service_instances(self, svcname):
n_up = 0
Expand Down Expand Up @@ -1345,29 +1394,6 @@ def get_services_status(self, svcnames):

return data

def set_service_monitor(self, svcname, status=None):
with self.data_lock:
if svcname not in self.data:
self.data[svcname] = Storage({})
if status:
self.log.info(
"service %s monitor status change: %s => %s",
svcname,
self.data[svcname].status if self.data[svcname].status else "none",
status
)
self.data[svcname].status = status
self.data[svcname].updated = datetime.datetime.utcnow()

def get_service_monitor(self, svcname, datestr=False):
with self.data_lock:
if svcname not in self.data:
self.set_service_monitor(svcname, "idle")
data = Storage(self.data[svcname])
if datestr:
data.updated = data.updated.strftime(DATEFMT)
return data

def update_hb_data(self):
#self.log.info("update heartbeat data to send")
load_avg = os.getloadavg()
Expand Down
9 changes: 9 additions & 0 deletions lib/resContainerDocker.py
Expand Up @@ -521,6 +521,15 @@ def container_forcestop(self):
def _ping(self):
return check_ping(self.addr, timeout=1)

def is_down(self):
if self.docker_service:
hosted = len(self.service_hosted_instances())
if hosted > 0:
return False
return True
else:
return not self.is_up()

def is_up(self):
if self.svc.dockerlib.docker_daemon_private and \
self.svc.dockerlib.docker_data_dir is None:
Expand Down
12 changes: 11 additions & 1 deletion lib/svc.py
Expand Up @@ -25,6 +25,7 @@
import rcLogger
import node
from rcScheduler import scheduler_fork, Scheduler, SchedOpts
from osvcd import Crypt

if sys.version_info[0] < 3:
BrokenPipeError = IOError
Expand Down Expand Up @@ -344,7 +345,7 @@ def _func(self):
func(self)
return _func

class Svc(object):
class Svc(Crypt):
"""
A OpenSVC service class.
A service is a collection of resources.
Expand Down Expand Up @@ -5038,4 +5039,13 @@ def compliance_auto(self):
return
self.action("compliance_auto")

def clear(self):
options = {
"svcname": self.svcname,
}
data = self.daemon_send(
{"action": "clear", "options": options},
nodename=self.options.node,
)
print(json.dumps(data, indent=4, sort_keys=True))

14 changes: 14 additions & 0 deletions lib/svcmgr_parser.py
Expand Up @@ -158,6 +158,10 @@
action="store", dest="moduleset",
help="compliance, set moduleset list. The 'all' value "
"can be used in conjonction with detach."),
"node": Option(
"--node", default="",
action="store", dest="node",
help="the node to send a request to. if not specified the local node is targeted."),
"onlyprimary": Option(
"--onlyprimary", default=None,
action="store_true", dest="parm_primary",
Expand Down Expand Up @@ -330,6 +334,10 @@
OPT.ignore_affinity,
]

DAEMON_OPTS = [
OPT.node
]

ACTIONS = {
'Service actions': {
'boot': {
Expand All @@ -338,6 +346,12 @@
'startstandby if not',
'options': ACTION_OPTS + START_ACTION_OPTS,
},
'clear': {
'msg': 'clear the monitor status of the service on the node pointed '
'by --node. If --node is not specified, the local node is '
'targeted.',
'options': DAEMON_OPTS,
},
'dns_update': {
'msg': 'update the collector dns records for the service',
'options': ACTION_OPTS,
Expand Down

0 comments on commit cde681e

Please sign in to comment.