Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Migrate to fileutils and lockutils.

Migrate nova to using openstack-common's file and lock utilities.
Resolves bug 1063230.

Change-Id: I1a4c87856bc08cd33b61d7098ed856baa4583654
  • Loading branch information...
commit 0d4e6dbe6f17d0a8d0f93833c1ea70f79944d945 1 parent 86b9147
Michael Still mikalstill authored

Showing 28 changed files with 364 additions and 395 deletions. Show diff stats Hide diff stats

  1. +1 1  doc/source/devref/threading.rst
  2. +2 1  nova/api/openstack/compute/contrib/cloudpipe.py
  3. +2 1  nova/cloudpipe/pipelib.py
  4. +4 3 nova/compute/manager.py
  5. +6 5 nova/compute/resource_tracker.py
  6. +3 2 nova/crypto.py
  7. +0 3  nova/flags.py
  8. +9 7 nova/network/linux_net.py
  9. +2 1  nova/network/manager.py
  10. +4 3 nova/objectstore/s3server.py
  11. +35 0 nova/openstack/common/fileutils.py
  12. +233 0 nova/openstack/common/lockutils.py
  13. +17 16 nova/tests/network/test_linux_net.py
  14. +8 5 nova/tests/test_imagebackend.py
  15. +2 1  nova/tests/test_libvirt.py
  16. +0 94 nova/tests/test_misc.py
  17. +0 37 nova/tests/test_utils.py
  18. +0 192 nova/utils.py
  19. +6 4 nova/virt/baremetal/driver.py
  20. +2 1  nova/virt/configdrive.py
  21. +3 3 nova/virt/firewall.py
  22. +2 2 nova/virt/hyperv/vmops.py
  23. +4 3 nova/virt/libvirt/driver.py
  24. +11 5 nova/virt/libvirt/imagebackend.py
  25. +2 1  nova/virt/libvirt/utils.py
  26. +3 2 nova/virt/libvirt/volume.py
  27. +2 1  nova/volume/iscsi.py
  28. +1 1  openstack-common.conf
2  doc/source/devref/threading.rst
Source Rendered
@@ -12,7 +12,7 @@ view, each OpenStack service runs in a single thread.
12 12
13 13 The use of green threads reduces the likelihood of race conditions, but does
14 14 not completely eliminate them. In some cases, you may need to use the
15   -``@utils.synchronized(...)`` decorator to avoid races.
  15 +``@lockutils.synchronized(...)`` decorator to avoid races.
16 16
17 17 In addition, since there is only one operating system thread, a call that
18 18 blocks that main thread will block the entire process.
3  nova/api/openstack/compute/contrib/cloudpipe.py
@@ -25,6 +25,7 @@
25 25 from nova import exception
26 26 from nova import flags
27 27 from nova import network
  28 +from nova.openstack.common import fileutils
28 29 from nova.openstack.common import log as logging
29 30 from nova.openstack.common import timeutils
30 31 from nova import utils
@@ -69,7 +70,7 @@ def setup(self):
69 70 # NOTE(vish): One of the drawbacks of doing this in the api is
70 71 # the keys will only be on the api node that launched
71 72 # the cloudpipe.
72   - utils.ensure_tree(FLAGS.keys_path)
  73 + fileutils.ensure_tree(FLAGS.keys_path)
73 74
74 75 def _get_all_cloudpipes(self, context):
75 76 """Get all cloudpipes"""
3  nova/cloudpipe/pipelib.py
@@ -33,6 +33,7 @@
33 33 from nova import exception
34 34 from nova import flags
35 35 from nova.openstack.common import cfg
  36 +from nova.openstack.common import fileutils
36 37 from nova.openstack.common import log as logging
37 38 from nova import utils
38 39
@@ -150,7 +151,7 @@ def setup_key_pair(self, context):
150 151 key_name)
151 152 private_key = result['private_key']
152 153 key_dir = os.path.join(FLAGS.keys_path, context.user_id)
153   - utils.ensure_tree(key_dir)
  154 + fileutils.ensure_tree(key_dir)
154 155 key_path = os.path.join(key_dir, '%s.pem' % key_name)
155 156 with open(key_path, 'w') as f:
156 157 f.write(private_key)
7 nova/compute/manager.py
@@ -64,6 +64,7 @@
64 64 from nova.openstack.common import excutils
65 65 from nova.openstack.common import importutils
66 66 from nova.openstack.common import jsonutils
  67 +from nova.openstack.common import lockutils
