Skip to content

Commit

Permalink
Merge pull request #489 from cgalibern/b2.1-fixes-2
Browse files Browse the repository at this point in the history
b2.1-fixes-2
  • Loading branch information
cgalibern committed Nov 9, 2021
2 parents f5adfbd + 38218ee commit 004c46d
Show file tree
Hide file tree
Showing 16 changed files with 311 additions and 48 deletions.
5 changes: 4 additions & 1 deletion bin/postinstall
Expand Up @@ -43,7 +43,10 @@ if lsb:
pathetc = "/etc/opensvc"
pathvar = "/var/lib/opensvc"
pathlck = '/var/lib/opensvc/lock'
pathtmp = "/var/tmp/opensvc"
if sysname != "SunOS":
pathtmp = "/var/tmp/opensvc"
else:
pathtmp = "/var/lib/opensvc/tmp"
pathlog = "/var/log/opensvc"
pathbin = "/usr/share/opensvc/bin"
pathlib = "/usr/share/opensvc/opensvc"
Expand Down
5 changes: 3 additions & 2 deletions opensvc/core/collector/rpc.py
Expand Up @@ -491,9 +491,10 @@ def push_config(self, svc, sync=True):

def repr_config(svc):
import codecs
if not os.path.exists(svc.paths_cf):
cfg_file = svc.path_cf
if not os.path.exists(cfg_file):
return
with codecs.open(svc.paths_cf, 'r', encoding="utf8") as f:
with codecs.open(cfg_file, 'r', encoding="utf8") as f:
buff = f.read()
return buff

Expand Down
8 changes: 2 additions & 6 deletions opensvc/core/objects/svc.py
Expand Up @@ -2838,7 +2838,7 @@ def post_commit(self):
def configure_scheduler(self, *args, **kwargs):
pass

def as_storage(self):
def send_service_config_args(self):
return Storage({
"path": self.path,
"path_cf": self.paths.cf,
Expand All @@ -2858,10 +2858,6 @@ def as_storage(self):
"encap": self.encap,
})

def containers_as_storage_list(self):
return [container.as_storage()
for container in self.get_resources('container')]


class Svc(PgMixin, BaseSvc):
"""
Expand Down Expand Up @@ -4937,7 +4933,7 @@ def push_config(self):
Push the service config to the collector. Usually done
automatically by the collector thread.
"""
self.node.collector.call('push_config', self)
self.node.collector.call('push_config', self.send_service_config_args())

