Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add locking to workspace pilot

  • Loading branch information...
commit cc8b52bf6f14413896ba4d1a1df2a4639321385b 1 parent 7010925
@oldpatricka oldpatricka authored
Showing with 70 additions and 31 deletions.
  1. +70 −31 pilot/workspacepilot.py
View
101 pilot/workspacepilot.py
@@ -35,27 +35,27 @@
# result of "generate-index.py < workspacepilot.py"
INDEX = """
I. Globals (lines 10-69)
- II. Embedded, default configuration file (lines 71-188)
- III. Imports (lines 190-216)
- IV. Exceptions (lines 218-344)
- V. Logging (lines 346-565)
- VI. Signal handlers (lines 567-669)
- VII. Timer (lines 671-696)
- VIII. Path/system utilities (lines 698-1057)
- IX. Action (lines 1059-1110)
- X. ReserveSlot(Action) (lines 1112-1715)
- XI. KillNine(ReserveSlot) (lines 1717-1795)
- XII. ListenerThread(Thread) (lines 1797-1902)
- XIII. StateChangeListener (lines 1904-2130)
- XIV. XenActions(StateChangeListener) (lines 2132-2846)
- XV. FakeXenActions(XenActions) (lines 2848-2962)
- XVI. XenKillNine(XenActions) (lines 2964-3095)
- XVII. VWSNotifications(StateChangeListener) (lines 3097-3712)
- XVIII. Configuration objects (lines 3714-3944)
- XIX. Convert configurations (lines 3946-4206)
- XX. External configuration (lines 4208-4278)
- XXI. Commandline arguments (lines 4280-4495)
- XXII. Standalone entry and exit (lines 4497-4690)
+ II. Embedded, default configuration file (lines 71-191)
+ III. Imports (lines 193-220)
+ IV. Exceptions (lines 222-348)
+ V. Logging (lines 350-569)
+ VI. Signal handlers (lines 571-673)
+ VII. Timer (lines 675-700)
+ VIII. Path/system utilities (lines 702-1073)
+ IX. Action (lines 1075-1126)
+ X. ReserveSlot(Action) (lines 1128-1732)
+ XI. KillNine(ReserveSlot) (lines 1734-1812)
+ XII. ListenerThread(Thread) (lines 1814-1919)
+ XIII. StateChangeListener (lines 1921-2147)
+ XIV. XenActions(StateChangeListener) (lines 2149-2877)
+ XV. FakeXenActions(XenActions) (lines 2879-2993)
+ XVI. XenKillNine(XenActions) (lines 2995-3126)
+ XVII. VWSNotifications(StateChangeListener) (lines 3128-3743)
+ XVIII. Configuration objects (lines 3745-3981)
+ XIX. Convert configurations (lines 3983-4245)
+ XX. External configuration (lines 4247-4317)
+ XXI. Commandline arguments (lines 4319-4534)
+ XXII. Standalone entry and exit (lines 4536-4729)
"""
RESTART_XEND_SECONDS_DEFAULT = 2.0
@@ -183,6 +183,9 @@
# Not set by default: uncomment and insert the absolute path.
#sshcredential:
+# This lock file prevents multiple instances of pilot from interfering
+# with one another.
+lockfile: /tmp/workspace-pilot.lock
"""
# }}} END: II. Embedded, default configuration file
@@ -204,6 +207,7 @@
import sys
import time
import urllib2
+import fcntl
try:
from threading import Thread
@@ -849,6 +853,18 @@ def checkrootpermissions(path, trace=False):
if trace:
msg += " (%s)" % permdetails
log.info(msg)
+
+def _get_lockhandle(lockfilename):
+ return open(lockfilename, "w+")
+
+def _lock(lockfile):
+ fcntl.flock(lockfile,fcntl.LOCK_EX)
+ log.debug("got pilot lock")
+
+def _unlock(lockfile):
+ fcntl.flock(lockfile,fcntl.LOCK_UN)
+ lockfile.close()
+ log.debug("released pilot lock")
class SimpleRunThread(Thread):
"""Run a command with timeout options, delay, stdin, etc."""
@@ -1379,6 +1395,7 @@ def run_earlyUnreserving(self):
grace = self.conf.graceperiod
ratio = self.conf.earlywaitratio
doze = grace * ratio
+ lockfile = self.conf.lockfile
msg = "earlyUnreserving: allowed to wait for %0.3f seconds" % doze
msg += " (grace period %d * %0.5f early wait ratio)" % (grace, ratio)
@@ -2283,7 +2300,10 @@ def reserving(self, timeout=None):
memory = self.conf.memory
if self.common.trace:
log.debug("XenActions.reserving(), reserving %d MB" % memory)
-
+
+ lockhandle = _get_lockhandle(self.conf.lockfile)
+ _lock(lockhandle)
+
persistent_timestamp("PILOT2")
curmem = self.currentAllocation_MB()
persistent_timestamp("PILOT2B")
@@ -2303,6 +2323,8 @@ def reserving(self, timeout=None):
persistent_timestamp("PILOT2C")
self.memset(targetmem)
persistent_timestamp("PILOT3")
+
+ _unlock(lockhandle)
# assumes lowering always works (see unreserving where we can't assume)
@@ -2332,7 +2354,11 @@ def unreserving(self, timeout=None):
memory = self.conf.memory
if self.common.trace:
log.debug("XenActions.unreserving(), unreserving %d MB" % memory)
-
+
+ # Be sure to unlock for every exit point.
+ lockhandle = _get_lockhandle(self.conf.lockfile)
+ _lock(lockhandle)
+
persistent_timestamp("PILOT20")
curmem = self.currentAllocation_MB()
persistent_timestamp("PILOT21")
@@ -2398,6 +2424,7 @@ def unreserving(self, timeout=None):
msg += "guest VMs are using this memory but we could "
msg += "not kill any."
log.critical(msg)
+ _unlock(lockhandle)
raise UnexpectedError(msg)
# assumes no VMs are started in the meantime
@@ -2416,9 +2443,13 @@ def unreserving(self, timeout=None):
errmsg = "Problem setting memory to %d: %s: %s" % (targetmem,n,err)
if raiseme:
log.critical(errmsg)
+ _unlock(lockhandle)
raise raiseme
else:
+ _unlock(lockhandle)
raise UnexpectedError(errmsg)
+
+ _unlock(lockhandle)
if raiseme:
raise raiseme
@@ -3037,7 +3068,7 @@ def unreserving(self, timeout=None):
log.info("XenKillNine unreserving, releasing as much as we can")
else:
log.info("XenKillNine unreserving, releasing %d MB" % memory)
-
+
curmem = self.currentAllocation_MB()
log.info("current memory MB = %d" % curmem)
@@ -3063,7 +3094,7 @@ def unreserving(self, timeout=None):
targetmem += vm.mem
else:
targetmem = memory
-
+
curmem = self.currentAllocation_MB()
if curmem != targetmem:
# for test harness mostly
@@ -3088,7 +3119,7 @@ def unreserving(self, timeout=None):
raise raiseme
else:
raise UnexpectedError(errmsg)
-
+
if raiseme:
raise raiseme
@@ -3741,7 +3772,7 @@ class ReserveSlotConf:
"""Class for reserve-slot configurations."""
- def __init__(self, duration, graceperiod, earlywaitratio):
+ def __init__(self, duration, graceperiod, earlywaitratio, lockfile):
"""Set the configurations.
Required parameters:
@@ -3763,6 +3794,8 @@ def __init__(self, duration, graceperiod, earlywaitratio):
raise InvalidConfig("duration is required")
if graceperiod == None:
raise InvalidConfig("graceperiod is required")
+ if lockfile == None:
+ raise InvalidConfig("lockfile is required")
try:
duration = int(duration)
@@ -3783,6 +3816,7 @@ def __init__(self, duration, graceperiod, earlywaitratio):
self.duration = duration
self.graceperiod = graceperiod
self.horizon = duration
+ self.lockfile = lockfile
errmsg = "earlywaitratio is required to be a number: zero or between "
errmsg += "zero and one (but not one itself)."
@@ -3797,6 +3831,7 @@ def __init__(self, duration, graceperiod, earlywaitratio):
log.debug("reserve-slot duration: %ds" % self.duration)
log.debug("reserve-slot grace period: %ds" % self.graceperiod)
log.debug("reserve-slot earlywaitratio: %.5f" % self.earlywaitratio)
+ log.debug("reserve-slot lockfile: %s" % self.lockfile)
class XenActionsConf:
@@ -3804,7 +3839,7 @@ class XenActionsConf:
BESTEFFORT = "BESTEFFORT"
- def __init__(self, xmpath, xendpath, xmsudo, sudopath, memory, minmem, xend_secs):
+ def __init__(self, xmpath, xendpath, xmsudo, sudopath, memory, minmem, xend_secs, lockfile):
"""Set the configurations.
Required parameters:
@@ -3835,6 +3870,8 @@ def __init__(self, xmpath, xendpath, xmsudo, sudopath, memory, minmem, xend_secs
self.xmsudo = xmsudo
self.sudopath = sudopath
self.xendpath = xendpath
+ self.lockfile = lockfile
+ log.debug("Xenactions lockfile: %s" % lockfile)
if memory == None:
raise InvalidConfig("memory is required")
@@ -3985,6 +4022,7 @@ def getReserveSlotConf(opts, config):
try:
earlywaitratio = config.get("other", "earlywaitratio")
+ lockfile = config.get("other", "lockfile")
except:
exception_type = sys.exc_type
try:
@@ -3994,7 +4032,7 @@ def getReserveSlotConf(opts, config):
msg = "%s: %s" % (str(exceptname), str(sys.exc_value))
raise InvalidConfig(msg)
- return ReserveSlotConf(opts.duration, opts.graceperiod, earlywaitratio)
+ return ReserveSlotConf(opts.duration, opts.graceperiod, earlywaitratio, lockfile)
def getXenActionsConf(opts, config):
"""Return populated XenActionsConf object or raise InvalidConfig
@@ -4028,6 +4066,7 @@ def getXenActionsConf(opts, config):
try:
xm = config.get("xen", "xm")
minmem = config.get("xen", "minmem")
+ lockfile = config.get("other", "lockfile")
except:
exception_type = sys.exc_type
try:
@@ -4079,7 +4118,7 @@ def getXenActionsConf(opts, config):
raise InvalidConfig(msg)
if not opts.killnine:
- return XenActionsConf(xm, xend, xmsudo, sudo, opts.memory, minmem, xend_secs)
+ return XenActionsConf(xm, xend, xmsudo, sudo, opts.memory, minmem, xend_secs, lockfile)
else:
alt = "going to kill all guest VMs (if they exist) and give dom0 "
alt += "their memory (which may or may not be the maximum available) "
@@ -4107,7 +4146,7 @@ def getXenActionsConf(opts, config):
log.info(msg + ", %s" % alt)
dom0mem = XenActionsConf.BESTEFFORT
- return XenActionsConf(xm, xend, xmsudo, sudo, dom0mem, minmem, xend_secs)
+ return XenActionsConf(xm, xend, xmsudo, sudo, dom0mem, minmem, xend_secs, lockfile)
def getVWSNotificationsConf(opts, config):
"""Return populated VWSNotificationsConf object or raise InvalidConfig
Please sign in to comment.
Something went wrong with that request. Please try again.