Skip to content

Commit

Permalink
Acquire per-sync-resource locks for sync all --rid <rids>
Browse files Browse the repository at this point in the history
The scheduler threads triggers "sync all --rid <rids>", grouping rids with
compatible schedule, but this action acquired the service-wide sync lock,
thus blocking the possibility of running another sync_all on a different
resources set until finished.

This lead to lock timeout errors in the logs.

This patch implements a per-resource lock for sync_all --rid <rids>.
  • Loading branch information
cvaroqui committed Mar 9, 2018
1 parent d58bc34 commit 8400c18
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 5 deletions.
49 changes: 49 additions & 0 deletions lib/resources.py
Expand Up @@ -64,6 +64,7 @@ def __init__(self,
self.encap = encap or "encap" in self.tags
self.sort_key = rid
self.info_in_status = []
self.lockfd = None
try:
self.label = type
except AttributeError:
Expand Down Expand Up @@ -1112,3 +1113,51 @@ def unset_lazy(self, prop):
"""
unset_lazy(self, prop)

def reslock(self, action=None, timeout=30, delay=1, suffix=None):
"""
Acquire the resource action lock.
"""
if self.lockfd is not None:
# already acquired
return

lockfile = os.path.join(rcEnv.paths.pathlock, self.svc.svcname+"."+self.rid)
if suffix is not None:
lockfile = ".".join((lockfile, suffix))

details = "(timeout %d, delay %d, action %s, lockfile %s)" % \
(timeout, delay, action, lockfile)
self.log.debug("acquire resource lock %s", details)

try:
lockfd = lock.lock(
timeout=timeout,
delay=delay,
lockfile=lockfile,
intent=action
)
except lock.LockTimeout as exc:
raise ex.excError("timed out waiting for lock %s: %s" % (details, str(exc)))
except lock.LockNoLockFile:
raise ex.excError("lock_nowait: set the 'lockfile' param %s" % details)
except lock.LockCreateError:
raise ex.excError("can not create lock file %s" % details)
except lock.LockAcquire as exc:
raise ex.excError("another action is currently running %s: %s" % (details, str(exc)))
except ex.excSignal:
raise ex.excError("interrupted by signal %s" % details)
except Exception as exc:
self.save_exc()
raise ex.excError("unexpected locking error %s: %s" % (details, str(exc)))

if lockfd is not None:
self.lockfd = lockfd

def resunlock(self):
"""
Release the service action lock.
"""
lock.unlock(self.lockfd)
self.lockfd = None


52 changes: 47 additions & 5 deletions lib/svc.py
Expand Up @@ -217,6 +217,10 @@ def signal_handler(*args):
"status",
]

ACTIONS_LOCK_COMPAT = {
"postsync": ["sync_all", "sync_nodes", "sync_drp", "sync_update", "sync_resync"],
}

ACTIONS_NO_LOCK = [
"abort",
"docker",
Expand Down Expand Up @@ -1190,6 +1194,27 @@ def svclock(self, action=None, timeout=30, delay=1):
details = "(timeout %d, delay %d, action %s, lockfile %s)" % \
(timeout, delay, action, lockfile)
self.log.debug("acquire service lock %s", details)

# try an immmediate lock acquire and see if the running action is
# compatible
if action in ACTIONS_LOCK_COMPAT:
try:
lockfd = lock.lock(
timeout=0,
delay=delay,
lockfile=lockfile,
intent=action
)
if lockfd is not None:
self.lockfd = lockfd
return
except lock.LockTimeout as exc:
if exc.intent in ACTIONS_LOCK_COMPAT[action]:
return
# not compatible, continue with the normal acquire
except Exception:
pass

try:
lockfd = lock.lock(
timeout=timeout,
Expand Down Expand Up @@ -3896,11 +3921,22 @@ def do_action(self, action, options):
if waitlock < 0:
waitlock = self.lock_timeout

try:
self.svclock(action, timeout=waitlock)
except ex.excError as exc:
self.log.error(str(exc))
return 1
if action == "sync_all" and self.command_is_scoped():
for rid in self.action_rid:
resource = self.resources_by_id[rid]
if not resource.type.startswith("sync"):
continue
try:
resource.reslock(action=action, suffix="sync")
except ex.excError as exc:
self.log.error(str(exc))
return 1
else:
try:
self.svclock(action, timeout=waitlock)
except ex.excError as exc:
self.log.error(str(exc))
return 1

psinfo = self.do_cluster_action(action, options=options)

Expand Down Expand Up @@ -3953,6 +3989,12 @@ def call_action(action):
self.update_status_data()
self.clear_action(action, err)
self.svcunlock()
if action == "sync_all" and self.command_is_scoped():
for rid in self.action_rid:
resource = self.resources_by_id[rid]
if not resource.type.startswith("sync"):
continue
resource.resunlock()

if psinfo:
self.join_cluster_action(**psinfo)
Expand Down

0 comments on commit 8400c18

Please sign in to comment.