Permalink
Cannot retrieve contributors at this time
Fetching contributors…
| # Copyright 2011 OpenStack Foundation. | |
| # All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may | |
| # not use this file except in compliance with the License. You may obtain | |
| # a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
| # License for the specific language governing permissions and limitations | |
| # under the License. | |
| import contextlib | |
| import functools | |
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import threading | |
| import weakref | |
| import fasteners | |
| from oslo_config import cfg | |
| from oslo_utils import reflection | |
| from oslo_utils import timeutils | |
| import six | |
| from oslo_concurrency._i18n import _ | |
| LOG = logging.getLogger(__name__) | |
| _opts = [ | |
| cfg.BoolOpt('disable_process_locking', default=False, | |
| help='Enables or disables inter-process locks.', | |
| deprecated_group='DEFAULT'), | |
| cfg.StrOpt('lock_path', | |
| default=os.environ.get("OSLO_LOCK_PATH"), | |
| help='Directory to use for lock files. For security, the ' | |
| 'specified directory should only be writable by the user ' | |
| 'running the processes that need locking. ' | |
| 'Defaults to environment variable OSLO_LOCK_PATH. ' | |
| 'If external locks are used, a lock path must be set.', | |
| deprecated_group='DEFAULT') | |
| ] | |
| def _register_opts(conf): | |
| conf.register_opts(_opts, group='oslo_concurrency') | |
| CONF = cfg.CONF | |
| _register_opts(CONF) | |
| def set_defaults(lock_path): | |
| """Set value for lock_path. | |
| This can be used by tests to set lock_path to a temporary directory. | |
| """ | |
| cfg.set_defaults(_opts, lock_path=lock_path) | |
| def get_lock_path(conf): | |
| """Return the path used for external file-based locks. | |
| :param conf: Configuration object | |
| :type conf: oslo_config.cfg.ConfigOpts | |
| .. versionadded:: 1.8 | |
| """ | |
| _register_opts(conf) | |
| return conf.oslo_concurrency.lock_path | |
| InterProcessLock = fasteners.InterProcessLock | |
| ReaderWriterLock = fasteners.ReaderWriterLock | |
| """A reader/writer lock. | |
| .. versionadded:: 0.4 | |
| """ | |
| class Semaphores(object): | |
| """A garbage collected container of semaphores. | |
| This collection internally uses a weak value dictionary so that when a | |
| semaphore is no longer in use (by any threads) it will automatically be | |
| removed from this container by the garbage collector. | |
| .. versionadded:: 0.3 | |
| """ | |
| def __init__(self): | |
| self._semaphores = weakref.WeakValueDictionary() | |
| self._lock = threading.Lock() | |
| def get(self, name): | |
| """Gets (or creates) a semaphore with a given name. | |
| :param name: The semaphore name to get/create (used to associate | |
| previously created names with the same semaphore). | |
| Returns an newly constructed semaphore (or an existing one if it was | |
| already created for the given name). | |
| """ | |
| with self._lock: | |
| try: | |
| return self._semaphores[name] | |
| except KeyError: | |
| sem = threading.Semaphore() | |
| self._semaphores[name] = sem | |
| return sem | |
| def __len__(self): | |
| """Returns how many semaphores exist at the current time.""" | |
| return len(self._semaphores) | |
| _semaphores = Semaphores() | |
| def _get_lock_path(name, lock_file_prefix, lock_path=None): | |
| # NOTE(mikal): the lock name cannot contain directory | |
| # separators | |
| name = name.replace(os.sep, '_') | |
| if lock_file_prefix: | |
| sep = '' if lock_file_prefix.endswith('-') else '-' | |
| name = '%s%s%s' % (lock_file_prefix, sep, name) | |
| local_lock_path = lock_path or CONF.oslo_concurrency.lock_path | |
| if not local_lock_path: | |
| raise cfg.RequiredOptError('lock_path') | |
| return os.path.join(local_lock_path, name) | |
| def external_lock(name, lock_file_prefix=None, lock_path=None): | |
| lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) | |
| return InterProcessLock(lock_file_path) | |
| def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None, | |
| semaphores=None): | |
| """Remove an external lock file when it's not used anymore | |
| This will be helpful when we have a lot of lock files | |
| """ | |
| with internal_lock(name, semaphores=semaphores): | |
| lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) | |
| try: | |
| os.remove(lock_file_path) | |
| except OSError: | |
| LOG.info('Failed to remove file %(file)s', | |
| {'file': lock_file_path}) | |
| def internal_lock(name, semaphores=None): | |
| if semaphores is None: | |
| semaphores = _semaphores | |
| return semaphores.get(name) | |
| @contextlib.contextmanager | |
| def lock(name, lock_file_prefix=None, external=False, lock_path=None, | |
| do_log=True, semaphores=None, delay=0.01): | |
| """Context based lock | |
| This function yields a `threading.Semaphore` instance (if we don't use | |
| eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is | |
| True, in which case, it'll yield an InterProcessLock instance. | |
| :param lock_file_prefix: The lock_file_prefix argument is used to provide | |
| lock files on disk with a meaningful prefix. | |
| :param external: The external keyword argument denotes whether this lock | |
| should work across multiple processes. This means that if two different | |
| workers both run a method decorated with @synchronized('mylock', | |
| external=True), only one of them will execute at a time. | |
| :param lock_path: The path in which to store external lock files. For | |
| external locking to work properly, this must be the same for all | |
| references to the lock. | |
| :param do_log: Whether to log acquire/release messages. This is primarily | |
| intended to reduce log message duplication when `lock` is used from the | |
| `synchronized` decorator. | |
| :param semaphores: Container that provides semaphores to use when locking. | |
| This ensures that threads inside the same application can not collide, | |
| due to the fact that external process locks are unaware of a processes | |
| active threads. | |
| :param delay: Delay between acquisition attempts (in seconds). | |
| .. versionchanged:: 0.2 | |
| Added *do_log* optional parameter. | |
| .. versionchanged:: 0.3 | |
| Added *delay* and *semaphores* optional parameters. | |
| """ | |
| int_lock = internal_lock(name, semaphores=semaphores) | |
| with int_lock: | |
| if do_log: | |
| LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name}) | |
| try: | |
| if external and not CONF.oslo_concurrency.disable_process_locking: | |
| ext_lock = external_lock(name, lock_file_prefix, lock_path) | |
| ext_lock.acquire(delay=delay) | |
| if do_log: | |
| LOG.debug('Acquired external semaphore "%(lock)s"', | |
| {'lock': name}) | |
| try: | |
| yield ext_lock | |
| finally: | |
| ext_lock.release() | |
| else: | |
| yield int_lock | |
| finally: | |
| if do_log: | |
| LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name}) | |
| def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, | |
| semaphores=None, delay=0.01): | |
| """Synchronization decorator. | |
| Decorating a method like so:: | |
| @synchronized('mylock') | |
| def foo(self, *args): | |
| ... | |
| ensures that only one thread will execute the foo method at a time. | |
| Different methods can share the same lock:: | |
| @synchronized('mylock') | |
| def foo(self, *args): | |
| ... | |
| @synchronized('mylock') | |
| def bar(self, *args): | |
| ... | |
| This way only one of either foo or bar can be executing at a time. | |
| .. versionchanged:: 0.3 | |
| Added *delay* and *semaphores* optional parameter. | |
| """ | |
| def wrap(f): | |
| @six.wraps(f) | |
| def inner(*args, **kwargs): | |
| t1 = timeutils.now() | |
| t2 = None | |
| try: | |
| with lock(name, lock_file_prefix, external, lock_path, | |
| do_log=False, semaphores=semaphores, delay=delay): | |
| t2 = timeutils.now() | |
| LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' | |
| 'waited %(wait_secs)0.3fs', | |
| {'name': name, | |
| 'function': reflection.get_callable_name(f), | |
| 'wait_secs': (t2 - t1)}) | |
| return f(*args, **kwargs) | |
| finally: | |
| t3 = timeutils.now() | |
| if t2 is None: | |
| held_secs = "N/A" | |
| else: | |
| held_secs = "%0.3fs" % (t3 - t2) | |
| LOG.debug('Lock "%(name)s" released by "%(function)s" :: held ' | |
| '%(held_secs)s', | |
| {'name': name, | |
| 'function': reflection.get_callable_name(f), | |
| 'held_secs': held_secs}) | |
| return inner | |
| return wrap | |
| def synchronized_with_prefix(lock_file_prefix): | |
| """Partial object generator for the synchronization decorator. | |
| Redefine @synchronized in each project like so:: | |
| (in nova/utils.py) | |
| from oslo_concurrency import lockutils | |
| synchronized = lockutils.synchronized_with_prefix('nova-') | |
| (in nova/foo.py) | |
| from nova import utils | |
| @utils.synchronized('mylock') | |
| def bar(self, *args): | |
| ... | |
| The lock_file_prefix argument is used to provide lock files on disk with a | |
| meaningful prefix. | |
| """ | |
| return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) | |
| def remove_external_lock_file_with_prefix(lock_file_prefix): | |
| """Partial object generator for the remove lock file function. | |
| Redefine remove_external_lock_file_with_prefix in each project like so:: | |
| (in nova/utils.py) | |
| from oslo_concurrency import lockutils | |
| synchronized = lockutils.synchronized_with_prefix('nova-') | |
| synchronized_remove = lockutils.remove_external_lock_file_with_prefix( | |
| 'nova-') | |
| (in nova/foo.py) | |
| from nova import utils | |
| @utils.synchronized('mylock') | |
| def bar(self, *args): | |
| ... | |
| <eventually call synchronized_remove('mylock') to cleanup> | |
| The lock_file_prefix argument is used to provide lock files on disk with a | |
| meaningful prefix. | |
| """ | |
| return functools.partial(remove_external_lock_file, | |
| lock_file_prefix=lock_file_prefix) | |
| def _lock_wrapper(argv): | |
| """Create a dir for locks and pass it to command from arguments | |
| This is exposed as a console script entry point named | |
| lockutils-wrapper | |
| If you run this: | |
| lockutils-wrapper python setup.py testr <etc> | |
| a temporary directory will be created for all your locks and passed to all | |
| your tests in an environment variable. The temporary dir will be deleted | |
| afterwards and the return value will be preserved. | |
| """ | |
| lock_dir = tempfile.mkdtemp() | |
| os.environ["OSLO_LOCK_PATH"] = lock_dir | |
| try: | |
| ret_val = subprocess.call(argv[1:]) | |
| finally: | |
| shutil.rmtree(lock_dir, ignore_errors=True) | |
| return ret_val | |
| def main(): | |
| sys.exit(_lock_wrapper(sys.argv)) | |
| if __name__ == '__main__': | |
| raise NotImplementedError(_('Calling lockutils directly is no longer ' | |
| 'supported. Please use the ' | |
| 'lockutils-wrapper console script instead.')) |