def push_resinfo(self):
"""
Expand Down
5 changes: 3 additions & 2 deletions opensvc/daemon/collector.py
Expand Up @@ -190,7 +190,8 @@ def send_containerinfo(self, path):
service = shared.SERVICES[path]
if not service.has_encap_resources:
return
containers = service.containers_as_storage_list()
containers = [container.send_containerinfo_arg()
for container in service.get_resources('container')]

if len(containers) == 0:
return
Expand All @@ -209,7 +210,7 @@ def send_service_config(self, path, csum):
with shared.SERVICES_LOCK:
if path not in shared.SERVICES:
return
svc = shared.SERVICES[path].as_storage()
svc = shared.SERVICES[path].send_service_config_args()

self.log.info("send service %s config", path)
try:
Expand Down
2 changes: 1 addition & 1 deletion opensvc/daemon/handlers/daemon/shutdown/post.py
Expand Up @@ -44,7 +44,7 @@ def action(self, nodename, thr=None, **kwargs):
_, _, kind = split_path(path)
if kind not in ("svc", "vol"):
continue
thr.set_smon(path, local_expect="shutdown")
thr.defer_set_smon(path, local_expect="shutdown", origin="post /daemon_shutdown")
self.wait_shutdown(thr=thr)

# send a last status to peers so they can takeover asap
Expand Down
2 changes: 1 addition & 1 deletion opensvc/daemon/handlers/node/drain/post.py
Expand Up @@ -57,7 +57,7 @@ def action(self, nodename, thr=None, **kwargs):
_, _, kind = split_path(path)
if kind not in ("svc", "vol"):
continue
thr.set_smon(path, local_expect="shutdown")
thr.defer_set_smon(path, local_expect="shutdown", origin="post /node_drain")
if options.wait:
try:
self.wait_shutdown(timeout=options.time, thr=thr)
Expand Down
2 changes: 1 addition & 1 deletion opensvc/daemon/handlers/object/clear/post.py
Expand Up @@ -30,6 +30,6 @@ def action(self, nodename, thr=None, **kwargs):
if smon.status.endswith("ing"):
return {"info": "skip clear on %s instance" % smon.status, "status": 0}
thr.log_request("clear %s monitor status" % options.path, nodename, **kwargs)
thr.set_smon(options.path, status="idle", reset_retries=True)
thr.defer_set_smon(options.path, status="idle", reset_retries=True, origin="post /object_clear")
return {"status": 0, "info": "%s instance cleared" % options.path}

10 changes: 3 additions & 7 deletions opensvc/daemon/handlers/object/monitor/post.py
Expand Up @@ -122,19 +122,15 @@ def action(self, nodename, thr=None, **kwargs):
options.global_expect = new_ge
if options.global_expect:
data["data"]["global_expect"] = options.global_expect
info.append("%s target state set to %s" % (path, options.global_expect))
set_smon_result = thr.set_smon(
info.append("%s defer target state set to %s" % (path, options.global_expect))
thr.defer_set_smon(
path, status=options.status,
local_expect=options.local_expect,
global_expect=options.global_expect,
reset_retries=options.reset_retries,
stonith=options.stonith,
origin="post /object_monitor",
)
if set_smon_result is False:
message = "skipped set %s monitor with:" % path
for option in ["status", "local_expect", "global_expect", "reset_retries", "stonith"]:
message += " %s=%s" % (option, options.get(option))
thr.log.info(message)
data["status"] = len(errors)
if info:
data["info"] = info
Expand Down
12 changes: 12 additions & 0 deletions opensvc/daemon/monitor.py
Expand Up @@ -3621,6 +3621,7 @@ def get_arbitrators_data(self):
return self.arbitrators_data

def update_cluster_data(self):
self.apply_deferred_smon()
self.update_node_data()
self.purge_left_nodes()
self.merge_hb_data()
Expand All @@ -3640,6 +3641,17 @@ def purge_left_nodes(self):
self.log.info("purge left node %s data", node)
self.delete_peer_data(node)

def apply_deferred_smon(self):
with shared.DEFERRED_SET_SMON_LOCK:
for path, status, local_expect, global_expect, reset_retries, \
stonith, expected_status, origin in shared.DEFERRED_SET_SMON:
self.set_smon(path, status=status, local_expect=local_expect,
global_expect=global_expect,
reset_retries=reset_retries, stonith=stonith,
expected_status=reset_retries,
defer=True, origin=origin)
del shared.DEFERRED_SET_SMON[:]

def update_node_data(self):
"""
Rescan services config and status.
Expand Down
52 changes: 42 additions & 10 deletions opensvc/daemon/shared.py
Expand Up @@ -130,6 +130,11 @@ def __init__(self):
# DEFERRED_STOP_LISTENER_CLIENTS_LOCK serialize access to DEFERRED_STOP_LISTENER_CLIENTS
DEFERRED_STOP_LISTENER_CLIENTS_LOCK = threading.RLock()

# DEFERRED_SET_SMON define list of deferred smon changes
DEFERRED_SET_SMON = []
# DEFERRED_SET_SMON_LOCK serialize access to DEFERRED_SET_SMON
DEFERRED_SET_SMON_LOCK = threading.RLock()