67 68 from nova.openstack.common import log as logging
68 69 from nova.openstack.common.notifier import api as notifier
69 70 from nova.openstack.common import rpc
@@ -844,7 +845,7 @@ def run_instance(self, context, instance, request_spec=None,
844 845 if injected_files is None:
845 846 injected_files = []
846 847
847   - @utils.synchronized(instance['uuid'])
  848 + @lockutils.synchronized(instance['uuid'], 'nova-')
848 849 def do_run_instance():
849 850 self._run_instance(context, request_spec,
850 851 filter_properties, requested_networks, injected_files,
@@ -954,7 +955,7 @@ def terminate_instance(self, context, instance, bdms=None):
954 955 if not bdms:
955 956 bdms = self._get_instance_volume_bdms(context, instance["uuid"])
956 957
957   - @utils.synchronized(instance['uuid'])
  958 + @lockutils.synchronized(instance['uuid'], 'nova-')
958 959 def do_terminate_instance(instance, bdms):
959 960 try:
960 961 self._delete_instance(context, instance, bdms)
@@ -2027,7 +2028,7 @@ def _attach_volume_boot(self, context, instance, volume, mountpoint):
2027 2028 @wrap_instance_fault
2028 2029 def reserve_block_device_name(self, context, instance, device, volume_id):
2029 2030
2030   - @utils.synchronized(instance['uuid'])
  2031 + @lockutils.synchronized(instance['uuid'], 'nova-')
2031 2032 def do_reserve():
2032 2033 result = compute_utils.get_device_name_for_instance(context,
2033 2034 instance,
11 nova/compute/resource_tracker.py
@@ -27,6 +27,7 @@
27 27 from nova.openstack.common import cfg
28 28 from nova.openstack.common import importutils
29 29 from nova.openstack.common import jsonutils
  30 +from nova.openstack.common import lockutils
30 31 from nova.openstack.common import log as logging
31 32 from nova import utils
32 33
@@ -121,7 +122,7 @@ def resource_claim(self, context, instance_ref, limits=None):
121 122 claim = self.begin_resource_claim(context, instance_ref, limits)
122 123 return ResourceContextManager(context, claim, self)
123 124
124   - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
  125 + @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
125 126 def begin_resource_claim(self, context, instance_ref, limits=None):
126 127 """Indicate that some resources are needed for an upcoming compute
127 128 instance build operation.
@@ -293,7 +294,7 @@ def _can_claim_cpu(self, vcpus, vcpu_limit):
293 294
294 295 return can_claim_cpu
295 296
296   - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
  297 + @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
297 298 def finish_resource_claim(self, claim):
298 299 """Indicate that the compute operation that previously claimed the
299 300 resources identified by 'claim' has now completed and the resources
@@ -308,7 +309,7 @@ def finish_resource_claim(self, claim):
308 309 if self.claims.pop(claim.claim_id, None):
309 310 LOG.debug(_("Finishing claim: %s") % claim)
310 311
311   - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
  312 + @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
312 313 def abort_resource_claim(self, context, claim):
313 314 """Indicate that the operation that claimed the resources identified by
314 315 'claim_id' has either failed or been aborted and the resources are no
@@ -328,7 +329,7 @@ def abort_resource_claim(self, context, claim):
328 329 self._update_usage_from_instance(self.compute_node, claim.instance)
329 330 self._update(context, self.compute_node)
330 331
331   - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
  332 + @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
332 333 def update_usage(self, context, instance):
333 334 """Update the resource usage and stats after a change in an
334 335 instance
@@ -347,7 +348,7 @@ def update_usage(self, context, instance):
347 348 def disabled(self):
348 349 return self.compute_node is None
349 350
350   - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
  351 + @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
351 352 def update_available_resource(self, context):
352 353 """Override in-memory calculations of compute node resource usage based
353 354 on data audited from the hypervisor layer.
5 nova/crypto.py
@@ -33,6 +33,7 @@
33 33 from nova import exception
34 34 from nova import flags
35 35 from nova.openstack.common import cfg
  36 +from nova.openstack.common import fileutils
36 37 from nova.openstack.common import log as logging
37 38 from nova.openstack.common import timeutils
38 39 from nova import utils
@@ -112,7 +113,7 @@ def ensure_ca_filesystem():
112 113 'genrootca.sh')
113 114
114 115 start = os.getcwd()
115   - utils.ensure_tree(ca_dir)
  116 + fileutils.ensure_tree(ca_dir)
116 117 os.chdir(ca_dir)
117 118 utils.execute("sh", genrootca_sh_path)
118 119 os.chdir(start)
@@ -301,7 +302,7 @@ def _sign_csr(csr_text, ca_folder):
301 302 start = os.getcwd()
302 303
303 304 # Change working dir to CA
304   - utils.ensure_tree(ca_folder)
  305 + fileutils.ensure_tree(ca_folder)
305 306 os.chdir(ca_folder)
306 307 utils.execute('openssl', 'ca', '-batch', '-out', outbound, '-config',
307 308 './openssl.cnf', '-infiles', inbound)
3  nova/flags.py
@@ -91,9 +91,6 @@ def _get_my_ip():
91 91 cfg.StrOpt('state_path',
92 92 default='$pybasedir',
93 93 help="Top-level directory for maintaining nova's state"),
94   - cfg.StrOpt('lock_path',
95   - default='$pybasedir',
96   - help='Directory to use for lock files'),
97 94 ]
98 95
99 96 debug_opts = [
16 nova/network/linux_net.py
@@ -28,7 +28,9 @@
28 28 from nova import exception
29 29 from nova import flags
30 30 from nova.openstack.common import cfg
  31 +from nova.openstack.common import fileutils
31 32 from nova.openstack.common import importutils
  33 +from nova.openstack.common import lockutils
32 34 from nova.openstack.common import log as logging
33 35 from nova import utils
34 36
@@ -344,7 +346,7 @@ def apply(self):
344 346
345 347 self._apply()
346 348
347   - @utils.synchronized('iptables', external=True)
  349 + @lockutils.synchronized('iptables', 'nova-', external=True)
348 350 def _apply(self):
349 351 """Apply the current in-memory set of iptables rules.
350 352
@@ -791,7 +793,7 @@ def kill_dhcp(dev):
791 793 # NOTE(ja): Sending a HUP only reloads the hostfile, so any
792 794 # configuration options (like dchp-range, vlan, ...)
793 795 # aren't reloaded.
794   -@utils.synchronized('dnsmasq_start')
  796 +@lockutils.synchronized('dnsmasq_start', 'nova-')
795 797 def restart_dhcp(context, dev, network_ref):
796 798 """(Re)starts a dnsmasq server for a given network.
797 799
@@ -858,7 +860,7 @@ def restart_dhcp(context, dev, network_ref):
858 860 _add_dnsmasq_accept_rules(dev)
859 861
860 862
861   -@utils.synchronized('radvd_start')
  863 +@lockutils.synchronized('radvd_start', 'nova-')
862 864 def update_ra(context, dev, network_ref):
863 865 conffile = _ra_file(dev, 'conf')
864 866 conf_str = """
@@ -957,7 +959,7 @@ def _device_exists(device):
957 959
958 960 def _dhcp_file(dev, kind):
959 961 """Return path to a pid, leases or conf file for a bridge/device."""
960   - utils.ensure_tree(FLAGS.networks_path)
  962 + fileutils.ensure_tree(FLAGS.networks_path)
961 963 return os.path.abspath('%s/nova-%s.%s' % (FLAGS.networks_path,
962 964 dev,
963 965 kind))
@@ -965,7 +967,7 @@ def _dhcp_file(dev, kind):
965 967
966 968 def _ra_file(dev, kind):
967 969 """Return path to a pid or conf file for a bridge/device."""
968   - utils.ensure_tree(FLAGS.networks_path)
  970 + fileutils.ensure_tree(FLAGS.networks_path)
969 971 return os.path.abspath('%s/nova-ra-%s.%s' % (FLAGS.networks_path,
970 972 dev,
971 973 kind))
@@ -1116,7 +1118,7 @@ def ensure_vlan_bridge(_self, vlan_num, bridge, bridge_interface,
1116 1118 return interface
1117 1119
1118 1120 @classmethod
1119   - @utils.synchronized('ensure_vlan', external=True)
  1121 + @lockutils.synchronized('ensure_vlan', 'nova-', external=True)
1120 1122 def ensure_vlan(_self, vlan_num, bridge_interface, mac_address=None):
1121 1123 """Create a vlan unless it already exists."""
1122 1124 interface = 'vlan%s' % vlan_num
@@ -1141,7 +1143,7 @@ def ensure_vlan(_self, vlan_num, bridge_interface, mac_address=None):
1141 1143 return interface
1142 1144
1143 1145 @classmethod
1144   - @utils.synchronized('ensure_bridge', external=True)
  1146 + @lockutils.synchronized('ensure_bridge', 'nova-', external=True)
1145 1147 def ensure_bridge(_self, bridge, interface, net_attrs=None, gateway=True):
1146 1148 """Create a bridge unless it already exists.
1147 1149
3  nova/network/manager.py
@@ -66,6 +66,7 @@
66 66 from nova.openstack.common import excutils
67 67 from nova.openstack.common import importutils
68 68 from nova.openstack.common import jsonutils
  69 +from nova.openstack.common import lockutils
69 70 from nova.openstack.common import log as logging
70 71 from nova.openstack.common.notifier import api as notifier
71 72 from nova.openstack.common import rpc
@@ -854,7 +855,7 @@ def __init__(self, network_driver=None, *args, **kwargs):
854 855 def _import_ipam_lib(self, ipam_lib):
855 856 self.ipam = importutils.import_module(ipam_lib).get_ipam_lib(self)
856 857
857   - @utils.synchronized('get_dhcp')
  858 + @lockutils.synchronized('get_dhcp', 'nova-')
858 859 def _get_dhcp_ip(self, context, network_ref, host=None):
859 860 """Get the proper dhcp address to listen on."""
860 861 # NOTE(vish): this is for compatibility
7 nova/objectstore/s3server.py
@@ -46,6 +46,7 @@
46 46
47 47 from nova import flags
48 48 from nova.openstack.common import cfg
  49 +from nova.openstack.common import fileutils
49 50 from nova import utils
50 51 from nova import wsgi
51 52
@@ -93,7 +94,7 @@ def __init__(self, root_directory, bucket_depth=0, mapper=None):
93 94 mapper.connect('/{bucket_name}/',
94 95 controller=lambda *a, **kw: BucketHandler(self)(*a, **kw))
95 96 self.directory = os.path.abspath(root_directory)
96   - utils.ensure_tree(self.directory)
  97 + fileutils.ensure_tree(self.directory)
97 98 self.bucket_depth = bucket_depth
98 99 super(S3Application, self).__init__(mapper)
99 100
@@ -285,7 +286,7 @@ def put(self, bucket_name):
285 286 os.path.exists(path)):
286 287 self.set_status(403)
287 288 return
288   - utils.ensure_tree(path)
  289 + fileutils.ensure_tree(path)
289 290 self.finish()
290 291
291 292 def delete(self, bucket_name):
@@ -334,7 +335,7 @@ def put(self, bucket, object_name):
334 335 self.set_status(403)
335 336 return
336 337 directory = os.path.dirname(path)
337   - utils.ensure_tree(directory)
  338 + fileutils.ensure_tree(directory)
338 339 object_file = open(path, "w")
339 340 object_file.write(self.request.body)
340 341 object_file.close()
35 nova/openstack/common/fileutils.py
... ... @@ -0,0 +1,35 @@
  1 +# vim: tabstop=4 shiftwidth=4 softtabstop=4
  2 +
  3 +# Copyright 2011 OpenStack LLC.
  4 +# All Rights Reserved.
  5 +#
  6 +# Licensed under the Apache License, Version 2.0 (the "License"); you may
  7 +# not use this file except in compliance with the License. You may obtain
  8 +# a copy of the License at
  9 +#
  10 +# http://www.apache.org/licenses/LICENSE-2.0
  11 +#
  12 +# Unless required by applicable law or agreed to in writing, software
  13 +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  14 +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  15 +# License for the specific language governing permissions and limitations
  16 +# under the License.
  17 +
  18 +
  19 +import errno
  20 +import os
  21 +
  22 +
  23 +def ensure_tree(path):
  24 + """Create a directory (and any ancestor directories required)
  25 +
  26 + :param path: Directory to create
  27 + """
  28 + try:
  29 + os.makedirs(path)
  30 + except OSError as exc:
  31 + if exc.errno == errno.EEXIST:
  32 + if not os.path.isdir(path):
  33 + raise
  34 + else:
  35 + raise
233 nova/openstack/common/lockutils.py
... ... @@ -0,0 +1,233 @@
  1 +# vim: tabstop=4 shiftwidth=4 softtabstop=4
  2 +
  3 +# Copyright 2011 OpenStack LLC.
  4 +# All Rights Reserved.
  5 +#
  6 +# Licensed under the Apache License, Version 2.0 (the "License"); you may
  7 +# not use this file except in compliance with the License. You may obtain
  8 +# a copy of the License at
  9 +#
  10 +# http://www.apache.org/licenses/LICENSE-2.0
  11 +#
  12 +# Unless required by applicable law or agreed to in writing, software
  13 +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  14 +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  15 +# License for the specific language governing permissions and limitations
  16 +# under the License.
  17 +
  18 +
  19 +import errno
  20 +import functools
  21 +import os
  22 +import shutil
  23 +import tempfile
  24 +import time
  25 +import weakref
  26 +
  27 +from eventlet import greenthread
  28 +from eventlet import semaphore
  29 +
  30 +from nova.openstack.common import cfg
  31 +from nova.openstack.common import fileutils
  32 +from nova.openstack.common import log as logging
  33 +
  34 +
  35 +LOG = logging.getLogger(__name__)
  36 +
  37 +
  38 +util_opts = [
  39 + cfg.BoolOpt('disable_process_locking', default=False,
  40 + help='Whether to disable inter-process locks'),
  41 + cfg.StrOpt('lock_path',
  42 + default=os.path.abspath(os.path.join(os.path.dirname(__file__),
  43 + '../')),
  44 + help='Directory to use for lock files')
  45 +]
  46 +
  47 +
  48 +CONF = cfg.CONF
  49 +CONF.register_opts(util_opts)
  50 +
  51 +
  52 +class _InterProcessLock(object):
  53 + """Lock implementation which allows multiple locks, working around
  54 + issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
  55 + not require any cleanup. Since the lock is always held on a file
  56 + descriptor rather than outside of the process, the lock gets dropped
  57 + automatically if the process crashes, even if __exit__ is not executed.
  58 +
  59 + There are no guarantees regarding usage by multiple green threads in a
  60 + single process here. This lock works only between processes. Exclusive
  61 + access between local threads should be achieved using the semaphores
  62 + in the @synchronized decorator.
  63 +
  64 + Note these locks are released when the descriptor is closed, so it's not
  65 + safe to close the file descriptor while another green thread holds the
  66 + lock. Just opening and closing the lock file can break synchronisation,
  67 + so lock files must be accessed only using this abstraction.
  68 + """
  69 +
  70 + def __init__(self, name):
  71 + self.lockfile = None
  72 + self.fname = name
  73 +
  74 + def __enter__(self):
  75 + self.lockfile = open(self.fname, 'w')
  76 +
  77 + while True:
  78 + try:
  79 + # Using non-blocking locks since green threads are not
  80 + # patched to deal with blocking locking calls.
  81 + # Also upon reading the MSDN docs for locking(), it seems
  82 + # to have a laughable 10 attempts "blocking" mechanism.
  83 + self.trylock()
  84 + return self
  85 + except IOError, e:
  86 + if e.errno in (errno.EACCES, errno.EAGAIN):
  87 + # external locks synchronise things like iptables
  88 + # updates - give it some time to prevent busy spinning
  89 + time.sleep(0.01)
  90 + else:
  91 + raise
  92 +
  93 + def __exit__(self, exc_type, exc_val, exc_tb):
  94 + try:
  95 + self.unlock()
  96 + self.lockfile.close()
  97 + except IOError:
  98 + LOG.exception(_("Could not release the acquired lock `%s`"),
  99 + self.fname)
  100 +
  101 + def trylock(self):
  102 + raise NotImplementedError()
  103 +
  104 + def unlock(self):
  105 + raise NotImplementedError()
  106 +
  107 +
  108 +class _WindowsLock(_InterProcessLock):
  109 + def trylock(self):
  110 + msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
  111 +
  112 + def unlock(self):
  113 + msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
  114 +
  115 +
  116 +class _PosixLock(_InterProcessLock):
  117 + def trylock(self):
  118 + fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
  119 +
  120 + def unlock(self):
  121 + fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
  122 +
  123 +
  124 +if os.name == 'nt':
  125 + import msvcrt
  126 + InterProcessLock = _WindowsLock
  127 +else:
  128 + import fcntl
  129 + InterProcessLock = _PosixLock
  130 +
  131 +_semaphores = weakref.WeakValueDictionary()
  132 +
  133 +
  134 +def synchronized(name, lock_file_prefix, external=False, lock_path=None):
  135 + """Synchronization decorator.
  136 +
  137 + Decorating a method like so::
  138 +
  139 + @synchronized('mylock')
  140 + def foo(self, *args):
  141 + ...
  142 +
  143 + ensures that only one thread will execute the bar method at a time.
  144 +
  145 + Different methods can share the same lock::
  146 +
  147 + @synchronized('mylock')
  148 + def foo(self, *args):
  149 + ...
  150 +
  151 + @synchronized('mylock')
  152 + def bar(self, *args):
  153 + ...
  154 +
  155 + This way only one of either foo or bar can be executing at a time.
  156 +
  157 + The lock_file_prefix argument is used to provide lock files on disk with a
  158 + meaningful prefix. The prefix should end with a hyphen ('-') if specified.
  159 +
  160 + The external keyword argument denotes whether this lock should work across
  161 + multiple processes. This means that if two different workers both run a
  162 + a method decorated with @synchronized('mylock', external=True), only one
  163 + of them will execute at a time.
  164 +
  165 + The lock_path keyword argument is used to specify a special location for
  166 + external lock files to live. If nothing is set, then CONF.lock_path is
  167 + used as a default.
  168 + """
  169 +
  170 + def wrap(f):
  171 + @functools.wraps(f)
  172 + def inner(*args, **kwargs):
  173 + # NOTE(soren): If we ever go natively threaded, this will be racy.
  174 + # See http://stackoverflow.com/questions/5390569/dyn
  175 + # amically-allocating-and-destroying-mutexes
  176 + sem = _semaphores.get(name, semaphore.Semaphore())
  177 + if name not in _semaphores:
  178 + # this check is not racy - we're already holding ref locally
  179 + # so GC won't remove the item and there was no IO switch
  180 + # (only valid in greenthreads)
  181 + _semaphores[name] = sem
  182 +
  183 + with sem:
  184 + LOG.debug(_('Got semaphore "%(lock)s" for method '
  185 + '"%(method)s"...'), {'lock': name,
  186 + 'method': f.__name__})
  187 + if external and not CONF.disable_process_locking:
  188 + LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
  189 + 'method "%(method)s"...'),
  190 + {'lock': name, 'method': f.__name__})
  191 + cleanup_dir = False
  192 +
  193 + # We need a copy of lock_path because it is non-local
  194 + local_lock_path = lock_path
  195 + if not local_lock_path:
  196 + local_lock_path = CONF.lock_path
  197 +
  198 + if not local_lock_path:
  199 + cleanup_dir = True
  200 + local_lock_path = tempfile.mkdtemp()
  201 +
  202 + if not os.path.exists(local_lock_path):
  203 + cleanup_dir = True
  204 + fileutils.ensure_tree(local_lock_path)
  205 +
  206 + # NOTE(mikal): the lock name cannot contain directory
  207 + # separators
  208 + safe_name = name.replace(os.sep, '_')
  209 + lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
  210 + lock_file_path = os.path.join(local_lock_path,
  211 + lock_file_name)
  212 +
  213 + try:
  214 + lock = InterProcessLock(lock_file_path)
  215 + with lock:
  216 + LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
  217 + 'for method "%(method)s"...'),
  218 + {'lock': name,
  219 + 'path': lock_file_path,
  220 + 'method': f.__name__})
  221 + retval = f(*args, **kwargs)
  222 + finally:
  223 + # NOTE(vish): This removes the tempdir if we needed
  224 + # to create one. This is used to cleanup
  225 + # the locks left behind by unit tests.
  226 + if cleanup_dir:
  227 + shutil.rmtree(local_lock_path)
  228 + else:
  229 + retval = f(*args, **kwargs)
  230 +
  231 + return retval
  232 + return inner
  233 + return wrap
33 nova/tests/network/test_linux_net.py
@@ -23,6 +23,7 @@
23 23 from nova import db
24 24 from nova import flags
25 25 from nova.network import linux_net
  26 +from nova.openstack.common import fileutils
26 27 from nova.openstack.common import importutils
27 28 from nova.openstack.common import log as logging
28 29 from nova import test
@@ -236,18 +237,18 @@ def test_update_dhcp_for_nw00(self):
236 237 self.flags(use_single_default_gateway=True)
237 238
238 239 self.mox.StubOutWithMock(self.driver, 'write_to_file')
239   - self.mox.StubOutWithMock(utils, 'ensure_tree')
  240 + self.mox.StubOutWithMock(fileutils, 'ensure_tree')
240 241 self.mox.StubOutWithMock(os, 'chmod')
241 242
242 243 self.driver.write_to_file(mox.IgnoreArg(), mox.IgnoreArg())
243 244 self.driver.write_to_file(mox.IgnoreArg(), mox.IgnoreArg())
244   - utils.ensure_tree(mox.IgnoreArg())
245   - utils.ensure_tree(mox.IgnoreArg())
246   - utils.ensure_tree(mox.IgnoreArg())
247   - utils.ensure_tree(mox.IgnoreArg())
248   - utils.ensure_tree(mox.IgnoreArg())
249   - utils.ensure_tree(mox.IgnoreArg())
250   - utils.ensure_tree(mox.IgnoreArg())
  245 + fileutils.ensure_tree(mox.IgnoreArg())
  246 + fileutils.ensure_tree(mox.IgnoreArg())
  247 + fileutils.ensure_tree(mox.IgnoreArg())
  248 + fileutils.ensure_tree(mox.IgnoreArg())
  249 + fileutils.ensure_tree(mox.IgnoreArg())
  250 + fileutils.ensure_tree(mox.IgnoreArg())
  251 + fileutils.ensure_tree(mox.IgnoreArg())
251 252 os.chmod(mox.IgnoreArg(), mox.IgnoreArg())
252 253 os.chmod(mox.IgnoreArg(), mox.IgnoreArg())
253 254
@@ -259,18 +260,18 @@ def test_update_dhcp_for_nw01(self):
259 260 self.flags(use_single_default_gateway=True)
260 261
261 262 self.mox.StubOutWithMock(self.driver, 'write_to_file')
262   - self.mox.StubOutWithMock(utils, 'ensure_tree')
  263 + self.mox.StubOutWithMock(fileutils, 'ensure_tree')
263 264 self.mox.StubOutWithMock(os, 'chmod')
264 265
265 266 self.driver.write_to_file(mox.IgnoreArg(), mox.IgnoreArg())
266 267 self.driver.write_to_file(mox.IgnoreArg(), mox.IgnoreArg())
267   - utils.ensure_tree(mox.IgnoreArg())
268   - utils.ensure_tree(mox.IgnoreArg())
269   - utils.ensure_tree(mox.IgnoreArg())
270   - utils.ensure_tree(mox.IgnoreArg())
271   - utils.ensure_tree(mox.IgnoreArg())
272   - utils.ensure_tree(mox.IgnoreArg())
273   - utils.ensure_tree(mox.IgnoreArg())
  268 + fileutils.ensure_tree(mox.IgnoreArg())
  269 + fileutils.ensure_tree(mox.IgnoreArg())
  270 + fileutils.ensure_tree(mox.IgnoreArg())
  271 + fileutils.ensure_tree(mox.IgnoreArg())
  272 + fileutils.ensure_tree(mox.IgnoreArg())
  273 + fileutils.ensure_tree(mox.IgnoreArg())
  274 + fileutils.ensure_tree(mox.IgnoreArg())
274 275 os.chmod(mox.IgnoreArg(), mox.IgnoreArg())
275 276 os.chmod(mox.IgnoreArg(), mox.IgnoreArg())
276 277
13 nova/tests/test_imagebackend.py
@@ -18,6 +18,7 @@
18 18 import os
19 19
20 20 from nova import flags
  21 +from nova.openstack.common import fileutils
21 22 from nova import test
22 23 from nova.tests import fake_libvirt_utils
23 24 from nova.virt.libvirt import imagebackend
@@ -56,8 +57,8 @@ def test_cache(self):
56 57 os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
57 58 fn = self.mox.CreateMockAnything()
58 59 fn(target=self.TEMPLATE_PATH)
59   - self.mox.StubOutWithMock(imagebackend.utils, 'ensure_tree')
60   - imagebackend.utils.ensure_tree(self.TEMPLATE_DIR)
  60 + self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
  61 + imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
61 62 self.mox.ReplayAll()
62 63
63 64 image = self.image_class(self.INSTANCE, self.NAME)
@@ -83,7 +84,7 @@ def test_cache_base_dir_exists(self):
83 84 os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
84 85 fn = self.mox.CreateMockAnything()
85 86 fn(target=self.TEMPLATE_PATH)
86   - self.mox.StubOutWithMock(imagebackend.utils, 'ensure_tree')
  87 + self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
87 88 self.mox.ReplayAll()
88 89
89 90 image = self.image_class(self.INSTANCE, self.NAME)
@@ -117,7 +118,8 @@ def setUp(self):
117 118
118 119 def prepare_mocks(self):
119 120 fn = self.mox.CreateMockAnything()
120   - self.mox.StubOutWithMock(imagebackend.utils.synchronized, '__call__')
  121 + self.mox.StubOutWithMock(imagebackend.lockutils.synchronized,
  122 + '__call__')
121 123 self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
122 124 self.mox.StubOutWithMock(imagebackend.disk, 'extend')
123 125 return fn
@@ -167,7 +169,8 @@ def setUp(self):
167 169
168 170 def prepare_mocks(self):
169 171 fn = self.mox.CreateMockAnything()
170   - self.mox.StubOutWithMock(imagebackend.utils.synchronized, '__call__')
  172 + self.mox.StubOutWithMock(imagebackend.lockutils.synchronized,
  173 + '__call__')
171 174 self.mox.StubOutWithMock(imagebackend.libvirt_utils,
172 175 'create_cow_image')
173 176 self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
3  nova/tests/test_libvirt.py
@@ -37,6 +37,7 @@
37 37 from nova import db
38 38 from nova import exception
39 39 from nova import flags
  40 +from nova.openstack.common import fileutils
40 41 from nova.openstack.common import importutils
41 42 from nova.openstack.common import jsonutils
42 43 from nova.openstack.common import log as logging
@@ -448,7 +449,7 @@ def setUp(self):
448 449 # use for tests. So, create the path here so utils.synchronized()
449 450 # won't delete it out from under one of the threads.
450 451 self.lock_path = os.path.join(FLAGS.instances_path, 'locks')
451   - utils.ensure_tree(self.lock_path)
  452 + fileutils.ensure_tree(self.lock_path)
452 453
453 454 def fake_exists(fname):
454 455 basedir = os.path.join(FLAGS.instances_path, FLAGS.base_dir_name)
94 nova/tests/test_misc.py
@@ -24,7 +24,6 @@
24 24
25 25 from nova import exception
26 26 from nova import test
27   -from nova import utils
28 27
29 28
30 29 class ExceptionTestCase(test.TestCase):
@@ -63,96 +62,3 @@ def test_all_migrations_have_downgrade(self):
63 62 helpful_msg = (_("The following migrations are missing a downgrade:"
64 63 "\n\t%s") % '\n\t'.join(sorted(missing_downgrade)))
65 64 self.assert_(not missing_downgrade, helpful_msg)
66   -
67   -
68   -class LockTestCase(test.TestCase):
69   - def test_synchronized_wrapped_function_metadata(self):
70   - @utils.synchronized('whatever')
71   - def foo():
72   - """Bar"""
73   - pass
74   - self.assertEquals(foo.__doc__, 'Bar', "Wrapped function's docstring "
75   - "got lost")
76   - self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
77   - "got mangled")
78   -
79   - def test_synchronized_internally(self):
80   - """We can lock across multiple green threads"""
81   - saved_sem_num = len(utils._semaphores)
82   - seen_threads = list()
83   -
84   - @utils.synchronized('testlock2', external=False)
85   - def f(id):
86   - for x in range(10):
87   - seen_threads.append(id)
88   - greenthread.sleep(0)
89   -
90   - threads = []
91   - pool = greenpool.GreenPool(10)
92   - for i in range(10):
93   - threads.append(pool.spawn(f, i))
94   -
95   - for thread in threads:
96   - thread.wait()
97   -
98   - self.assertEquals(len(seen_threads), 100)
99   - # Looking at the seen threads, split it into chunks of 10, and verify
100   - # that the last 9 match the first in each chunk.
101   - for i in range(10):
102   - for j in range(9):
103   - self.assertEquals(seen_threads[i * 10],
104   - seen_threads[i * 10 + 1 + j])
105   -
106   - self.assertEqual(saved_sem_num, len(utils._semaphores),
107   - "Semaphore leak detected")
108   -
109   - def test_nested_external_works(self):
110   - """We can nest external syncs"""
111   - with utils.tempdir() as tempdir:
112   - self.flags(lock_path=tempdir)
113   - sentinel = object()
114   -
115   - @utils.synchronized('testlock1', external=True)
116   - def outer_lock():
117   -
118   - @utils.synchronized('testlock2', external=True)
119   - def inner_lock():
120   - return sentinel
121   - return inner_lock()
122   -
123   - self.assertEqual(sentinel, outer_lock())
124   -
125   - def test_synchronized_externally(self):
126   - """We can lock across multiple processes"""
127   - with utils.tempdir() as tempdir:
128   - self.flags(lock_path=tempdir)
129   - rpipe1, wpipe1 = os.pipe()
130   - rpipe2, wpipe2 = os.pipe()
131   -
132   - @utils.synchronized('testlock1', external=True)
133   - def f(rpipe, wpipe):
134   - try:
135   - os.write(wpipe, "foo")
136   - except OSError, e:
137   - self.assertEquals(e.errno, errno.EPIPE)
138   - return
139   -
140   - rfds, _wfds, _efds = select.select([rpipe], [], [], 1)
141   - self.assertEquals(len(rfds), 0, "The other process, which was"
142   - " supposed to be locked, "
143   - "wrote on its end of the "
144   - "pipe")
145   - os.close(rpipe)
146   -
147   - pid = os.fork()
148   - if pid > 0:
149   - os.close(wpipe1)
150   - os.close(rpipe2)
151   -
152   - f(rpipe1, wpipe2)
153   - else:
154   - os.close(rpipe1)
155   - os.close(wpipe2)
156   -
157   - f(rpipe2, wpipe1)
158   - os._exit(0)
37 nova/tests/test_utils.py
@@ -543,35 +543,6 @@ def test_monkey_patch(self):
543 543 in nova.tests.monkey_patch_example.CALLED_FUNCTION)
544 544
545 545
546   -class TestFileLocks(test.TestCase):
547   - def test_concurrent_green_lock_succeeds(self):
548   - """Verify spawn_n greenthreads with two locks run concurrently."""
549   - self.completed = False
550   - with utils.tempdir() as tmpdir:
551   -
552   - def locka(wait):
553   - a = utils.InterProcessLock(os.path.join(tmpdir, 'a'))
554   - with a:
555   - wait.wait()
556   - self.completed = True
557   -
558   - def lockb(wait):
559   - b = utils.InterProcessLock(os.path.join(tmpdir, 'b'))
560   - with b:
561   - wait.wait()
562   -
563   - wait1 = eventlet.event.Event()
564   - wait2 = eventlet.event.Event()
565   - pool = greenpool.GreenPool()
566   - pool.spawn_n(locka, wait1)
567   - pool.spawn_n(lockb, wait2)
568   - wait2.send()
569   - eventlet.sleep(0)
570   - wait1.send()
571   - pool.waitall()
572   - self.assertTrue(self.completed)
573   -
574   -
575 546 class AuditPeriodTest(test.TestCase):
576 547
577 548 def setUp(self):
@@ -778,11 +749,3 @@ def test_mkfs(self):
778 749
779 750 utils.mkfs('ext4', '/my/block/dev')
780 751 utils.mkfs('swap', '/my/swap/block/dev')
781   -
782   -
783   -class EnsureTree(test.TestCase):
784   - def test_ensure_tree(self):
785   - with utils.tempdir() as tmpdir:
786   - testdir = '%s/foo/bar/baz' % (tmpdir,)
787   - utils.ensure_tree(testdir)
788   - self.assertTrue(os.path.isdir(testdir))
192 nova/utils.py
@@ -586,183 +586,6 @@ def utf8(value):
586 586 return value
587 587
588 588
589   -class _InterProcessLock(object):
590   - """Lock implementation which allows multiple locks, working around
591   - issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
592   - not require any cleanup. Since the lock is always held on a file
593   - descriptor rather than outside of the process, the lock gets dropped
594   - automatically if the process crashes, even if __exit__ is not executed.
595   -
596   - There are no guarantees regarding usage by multiple green threads in a
597   - single process here. This lock works only between processes. Exclusive
598   - access between local threads should be achieved using the semaphores
599   - in the @synchronized decorator.
600   -
601   - Note these locks are released when the descriptor is closed, so it's not
602   - safe to close the file descriptor while another green thread holds the
603   - lock. Just opening and closing the lock file can break synchronisation,
604   - so lock files must be accessed only using this abstraction.
605   - """
606   -
607   - def __init__(self, name):
608   - self.lockfile = None
609   - self.fname = name
610   -
611   - def __enter__(self):
612   - self.lockfile = open(self.fname, 'w')
613   -
614   - while True:
615   - try:
616   - # Using non-blocking locks since green threads are not
617   - # patched to deal with blocking locking calls.
618   - # Also upon reading the MSDN docs for locking(), it seems
619   - # to have a laughable 10 attempts "blocking" mechanism.
620   - self.trylock()
621   - return self
622   - except IOError, e:
623   - if e.errno in (errno.EACCES, errno.EAGAIN):
624   - # external locks synchronise things like iptables
625   - # updates - give it some time to prevent busy spinning
626   - time.sleep(0.01)
627   - else:
628   - raise
629   -
630   - def __exit__(self, exc_type, exc_val, exc_tb):
631   - try:
632   - self.unlock()
633   - self.lockfile.close()
634   - except IOError:
635   - LOG.exception(_("Could not release the acquired lock `%s`")
636   - % self.fname)
637   -
638   - def trylock(self):
639   - raise NotImplementedError()
640   -
641   - def unlock(self):
642   - raise NotImplementedError()
643   -
644   -
645   -class _WindowsLock(_InterProcessLock):
646   - def trylock(self):
647   - msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
648   -
649   - def unlock(self):
650   - msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
651   -
652   -
653   -class _PosixLock(_InterProcessLock):
654   - def trylock(self):
655   - fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
656   -
657   - def unlock(self):
658   - fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
659   -
660   -
661   -if os.name == 'nt':
662   - import msvcrt
663   - InterProcessLock = _WindowsLock
664   -else:
665   - import fcntl
666   - InterProcessLock = _PosixLock
667   -
668   -_semaphores = weakref.WeakValueDictionary()
669   -
670   -
671   -def synchronized(name, external=False, lock_path=None):
672   - """Synchronization decorator.
673   -
674   - Decorating a method like so::
675   -
676   - @synchronized('mylock')
677   - def foo(self, *args):
678   - ...
679   -
680   - ensures that only one thread will execute the bar method at a time.
681   -
682   - Different methods can share the same lock::
683   -
684   - @synchronized('mylock')
685   - def foo(self, *args):
686   - ...
687   -
688   - @synchronized('mylock')
689   - def bar(self, *args):
690   - ...
691   -
692   - This way only one of either foo or bar can be executing at a time.
693   -
694   - The external keyword argument denotes whether this lock should work across
695   - multiple processes. This means that if two different workers both run a
696   - a method decorated with @synchronized('mylock', external=True), only one
697   - of them will execute at a time.
698   -
699   - The lock_path keyword argument is used to specify a special location for
700   - external lock files to live. If nothing is set, then FLAGS.lock_path is
701   - used as a default.
702   - """
703   -
704   - def wrap(f):
705   - @functools.wraps(f)
706   - def inner(*args, **kwargs):
707   - # NOTE(soren): If we ever go natively threaded, this will be racy.
708   - # See http://stackoverflow.com/questions/5390569/dyn
709   - # amically-allocating-and-destroying-mutexes
710   - sem = _semaphores.get(name, semaphore.Semaphore())
711   - if name not in _semaphores:
712   - # this check is not racy - we're already holding ref locally
713   - # so GC won't remove the item and there was no IO switch
714   - # (only valid in greenthreads)
715   - _semaphores[name] = sem
716   -
717   - with sem:
718   - LOG.debug(_('Got semaphore "%(lock)s" for method '
719   - '"%(method)s"...'), {'lock': name,
720   - 'method': f.__name__})
721   - if external and not FLAGS.disable_process_locking:
722   - LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
723   - 'method "%(method)s"...'),
724   - {'lock': name, 'method': f.__name__})
725   - cleanup_dir = False
726   -
727   - # We need a copy of lock_path because it is non-local
728   - local_lock_path = lock_path
729   - if not local_lock_path:
730   - local_lock_path = FLAGS.lock_path
731   -
732   - if not local_lock_path:
733   - cleanup_dir = True
734   - local_lock_path = tempfile.mkdtemp()
735   -
736   - if not os.path.exists(local_lock_path):
737   - cleanup_dir = True
738   - ensure_tree(local_lock_path)
739   -
740   - # NOTE(mikal): the lock name cannot contain directory
741   - # separators
742   - safe_name = name.replace(os.sep, '_')
743   - lock_file_path = os.path.join(local_lock_path,
744   - 'nova-%s' % safe_name)
745   - try:
746   - lock = InterProcessLock(lock_file_path)
747   - with lock:
748   - LOG.debug(_('Got file lock "%(lock)s" for '
749   - 'method "%(method)s"...'),
750   - {'lock': name, 'method': f.__name__})
751   - retval = f(*args, **kwargs)
752   - finally:
753   - # NOTE(vish): This removes the tempdir if we needed
754   - # to create one. This is used to cleanup
755   - # the locks left behind by unit tests.
756   - if cleanup_dir:
757   - shutil.rmtree(local_lock_path)
758   - else:
759   - retval = f(*args, **kwargs)
760   -
761   - return retval
762   - return inner
763   - return wrap
764   -
765   -
766 589 def delete_if_exists(pathname):
767 590 """delete a file, but ignore file not found error"""
768 591
@@ -1314,21 +1137,6 @@ def rollback_and_reraise(self, msg=None, **kwargs):
1314 1137 self._rollback()
1315 1138
1316 1139
1317   -def ensure_tree(path):
1318   - """Create a directory (and any ancestor directories required)
1319   -
1320   - :param path: Directory to create
1321   - """
1322   - try:
1323   - os.makedirs(path)
1324   - except OSError as exc:
1325   - if exc.errno == errno.EEXIST:
1326   - if not os.path.isdir(path):
1327   - raise
1328   - else:
1329   - raise
1330   -
1331   -
1332 1140 def mkfs(fs, path, label=None):
1333 1141 """Format a file or block device
1334 1142
10 nova/virt/baremetal/driver.py
@@ -42,6 +42,8 @@
42 42 from nova import flags
43 43 from nova import notifications
44 44 from nova.openstack.common import cfg
  45 +from nova.openstack.common import fileutils
  46 +from nova.openstack.common import lockutils
45 47 from nova.openstack.common import log as logging
46 48 from nova import utils
47 49 from nova.virt.baremetal import dom
@@ -303,10 +305,10 @@ def _cache_image(fetch_func, target, fname, cow=False, *args, **kwargs):
303 305 if not os.path.exists(target):
304 306 base_dir = os.path.join(FLAGS.instances_path, '_base')
305 307 if not os.path.exists(base_dir):
306   - utils.ensure_tree(base_dir)
  308 + fileutils.ensure_tree(base_dir)
307 309 base = os.path.join(base_dir, fname)
308 310
309   - @utils.synchronized(fname)
  311 + @lockutils.synchronized(fname, 'nova-')
310 312 def call_if_not_exists(base, fetch_func, *args, **kwargs):
311 313 if not os.path.exists(base):
312 314 fetch_func(target=base, *args, **kwargs)
@@ -331,7 +333,7 @@ def basepath(fname='', suffix=suffix):
331 333 fname + suffix)
332 334
333 335 # ensure directories exist and are writable
334   - utils.ensure_tree(basepath(suffix=''))
  336 + fileutils.ensure_tree(basepath(suffix=''))
335 337 utils.execute('chmod', '0777', basepath(suffix=''))
336 338
337 339 LOG.info(_('instance %s: Creating image'), inst['name'],
@@ -339,7 +341,7 @@ def basepath(fname='', suffix=suffix):
339 341
340 342 if FLAGS.baremetal_type == 'lxc':
341 343 container_dir = '%s/rootfs' % basepath(suffix='')
342   - utils.ensure_tree(container_dir)
  344 + fileutils.ensure_tree(container_dir)
343 345
344 346 # NOTE(vish): No need add the suffix to console.log
345 347 libvirt_utils.write_to_file(basepath('console.log', ''), '', 007)
3  nova/virt/configdrive.py
@@ -24,6 +24,7 @@
24 24 from nova import exception
25 25 from nova import flags
26 26 from nova.openstack.common import cfg
  27 +from nova.openstack.common import fileutils
27 28 from nova.openstack.common import log as logging
28 29 from nova import utils
29 30 from nova import version
@@ -66,7 +67,7 @@ def __init__(self, instance_md=None):
66 67 def _add_file(self, path, data):
67 68 filepath = os.path.join(self.tempdir, path)
68 69 dirname = os.path.dirname(filepath)
69   - utils.ensure_tree(dirname)
  70 + fileutils.ensure_tree(dirname)
70 71 with open(filepath, 'w') as f:
71 72 f.write(data)
72 73
6 nova/virt/firewall.py
@@ -24,8 +24,8 @@
24 24 from nova.network import linux_net
25 25 from nova.openstack.common import cfg
26 26 from nova.openstack.common import importutils
  27 +from nova.openstack.common import lockutils
27 28 from nova.openstack.common import log as logging
28   -from nova import utils
29 29 from nova.virt import netutils
30 30
31 31
@@ -430,7 +430,7 @@ def refresh_instance_security_rules(self, instance):
430 430 self.do_refresh_instance_rules(instance)
431 431 self.iptables.apply()
432 432
433   - @utils.synchronized('iptables', external=True)
  433 + @lockutils.synchronized('iptables', 'nova-', external=True)
434 434 def _inner_do_refresh_rules(self, instance, ipv4_rules,
435 435 ipv6_rules):
436 436 self.remove_filters_for_instance(instance)
@@ -453,7 +453,7 @@ def refresh_provider_fw_rules(self):
453 453 self._do_refresh_provider_fw_rules()
454 454 self.iptables.apply()
455 455
456   - @utils.synchronized('iptables', external=True)