# thread loop conditions and helpers
DAEMON_STOP = threading.Event()
MON_TICKER = threading.Condition()
Expand Down Expand Up @@ -642,7 +647,11 @@ def set_nmon(self, status=None, local_expect=None, global_expect=None, global_ex

def set_smon(self, path, status=None, local_expect=None,
global_expect=None, reset_retries=False,
stonith=None, expected_status=None):
stonith=None, expected_status=None,
defer=False, origin=""):
msg = "defer" if defer else ""
if origin:
msg += " from %s" % origin
instance = self.get_service_instance(path, Env.nodename)
if not instance:
self.node_data.set(["services", "status", path], {"resources": {}})
Expand Down Expand Up @@ -674,11 +683,12 @@ def set_smon(self, path, status=None, local_expect=None,
if status != smon.status \
and (not expected_status or expected_status == smon.status):
self.log.info(
"%s monitor status change: %s => %s",
"%s monitor status change: %s => %s %s",
path,
smon.status if
smon.status else "none",
status
status,
msg,
)
if smon.status is not None \
and "failed" in smon.status \
Expand All @@ -701,11 +711,12 @@ def set_smon(self, path, status=None, local_expect=None,
local_expect = None
if local_expect != smon.local_expect:
self.log.info(
"%s monitor local expect change: %s => %s",
"%s monitor local expect change: %s => %s %s",
path,
smon.local_expect if
smon.local_expect else "none",
local_expect
local_expect,
msg,
)
smon.local_expect = local_expect
smon.local_expect_updated = time.time()
Expand All @@ -718,18 +729,19 @@ def set_smon(self, path, status=None, local_expect=None,
global_expect = "restarted@%s" % time.time()
if global_expect != smon.global_expect:
self.log.info(
"%s monitor global expect change: %s => %s",
"%s monitor global expect change: %s => %s %s",
path,
smon.global_expect if
smon.global_expect else "none",
global_expect
global_expect,
msg,
)
smon.global_expect = global_expect
smon.global_expect_updated = time.time()
changed = True
if reset_retries and "restart" in smon:
self.log.info("%s monitor resources restart count "
"reset", path)
"reset %s", path, msg)
del smon["restart"]
changed = True

Expand All @@ -738,18 +750,38 @@ def set_smon(self, path, status=None, local_expect=None,
stonith = None
if stonith != smon.stonith:
self.log.info(
"%s monitor stonith change: %s => %s",
"%s monitor stonith change: %s => %s %s",
path,
smon.stonith if
smon.stonith else "none",
stonith
stonith,
msg,
)
smon.stonith = stonith
changed = True
if changed:
smon_view.set([], smon)
wake_monitor(reason="%s mon change" % path)

def defer_set_smon(
self,
path, status=None, local_expect=None, global_expect=None,
reset_retries=False, stonith=None, expected_status=None,
origin=""):

with DEFERRED_SET_SMON_LOCK:
DEFERRED_SET_SMON.append((
path,
status,
local_expect,
global_expect,
reset_retries,
stonith,
expected_status,
origin,
))
wake_monitor(reason="%s defer smon call" % path)

def get_node_monitor(self, nodename=None):
"""
Return the Monitor data of the node.
Expand Down
4 changes: 2 additions & 2 deletions opensvc/drivers/resource/container/__init__.py
Expand Up @@ -437,12 +437,12 @@ def dns_options(self, options):
options.append("use-vc")
return options

def as_storage(self):
def send_containerinfo_arg(self):
container_info = self.get_container_info()
return Storage({
"vm_hostname": self.vm_hostname,
"container.guestos": self.guestos,
"vmem": container_info["vmem"],
"vcpu": container_info["vcpus"],
"zonepath": self.zonepath if hasattr(self, "zonepath") else ""
"zonepath": getattr(self, "zonepath", ""),
})
47 changes: 33 additions & 14 deletions opensvc/env.py
Expand Up @@ -8,6 +8,9 @@
from utilities.storage import Storage


_sysname, _, _, _, _machine, _ = platform.uname()


def create_or_update_dir(d):
if not os.path.exists(d):
os.makedirs(d)
Expand All @@ -16,10 +19,11 @@ def create_or_update_dir(d):
now = time.time()
try:
os.utime(d, (now, now))
except:
except Exception:
# unprivileged
pass


class Paths(object):
def __init__(self, osvc_root_path=None, detect=False):
if osvc_root_path:
Expand All @@ -35,7 +39,11 @@ def __init__(self, osvc_root_path=None, detect=False):
self.pathetc = '/etc/opensvc'
self.pathetcns = '/etc/opensvc/namespaces'
self.pathlog = '/var/log/opensvc'
self.pathtmpv = '/var/tmp/opensvc'
if _sysname != "SunOS":
self.pathtmpv = '/var/tmp/opensvc'
else:
# Prevent use of /var/tmp (may conflict with /system/filesystem/minimal)
self.pathtmpv = '/var/lib/opensvc/tmp'
self.pathvar = '/var/lib/opensvc'
self.pathdoc = '/usr/share/doc/opensvc'
self.pathhtml = '/usr/share/opensvc/html'
Expand Down Expand Up @@ -106,6 +114,7 @@ def prepare_tmp(self):
create_or_update_dir(self.pathtmpv)
self.tmp_prepared = True


class Env(object):
"""Class to store globals
"""
Expand Down Expand Up @@ -175,30 +184,40 @@ class Env(object):
"afs", "ncpfs", "glusterfs", "cephfs",
]
_platform = sys.platform
sysname, x, x, x, machine, x = platform.uname()
sysname = _sysname
module_sysname = sysname.lower().replace("-", "")
nodename = socket.gethostname().lower()
fqdn = socket.getfqdn().lower()
listener_port = 1214
listener_tls_port = 1215

# programs to execute remote command on other nodes or virtual hosts
if _platform == "sunos5" :
if _platform == "sunos5":
if os.path.exists('/usr/local/bin/ssh'):
rsh = "/usr/local/bin/ssh -q -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
rcp = "/usr/local/bin/scp -q -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
rsh = "/usr/local/bin/ssh -q -o StrictHostKeyChecking=no" \
" -o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
rcp = "/usr/local/bin/scp -q -o StrictHostKeyChecking=no" \
" -o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
else:
rsh = "/usr/bin/ssh -q -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes -n"
rcp = "/usr/bin/scp -q -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes"
rsh = "/usr/bin/ssh -q -o StrictHostKeyChecking=no " \
"-o ForwardX11=no -o BatchMode=yes -n"
rcp = "/usr/bin/scp -q -o StrictHostKeyChecking=no " \
" -o ForwardX11=no -o BatchMode=yes"
elif os.path.exists('/etc/vmware-release'):
rsh = "/usr/bin/ssh -q -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes"
rcp = "/usr/bin/scp -q -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes"
rsh = "/usr/bin/ssh -q -o StrictHostKeyChecking=no " \
"-o ForwardX11=no -o BatchMode=yes"
rcp = "/usr/bin/scp -q -o StrictHostKeyChecking=no " \
"-o ForwardX11=no -o BatchMode=yes"
elif sysname == 'OSF1':
rsh = "ssh -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
rcp = "scp -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
rsh = "ssh -o StrictHostKeyChecking=no -o ForwardX11=no " \
"-o BatchMode=yes -o ConnectTimeout=10"
rcp = "scp -o StrictHostKeyChecking=no -o ForwardX11=no "\
"-o BatchMode=yes -o ConnectTimeout=10"
else:
rsh = "/usr/bin/ssh -q -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
rcp = "/usr/bin/scp -q -o StrictHostKeyChecking=no -o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
rsh = "/usr/bin/ssh -q -o StrictHostKeyChecking=no " \
"-o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"
rcp = "/usr/bin/scp -q -o StrictHostKeyChecking=no " \
"-o ForwardX11=no -o BatchMode=yes -o ConnectTimeout=10"

vt_cloud = ["vcloud", "openstack", "amazon"]
vt_libvirt = ["kvm"]
Expand Down

0 comments on commit 004c46d

Please sign in to comment.